Loading [MathJax]/extensions/TeX/AMSsymbols.js
LALApps 10.1.0.1-b246709
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
lalapps_cosmicstring_pipe.py
Go to the documentation of this file.
1##python
2#
3# =============================================================================
4#
5# Preamble
6#
7# =============================================================================
8#
9
10
11"""
12Standalone ring pipeline driver script
13
14
15
16This script produces the condor submit and dag files to run
17the standalone ring code on LIGO data
18"""
19
20
21from __future__ import print_function
22
23
24import itertools
25from optparse import OptionParser
26import os
27import sys
28import tempfile
29from configparser import ConfigParser
30
31
32from igwn_ligolw import lsctables
33from igwn_ligolw import utils as ligolw_utils
34from igwn_ligolw.utils import segments as ligolw_segments
35from lal import LIGOTimeGPS
36from lal import pipeline
37from lal.utils import CacheEntry
38from lalburst import offsetvector
39from lalburst import power
40from lalapps import cosmicstring
41import igwn_segments as segments
42
43__author__ = 'Xavier Siemens<siemens@gravity.phys.uwm.edu>'
44__date__ = '$Date$'
45__version__ = '$Revision$'
46
47
48#
49# =============================================================================
50#
51# Command Line
52#
53# =============================================================================
54#
55
56
58 parser = OptionParser(
59 usage = "%prog [options] ...",
60 description = "FIXME"
61 )
62 parser.add_option("-f", "--config-file", metavar = "filename", help = "Use this configuration file (required).")
63 parser.add_option("-l", "--log-path", metavar = "path", help = "Make condor put log files in this directory (required).")
64 parser.add_option("--background-time-slides", metavar = "filename", action = "append", help = "Set the name of the file from which to obtain the time slide table for use in the background branch of the pipeline (required). This option can be given multiple times to parallelize the background analysis across time slides. You will want to make sure the time slide files have distinct vectors to not repeat the same analysis multiple times, and in particular you'll want to make sure only one of them has a zero-lag vector in it.")
65 parser.add_option("--injection-time-slides", metavar = "filename", help = "Set the name of the file from which to obtain the time slide table for use in the injection branch of the pipeline (required).")
66 parser.add_option("--segments-file", metavar = "filename", help = "Set the name of the LIGO Light-Weight XML file from which to obtain segment lists (required). See ligolw_segments and ligolw_segment_query for more information on constructing an XML-format segments file. See also --segments-name.")
67 parser.add_option("--segments-name", metavar = "name", default = "segments", help = "Set the name of the segment lists to retrieve from the segments file (default = \"segments\"). See also --segments-file.")
68 parser.add_option("--vetoes-file", metavar = "filename", help = "Set the name of the LIGO Light-Weight XML file from which to obtain veto segment lists (optional). See ligolw_segments and ligolw_segment_query for more information on constructing an XML-format segments file. See also --vetos-name.")
69 parser.add_option("--vetoes-name", metavar = "name", default = "vetoes", help = "Set the name of the segment lists to retrieve from the veto segments file (default = \"vetoes\"). See also --vetoes-file.")
70 parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
71
72 options, filenames = parser.parse_args()
73
74 required_options = ["log_path", "config_file", "background_time_slides", "injection_time_slides", "segments_file"]
75 missing_options = [option for option in required_options if getattr(options, option) is None]
76 if missing_options:
77 raise ValueError("missing required options %s" % ", ".join(sorted("--%s" % option.replace("_", "-") for option in missing_options)))
78
79 if options.vetoes_file is not None:
80 options.vetoes_cache = set([CacheEntry(None, "VETO", None, "file://localhost" + os.path.abspath(options.vetoes_file))])
81 else:
82 options.vetoes_cache = set()
83
84 options.injection_time_slides = [options.injection_time_slides]
85
86 return options, filenames
87
88options, filenames = parse_command_line()
89
90
91#
92# =============================================================================
93#
94# Initialize
95#
96# =============================================================================
97#
98
99
100#
101# start a log file for this script
102#
103
104basename = os.path.splitext(os.path.basename(options.config_file))[0]
105log_fh = open(basename + '.pipeline.log', 'w')
106# FIXME: the following code uses obsolete CVS ID tags.
107# It should be modified to use git version information.
108print("$Id$\nInvoked with arguments:", file=log_fh)
109for name_value in options.__dict__.items():
110 print("%s %s" % name_value, file=log_fh)
111print(file=log_fh)
112
113#
114# create the config parser object and read in the ini file
115#
116
117config_parser = ConfigParser()
118config_parser.read(options.config_file)
119
120#
121# initialize lalapps.power and lalapps.cosmicstring modules
122#
123
124power.init_job_types(config_parser, job_types = ("datafind", "binj", "lladd", "binjfind", "burca", "sqlite"))
125cosmicstring.init_job_types(config_parser, job_types = ("string", "meas_likelihood", "calc_likelihood", "runsqlite"))
126
127#
128# make directories to store the cache files, job logs, and trigger output
129#
130
131def make_dag_directories(top_level_directory, config_parser):
132 cwd = os.getcwd()
133 power.make_dir_if_not_exists(top_level_directory)
134 os.chdir(top_level_directory)
135 power.make_dag_directories(config_parser)
136 # FIXME: move this into make_dag_directories(). requires update
137 # of excess power and gstlal dags
138 power.make_dir_if_not_exists(power.get_triggers_dir(config_parser))
139 os.chdir(cwd)
140
141power.make_dag_directories(config_parser)
142injection_folders = []
143for i in range(config_parser.getint('pipeline', 'injection-runs')):
144 injection_folders.append(os.path.abspath("injections%d" % i))
145 make_dag_directories(injection_folders[-1], config_parser)
146noninjection_folders = []
147noninjection_folders.append(os.path.abspath("noninjections"))
148make_dag_directories(noninjection_folders[-1], config_parser)
149
150#
151# create a log file that the Condor jobs will write to
152#
153
154logfile = tempfile.mkstemp(prefix = basename, suffix = '.log', dir = options.log_path)[1]
155
156#
157# create the DAG writing the log to the specified directory
158#
159
160dag = pipeline.CondorDAG(logfile)
161dag.set_dag_file(basename)
162clipsegments_sql_filename = os.path.abspath("clipsegments.sql")
163
164#
165# get chunk lengths from the values in the ini file
166#
167
168short_segment_duration = config_parser.getint('lalapps_StringSearch', 'short-segment-duration')
169pad = config_parser.getint('lalapps_StringSearch', 'pad')
170min_segment_length = config_parser.getint('pipeline', 'segment-length') # not including pad at each end
171trig_overlap = config_parser.getint('pipeline', 'trig_overlap')
172overlap = short_segment_duration / 2 + 2 * pad # FIXME: correct?
173
174#
175# get the instruments and raw segments
176#
177
178instruments = lsctables.instrumentsproperty.get(config_parser.get('pipeline','ifos'))
179segments_cache = set([CacheEntry(None, "SEG", None, "file://localhost" + os.path.abspath(options.segments_file))])
180seglists = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.segments_file, verbose = options.verbose), options.segments_name).coalesce()
181# remove extra instruments
182for instrument in set(seglists) - instruments:
183 if options.verbose:
184 print("warning: ignoring segments for '%s' found in '%s'" % (instrument, options.segments_file), file=sys.stderr)
185 del seglists[instrument]
186# check for missing instruments
187if not instruments.issubset(set(seglists)):
188 raise ValueError("segment lists retrieved from '%s' missing segments for instruments %s" % (options.segments_file, ", ".join(instruments - set(seglists))))
189# now rely on seglists' keys to provide the instruments
190del instruments
191
192#
193# Using time slide information, construct segment lists describing times
194# requiring trigger construction.
195#
196
197if options.verbose:
198 print("Computing segments for which lalapps_StringSearch jobs are required ...", file=sys.stderr)
199
200background_time_slides = {}
201background_seglists = segments.segmentlistdict()
202for filename in options.background_time_slides:
203 cache_entry = CacheEntry(None, "BG", None, "file://localhost" + os.path.abspath(filename))
204
205 background_time_slides[cache_entry] = lsctables.TimeSlideTable.get_table(ligolw_utils.load_filename(filename, verbose = options.verbose)).as_dict().values()
206
207
208 for i in range(len(background_time_slides[cache_entry])):
209 background_time_slides[cache_entry][i] = offsetvector.offsetvector((instrument, LIGOTimeGPS(offset)) for instrument, offset in background_time_slides[cache_entry][i].items())
210 background_seglists |= cosmicstring.compute_segment_lists(seglists, offsetvector.component_offsetvectors(background_time_slides[cache_entry], 2), min_segment_length, pad)
211
212injection_time_slides = {}
213injection_seglists = segments.segmentlistdict()
214for filename in options.injection_time_slides:
215 cache_entry = CacheEntry(None, "INJ", None, "file://localhost" + os.path.abspath(filename))
216
217 injection_time_slides[cache_entry] = lsctables.TimeSlideTable.get_table(ligolw_utils.load_filename(filename, verbose = options.verbose)).as_dict().values()
218
219 for i in range(len(injection_time_slides[cache_entry])):
220 injection_time_slides[cache_entry][i] = offsetvector.offsetvector((instrument, LIGOTimeGPS(offset)) for instrument, offset in injection_time_slides[cache_entry][i].items())
221 injection_seglists |= cosmicstring.compute_segment_lists(seglists, offsetvector.component_offsetvectors(injection_time_slides[cache_entry], 2), min_segment_length, pad)
222
223
224#
225# check for duplicate offset vectors
226#
227
228def check_for_reused_offsetvectors(background_time_slides, injection_time_slides):
229 # make sure none of the injection run offset vectors is also used
230 # for a non-injection run
231
232 for background_cache_entry, background_offsetvector in [(cache_entry, offsetvector) for cache_entry, offsetvectors in background_time_slides.items() for offsetvector in offsetvectors]:
233 for injection_cache_entry, injection_offsetvector in [(cache_entry, offsetvector) for cache_entry, offsetvectors in injection_time_slides.items() for offsetvector in offsetvectors]:
234 if background_offsetvector.deltas == injection_offsetvector.deltas:
235 raise ValueError("injections offset vector %s from %s is the same as non-injections offset vector %s from %s. to avoid a self-selection bias, injections must not be performed at the same relative time shifts as a non-injection run" % (str(injection_offsetvector), injection_cache_entry.url, str(background_offsetvector), background_cache_entry.url))
236
237check_for_reused_offsetvectors(background_time_slides, injection_time_slides)
238
239
240#
241# =============================================================================
242#
243# Build DAG
244#
245# =============================================================================
246#
247
248
249#
250# datafinds
251#
252
253
254datafinds = power.make_datafind_stage(dag, injection_seglists | background_seglists, verbose = options.verbose)
255
256
257#
258# trigger production, coincidence, injection identification, likelihood
259# ratio probability density data collection
260#
261
262
263def make_coinc_branch(dag, datafinds, seglists, time_slides, min_segment_length, pad, overlap, short_segment_duration, tag, vetoes_cache = set(), do_injections = False, injections_offset = 0.0, verbose = False):
264 #
265 # injection job
266 #
267
268 binjnodes = set()
269 if do_injections:
270 # don't know what to do with more than one list of offset
271 # vectors
272 assert len(time_slides) == 1
273
274 # get the largest injection offset's magnitude
275 maxoffset = max(abs(offset) for offsetvectorlist in time_slides.values() for offsetvector in offsetvectorlist for offset in offsetvector.values())
276
277 # to save disk space and speed the dag along we don't
278 # generate a single injection list for the entire analysis
279 # run, instead a separate list is constructed for each
280 # block of data to be analyzed. we need to be careful that
281 # two nearby injection lists don't contain injections for
282 # the same time, so we protract the segments by the time
283 # step and coalesce so that only gaps between segments
284 # larger than twice the time step result in separate files
285 # being generated. we could allow smaller gaps to survive,
286 # but this way we don't have to worry about it.
287
288 # injections_offset is a number between 0 and 1 in units of
289 # the period between injections
290
291 for seg in seglists.union(seglists).protract(power.binjjob.time_step + maxoffset).coalesce().contract(power.binjjob.time_step + maxoffset):
292 binjnodes |= power.make_binj_fragment(dag, seg.protract(maxoffset), time_slides.keys()[0], tag, offset = injections_offset)
293
294 # artificial parent-child relationship to induce dagman to
295 # submit binj jobs as the corresponding datafinds complete
296 # instead of submiting all of one kind before any of the next.
297 # makes dag run faster because it allows string search jobs to
298 # start moving onto the cluster without waiting for all the
299 # datafinds and/or all the binjs to complete
300
301 for datafindnode in datafinds:
302 seg = segments.segment(datafindnode.get_start(), datafindnode.get_end())
303 for binjnode in binjnodes:
304 if seg.intersects(power.cache_span(binjnode.get_output_cache())):
305 binjnode.add_parent(datafindnode)
306
307 #
308 # trigger generator jobs
309 #
310
311 # set max job length to ~3600 s (will be clipped to an allowed
312 # size)
313 trigger_nodes = cosmicstring.make_single_instrument_stage(dag, datafinds, seglists, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length = 3600, binjnodes = binjnodes, verbose = verbose)
314
315 #
316 # coincidence analysis
317 #
318
319 coinc_nodes = []
320 for n, (time_slides_cache_entry, these_time_slides) in enumerate(time_slides.items()):
321 if verbose:
322 print("%s %d/%d (%s):" % (tag, n + 1, len(time_slides), time_slides_cache_entry.path), file=sys.stderr)
323 coinc_nodes.append(set())
324
325 #
326 # lalapps_cafe & ligolw_add
327 #
328
329 tisi_cache = set([time_slides_cache_entry])
330 lladd_nodes = set()
331 for segnum, (seg, parents, cache, clipseg) in enumerate(power.group_coinc_parents(trigger_nodes, these_time_slides, extentlimit = 150000000.0 / (len(these_time_slides) or 1), verbose = verbose)):
332 binj_cache = set(cache_entry for node in binjnodes for cache_entry in node.get_output_cache() if cache_entry.segment.intersects(seg))
333 # otherwise too many copies of the offset vector
334 # will be fed into burca
335 assert len(binj_cache) < 2
336 if do_injections:
337 # lalapps_binj has already copied the time
338 # slide document into its own output
339 extra_input_cache = vetoes_cache
340 else:
341 # ligolw_add needs to copy the time slide
342 # document into its output
343 extra_input_cache = tisi_cache | vetoes_cache
344 these_lladd_nodes = power.make_lladd_fragment(dag, parents | binjnodes, "%s_%d_%x" % (tag, n, segnum), segment = seg, input_cache = cache | binj_cache | segments_cache, extra_input_cache = extra_input_cache, remove_input = do_injections and clipseg is not None, preserve_cache = binj_cache | segments_cache | tisi_cache | vetoes_cache)
345 if clipseg is not None:
346 #
347 # this is a fragment of a too-large burca
348 # job, construct it specially and add the
349 # command-line option needed to clip the
350 # output
351 #
352
353 assert len(these_lladd_nodes) == 1
354 coinc_nodes[-1] |= power.make_burca_fragment(dag, these_lladd_nodes, "%s_%d" % (tag, n), coincidence_segments = segments.segmentlist([clipseg]), verbose = verbose)
355
356 else:
357 #
358 # this is not a fragment of a too-large
359 # burca job, add it to the pool of files to
360 # be processed by the burcas that don't
361 # require special clipping command line
362 # options
363 #
364
365 lladd_nodes |= these_lladd_nodes
366
367 #
368 # lalburst_coinc pool. these are the burca jobs that don't
369 # require special clipping command line options, and so can
370 # bulk-process many files with each job
371 #
372
373 if verbose:
374 print("building burca jobs ...", file=sys.stderr)
375 coinc_nodes[-1] |= power.make_burca_fragment(dag, lladd_nodes, "%s_%d" % (tag, n), verbose = verbose)
376 if verbose:
377 print("done %s %d/%d" % (tag, n + 1, len(time_slides)), file=sys.stderr)
378
379 #
380 # lalburst_injfind
381 #
382
383 if do_injections:
384 if verbose:
385 print("building binjfind jobs ...", file=sys.stderr)
386 coinc_nodes = [power.make_binjfind_fragment(dag, these_coinc_nodes, "%s_%d" % (tag, n), verbose = verbose) for n, these_coinc_nodes in enumerate(coinc_nodes)]
387
388 #
389 # ligolw_sqlite and ligolw_run_sqlite
390 #
391
392 if verbose:
393 print("building sqlite jobs ...", file=sys.stderr)
394 coinc_nodes = [power.make_sqlite_fragment(dag, these_coinc_nodes, "%s_%d" % (tag, n), verbose = verbose) for n, these_coinc_nodes in enumerate(coinc_nodes)]
395 coinc_nodes = [cosmicstring.make_run_sqlite_fragment(dag, these_coinc_nodes, "%s_%d" % (tag, n), clipsegments_sql_filename) for n, these_coinc_nodes in enumerate(coinc_nodes)]
396
397 #
398 # lalapps_string_meas_likelihood
399 #
400
401 if verbose:
402 print("building lalapps_string_meas_likelihood jobs ...", file=sys.stderr)
403 likelihood_nodes = [cosmicstring.make_meas_likelihood_fragment(dag, these_coinc_nodes, "%s_%d" % (tag, n)) for n, these_coinc_nodes in enumerate(coinc_nodes)]
404
405 #
406 # write output cache
407 #
408
409 if verbose:
410 print("writing output cache ...", file=sys.stderr)
411 for n, (these_coinc_nodes, these_likelihood_nodes) in enumerate(zip(coinc_nodes, likelihood_nodes)):
412 power.write_output_cache(these_coinc_nodes | these_likelihood_nodes, "%s_%s_output.cache" % (os.path.splitext(dag.get_dag_file())[0], "%s_%d" % (tag, n)))
413
414 #
415 # done
416 #
417
418 return coinc_nodes, likelihood_nodes
419
420
421user_tag = config_parser.get('pipeline', 'user_tag')
422
423
424injection_coinc_nodes = []
425injection_likelihood_nodes = []
426for i, folder in enumerate(injection_folders):
427 cwd = os.getcwd()
428 os.chdir(folder)
429 # note: unpacking enforces rule that each injection run uses a
430 # single time-slides file
431 [these_coinc_nodes], [these_likelihood_nodes] = make_coinc_branch(dag, datafinds, injection_seglists, injection_time_slides, min_segment_length, pad, overlap, short_segment_duration, "%s_INJ_%d" % (user_tag, i), vetoes_cache = options.vetoes_cache, do_injections = True, injections_offset = float(i) / len(injection_folders), verbose = options.verbose)
432 os.chdir(cwd)
433 injection_coinc_nodes.append(these_coinc_nodes)
434 injection_likelihood_nodes.append(these_likelihood_nodes)
435
436
437for i, folder in enumerate(noninjection_folders):
438 assert i == 0 # loop only works for one iteration
439 cwd = os.getcwd()
440 os.chdir(folder)
441 background_coinc_nodes, background_likelihood_nodes = make_coinc_branch(dag, datafinds, background_seglists, background_time_slides, min_segment_length, pad, overlap, short_segment_duration, "%s_%d" % (user_tag, i), vetoes_cache = options.vetoes_cache, do_injections = False, verbose = options.verbose)
442 os.chdir(cwd)
443
444
445def flatten_node_groups(node_groups):
446 return reduce(lambda a, b: a | b, node_groups, set())
447
448
449all_background_likelihood_nodes = flatten_node_groups(background_likelihood_nodes)
450all_injection_likelihood_nodes = flatten_node_groups(injection_likelihood_nodes)
451
452
453#
454# round-robin parameter distribution density data for likelihood ratio
455# computation.
456#
457
458
459if options.verbose:
460 print("building lalapps_string_calc_likelihood jobs ...", file=sys.stderr)
461
462def round_robin_and_flatten(injection_coinc_node_groups, injection_likelihood_node_groups):
463 # round-robin the node lists
464 A = list(itertools.combinations(injection_coinc_node_groups, 1))
465 B = list(itertools.combinations(injection_likelihood_node_groups, len(injection_likelihood_node_groups) - 1))
466 B.reverse()
467 A = [flatten_node_groups(node_groups) for node_groups in A]
468 B = [flatten_node_groups(node_groups) for node_groups in B]
469 return zip(A, B)
470
471coinc_nodes = set()
472for n, (these_inj_coinc_nodes, these_inj_likelihood_nodes) in enumerate(round_robin_and_flatten(injection_coinc_nodes, injection_likelihood_nodes)):
473 these_inj_coinc_nodes = cosmicstring.make_calc_likelihood_fragment(dag, these_inj_coinc_nodes, these_inj_likelihood_nodes | all_background_likelihood_nodes, "%s_INJ_%d" % (user_tag, n), verbose = options.verbose)
474 # to prevent race conditions, none of the calc_likelihood jobs
475 # should start before all the meas_likelihood jobs have completed,
476 # even those that produce data not required by the calc_likelihood
477 # jobs because the calc_likelihood jobs might overwrite the
478 # database files just as the meas_likelihood jobs are opening them
479 # for reading. this is only a problem for injection
480 # calc_likelihood jobs because of the round-robining, the
481 # non-injection ones already have all meas_likelihood jobs as
482 # parents.
483 for extra_parent in all_injection_likelihood_nodes - these_inj_likelihood_nodes:
484 for coinc_node in these_inj_coinc_nodes:
485 coinc_node.add_parent(extra_parent)
486 coinc_nodes |= these_inj_coinc_nodes
487coinc_nodes |= cosmicstring.make_calc_likelihood_fragment(dag, flatten_node_groups(background_coinc_nodes), all_injection_likelihood_nodes | all_background_likelihood_nodes, user_tag, files_per_calc_likelihood = 1, verbose = options.verbose)
488
489
490
491#
492# =============================================================================
493#
494# Done
495#
496# =============================================================================
497#
498
499#
500# write DAG
501#
502
503if options.verbose:
504 print("writing dag ...", file=sys.stderr)
505dag.write_sub_files()
506dag.write_dag()
507cosmicstring.write_clip_segment_sql_file(clipsegments_sql_filename)
508
509#
510# write a message telling the user that the DAG has been written
511#
512
513print("""Created a DAG file which can be submitted by executing
514
515$ condor_submit_dag %s
516
517from a condor submit machine (e.g. hydra.phys.uwm.edu)
518
519Do not forget to initialize your grid proxy certificate on the condor
520submit machine by running the commands
521
522$ unset X509_USER_PROXY
523$ grid-proxy-init -hours $((24*7)):00
524
525""" % dag.get_dag_file())
def init_job_types(config_parser, job_types=("string", "meas_likelihoodjob", "calc_likelihood", "runsqlite"))
Construct definitions of the submit files.
def write_clip_segment_sql_file(filename)
def make_meas_likelihood_fragment(dag, parents, tag, files_per_meas_likelihood=None)
def make_single_instrument_stage(dag, datafinds, seglistdict, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes=set(), verbose=False)
def make_run_sqlite_fragment(dag, parents, tag, sql_file, files_per_run_sqlite=None)
def make_calc_likelihood_fragment(dag, parents, likelihood_parents, tag, files_per_calc_likelihood=None, verbose=False)
def compute_segment_lists(seglists, offset_vectors, min_segment_length, pad)
def check_for_reused_offsetvectors(background_time_slides, injection_time_slides)
def make_coinc_branch(dag, datafinds, seglists, time_slides, min_segment_length, pad, overlap, short_segment_duration, tag, vetoes_cache=set(), do_injections=False, injections_offset=0.0, verbose=False)
def round_robin_and_flatten(injection_coinc_node_groups, injection_likelihood_node_groups)
def make_dag_directories(top_level_directory, config_parser)