32Excess power offline pipeline construction script.
37from optparse
import OptionParser
41from configparser
import (ConfigParser, NoOptionError)
44import igwn_segments
as segments
45from igwn_segments
import utils
as segmentsUtils
46from lal
import LIGOTimeGPS
47from lal
import pipeline
49from lalburst
import cafe
50from lalburst
import timeslides
51from lalburst
import power
54__author__ =
"Kipp Cannon <kipp@gravity.phys.uwm.edu>"
56__version__ =
"$Revision$"
69 parser = OptionParser(
70 version =
"%prog CVS $Id$",
71 description =
"%prog builds an excess power pipeline DAG suitable for running at the various LSC Data Grid sites. The script requires a configuration file. An example file can be found in the LALApps CVS."
73 parser.add_option(
"--condor-log-dir", metavar =
"path", default =
".", help =
"Set the directory for Condor log files (default = \".\").")
74 parser.add_option(
"--config-file", metavar =
"filename", default =
"power.ini", help =
"Set .ini configuration file name (default = \"power.ini\").")
75 parser.add_option(
"--full-segments", action =
"store_true", help =
"Analyze all data from segment lists, not just coincident times.")
76 parser.add_option(
"--minimum-gap", metavar =
"seconds", type =
"float", default = 60.0, help =
"Merge jobs analyzing data from the same instrument if the gap between them is less than this many seconds (default = 60).")
77 parser.add_option(
"--variant", metavar =
"[injections|noninjections|both]", default =
"both", help =
"Select the variant of the pipeline to construct. \"injections\" produces a simulations-only version of the pipeline, \"noninjections\" produces a version with no simulation jobs, and \"both\" produces a full pipeline with both simulation and non-simulation jobs.")
78 parser.add_option(
"--background-time-slides", metavar =
"filename", default = [], action =
"append", help =
"Set file from which to obtain the time slide table for use in the background branch of the pipeline (default = \"background_time_slides.xml.gz\"). Provide this argument multiple times to provide multiple time slide files, each will result in a separate set of lalburst_coinc jobs.")
79 parser.add_option(
"--injection-time-slides", metavar =
"filename", help =
"Set file from which to obtain the time slide table for use in the injection branch of the pipeline (default = \"injection_time_slides.xml.gz\").")
80 parser.add_option(
"-v",
"--verbose", action =
"store_true", help =
"Be verbose.")
81 options, filenames = parser.parse_args()
83 if options.variant
not in (
"injections",
"noninjections",
"both"):
84 raise ValueError(
"unrecognized --variant %s" % options.variant)
85 options.do_injections = options.variant
in (
"injections",
"both")
86 options.do_noninjections = options.variant
in (
"noninjections",
"both")
88 if options.do_injections
and not options.injection_time_slides:
89 raise ValueError(
"missing required --injection-time-slides argument")
90 if options.do_noninjections
and not options.background_time_slides:
91 raise ValueError(
"missing required --background-time-slides argument")
95 options.injection_time_slides = [options.injection_time_slides]
97 return options, (filenames
or [
"power.dag"])
111 print(
"reading %s ..." % options.config_file, file=sys.stderr)
112 config = ConfigParser()
113 config.read(options.config_file)
115 options.tag = config.get(
"pipeline",
"user_tag")
116 options.enable_clustering = config.getboolean(
"pipeline",
"enable_clustering")
118 seglistdict = segments.segmentlistdict()
120 for ifo
in config.get(
"pipeline",
"ifos").split():
121 seglistdict[ifo] = segmentsUtils.fromsegwizard(open(config.get(
"pipeline",
"seglist_%s" % ifo)), coltype = LIGOTimeGPS).coalesce()
123 offset = config.getfloat(
"pipeline",
"tiling_phase_%s" % ifo)
124 except NoOptionError:
127 tiling_phase[ifo] = offset
129 options.psds_per_power = config.getint(
"pipeline",
"psds_per_power")
130 options.psds_per_injection = config.getint(
"pipeline",
"psds_per_injection")
131 options.timing_params = power.TimingParameters(config)
133 return seglistdict, tiling_phase, config
145def compute_segment_lists(seglistdict, time_slides, minimum_gap, timing_params, full_segments = True, verbose = False):
147 print(
"constructing segment list ...", file=sys.stderr)
149 seglistdict = seglistdict.copy()
151 if not full_segments:
157 power.remove_too_short_segments(seglistdict, timing_params)
161 new = cafe.get_coincident_segmentlistdict(seglistdict, time_slides)
165 for seglist
in new.values():
169 psds = [power.psds_from_job_length(timing_params, float(abs(seg)))
for seg
in seglist]
172 psds = [int(math.ceil(
max(n, 1.0)))
for n
in psds]
175 durations = [power.job_length_from_psds(timing_params, n)
for n
in psds]
178 for i, seg
in enumerate(seglist):
179 seglist[i] = segments.segment(seg[0], seg[0] + durations[i])
188 psds = [power.psds_from_job_length(timing_params, float(abs(seg)))
for seg
in seglist]
191 psds = [int(math.ceil(
max(n, 1.0)))
for n
in psds]
194 durations = [power.job_length_from_psds(timing_params, n)
for n
in psds]
197 for i, seg
in enumerate(seglist):
198 seglist[i] = segments.segment(seg[1] - durations[i], seg[1])
206 new.protract(minimum_gap / 2).contract(minimum_gap / 2)
213 power.remove_too_short_segments(seglistdict, timing_params)
249power.init_job_types(config_parser)
259 print(
"Computing segments for which lalapps_power jobs are required ...", file=sys.stderr)
261background_time_slides = {}
262background_seglistdict = segments.segmentlistdict()
263if options.do_noninjections:
264 for filename
in options.background_time_slides:
265 cache_entry = CacheEntry(
None,
None,
None,
"file://localhost" + os.path.abspath(filename))
266 background_time_slides[cache_entry] = timeslides.load_time_slides(filename, verbose = options.verbose).values()
267 background_seglistdict |=
compute_segment_lists(seglistdict, background_time_slides[cache_entry], options.minimum_gap, options.timing_params, full_segments = options.full_segments, verbose = options.verbose)
270injection_time_slides = {}
271injection_seglistdict = segments.segmentlistdict()
272if options.do_injections:
273 for filename
in options.injection_time_slides:
274 cache_entry = CacheEntry(
None,
None,
None,
"file://localhost" + os.path.abspath(filename))
275 injection_time_slides[cache_entry] = timeslides.load_time_slides(filename, verbose = options.verbose).values()
276 injection_seglistdict |=
compute_segment_lists(seglistdict, injection_time_slides[cache_entry], options.minimum_gap, options.timing_params, full_segments = options.full_segments, verbose = options.verbose)
285for key, offset
in tiling_phase.items():
286 if key
in background_seglistdict:
287 background_seglistdict[key].shift(offset)
288 if key
in injection_seglistdict:
289 injection_seglistdict[key].shift(offset)
290background_seglistdict &= seglistdict
291injection_seglistdict &= seglistdict
299power.make_dag_directories(config_parser)
300dag = pipeline.CondorDAG(tempfile.mkstemp(
".log",
"power_", options.condor_log_dir)[1])
301dag.set_dag_file(os.path.splitext(filenames[0])[0])
309datafinds = power.make_datafind_stage(dag, injection_seglistdict | background_seglistdict, verbose = options.verbose)
317def make_coinc_branch(dag, datafinds, seglistdict, time_slides, timing_params, psds_per_power, enable_clustering, tag, do_injections = False, verbose = False):
322 assert len(time_slides) == 1
324 print(
"Building lalapps_binj jobs ...", file=sys.stderr)
325 binjnodes = power.make_binj_fragment(dag, seglistdict.extent_all(), time_slides.keys()[0], tag, 0.0, float(power.powerjob.get_opts()[
"low-freq-cutoff"]), float(power.powerjob.get_opts()[
"low-freq-cutoff"]) + float(power.powerjob.get_opts()[
"bandwidth"]))
330 for node
in datafinds:
331 for binjnode
in binjnodes:
332 node.add_parent(binjnode)
340 trigger_nodes = power.make_single_instrument_stage(dag, datafinds, seglistdict, tag, timing_params, psds_per_power, binjnodes = binjnodes, verbose = verbose)
341 if enable_clustering:
343 print(
"building pre-lladd bucluster jobs ...", file=sys.stderr)
344 trigger_nodes = power.make_bucluster_fragment(dag, trigger_nodes,
"PRELLADD_%s" % tag, verbose = verbose)
351 binj_cache = set([cache_entry
for node
in binjnodes
for cache_entry
in node.get_output_cache()])
354 assert len(binj_cache) < 2
355 for n, (time_slides_cache_entry, these_time_slides)
in enumerate(time_slides.items()):
357 print(
"%s %d/%d (%s):" % (tag, n + 1, len(time_slides), time_slides_cache_entry.path), file=sys.stderr)
358 tisi_cache = set([time_slides_cache_entry])
362 extra_input_cache = set()
366 extra_input_cache = tisi_cache
368 for seg, parents, cache, clipseg
in power.group_coinc_parents(trigger_nodes, these_time_slides, verbose = verbose):
369 nodes |= power.make_lladd_fragment(dag, parents | binjnodes,
"%s_%d" % (tag, n), segment = seg, input_cache = cache | binj_cache, extra_input_cache = extra_input_cache, remove_input = do_injections, preserve_cache = binj_cache | tisi_cache)
370 if enable_clustering:
372 print(
"building post-lladd bucluster jobs ...", file=sys.stderr)
373 nodes = power.make_bucluster_fragment(dag, nodes,
"POSTLLADD_%s_%d" % (tag, n), verbose = verbose)
375 print(
"building burca jobs ...", file=sys.stderr)
376 coinc_nodes |= power.make_burca_fragment(dag, nodes,
"%s_%d" % (tag, n), verbose = verbose)
378 print(
"done %s %d/%d" % (tag, n + 1, len(time_slides)), file=sys.stderr)
386 print(
"building binjfind jobs ...", file=sys.stderr)
387 coinc_nodes = power.make_binjfind_fragment(dag, coinc_nodes, tag, verbose = verbose)
394 print(
"building sqlite jobs ...", file=sys.stderr)
395 coinc_nodes = power.make_sqlite_fragment(dag, coinc_nodes, tag, verbose = verbose)
401 power.write_output_cache(coinc_nodes,
"%s_%s_output.cache" % (os.path.splitext(dag.get_dag_file())[0], tag))
405coinc_nodes =
make_coinc_branch(dag, datafinds, background_seglistdict, background_time_slides, options.timing_params, options.psds_per_power, options.enable_clustering, options.tag, do_injections =
False, verbose = options.verbose)
406inj_coinc_nodes =
make_coinc_branch(dag, datafinds, injection_seglistdict, injection_time_slides, options.timing_params, options.psds_per_injection, options.enable_clustering,
"INJECTIONS_RUN_0_%s" % options.tag, do_injections =
True, verbose = options.verbose)
415 print(
"writing dag ...", file=sys.stderr)
def compute_segment_lists(seglistdict, time_slides, minimum_gap, timing_params, full_segments=True, verbose=False)
def make_coinc_branch(dag, datafinds, seglistdict, time_slides, timing_params, psds_per_power, enable_clustering, tag, do_injections=False, verbose=False)
def parse_config_file(options)