12Standalone ring pipeline driver script
16This script produces the condor submit and dag files to run
17the standalone ring code on LIGO data
21from __future__
import print_function
25from optparse
import OptionParser
29from configparser
import ConfigParser
32from igwn_ligolw
import lsctables
33from igwn_ligolw
import utils
as ligolw_utils
34from igwn_ligolw.utils
import segments
as ligolw_segments
35from lal
import LIGOTimeGPS
36from lal
import pipeline
38from lalburst
import offsetvector
39from lalburst
import power
40from lalapps
import cosmicstring
41import igwn_segments
as segments
43__author__ =
'Xavier Siemens<siemens@gravity.phys.uwm.edu>'
45__version__ =
'$Revision$'
58 parser = OptionParser(
59 usage =
"%prog [options] ...",
62 parser.add_option(
"-f",
"--config-file", metavar =
"filename", help =
"Use this configuration file (required).")
63 parser.add_option(
"-l",
"--log-path", metavar =
"path", help =
"Make condor put log files in this directory (required).")
64 parser.add_option(
"--background-time-slides", metavar =
"filename", action =
"append", help =
"Set the name of the file from which to obtain the time slide table for use in the background branch of the pipeline (required). This option can be given multiple times to parallelize the background analysis across time slides. You will want to make sure the time slide files have distinct vectors to not repeat the same analysis multiple times, and in particular you'll want to make sure only one of them has a zero-lag vector in it.")
65 parser.add_option(
"--injection-time-slides", metavar =
"filename", help =
"Set the name of the file from which to obtain the time slide table for use in the injection branch of the pipeline (required).")
66 parser.add_option(
"--segments-file", metavar =
"filename", help =
"Set the name of the LIGO Light-Weight XML file from which to obtain segment lists (required). See ligolw_segments and ligolw_segment_query for more information on constructing an XML-format segments file. See also --segments-name.")
67 parser.add_option(
"--segments-name", metavar =
"name", default =
"segments", help =
"Set the name of the segment lists to retrieve from the segments file (default = \"segments\"). See also --segments-file.")
68 parser.add_option(
"--vetoes-file", metavar =
"filename", help =
"Set the name of the LIGO Light-Weight XML file from which to obtain veto segment lists (optional). See ligolw_segments and ligolw_segment_query for more information on constructing an XML-format segments file. See also --vetos-name.")
69 parser.add_option(
"--vetoes-name", metavar =
"name", default =
"vetoes", help =
"Set the name of the segment lists to retrieve from the veto segments file (default = \"vetoes\"). See also --vetoes-file.")
70 parser.add_option(
"-v",
"--verbose", action =
"store_true", help =
"Be verbose.")
72 options, filenames = parser.parse_args()
74 required_options = [
"log_path",
"config_file",
"background_time_slides",
"injection_time_slides",
"segments_file"]
75 missing_options = [option
for option
in required_options
if getattr(options, option)
is None]
77 raise ValueError(
"missing required options %s" %
", ".join(sorted(
"--%s" % option.replace(
"_",
"-")
for option
in missing_options)))
79 if options.vetoes_file
is not None:
80 options.vetoes_cache = set([CacheEntry(
None,
"VETO",
None,
"file://localhost" + os.path.abspath(options.vetoes_file))])
82 options.vetoes_cache = set()
84 options.injection_time_slides = [options.injection_time_slides]
86 return options, filenames
104basename = os.path.splitext(os.path.basename(options.config_file))[0]
105log_fh = open(basename +
'.pipeline.log',
'w')
108print(
"$Id$\nInvoked with arguments:", file=log_fh)
109for name_value
in options.__dict__.items():
110 print(
"%s %s" % name_value, file=log_fh)
117config_parser = ConfigParser()
118config_parser.read(options.config_file)
124power.init_job_types(config_parser, job_types = (
"datafind",
"binj",
"lladd",
"binjfind",
"burca",
"sqlite"))
133 power.make_dir_if_not_exists(top_level_directory)
134 os.chdir(top_level_directory)
135 power.make_dag_directories(config_parser)
138 power.make_dir_if_not_exists(power.get_triggers_dir(config_parser))
141power.make_dag_directories(config_parser)
142injection_folders = []
143for i
in range(config_parser.getint(
'pipeline',
'injection-runs')):
144 injection_folders.append(os.path.abspath(
"injections%d" % i))
146noninjection_folders = []
147noninjection_folders.append(os.path.abspath(
"noninjections"))
154logfile = tempfile.mkstemp(prefix = basename, suffix =
'.log', dir = options.log_path)[1]
160dag = pipeline.CondorDAG(logfile)
161dag.set_dag_file(basename)
162clipsegments_sql_filename = os.path.abspath(
"clipsegments.sql")
168short_segment_duration = config_parser.getint(
'lalapps_StringSearch',
'short-segment-duration')
169pad = config_parser.getint(
'lalapps_StringSearch',
'pad')
170min_segment_length = config_parser.getint(
'pipeline',
'segment-length')
171trig_overlap = config_parser.getint(
'pipeline',
'trig_overlap')
172overlap = short_segment_duration / 2 + 2 * pad
178instruments = lsctables.instrumentsproperty.get(config_parser.get(
'pipeline',
'ifos'))
179segments_cache = set([CacheEntry(
None,
"SEG",
None,
"file://localhost" + os.path.abspath(options.segments_file))])
180seglists = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.segments_file, verbose = options.verbose), options.segments_name).coalesce()
182for instrument
in set(seglists) - instruments:
184 print(
"warning: ignoring segments for '%s' found in '%s'" % (instrument, options.segments_file), file=sys.stderr)
185 del seglists[instrument]
187if not instruments.issubset(set(seglists)):
188 raise ValueError(
"segment lists retrieved from '%s' missing segments for instruments %s" % (options.segments_file,
", ".join(instruments - set(seglists))))
198 print(
"Computing segments for which lalapps_StringSearch jobs are required ...", file=sys.stderr)
200background_time_slides = {}
201background_seglists = segments.segmentlistdict()
202for filename
in options.background_time_slides:
203 cache_entry = CacheEntry(
None,
"BG",
None,
"file://localhost" + os.path.abspath(filename))
205 background_time_slides[cache_entry] = lsctables.TimeSlideTable.get_table(ligolw_utils.load_filename(filename, verbose = options.verbose)).as_dict().values()
208 for i
in range(len(background_time_slides[cache_entry])):
209 background_time_slides[cache_entry][i] = offsetvector.offsetvector((instrument, LIGOTimeGPS(offset))
for instrument, offset
in background_time_slides[cache_entry][i].items())
210 background_seglists |=
cosmicstring.compute_segment_lists(seglists, offsetvector.component_offsetvectors(background_time_slides[cache_entry], 2), min_segment_length, pad)
212injection_time_slides = {}
213injection_seglists = segments.segmentlistdict()
214for filename
in options.injection_time_slides:
215 cache_entry = CacheEntry(
None,
"INJ",
None,
"file://localhost" + os.path.abspath(filename))
217 injection_time_slides[cache_entry] = lsctables.TimeSlideTable.get_table(ligolw_utils.load_filename(filename, verbose = options.verbose)).as_dict().values()
219 for i
in range(len(injection_time_slides[cache_entry])):
220 injection_time_slides[cache_entry][i] = offsetvector.offsetvector((instrument, LIGOTimeGPS(offset))
for instrument, offset
in injection_time_slides[cache_entry][i].items())
221 injection_seglists |=
cosmicstring.compute_segment_lists(seglists, offsetvector.component_offsetvectors(injection_time_slides[cache_entry], 2), min_segment_length, pad)
232 for background_cache_entry, background_offsetvector
in [(cache_entry, offsetvector)
for cache_entry, offsetvectors
in background_time_slides.items()
for offsetvector
in offsetvectors]:
233 for injection_cache_entry, injection_offsetvector
in [(cache_entry, offsetvector)
for cache_entry, offsetvectors
in injection_time_slides.items()
for offsetvector
in offsetvectors]:
234 if background_offsetvector.deltas == injection_offsetvector.deltas:
235 raise ValueError(
"injections offset vector %s from %s is the same as non-injections offset vector %s from %s. to avoid a self-selection bias, injections must not be performed at the same relative time shifts as a non-injection run" % (str(injection_offsetvector), injection_cache_entry.url, str(background_offsetvector), background_cache_entry.url))
254datafinds = power.make_datafind_stage(dag, injection_seglists | background_seglists, verbose = options.verbose)
263def make_coinc_branch(dag, datafinds, seglists, time_slides, min_segment_length, pad, overlap, short_segment_duration, tag, vetoes_cache = set(), do_injections =
False, injections_offset = 0.0, verbose =
False):
272 assert len(time_slides) == 1
275 maxoffset =
max(abs(offset)
for offsetvectorlist
in time_slides.values()
for offsetvector
in offsetvectorlist
for offset
in offsetvector.values())
291 for seg
in seglists.union(seglists).protract(power.binjjob.time_step + maxoffset).coalesce().contract(power.binjjob.time_step + maxoffset):
292 binjnodes |= power.make_binj_fragment(dag, seg.protract(maxoffset), time_slides.keys()[0], tag, offset = injections_offset)
301 for datafindnode
in datafinds:
302 seg = segments.segment(datafindnode.get_start(), datafindnode.get_end())
303 for binjnode
in binjnodes:
304 if seg.intersects(power.cache_span(binjnode.get_output_cache())):
305 binjnode.add_parent(datafindnode)
313 trigger_nodes =
cosmicstring.make_single_instrument_stage(dag, datafinds, seglists, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length = 3600, binjnodes = binjnodes, verbose = verbose)
320 for n, (time_slides_cache_entry, these_time_slides)
in enumerate(time_slides.items()):
322 print(
"%s %d/%d (%s):" % (tag, n + 1, len(time_slides), time_slides_cache_entry.path), file=sys.stderr)
323 coinc_nodes.append(set())
329 tisi_cache = set([time_slides_cache_entry])
331 for segnum, (seg, parents, cache, clipseg)
in enumerate(power.group_coinc_parents(trigger_nodes, these_time_slides, extentlimit = 150000000.0 / (len(these_time_slides)
or 1), verbose = verbose)):
332 binj_cache = set(cache_entry
for node
in binjnodes
for cache_entry
in node.get_output_cache()
if cache_entry.segment.intersects(seg))
335 assert len(binj_cache) < 2
339 extra_input_cache = vetoes_cache
343 extra_input_cache = tisi_cache | vetoes_cache
344 these_lladd_nodes = power.make_lladd_fragment(dag, parents | binjnodes,
"%s_%d_%x" % (tag, n, segnum), segment = seg, input_cache = cache | binj_cache | segments_cache, extra_input_cache = extra_input_cache, remove_input = do_injections
and clipseg
is not None, preserve_cache = binj_cache | segments_cache | tisi_cache | vetoes_cache)
345 if clipseg
is not None:
353 assert len(these_lladd_nodes) == 1
354 coinc_nodes[-1] |= power.make_burca_fragment(dag, these_lladd_nodes,
"%s_%d" % (tag, n), coincidence_segments = segments.segmentlist([clipseg]), verbose = verbose)
365 lladd_nodes |= these_lladd_nodes
374 print(
"building burca jobs ...", file=sys.stderr)
375 coinc_nodes[-1] |= power.make_burca_fragment(dag, lladd_nodes,
"%s_%d" % (tag, n), verbose = verbose)
377 print(
"done %s %d/%d" % (tag, n + 1, len(time_slides)), file=sys.stderr)
385 print(
"building binjfind jobs ...", file=sys.stderr)
386 coinc_nodes = [power.make_binjfind_fragment(dag, these_coinc_nodes,
"%s_%d" % (tag, n), verbose = verbose)
for n, these_coinc_nodes
in enumerate(coinc_nodes)]
393 print(
"building sqlite jobs ...", file=sys.stderr)
394 coinc_nodes = [power.make_sqlite_fragment(dag, these_coinc_nodes,
"%s_%d" % (tag, n), verbose = verbose)
for n, these_coinc_nodes
in enumerate(coinc_nodes)]
402 print(
"building lalapps_string_meas_likelihood jobs ...", file=sys.stderr)
410 print(
"writing output cache ...", file=sys.stderr)
411 for n, (these_coinc_nodes, these_likelihood_nodes)
in enumerate(zip(coinc_nodes, likelihood_nodes)):
412 power.write_output_cache(these_coinc_nodes | these_likelihood_nodes,
"%s_%s_output.cache" % (os.path.splitext(dag.get_dag_file())[0],
"%s_%d" % (tag, n)))
418 return coinc_nodes, likelihood_nodes
421user_tag = config_parser.get(
'pipeline',
'user_tag')
424injection_coinc_nodes = []
425injection_likelihood_nodes = []
426for i, folder
in enumerate(injection_folders):
431 [these_coinc_nodes], [these_likelihood_nodes] =
make_coinc_branch(dag, datafinds, injection_seglists, injection_time_slides, min_segment_length, pad, overlap, short_segment_duration,
"%s_INJ_%d" % (user_tag, i), vetoes_cache = options.vetoes_cache, do_injections =
True, injections_offset = float(i) / len(injection_folders), verbose = options.verbose)
433 injection_coinc_nodes.append(these_coinc_nodes)
434 injection_likelihood_nodes.append(these_likelihood_nodes)
437for i, folder
in enumerate(noninjection_folders):
441 background_coinc_nodes, background_likelihood_nodes =
make_coinc_branch(dag, datafinds, background_seglists, background_time_slides, min_segment_length, pad, overlap, short_segment_duration,
"%s_%d" % (user_tag, i), vetoes_cache = options.vetoes_cache, do_injections =
False, verbose = options.verbose)
446 return reduce(
lambda a, b: a | b, node_groups, set())
460 print(
"building lalapps_string_calc_likelihood jobs ...", file=sys.stderr)
464 A = list(itertools.combinations(injection_coinc_node_groups, 1))
465 B = list(itertools.combinations(injection_likelihood_node_groups, len(injection_likelihood_node_groups) - 1))
472for n, (these_inj_coinc_nodes, these_inj_likelihood_nodes)
in enumerate(
round_robin_and_flatten(injection_coinc_nodes, injection_likelihood_nodes)):
483 for extra_parent
in all_injection_likelihood_nodes - these_inj_likelihood_nodes:
484 for coinc_node
in these_inj_coinc_nodes:
485 coinc_node.add_parent(extra_parent)
486 coinc_nodes |= these_inj_coinc_nodes
504 print(
"writing dag ...", file=sys.stderr)
513print(
"""Created a DAG file which can be submitted by executing
515$ condor_submit_dag %s
517from a condor submit machine (e.g. hydra.phys.uwm.edu)
519Do not forget to initialize your grid proxy certificate on the condor
520submit machine by running the commands
522$ unset X509_USER_PROXY
523$ grid-proxy-init -hours $((24*7)):00
525""" % dag.get_dag_file())
def init_job_types(config_parser, job_types=("string", "meas_likelihoodjob", "calc_likelihood", "runsqlite"))
Construct definitions of the submit files.
def write_clip_segment_sql_file(filename)
def make_meas_likelihood_fragment(dag, parents, tag, files_per_meas_likelihood=None)
def make_single_instrument_stage(dag, datafinds, seglistdict, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes=set(), verbose=False)
def make_run_sqlite_fragment(dag, parents, tag, sql_file, files_per_run_sqlite=None)
def make_calc_likelihood_fragment(dag, parents, likelihood_parents, tag, files_per_calc_likelihood=None, verbose=False)
def compute_segment_lists(seglists, offset_vectors, min_segment_length, pad)
def check_for_reused_offsetvectors(background_time_slides, injection_time_slides)
def flatten_node_groups(node_groups)
def make_coinc_branch(dag, datafinds, seglists, time_slides, min_segment_length, pad, overlap, short_segment_duration, tag, vetoes_cache=set(), do_injections=False, injections_offset=0.0, verbose=False)
def round_robin_and_flatten(injection_coinc_node_groups, injection_likelihood_node_groups)
def make_dag_directories(top_level_directory, config_parser)