Loading [MathJax]/extensions/TeX/AMSsymbols.js
LALApps 10.1.0.1-5e288d3
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
lalapps_power_pipe.py
Go to the documentation of this file.
1##python
2#
3# Copyright (C) 2005--2010,2013,2015,2016,2019 Kipp Cannon
4# Copyright (C) 2004--2006 Saikat Ray-Majumder
5# Copyright (C) 2003--2005 Duncan Brown
6#
7# This program is free software; you can redistribute it and/or modify it
8# under the terms of the GNU General Public License as published by the
9# Free Software Foundation; either version 2 of the License, or (at your
10# option) any later version.
11#
12# This program is distributed in the hope that it will be useful, but
13# WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
15# Public License for more details.
16#
17# You should have received a copy of the GNU General Public License along
18# with this program; if not, write to the Free Software Foundation, Inc.,
19# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
20
21
22#
23# =============================================================================
24#
25# Preamble
26#
27# =============================================================================
28#
29
30
31"""
32Excess power offline pipeline construction script.
33"""
34
35
36import math
37from optparse import OptionParser
38import os
39import sys
40import tempfile
41from configparser import (ConfigParser, NoOptionError)
42
43
44import igwn_segments as segments
45from igwn_segments import utils as segmentsUtils
46from lal import LIGOTimeGPS
47from lal import pipeline
48from lal.utils import CacheEntry
49from lalburst import cafe
50from lalburst import timeslides
51from lalburst import power
52
53
54__author__ = "Kipp Cannon <kipp@gravity.phys.uwm.edu>"
55__date__ = "$Date$"
56__version__ = "$Revision$"
57
58
59#
60# =============================================================================
61#
62# Command Line
63#
64# =============================================================================
65#
66
67
69 parser = OptionParser(
70 version = "%prog CVS $Id$",
71 description = "%prog builds an excess power pipeline DAG suitable for running at the various LSC Data Grid sites. The script requires a configuration file. An example file can be found in the LALApps CVS."
72 )
73 parser.add_option("--condor-log-dir", metavar = "path", default = ".", help = "Set the directory for Condor log files (default = \".\").")
74 parser.add_option("--config-file", metavar = "filename", default = "power.ini", help = "Set .ini configuration file name (default = \"power.ini\").")
75 parser.add_option("--full-segments", action = "store_true", help = "Analyze all data from segment lists, not just coincident times.")
76 parser.add_option("--minimum-gap", metavar = "seconds", type = "float", default = 60.0, help = "Merge jobs analyzing data from the same instrument if the gap between them is less than this many seconds (default = 60).")
77 parser.add_option("--variant", metavar = "[injections|noninjections|both]", default = "both", help = "Select the variant of the pipeline to construct. \"injections\" produces a simulations-only version of the pipeline, \"noninjections\" produces a version with no simulation jobs, and \"both\" produces a full pipeline with both simulation and non-simulation jobs.")
78 parser.add_option("--background-time-slides", metavar = "filename", default = [], action = "append", help = "Set file from which to obtain the time slide table for use in the background branch of the pipeline (default = \"background_time_slides.xml.gz\"). Provide this argument multiple times to provide multiple time slide files, each will result in a separate set of lalburst_coinc jobs.")
79 parser.add_option("--injection-time-slides", metavar = "filename", help = "Set file from which to obtain the time slide table for use in the injection branch of the pipeline (default = \"injection_time_slides.xml.gz\").")
80 parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
81 options, filenames = parser.parse_args()
82
83 if options.variant not in ("injections", "noninjections", "both"):
84 raise ValueError("unrecognized --variant %s" % options.variant)
85 options.do_injections = options.variant in ("injections", "both")
86 options.do_noninjections = options.variant in ("noninjections", "both")
87
88 if options.do_injections and not options.injection_time_slides:
89 raise ValueError("missing required --injection-time-slides argument")
90 if options.do_noninjections and not options.background_time_slides:
91 raise ValueError("missing required --background-time-slides argument")
92
93 # simplifies life later by allowing the background and injection
94 # branches of the dag to be constructed with nearly identical code
95 options.injection_time_slides = [options.injection_time_slides]
96
97 return options, (filenames or ["power.dag"])
98
99
100#
101# =============================================================================
102#
103# Config
104#
105# =============================================================================
106#
107
108
109def parse_config_file(options):
110 if options.verbose:
111 print("reading %s ..." % options.config_file, file=sys.stderr)
112 config = ConfigParser()
113 config.read(options.config_file)
114
115 options.tag = config.get("pipeline", "user_tag")
116 options.enable_clustering = config.getboolean("pipeline", "enable_clustering")
117
118 seglistdict = segments.segmentlistdict()
119 tiling_phase = {}
120 for ifo in config.get("pipeline", "ifos").split():
121 seglistdict[ifo] = segmentsUtils.fromsegwizard(open(config.get("pipeline", "seglist_%s" % ifo)), coltype = LIGOTimeGPS).coalesce()
122 try:
123 offset = config.getfloat("pipeline", "tiling_phase_%s" % ifo)
124 except NoOptionError:
125 offset = 0.0
126 if offset:
127 tiling_phase[ifo] = offset
128
129 options.psds_per_power = config.getint("pipeline", "psds_per_power")
130 options.psds_per_injection = config.getint("pipeline", "psds_per_injection")
131 options.timing_params = power.TimingParameters(config)
132
133 return seglistdict, tiling_phase, config
134
135
136#
137# =============================================================================
138#
139# Determine Segment List
140#
141# =============================================================================
142#
143
144
145def compute_segment_lists(seglistdict, time_slides, minimum_gap, timing_params, full_segments = True, verbose = False):
146 if verbose:
147 print("constructing segment list ...", file=sys.stderr)
148
149 seglistdict = seglistdict.copy()
150
151 if not full_segments:
152 # cull too-short single-instrument segments from the input
153 # segmentlist dictionary; this can significantly increase
154 # the speed of the get_coincident_segmentlistdict()
155 # function when the input segmentlists have had many data
156 # quality holes poked out of them
157 power.remove_too_short_segments(seglistdict, timing_params)
158
159 # extract the segments that are coincident under the time
160 # slides
161 new = cafe.get_coincident_segmentlistdict(seglistdict, time_slides)
162
163 # adjust surviving segment lengths up to the next integer
164 # number of PSDs
165 for seglist in new.values():
166 # Try Adjusting Upper Bounds:
167
168 # count the number of PSDs in each segment
169 psds = [power.psds_from_job_length(timing_params, float(abs(seg))) for seg in seglist]
170
171 # round up to the nearest integer.
172 psds = [int(math.ceil(max(n, 1.0))) for n in psds]
173
174 # compute the duration of each job
175 durations = [power.job_length_from_psds(timing_params, n) for n in psds]
176
177 # update segment list
178 for i, seg in enumerate(seglist):
179 seglist[i] = segments.segment(seg[0], seg[0] + durations[i])
180
181 # and take intersection with original segments to
182 # not exceed original bounds
183 new &= seglistdict
184
185 # Try Adjusting Lower Bounds:
186
187 # count the number of PSDs in each segment
188 psds = [power.psds_from_job_length(timing_params, float(abs(seg))) for seg in seglist]
189
190 # round up to the nearest integer.
191 psds = [int(math.ceil(max(n, 1.0))) for n in psds]
192
193 # compute the duration of each job
194 durations = [power.job_length_from_psds(timing_params, n) for n in psds]
195
196 # update segment list
197 for i, seg in enumerate(seglist):
198 seglist[i] = segments.segment(seg[1] - durations[i], seg[1])
199
200 # and take intersection with original segments to
201 # not exceed original bounds
202 new &= seglistdict
203
204
205 # try to fill gaps between jobs
206 new.protract(minimum_gap / 2).contract(minimum_gap / 2)
207
208 # and take intersection with original segments to not
209 # exceed original bounds
210 seglistdict &= new
211
212 # remove segments that are too short
213 power.remove_too_short_segments(seglistdict, timing_params)
214
215 # done
216 return seglistdict
217
218
219#
220# =============================================================================
221#
222# DAG Construction
223#
224# =============================================================================
225#
226
227
228#
229# Command line
230#
231
232
233options, filenames = parse_command_line()
234
235
236#
237# Parse .ini file, loading the single-instrument segment lists while at it.
238#
239
240
241seglistdict, tiling_phase, config_parser = parse_config_file(options)
242
243
244#
245# Define .sub files
246#
247
248
249power.init_job_types(config_parser)
250
251
252#
253# Using time slide information, construct segment lists describing times
254# requiring trigger construction.
255#
256
257
258if options.verbose:
259 print("Computing segments for which lalapps_power jobs are required ...", file=sys.stderr)
260
261background_time_slides = {}
262background_seglistdict = segments.segmentlistdict()
263if options.do_noninjections:
264 for filename in options.background_time_slides:
265 cache_entry = CacheEntry(None, None, None, "file://localhost" + os.path.abspath(filename))
266 background_time_slides[cache_entry] = timeslides.load_time_slides(filename, verbose = options.verbose).values()
267 background_seglistdict |= compute_segment_lists(seglistdict, background_time_slides[cache_entry], options.minimum_gap, options.timing_params, full_segments = options.full_segments, verbose = options.verbose)
268
269
270injection_time_slides = {}
271injection_seglistdict = segments.segmentlistdict()
272if options.do_injections:
273 for filename in options.injection_time_slides:
274 cache_entry = CacheEntry(None, None, None, "file://localhost" + os.path.abspath(filename))
275 injection_time_slides[cache_entry] = timeslides.load_time_slides(filename, verbose = options.verbose).values()
276 injection_seglistdict |= compute_segment_lists(seglistdict, injection_time_slides[cache_entry], options.minimum_gap, options.timing_params, full_segments = options.full_segments, verbose = options.verbose)
277
278
279# apply time shifts to segment lists to shift tiling phases, but take
280# intersection with original segments to stay within allowed times. Note:
281# can't use segmentlistdict's offset mechanism to do this because we need
282# the offsets to still be 0 for coincidence testing later.
283
284
285for key, offset in tiling_phase.items():
286 if key in background_seglistdict:
287 background_seglistdict[key].shift(offset)
288 if key in injection_seglistdict:
289 injection_seglistdict[key].shift(offset)
290background_seglistdict &= seglistdict
291injection_seglistdict &= seglistdict
292
293
294#
295# Start DAG
296#
297
298
299power.make_dag_directories(config_parser)
300dag = pipeline.CondorDAG(tempfile.mkstemp(".log", "power_", options.condor_log_dir)[1])
301dag.set_dag_file(os.path.splitext(filenames[0])[0])
302
303
304#
305# Build datafind jobs.
306#
307
308
309datafinds = power.make_datafind_stage(dag, injection_seglistdict | background_seglistdict, verbose = options.verbose)
310
311
312#
313# Main analysis
314#
315
316
317def make_coinc_branch(dag, datafinds, seglistdict, time_slides, timing_params, psds_per_power, enable_clustering, tag, do_injections = False, verbose = False):
318 # injection list
319
320
321 if do_injections:
322 assert len(time_slides) == 1
323 if verbose:
324 print("Building lalapps_binj jobs ...", file=sys.stderr)
325 binjnodes = power.make_binj_fragment(dag, seglistdict.extent_all(), time_slides.keys()[0], tag, 0.0, float(power.powerjob.get_opts()["low-freq-cutoff"]), float(power.powerjob.get_opts()["low-freq-cutoff"]) + float(power.powerjob.get_opts()["bandwidth"]))
326 # add binj nodes as parents of the datafinds to force the binj's to
327 # be run first. this ensures that once a datafind has run the
328 # power jobs that follow it will immediately be able to run, which
329 # helps depth-first dagman do smarter things.
330 for node in datafinds:
331 for binjnode in binjnodes:
332 node.add_parent(binjnode)
333 else:
334 binjnodes = set()
335
336
337 # single-instrument trigger generation
338
339
340 trigger_nodes = power.make_single_instrument_stage(dag, datafinds, seglistdict, tag, timing_params, psds_per_power, binjnodes = binjnodes, verbose = verbose)
341 if enable_clustering:
342 if verbose:
343 print("building pre-lladd bucluster jobs ...", file=sys.stderr)
344 trigger_nodes = power.make_bucluster_fragment(dag, trigger_nodes, "PRELLADD_%s" % tag, verbose = verbose)
345
346
347 # coincidence analysis
348
349
350 coinc_nodes = set()
351 binj_cache = set([cache_entry for node in binjnodes for cache_entry in node.get_output_cache()])
352 # otherwise too many copies of the offset vector will be fed into
353 # burca
354 assert len(binj_cache) < 2
355 for n, (time_slides_cache_entry, these_time_slides) in enumerate(time_slides.items()):
356 if verbose:
357 print("%s %d/%d (%s):" % (tag, n + 1, len(time_slides), time_slides_cache_entry.path), file=sys.stderr)
358 tisi_cache = set([time_slides_cache_entry])
359 if do_injections:
360 # lalapps_binj has already copied the time slide
361 # document into its own output
362 extra_input_cache = set()
363 else:
364 # ligolw_add needs to copy the time slide document
365 # into is output
366 extra_input_cache = tisi_cache
367 nodes = set()
368 for seg, parents, cache, clipseg in power.group_coinc_parents(trigger_nodes, these_time_slides, verbose = verbose):
369 nodes |= power.make_lladd_fragment(dag, parents | binjnodes, "%s_%d" % (tag, n), segment = seg, input_cache = cache | binj_cache, extra_input_cache = extra_input_cache, remove_input = do_injections, preserve_cache = binj_cache | tisi_cache)
370 if enable_clustering:
371 if verbose:
372 print("building post-lladd bucluster jobs ...", file=sys.stderr)
373 nodes = power.make_bucluster_fragment(dag, nodes, "POSTLLADD_%s_%d" % (tag, n), verbose = verbose)
374 if verbose:
375 print("building burca jobs ...", file=sys.stderr)
376 coinc_nodes |= power.make_burca_fragment(dag, nodes, "%s_%d" % (tag, n), verbose = verbose)
377 if verbose:
378 print("done %s %d/%d" % (tag, n + 1, len(time_slides)), file=sys.stderr)
379
380
381 # injection identification
382
383
384 if do_injections:
385 if verbose:
386 print("building binjfind jobs ...", file=sys.stderr)
387 coinc_nodes = power.make_binjfind_fragment(dag, coinc_nodes, tag, verbose = verbose)
388
389
390 # conversion to SQLite database files
391
392
393 if verbose:
394 print("building sqlite jobs ...", file=sys.stderr)
395 coinc_nodes = power.make_sqlite_fragment(dag, coinc_nodes, tag, verbose = verbose)
396
397
398 # done
399
400
401 power.write_output_cache(coinc_nodes, "%s_%s_output.cache" % (os.path.splitext(dag.get_dag_file())[0], tag))
402 return coinc_nodes
403
404
405coinc_nodes = make_coinc_branch(dag, datafinds, background_seglistdict, background_time_slides, options.timing_params, options.psds_per_power, options.enable_clustering, options.tag, do_injections = False, verbose = options.verbose)
406inj_coinc_nodes = make_coinc_branch(dag, datafinds, injection_seglistdict, injection_time_slides, options.timing_params, options.psds_per_injection, options.enable_clustering, "INJECTIONS_RUN_0_%s" % options.tag, do_injections = True, verbose = options.verbose)
407
408
409#
410# Output
411#
412
413
414if options.verbose:
415 print("writing dag ...", file=sys.stderr)
416dag.write_sub_files()
417dag.write_dag()
def compute_segment_lists(seglistdict, time_slides, minimum_gap, timing_params, full_segments=True, verbose=False)
def make_coinc_branch(dag, datafinds, seglistdict, time_slides, timing_params, psds_per_power, enable_clustering, tag, do_injections=False, verbose=False)
def parse_config_file(options)