Loading [MathJax]/extensions/TeX/AMSsymbols.js
LALBurst 2.0.7.1-3a66518
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Macros Modules Pages
power.py
Go to the documentation of this file.
2# This program is free software; you can redistribute it and/or modify it under
3# the terms of the GNU General Public License as published by the Free Software
4# Foundation; either version 2 of the License, or (at your option) any later
5# version.
6#
7# This program is distributed in the hope that it will be useful, but WITHOUT
8# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
9# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
10# details.
11#
12# You should have received a copy of the GNU General Public License along with
13# this program; if not, write to the Free Software Foundation, Inc., 51
14# Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
15
16
17#
18# =============================================================================
19#
20# Preamble
21#
22# =============================================================================
23#
24
25
26"""
27Excess power pipeline construction tools.
28"""
29
30
31import errno
32import os
33import sys
34import time
35
36
37import igwn_segments as segments
38from igwn_segments import utils as segmentsUtils
39import lal
40from lal import iterutils
41from lal import pipeline
42from lal.utils import CacheEntry
43import lalburst
44from . import cafe
45
46
47__author__ = "Duncan Brown <duncan@gravity.phys.uwm.edu>, Kipp Cannon <kipp@gravity.phys.uwm.edu>"
48__date__ = "$Date$"
49__version__ = "$Revision$"
50
51
52#
53# =============================================================================
54#
55# Helpers
56#
57# =============================================================================
58#
59
60
61def get_universe(config_parser):
62 return config_parser.get("condor", "universe")
63
64
65def get_accounting_group(config_parser):
66 return config_parser.get("condor", "accounting_group")
67
68
69def get_executable(config_parser, name):
70 return config_parser.get("condor", name)
71
72
73def get_out_dir(config_parser):
74 return config_parser.get("pipeline", "out_dir")
75
76
77def get_cache_dir(config_parser):
78 return config_parser.get("pipeline", "cache_dir")
79
80
81def get_triggers_dir(config_parser):
82 return config_parser.get("pipeline", "triggers_dir")
83
84
86 try:
87 os.mkdir(dir)
88 except OSError as e:
89 if e.errno != errno.EEXIST:
90 # OK if directory exists, otherwise report error
91 raise e
92
93
94def make_dag_directories(config_parser):
97
98
99def get_files_per_bucluster(config_parser):
100 return config_parser.getint("pipeline", "files_per_bucluster")
101
102
103def get_files_per_bucut(config_parser):
104 return config_parser.getint("pipeline", "files_per_bucut")
105
106
107def get_files_per_burca(config_parser):
108 return config_parser.getint("pipeline", "files_per_burca")
109
110
111def get_files_per_binjfind(config_parser):
112 return config_parser.getint("pipeline", "files_per_binjfind")
113
114
115class TimingParameters(object):
116 """
117 A class to hold timing parameter values.
118 """
119 def __init__(self, config_parser):
120 # initialize from config file
121 self.resample_rate = config_parser.getfloat("lalapps_power", "resample-rate")
122 self.window_length = config_parser.getint("lalapps_power", "window-length")
123 self.max_tile_length = int(config_parser.getfloat("lalapps_power", "max-tile-duration") * self.resample_rate)
124 self.tile_stride_fraction = config_parser.getfloat("lalapps_power", "tile-stride-fraction")
125 self.filter_corruption = config_parser.getint("lalapps_power", "filter-corruption")
126 self.max_tile_bandwidth = config_parser.getfloat("lalapps_power", "max-tile-bandwidth")
127
128 # populate additional computed parameters from library code
129 self.psd_length, self.psd_shift, self.window_shift, self.window_pad, self.tiling_length = lalburst.EPGetTimingParameters(
130 self.window_length,
131 self.max_tile_length,
133 config_parser.getint("lalapps_power", "psd-average-points")
134 )
135
136
137def make_cache_entry(input_cache, description, path):
138 # summarize segment information
139 seglists = segments.segmentlistdict()
140 for c in input_cache:
141 seglists |= c.segmentlistdict
142
143 # obtain instrument list
144 instruments = seglists.keys()
145 if None in instruments:
146 instruments.remove(None)
147 instruments.sort()
148
149 # remove empty segment lists to allow extent_all() to work
150 for instrument in seglists.keys():
151 if not seglists[instrument]:
152 del seglists[instrument]
153
154 # make the URL
155 if path:
156 url = "file://localhost%s" % os.path.abspath(path)
157 else:
158 # FIXME: old version of CacheEntry allowed None for URL,
159 # new version doesn't. correct fix is to modify calling
160 # code to not try to initialize the output cache until
161 # after the input is known, but for now we'll just do this
162 # stupid hack.
163 url = "file://localhost/dev/null"
164
165 # construct a cache entry from the instruments and
166 # segments that remain
167 return CacheEntry("+".join(instruments) or None, description, seglists.extent_all(), url)
168
169
171 cache = [(cache_entry, parent) for parent in parents for cache_entry in parent.get_output_cache()]
172 cache.sort(key = lambda x: x[0].segment)
173 return cache
174
175
176def match_nodes_to_caches(nodes, caches):
177 """
178 For each cache, get the set of nodes whose output files it
179 contains. A node is allowed to provide more than one output file,
180 and thus can be listed in more than one set.
181 """
182 # cache_entry --> node loop-up table
183 nodes = set(nodes)
184 index = {}
185 for node in nodes:
186 for cache_entry in node.get_output_cache():
187 index[cache_entry] = node
188
189 # can't use [set()] * len(caches) for the normal reason
190 node_groups = [set() for cache in caches]
191
192 # form node groups matching input caches
193 for node_group, cache in zip(node_groups, caches):
194 for cache_entry in cache:
195 node_group.add(index[cache_entry])
196
197 # how many nodes didn't get used?
198 unused = len(nodes) - len(set.union(*node_groups))
199
200 # done
201 return node_groups, unused
202
203
204def cache_span(cache):
205 a = min([cache_entry.segment[0] for cache_entry in cache])
206 b = max([cache_entry.segment[1] for cache_entry in cache])
207 return segments.segment(a, b)
208
209
210#
211# How to write an output cache
212#
213
214
215def write_output_cache(nodes, filename):
216 f = file(filename, "w")
217 for cache_entry, node in collect_output_caches(nodes):
218 print(str(cache_entry), file=f)
219
220
221#
222# =============================================================================
223#
224# DAG Node and Job Class
225#
226# =============================================================================
227#
228
229
230class RMJob(pipeline.CondorDAGJob):
231 def __init__(self, config_parser):
232 """
233 config_parser = ConfigParser object
234 """
235 pipeline.CondorDAGJob.__init__(self, "local", "/bin/rm")
236 self.set_stdout_file(os.path.join(get_out_dir(config_parser), "rm-$(cluster)-$(process).out"))
237 self.set_stderr_file(os.path.join(get_out_dir(config_parser), "rm-$(cluster)-$(process).err"))
238 self.add_condor_cmd("getenv", "True")
239 self.add_condor_cmd("accounting_group", get_accounting_group(config_parser))
240 self.add_opt("force", "")
241 self.set_sub_file("rm.sub")
242
243
244class RMNode(pipeline.CondorDAGNode):
245 def __init__(self, job):
246 pipeline.CondorDAGNode.__init__(self, job)
247 self.input_cache = set()
248 self.output_cache = set()
249 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
250
251 def add_input_cache(self, cache):
252 self.input_cache |= cache
253 for cache_entry in cache:
254 pipeline.CondorDAGNode.add_file_arg(self, cache_entry.path)
255
257 return set()
258
259
260class BurstInjJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
261 """
262 A lalapps_binj job used by the power pipeline. The static options
263 are read from the [lalapps_binj] section in the ini file. The
264 stdout and stderr from the job are directed to the logs directory.
265 The job runs in the universe specified in the ini file. The path
266 to the executable is determined from the ini file.
267 """
268 def __init__(self, config_parser):
269 """
270 config_parser = ConfigParser object
271 """
272 pipeline.CondorDAGJob.__init__(self, get_universe(config_parser), get_executable(config_parser, "lalapps_binj"))
273 pipeline.AnalysisJob.__init__(self, config_parser)
274
275 # do this many injections between flow and fhigh inclusively
276 if config_parser.has_option("pipeline", "injection_bands"):
277 self.injection_bands = config_parser.getint("pipeline", "injection_bands")
278 else:
279 self.injection_bands = None
280
281 self.add_ini_opts(config_parser, "lalapps_binj")
282 self.set_stdout_file(os.path.join(get_out_dir(config_parser), "lalapps_binj-$(macrogpsstarttime)-$(macrogpsendtime)-$(cluster)-$(process).out"))
283 self.set_stderr_file(os.path.join(get_out_dir(config_parser), "lalapps_binj-$(macrogpsstarttime)-$(macrogpsendtime)-$(cluster)-$(process).err"))
284 self.add_condor_cmd("getenv", "True")
285 self.add_condor_cmd("accounting_group", get_accounting_group(config_parser))
286 self.set_sub_file("lalapps_binj.sub")
287
288 self.output_dir = "."
289
290 # one injection every time-step seconds
291 self.time_step = config_parser.getfloat("lalapps_binj", "time-step")
292
293
294class BurstInjNode(pipeline.CondorDAGNode,pipeline.AnalysisNode):
295 def __init__(self, job):
296 pipeline.CondorDAGNode.__init__(self, job)
297 pipeline.AnalysisNode.__init__(self)
298 self.__usertag = None
299 self.output_cache = []
300 self.output_dir = os.path.join(os.getcwd(), self.job().output_dir)
301 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
302
303 def set_user_tag(self, tag):
304 self.__usertag = tag
305 self.add_var_opt("user-tag", self.__usertag)
306
307 def get_user_tag(self):
308 if self.output_cache:
309 raise AttributeError("cannot change attributes after computing output cache")
310 return self.__usertag
311
312 def set_time_slide_file(self, filename):
313 self.add_var_opt("time-slide-file", filename)
314
316 return self.get_opts().get("macrotimeslidefile", None)
317
318 def set_start(self, start):
319 if self.output_cache:
320 raise AttributeError("cannot change attributes after computing output cache")
321 self.add_var_opt("gps-start-time", start)
322
323 def get_start(self):
324 return self.get_opts().get("macrogpsstarttime", None)
325
326 def set_end(self, end):
327 if self.output_cache:
328 raise AttributeError("cannot change attributes after computing output cache")
329 self.add_var_opt("gps-end-time", end)
330
331 def get_end(self):
332 return self.get_opts().get("macrogpsendtime", None)
333
334 def get_output_cache(self):
335 """
336 Returns a LAL cache of the output file name. Calling this
337 method also induces the output name to get set, so it must
338 be at least once.
339 """
340 if not self.output_cache:
341 # FIXME: instruments hardcoded to "everything"
342 self.output_cache = [CacheEntry("G1+H1+H2+L1+T1+V1", self.__usertag, segments.segment(lal.LIGOTimeGPS(self.get_start()), lal.LIGOTimeGPS(self.get_end())), "file://localhost" + os.path.abspath(self.get_output()))]
343 return self.output_cache
344
346 raise NotImplementedError
347
348 def get_output(self):
349 if self._AnalysisNode__output is None:
350 if None in (self.get_start(), self.get_end(), self.__usertag):
351 raise ValueError("start time, end time, ifo, or user tag has not been set")
352 seg = segments.segment(lal.LIGOTimeGPS(self.get_start()), lal.LIGOTimeGPS(self.get_end()))
353 self.set_output(os.path.join(self.output_dir, "G1+H1+H2+L1+T1+V1-INJECTIONS_%s-%d-%d.xml.gz" % (self.__usertag, int(self.get_start()), int(self.get_end() - self.get_start()))))
354 return self._AnalysisNode__output
355
356
357class PowerJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
358 """
359 A lalapps_power job used by the power pipeline. The static options
360 are read from the [lalapps_power] and [lalapps_power_<inst>]
361 sections in the ini file. The stdout and stderr from the job are
362 directed to the logs directory. The job runs in the universe
363 specified in the ini file. The path to the executable is determined
364 from the ini file.
365 """
366 def __init__(self, config_parser):
367 """
368 config_parser = ConfigParser object
369 """
370 pipeline.CondorDAGJob.__init__(self, get_universe(config_parser), get_executable(config_parser, "lalapps_power"))
371 pipeline.AnalysisJob.__init__(self, config_parser)
372 self.add_ini_opts(config_parser, "lalapps_power")
373 self.set_stdout_file(os.path.join(get_out_dir(config_parser), "lalapps_power-$(cluster)-$(process).out"))
374 self.set_stderr_file(os.path.join(get_out_dir(config_parser), "lalapps_power-$(cluster)-$(process).err"))
375 self.add_condor_cmd("getenv", "True")
376 self.add_condor_cmd("accounting_group", get_accounting_group(config_parser))
377 self.set_sub_file("lalapps_power.sub")
378
379 self.output_dir = "."
380
381
382class PowerNode(pipeline.AnalysisNode):
383 def __init__(self, job):
384 pipeline.CondorDAGNode.__init__(self, job)
385 pipeline.AnalysisNode.__init__(self)
386 self.__usertag = None
387 self.output_cache = []
388 self.output_dir = os.path.join(os.getcwd(), self.job().output_dir)
389 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
390
391 def set_ifo(self, instrument):
392 """
393 Load additional options from the per-instrument section in
394 the config file.
395 """
396 if self.output_cache:
397 raise AttributeError("cannot change attributes after computing output cache")
398 pipeline.AnalysisNode.set_ifo(self, instrument)
399 for optvalue in self.job()._AnalysisJob__cp.items("lalapps_power_%s" % instrument):
400 self.add_var_arg("--%s %s" % optvalue)
401
402 def set_user_tag(self, tag):
403 if self.output_cache:
404 raise AttributeError("cannot change attributes after computing output cache")
405 self.__usertag = tag
406 self.add_var_opt("user-tag", self.__usertag)
407
408 def get_user_tag(self):
409 return self.__usertag
410
411 def get_output_cache(self):
412 """
413 Returns a LAL cache of the output file name. Calling this
414 method also induces the output name to get set, so it must
415 be at least once.
416 """
417 if not self.output_cache:
418 self.output_cache = [CacheEntry(self.get_ifo(), self.__usertag, segments.segment(lal.LIGOTimeGPS(self.get_start()), lal.LIGOTimeGPS(self.get_end())), "file://localhost" + os.path.abspath(self.get_output()))]
419 return self.output_cache
420
422 raise NotImplementedError
423
424 def get_output(self):
425 if self._AnalysisNode__output is None:
426 if None in (self.get_start(), self.get_end(), self.get_ifo(), self.__usertag):
427 raise ValueError("start time, end time, ifo, or user tag has not been set")
428 seg = segments.segment(lal.LIGOTimeGPS(self.get_start()), lal.LIGOTimeGPS(self.get_end()))
429 self.set_output(os.path.join(self.output_dir, "%s-POWER_%s-%d-%d.xml.gz" % (self.get_ifo(), self.__usertag, int(self.get_start()), int(self.get_end()) - int(self.get_start()))))
430 return self._AnalysisNode__output
431
432 def set_mdccache(self, file):
433 """
434 Set the LAL frame cache to to use. The frame cache is
435 passed to the job with the --frame-cache argument. @param
436 file: calibration file to use.
437 """
438 self.add_var_opt("mdc-cache", file)
439 self.add_input_file(file)
440
441 def set_injection_file(self, file):
442 """
443 Set the name of the XML file from which to read a list of
444 software injections.
445 """
446 self.add_var_opt("injection-file", file)
447 self.add_input_file(file)
448
449
450class LigolwAddNode(pipeline.LigolwAddNode):
451 def __init__(self, job, remove_input, *args):
452 pipeline.LigolwAddNode.__init__(self, job, *args)
453 self.input_cache = []
454 self.output_cache = []
455 self.cache_dir = os.path.join(os.getcwd(), self.job().cache_dir)
456 self.output_dir = os.path.join(os.getcwd(), ".") # "." == self.job().output_dir except the job class doesn't yet have this info
457 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
458 self.remove_input = bool(remove_input)
459 if self.remove_input:
460 self.add_var_arg("--remove-input")
461
462 def __update_output_cache(self, observatory = None, segment = None):
463 del self.output_cache[:]
464 cache_entry = make_cache_entry(self.input_cache, None, self._AnalysisNode__output)
465 if observatory is not None:
466 cache_entry.observatory = observatory
467 if segment is not None:
468 cache_entry.segment = segment
469 cache_entry = self.output_cache.append(cache_entry)
470
471 def set_name(self, *args):
472 pipeline.LigolwAddNode.set_name(self, *args)
473 self.cache_name = os.path.join(self.cache_dir, "%s.cache" % self.get_name())
474 self.add_var_opt("input-cache", self.cache_name)
475
476 def add_input_cache(self, cache):
477 self.input_cache.extend(cache)
479
480 def set_output(self, path = None, observatory = None, segment = None):
481 pipeline.LigolwAddNode.set_output(self, path)
482 self.__update_output_cache(observatory = observatory, segment = segment)
483
484 def add_preserve_cache(self, cache):
485 if self.remove_input:
486 for c in cache:
487 self.add_var_arg("--remove-input-except %s" % c.path)
488
490 return self.input_cache
491
493 return self.output_cache
494
495 def write_input_files(self, *args):
496 f = file(self.cache_name, "w")
497 for c in self.input_cache:
498 print(str(c), file=f)
499 pipeline.LigolwAddNode.write_input_files(self, *args)
500
502 raise NotImplementedError
503
504 def get_output(self):
505 raise NotImplementedError
506
507
508class BucutJob(pipeline.CondorDAGJob):
509 def __init__(self, config_parser):
510 pipeline.CondorDAGJob.__init__(self, "vanilla", get_executable(config_parser, "lalburst_cut"))
511 self.set_sub_file("lalburst_cut.sub")
512 self.set_stdout_file(os.path.join(get_out_dir(config_parser), "lalburst_cut-$(cluster)-$(process).out"))
513 self.set_stderr_file(os.path.join(get_out_dir(config_parser), "lalburst_cut-$(cluster)-$(process).err"))
514 self.add_condor_cmd("getenv", "True")
515 self.add_condor_cmd("accounting_group", get_accounting_group(config_parser))
516 self.add_condor_cmd("Requirements", "Memory > 1100")
517 self.add_ini_opts(config_parser, "lalburst_cut")
518
520 if self.files_per_bucut < 1:
521 raise ValueError("files_per_bucut < 1")
522
523
524class BucutNode(pipeline.CondorDAGNode):
525 def __init__(self, *args):
526 pipeline.CondorDAGNode.__init__(self, *args)
527 self.input_cache = []
529 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
530
531 def add_input_cache(self, cache):
532 self.input_cache.extend(cache)
533 for c in cache:
534 filename = c.path
535 pipeline.CondorDAGNode.add_file_arg(self, filename)
536 self.add_output_file(filename)
537
538 def add_file_arg(self, filename):
539 raise NotImplementedError
540
542 return self.input_cache
543
545 return self.output_cache
546
548 raise NotImplementedError
549
550 def get_output(self):
551 raise NotImplementedError
552
553
554class BuclusterJob(pipeline.CondorDAGJob):
555 def __init__(self, config_parser):
556 pipeline.CondorDAGJob.__init__(self, "vanilla", get_executable(config_parser, "lalburst_cluster"))
557 self.set_sub_file("lalburst_cluster.sub")
558 self.set_stdout_file(os.path.join(get_out_dir(config_parser), "lalburst_cluster-$(cluster)-$(process).out"))
559 self.set_stderr_file(os.path.join(get_out_dir(config_parser), "lalburst_cluster-$(cluster)-$(process).err"))
560 self.add_condor_cmd("getenv", "True")
561 self.add_condor_cmd("accounting_group", get_accounting_group(config_parser))
562 self.add_condor_cmd("Requirements", "Memory > 1100")
563 self.add_ini_opts(config_parser, "lalburst_cluster")
564
565 self.cache_dir = get_cache_dir(config_parser)
566
568 if self.files_per_bucluster < 1:
569 raise ValueError("files_per_bucluster < 1")
570
571
572class BuclusterNode(pipeline.CondorDAGNode):
573 def __init__(self, *args):
574 pipeline.CondorDAGNode.__init__(self, *args)
575 self.input_cache = []
577 self.cache_dir = os.path.join(os.getcwd(), self.job().cache_dir)
578 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
579
580 def set_name(self, *args):
581 pipeline.CondorDAGNode.set_name(self, *args)
582 self.cache_name = os.path.join(self.cache_dir, "%s.cache" % self.get_name())
583 self.add_var_opt("input-cache", self.cache_name)
584
585 def add_input_cache(self, cache):
586 self.input_cache.extend(cache)
587
588 def add_file_arg(self, filename):
589 raise NotImplementedError
590
591 def write_input_files(self, *args):
592 f = file(self.cache_name, "w")
593 for c in self.input_cache:
594 print(str(c), file=f)
595 pipeline.CondorDAGNode.write_input_files(self, *args)
596
598 return self.input_cache
599
601 return self.output_cache
602
604 raise NotImplementedError
605
606 def get_output(self):
607 raise NotImplementedError
608
609
610class BinjfindJob(pipeline.CondorDAGJob):
611 def __init__(self, config_parser):
612 pipeline.CondorDAGJob.__init__(self, "vanilla", get_executable(config_parser, "lalburst_injfind"))
613 self.set_sub_file("lalburst_injfind.sub")
614 self.set_stdout_file(os.path.join(get_out_dir(config_parser), "lalburst_injfind-$(cluster)-$(process).out"))
615 self.set_stderr_file(os.path.join(get_out_dir(config_parser), "lalburst_injfind-$(cluster)-$(process).err"))
616 self.add_condor_cmd("getenv", "True")
617 self.add_condor_cmd("accounting_group", get_accounting_group(config_parser))
618 self.add_ini_opts(config_parser, "lalburst_injfind")
619
621 if self.files_per_binjfind < 1:
622 raise ValueError("files_per_binjfind < 1")
623
624
625class BinjfindNode(pipeline.CondorDAGNode):
626 def __init__(self, *args):
627 pipeline.CondorDAGNode.__init__(self, *args)
628 self.input_cache = []
630 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
631
632 def add_input_cache(self, cache):
633 self.input_cache.extend(cache)
634 for c in cache:
635 filename = c.path
636 pipeline.CondorDAGNode.add_file_arg(self, filename)
637 self.add_output_file(filename)
638
639 def add_file_arg(self, filename):
640 raise NotImplementedError
641
643 return self.input_cache
644
646 return self.output_cache
647
649 raise NotImplementedError
650
651 def get_output(self):
652 raise NotImplementedError
653
654
655class BurcaJob(pipeline.CondorDAGJob):
656 def __init__(self, config_parser):
657 pipeline.CondorDAGJob.__init__(self, "vanilla", get_executable(config_parser, "lalburst_coinc"))
658 self.set_sub_file("lalburst_coinc.sub")
659 self.set_stdout_file(os.path.join(get_out_dir(config_parser), "lalburst_coinc-$(cluster)-$(process).out"))
660 self.set_stderr_file(os.path.join(get_out_dir(config_parser), "lalburst_coinc-$(cluster)-$(process).err"))
661 self.add_condor_cmd("getenv", "True")
662 self.add_condor_cmd("accounting_group", get_accounting_group(config_parser))
663 self.add_condor_cmd("Requirements", "Memory >= $(macrominram)")
664 self.add_ini_opts(config_parser, "lalburst_coinc")
665
667 if self.files_per_burca < 1:
668 raise ValueError("files_per_burca < 1")
669
670
671class Burca2Job(pipeline.CondorDAGJob):
672 def __init__(self, config_parser):
673 pipeline.CondorDAGJob.__init__(self, "vanilla", get_executable(config_parser, "lalburst_coinc"))
674 self.set_sub_file("lalburst_coinc2.sub")
675 self.set_stdout_file(os.path.join(get_out_dir(config_parser), "lalburst_coinc2-$(cluster)-$(process).out"))
676 self.set_stderr_file(os.path.join(get_out_dir(config_parser), "lalburst_coinc2-$(cluster)-$(process).err"))
677 self.add_condor_cmd("getenv", "True")
678 self.add_condor_cmd("accounting_group", get_accounting_group(config_parser))
679 self.add_ini_opts(config_parser, "lalburst_coinc2")
680
681 self.cache_dir = get_cache_dir(config_parser)
682
683
684class BurcaNode(pipeline.CondorDAGNode):
685 def __init__(self, *args):
686 pipeline.CondorDAGNode.__init__(self, *args)
687 self.input_cache = []
689 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
690
691 def add_input_cache(self, cache):
692 self.input_cache.extend(cache)
693 for c in cache:
694 filename = c.path
695 pipeline.CondorDAGNode.add_file_arg(self, filename)
696 self.add_output_file(filename)
697 longest_duration = max(abs(cache_entry.segment) for cache_entry in self.input_cache)
698 if longest_duration > 25000:
699 # ask for >= 1300 MB
700 self.add_macro("macrominram", 1300)
701 elif longest_duration > 10000:
702 # ask for >= 800 MB
703 self.add_macro("macrominram", 800)
704 else:
705 # run on any node
706 self.add_macro("macrominram", 0)
707
708 def add_file_arg(self, filename):
709 raise NotImplementedError
710
712 return self.input_cache
713
715 return self.output_cache
716
718 raise NotImplementedError
719
720 def get_output(self):
721 raise NotImplementedError
722
723 def set_coincidence_segments(self, seglist):
724 self.add_var_arg("--coincidence-segments %s" % ",".join(segmentsUtils.to_range_strings(seglist)))
725
726
727class SQLiteJob(pipeline.CondorDAGJob):
728 def __init__(self, config_parser):
729 pipeline.CondorDAGJob.__init__(self, "vanilla", get_executable(config_parser, "ligolw_sqlite"))
730 self.set_sub_file("ligolw_sqlite.sub")
731 self.set_stdout_file(os.path.join(get_out_dir(config_parser), "ligolw_sqlite-$(cluster)-$(process).out"))
732 self.set_stderr_file(os.path.join(get_out_dir(config_parser), "ligolw_sqlite-$(cluster)-$(process).err"))
733 self.add_condor_cmd("getenv", "True")
734 self.add_condor_cmd("accounting_group", get_accounting_group(config_parser))
735 self.add_ini_opts(config_parser, "ligolw_sqlite")
736
737
738class SQLiteNode(pipeline.CondorDAGNode):
739 def __init__(self, *args):
740 pipeline.CondorDAGNode.__init__(self, *args)
741 self.input_cache = []
742 self.output_cache = []
743 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
744
745 def add_input_cache(self, cache):
746 if self.output_cache:
747 raise AttributeError("cannot change attributes after computing output cache")
748 self.input_cache.extend(cache)
749 for c in cache:
750 filename = c.path
751 pipeline.CondorDAGNode.add_file_arg(self, filename)
752 self.add_output_file(filename)
753
754 def add_file_arg(self, filename):
755 raise NotImplementedError
756
757 def set_output(self, filename):
758 if self.output_cache:
759 raise AttributeError("cannot change attributes after computing output cache")
760 self.add_macro("macrodatabase", filename)
761
763 return self.input_cache
764
766 if not self.output_cache:
767 self.output_cache = [make_cache_entry(self.input_cache, None, self.get_opts()["macrodatabase"])]
768 return self.output_cache
769
771 raise NotImplementedError
772
773 def get_output(self):
774 raise NotImplementedError
775
776
777class BurcaTailorJob(pipeline.CondorDAGJob):
778 def __init__(self, config_parser):
779 pipeline.CondorDAGJob.__init__(self, "vanilla", get_executable(config_parser, "lalburst_power_meas_likelihood"))
780 self.set_sub_file("lalburst_power_meas_likelihood.sub")
781 self.set_stdout_file(os.path.join(get_out_dir(config_parser), "lalburst_power_meas_likelihood-$(cluster)-$(process).out"))
782 self.set_stderr_file(os.path.join(get_out_dir(config_parser), "lalburst_power_meas_likelihood-$(cluster)-$(process).err"))
783 self.add_condor_cmd("getenv", "True")
784 self.add_condor_cmd("accounting_group", get_accounting_group(config_parser))
785 self.add_ini_opts(config_parser, "lalburst_power_meas_likelihood")
786
787 self.cache_dir = get_cache_dir(config_parser)
788 self.output_dir = "."
789
790
791class BurcaTailorNode(pipeline.CondorDAGNode):
792 def __init__(self, *args):
793 pipeline.CondorDAGNode.__init__(self, *args)
794 self.input_cache = []
795 self.output_cache = []
796 self.cache_dir = os.path.join(os.getcwd(), self.job().cache_dir)
797 self.output_dir = os.path.join(os.getcwd(), self.job().output_dir)
798 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
799
800 def set_name(self, *args):
801 pipeline.CondorDAGNode.set_name(self, *args)
802 self.cache_name = os.path.join(self.cache_dir, "%s.cache" % self.get_name())
803
804 def add_input_cache(self, cache):
805 if self.output_cache:
806 raise AttributeError("cannot change attributes after computing output cache")
807 self.input_cache.extend(cache)
808 for c in cache:
809 filename = c.path
810 pipeline.CondorDAGNode.add_file_arg(self, filename)
811 self.add_output_file(filename)
812
813 def add_file_arg(self, filename):
814 raise NotImplementedError
815
816 def set_output(self, description):
817 if self.output_cache:
818 raise AttributeError("cannot change attributes after computing output cache")
819 cache_entry = make_cache_entry(self.input_cache, description, "")
820 filename = os.path.join(self.output_dir, "%s-%s-%d-%d.xml.gz" % (cache_entry.observatory, cache_entry.description, int(cache_entry.segment[0]), int(abs(cache_entry.segment))))
821 self.add_var_opt("output", filename)
822 cache_entry.url = "file://localhost" + os.path.abspath(filename)
823 del self.output_cache[:]
824 self.output_cache.append(cache_entry)
825 return filename
826
828 return self.input_cache
829
831 if not self.output_cache:
832 raise AttributeError("must call set_output(description) first")
833 return self.output_cache
834
835 def write_input_files(self, *args):
836 # oh. my. god. this is fscked.
837 for arg in self.get_args():
838 if "--add-from-cache" in arg:
839 f = file(self.cache_name, "w")
840 for c in self.input_cache:
841 print(str(c), file=f)
842 pipeline.CondorDAGNode.write_input_files(self, *args)
843 break
844
846 raise NotImplementedError
847
848 def get_output(self):
849 raise NotImplementedError
850
851
852#
853# =============================================================================
854#
855# DAG Job Types
856#
857# =============================================================================
858#
859
860
861#
862# This is *SUCH* a hack I don't know where to begin. Please, shoot me.
863#
864
865
866datafindjob = None
867binjjob = None
868powerjob = None
869lladdjob = None
870binjfindjob = None
871bucutjob = None
872buclusterjob = None
873burcajob = None
874burca2job = None
875sqlitejob = None
876burcatailorjob = None
877
878
879def init_job_types(config_parser, job_types = ("datafind", "rm", "binj", "power", "lladd", "binjfind", "bucluster", "bucut", "burca", "burca2", "sqlite", "burcatailor")):
880 """
881 Construct definitions of the submit files.
882 """
883 global datafindjob, rmjob, binjjob, powerjob, lladdjob, binjfindjob, buclusterjob, llb2mjob, bucutjob, burcajob, burca2job, sqlitejob, burcatailorjob
884
885 # ligo_data_find
886 if "datafind" in job_types:
887 datafindjob = pipeline.LSCDataFindJob(os.path.join(os.getcwd(), get_cache_dir(config_parser)), os.path.join(os.getcwd(), get_out_dir(config_parser)), config_parser)
888
889 # rm
890 if "rm" in job_types:
891 rmjob = RMJob(config_parser)
892
893 # lalapps_binj
894 if "binj" in job_types:
895 binjjob = BurstInjJob(config_parser)
896
897 # lalapps_power
898 if "power" in job_types:
899 powerjob = PowerJob(config_parser)
900
901 # ligolw_add
902 if "lladd" in job_types:
903 lladdjob = pipeline.LigolwAddJob(os.path.join(get_out_dir(config_parser)), config_parser)
904 lladdjob.cache_dir = get_cache_dir(config_parser)
905
906 # lalburst_injfind
907 if "binjfind" in job_types:
908 binjfindjob = BinjfindJob(config_parser)
909
910 # lalburst_cut
911 if "bucut" in job_types:
912 bucutjob = BucutJob(config_parser)
913
914 # lalburst_cluster
915 if "bucluster" in job_types:
916 buclusterjob = BuclusterJob(config_parser)
917
918 # lalburst_coinc
919 if "burca" in job_types:
920 burcajob = BurcaJob(config_parser)
921
922 # lalburst_coinc2
923 if "burca2" in job_types:
924 burca2job = Burca2Job(config_parser)
925
926 # ligolw_sqlite
927 if "sqlite" in job_types:
928 sqlitejob = SQLiteJob(config_parser)
929
930 # lalburst_power_meas_likelihood
931 if "burcatailor" in job_types:
932 burcatailorjob = BurcaTailorJob(config_parser)
933
934
935#
936# =============================================================================
937#
938# Segmentation
939#
940# =============================================================================
941#
942
943
944def psds_from_job_length(timing_params, t):
945 """
946 Return the number of PSDs that can fit into a job of length t
947 seconds. In general, the return value is a non-integer.
948 """
949 if t < 0:
950 raise ValueError(t)
951 # convert to samples, and remove filter corruption
952 t = t * timing_params.resample_rate - 2 * timing_params.filter_corruption
953 if t < timing_params.psd_length:
954 return 0
955 return (t - timing_params.psd_length) / timing_params.psd_shift + 1
956
957
958def job_length_from_psds(timing_params, psds):
959 """
960 From the analysis parameters and a count of PSDs, return the length
961 of the job in seconds.
962 """
963 if psds < 1:
964 raise ValueError(psds)
965 # number of samples
966 result = (psds - 1) * timing_params.psd_shift + timing_params.psd_length
967 # add filter corruption
968 result += 2 * timing_params.filter_corruption
969 # convert to seconds
970 return result / timing_params.resample_rate
971
972
973def split_segment(timing_params, segment, psds_per_job):
974 """
975 Split the data segment into correctly-overlaping segments. We try
976 to have the numbers of PSDs in each segment be equal to
977 psds_per_job, but with a short segment at the end if needed.
978 """
979 # in seconds
980 joblength = job_length_from_psds(timing_params, psds_per_job)
981 # in samples
982 joboverlap = 2 * timing_params.filter_corruption + (timing_params.psd_length - timing_params.psd_shift)
983 # in seconds
984 joboverlap /= timing_params.resample_rate
985
986 segs = segments.segmentlist()
987 t = segment[0]
988 while t + joblength <= segment[1]:
989 segs.append(segments.segment(t, t + joblength) & segment)
990 t += joblength - joboverlap
991
992 extra_psds = int(psds_from_job_length(timing_params, float(segment[1] - t)))
993 if extra_psds:
994 segs.append(segments.segment(t, t + job_length_from_psds(timing_params, extra_psds)))
995 return segs
996
997
998def segment_ok(timing_params, segment):
999 """
1000 Return True if the segment can be analyzed using lalapps_power.
1001 """
1002 return psds_from_job_length(timing_params, float(abs(segment))) >= 1.0
1003
1004
1005def remove_too_short_segments(seglistdict, timing_params):
1006 """
1007 Remove segments from seglistdict that are too short to analyze.
1008
1009 CAUTION: this function modifies seglistdict in place.
1010 """
1011 for seglist in seglistdict.values():
1012 iterutils.inplace_filter(lambda seg: segment_ok(timing_params, seg), seglist)
1013
1014
1015#
1016# =============================================================================
1017#
1018# Single Node Fragments
1019#
1020# =============================================================================
1021#
1022
1023
1024datafind_pad = 512
1025
1026
1027def make_datafind_fragment(dag, instrument, seg):
1028 node = pipeline.LSCDataFindNode(datafindjob)
1029 node.set_name("ligo_data_find-%s-%d-%d" % (instrument, int(seg[0]), int(abs(seg))))
1030 node.set_start(seg[0] - datafind_pad)
1031 node.set_end(seg[1] + 1)
1032 # FIXME: argh, I need the node to know what instrument it's for,
1033 # but can't call set_ifo() because that adds a --channel-name
1034 # command line argument (!?)
1035 node._AnalysisNode__ifo = instrument
1036 node.set_observatory(instrument[0])
1037 if node.get_type() is None:
1038 node.set_type(datafindjob.get_config_file().get("datafind", "type_%s" % instrument))
1039 node.set_retry(3)
1040 dag.add_node(node)
1041 return set([node])
1042
1043
1044def make_lladd_fragment(dag, parents, tag, segment = None, input_cache = None, remove_input = False, preserve_cache = None, extra_input_cache = None):
1045 node = LigolwAddNode(lladdjob, remove_input = remove_input)
1046
1047 # link to parents
1048 for parent in parents:
1049 node.add_parent(parent)
1050
1051 # build input cache
1052 if input_cache is None:
1053 # default is to use all output files from parents
1054 for parent in parents:
1055 node.add_input_cache(parent.get_output_cache())
1056 else:
1057 # but calling code can provide its own collection
1058 node.add_input_cache(input_cache)
1059 if extra_input_cache is not None:
1060 # sometimes it helps to add some extra
1061 node.add_input_cache(extra_input_cache)
1062 if preserve_cache is not None:
1063 node.add_preserve_cache(preserve_cache)
1064
1065 # construct names for the node and output file, and override the
1066 # segment if needed
1067 [cache_entry] = node.get_output_cache()
1068 if segment is None:
1069 segment = cache_entry.segment
1070 node.set_name("lladd_%s_%d_%d" % (tag, int(segment[0]), int(abs(segment))))
1071 node.set_output(os.path.join(node.output_dir, "%s-%s-%d-%d.xml.gz" % (cache_entry.observatory, tag, int(segment[0]), int(abs(segment)))), segment = segment)
1072
1073 node.set_retry(3)
1074 dag.add_node(node)
1075 return set([node])
1076
1077
1078def make_power_fragment(dag, parents, instrument, seg, tag, framecache, injargs = {}):
1079 node = PowerNode(powerjob)
1080 node.set_name("lalapps_power_%s_%s_%d_%d" % (tag, instrument, int(seg[0]), int(abs(seg))))
1081 map(node.add_parent, parents)
1082 # FIXME: PowerNode should not be subclassed from AnalysisNode,
1083 # because that class is too hard-coded. For example, there is no
1084 # way to switch to analysing gaussian noise except to comment out
1085 # this line in the code.
1086 node.set_cache(framecache)
1087 node.set_ifo(instrument)
1088 node.set_start(seg[0])
1089 node.set_end(seg[1])
1090 node.set_user_tag(tag)
1091 for arg, value in injargs.iteritems():
1092 # this is a hack, but I can't be bothered
1093 node.add_var_arg("--%s %s" % (arg, value))
1094 dag.add_node(node)
1095 return set([node])
1096
1097
1098def make_binj_fragment(dag, seg, time_slides_cache_entry, tag, offset, flow = None, fhigh = None):
1099 # adjust start time to be commensurate with injection period
1100 start = seg[0] - seg[0] % binjjob.time_step + binjjob.time_step * offset
1101
1102 node = BurstInjNode(binjjob)
1103 node.set_time_slide_file(time_slides_cache_entry.path)
1104 node.set_start(start)
1105 node.set_end(seg[1])
1106 if flow is not None:
1107 node.set_name("lalapps_binj_%s_%d_%d" % (tag, int(start), int(flow)))
1108 else:
1109 node.set_name("lalapps_binj_%s_%d" % (tag, int(start)))
1110 node.set_user_tag(tag)
1111 if flow is not None:
1112 node.add_macro("macroflow", flow)
1113 if fhigh is not None:
1114 node.add_macro("macrofhigh", fhigh)
1115 node.add_macro("macroseed", int(time.time()%100 + start))
1116 dag.add_node(node)
1117 return set([node])
1118
1119
1120def make_binjfind_fragment(dag, parents, tag, verbose = False):
1121 input_cache = collect_output_caches(parents)
1122 nodes = set()
1123 while input_cache:
1124 node = BinjfindNode(binjfindjob)
1125 node.add_input_cache([cache_entry for (cache_entry, parent) in input_cache[:binjfindjob.files_per_binjfind]])
1126 for parent in set(parent for cache_entry, parent in input_cache[:binjfindjob.files_per_binjfind]):
1127 node.add_parent(parent)
1128 del input_cache[:binjfindjob.files_per_binjfind]
1129 seg = cache_span(node.get_input_cache())
1130 node.set_name("lalburst_injfind_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1131 node.add_macro("macrocomment", tag)
1132 dag.add_node(node)
1133 nodes.add(node)
1134 return nodes
1135
1136
1137def make_bucluster_fragment(dag, parents, tag, verbose = False):
1138 input_cache = collect_output_caches(parents)
1139 nodes = set()
1140 while input_cache:
1141 node = BuclusterNode(buclusterjob)
1142 node.add_input_cache([cache_entry for (cache_entry, parent) in input_cache[:buclusterjob.files_per_bucluster]])
1143 for parent in set(parent for cache_entry, parent in input_cache[:buclusterjob.files_per_bucluster]):
1144 node.add_parent(parent)
1145 del input_cache[:buclusterjob.files_per_bucluster]
1146 seg = cache_span(node.get_input_cache())
1147 node.set_name("lalburst_cluster_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1148 node.add_macro("macrocomment", tag)
1149 node.set_retry(3)
1150 dag.add_node(node)
1151 nodes.add(node)
1152 return nodes
1153
1154
1155def make_bucut_fragment(dag, parents, tag, verbose = False):
1156 input_cache = collect_output_caches(parents)
1157 nodes = set()
1158 while input_cache:
1159 node = BucutNode(bucutjob)
1160 node.add_input_cache([cache_entry for (cache_entry, parent) in input_cache[:bucutjob.files_per_bucut]])
1161 for parent in set(parent for cache_entry, parent in input_cache[:bucutjob.files_per_bucut]):
1162 node.add_parent(parent)
1163 del input_cache[:bucutjob.files_per_bucut]
1164 seg = cache_span(node.get_input_cache())
1165 node.set_name("lalburst_cut_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1166 node.add_macro("macrocomment", tag)
1167 dag.add_node(node)
1168 nodes.add(node)
1169 return nodes
1170
1171
1172def make_burca_fragment(dag, parents, tag, coincidence_segments = None, verbose = False):
1173 input_cache = collect_output_caches(parents)
1174 if coincidence_segments is not None:
1175 # doesn't sense to supply this keyword argument for
1176 # more than one input file
1177 assert len(input_cache) == 1
1178 nodes = set()
1179 while input_cache:
1180 node = BurcaNode(burcajob)
1181 node.add_input_cache([cache_entry for (cache_entry, parent) in input_cache[:burcajob.files_per_burca]])
1182 for parent in set(parent for cache_entry, parent in input_cache[:burcajob.files_per_burca]):
1183 node.add_parent(parent)
1184 del input_cache[:burcajob.files_per_burca]
1185 seg = cache_span(node.get_input_cache())
1186 node.set_name("lalburst_coinc_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1187 if coincidence_segments is not None:
1188 node.set_coincidence_segments(coincidence_segments)
1189 node.add_macro("macrocomment", tag)
1190 dag.add_node(node)
1191 nodes.add(node)
1192 return nodes
1193
1194
1195def make_sqlite_fragment(dag, parents, tag, verbose = False):
1196 input_cache = collect_output_caches(parents)
1197 nodes = set()
1198 for cache_entry, parent in input_cache:
1199 node = SQLiteNode(sqlitejob)
1200 node.add_input_cache([cache_entry])
1201 node.add_parent(parent)
1202 node.set_name("ligolw_sqlite_%s_%d" % (tag, len(nodes)))
1203 node.set_output(cache_entry.path.replace(".xml.gz", ".sqlite"))
1204 dag.add_node(node)
1205 nodes.add(node)
1206 return nodes
1207
1208
1209def make_burca_tailor_fragment(dag, input_cache, seg, tag):
1210 input_cache = list(input_cache)
1211 input_cache.sort(reverse = True)
1212 nodes = set()
1213 max_cost_per_job = 25 # 10000 s -equivalent files
1214 while input_cache:
1215 cache = []
1216 cost = 0
1217 while input_cache and cost <= max_cost_per_job:
1218 cache.append(input_cache.pop())
1219 # cost porportional to segment duration squared
1220 cost += (float(abs(cache[-1].segment)) / 10000.0)**2
1221 node = BurcaTailorNode(burcatailorjob)
1222 node.add_input_cache(cache)
1223 node.set_name("lalburst_power_meas_likelihood_%s_%d_%d_%d" % (tag, int(seg[0]), int(abs(seg)), len(nodes)))
1224 node.set_output("%s_%d" % (tag, len(nodes)))
1225 dag.add_node(node)
1226 nodes.add(node)
1227 node = BurcaTailorNode(burcatailorjob)
1228 node.set_name("lalburst_power_meas_likelihood_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1229 for parent in nodes:
1230 node.add_parent(parent)
1231 node.add_input_cache(parent.get_output_cache())
1232 del node.get_args()[:]
1233 node.add_var_arg("--add-from-cache %s" % node.cache_name)
1234 node.set_output(tag)
1235 dag.add_node(node)
1236 delete_cache = set(node.get_input_cache()) - set(node.get_output_cache())
1237 if delete_cache:
1238 rmnode = RMNode(rmjob)
1239 rmnode.set_name("lalburst_power_meas_likelihood_rm_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1240 rmnode.add_parent(node)
1241 rmnode.add_input_cache(delete_cache)
1242 dag.add_node(rmnode)
1243 return set([node])
1244
1245
1246def make_burca2_fragment(dag, coinc_cache, likelihood_parents, tag):
1247 # FIXME: pass a node set instead of a cache
1248 #input_cache = collect_output_caches(coinc_parents)
1249 coinc_cache = list(coinc_cache)
1250 coinc_cache.sort(reverse = True)
1251
1252 likelihood_data_cache_filename = os.path.join(burca2job.cache_dir, "burca2_%s.cache" % tag)
1253 likelihood_data_cache_file = file(likelihood_data_cache_filename, "w")
1254 for cache_entry in [cache_entry for node in likelihood_parents for cache_entry in node.get_output_cache()]:
1255 print(str(cache_entry), file=likelihood_data_cache_file)
1256
1257 nodes = set()
1258 max_cost_per_job = 10 # 10000 s -equivalent files
1259 while coinc_cache:
1260 cache = []
1261 cost = 0
1262 while coinc_cache and cost <= max_cost_per_job:
1263 cache.append(coinc_cache.pop())
1264 # cost porportional to segment duration squared
1265 cost += (float(abs(cache[-1].segment)) / 10000.0)**2
1266 node = BurcaNode(burca2job)
1267 node.set_name("lalburst_coinc2_%s_%d" % (tag, len(nodes)))
1268 node.add_macro("macrocomment", tag)
1269 node.add_var_arg("--likelihood-data-cache %s" % likelihood_data_cache_filename)
1270 node.add_input_cache(cache)
1271 for parent in likelihood_parents:
1272 node.add_parent(parent)
1273 dag.add_node(node)
1274 nodes.add(node)
1275 return nodes
1276
1277
1278#
1279# =============================================================================
1280#
1281# ligo_data_find Stage
1282#
1283# =============================================================================
1284#
1285
1286
1287def make_datafind_stage(dag, seglists, verbose = False):
1288 if verbose:
1289 print("building ligo_data_find jobs ...", file=sys.stderr)
1290
1291 #
1292 # Fill gaps smaller than the padding added to each datafind job.
1293 # Filling in the gaps ensures that exactly 1 datafind job is
1294 # suitable for each lalapps_power job, and also hugely reduces the
1295 # number of ligo_data_find nodes in the DAG.
1296 #
1297
1298 filled = seglists.copy().protract(datafind_pad / 2).contract(datafind_pad / 2)
1299
1300 #
1301 # Build the nodes. Do this in time order to assist depth-first job
1302 # submission on clusters.
1303 #
1304
1305 segs = [(seg, instrument) for instrument, seglist in filled.iteritems() for seg in seglist]
1306 segs.sort()
1307
1308 nodes = set()
1309 for seg, instrument in segs:
1310 if verbose:
1311 print("making datafind job for %s spanning %s" % (instrument, seg), file=sys.stderr)
1312 new_nodes = make_datafind_fragment(dag, instrument, seg)
1313 nodes |= new_nodes
1314
1315 # add a post script to check the file list
1316 #required_segs_string = ",".join(segmentsUtils.to_range_strings(seglists[instrument] & segments.segmentlist([seg])))
1317 #for node in new_nodes:
1318 # node.set_post_script(datafindjob.get_config_file().get("condor", "LSCdataFindcheck") + " --dagman-return $RETURN --stat --gps-segment-list %s %s" % (required_segs_string, node.get_output()))
1319
1320 return nodes
1321
1322
1323#
1324# =============================================================================
1325#
1326# Analyze All Segments in a segmentlistdict Using lalapps_power
1327#
1328# =============================================================================
1329#
1330
1331
1332#
1333# one segment
1334#
1335
1336
1337def make_power_segment_fragment(dag, datafindnodes, instrument, segment, tag, timing_params, psds_per_job, binjnodes = set(), verbose = False):
1338 """
1339 Construct a DAG fragment for an entire segment, splitting the
1340 segment into multiple trigger generator jobs.
1341 """
1342 # only one frame cache file can be provided as input, and only one
1343 # injection description file can be provided as input
1344 # the unpacking indirectly tests that the file count is correct
1345 [framecache] = [node.get_output() for node in datafindnodes]
1346 if binjnodes:
1347 [simfile] = [cache_entry.path for node in binjnodes for cache_entry in node.get_output_cache()]
1348 injargs = {"injection-file": simfile}
1349 else:
1350 injargs = {}
1351 seglist = split_segment(timing_params, segment, psds_per_job)
1352 if verbose:
1353 print("Segment split: " + str(seglist), file=sys.stderr)
1354 nodes = set()
1355 for seg in seglist:
1356 nodes |= make_power_fragment(dag, datafindnodes | binjnodes, instrument, seg, tag, framecache, injargs = injargs)
1357 return nodes
1358
1359
1360#
1361# all segments
1362#
1363
1364
1365def make_single_instrument_stage(dag, datafinds, seglistdict, tag, timing_params, psds_per_job, binjnodes = set(), verbose = False):
1366 nodes = []
1367 for instrument, seglist in seglistdict.iteritems():
1368 for seg in seglist:
1369 if verbose:
1370 print("generating %s fragment %s" % (instrument, str(seg)), file=sys.stderr)
1371
1372 # find the datafind job this job is going to need
1373 dfnodes = set([node for node in datafinds if (node.get_ifo() == instrument) and (seg in segments.segment(node.get_start(), node.get_end()))])
1374 if len(dfnodes) != 1:
1375 raise ValueError("error, not exactly 1 datafind is suitable for trigger generator job at %s in %s" % (str(seg), instrument))
1376
1377 # trigger generator jobs
1378 nodes += make_power_segment_fragment(dag, dfnodes, instrument, seg, tag, timing_params, psds_per_job, binjnodes = binjnodes, verbose = verbose)
1379
1380 # done
1381 return nodes
1382
1383
1384#
1385# =============================================================================
1386#
1387# Coincidence Post-Processing
1388#
1389# =============================================================================
1390#
1391
1392
1393def group_coinc_parents(parents, offset_vectors, extentlimit = None, verbose = False):
1394 if not offset_vectors:
1395 # no-op
1396 return []
1397
1398 if verbose:
1399 print("Grouping jobs for coincidence analysis:", file=sys.stderr)
1400
1401 #
1402 # use ligolw_cafe to group each output file according to how they
1403 # need to be combined to perform the coincidence analysis
1404 #
1405
1406 seglists, bins = cafe.ligolw_cafe([cache_entry for parent in parents for cache_entry in parent.get_output_cache()], offset_vectors, extentlimit = extentlimit, verbose = verbose)
1407
1408 #
1409 # retrieve the file caches and segments. note that ligolw_cafe
1410 # returns the bins sorted by segment, so we do too
1411 #
1412
1413 caches = [frozenset(bin.objects) for bin in bins]
1414 assert len(set(caches)) == len(caches)
1415 segs = [cache_span(bin.objects) for bin in bins]
1416
1417 #
1418 # determine the clipping boundaries to use for each coincidence job
1419 # if an extentlimit has been imposed
1420 #
1421
1422 clipsegs = [None] * len(bins)
1423 if extentlimit is not None:
1424 extents = [bin.extent for bin in bins]
1425 for i, extent in enumerate(extents):
1426 # FIXME: when we can rely on Python >= 2.5,
1427 #lo = segments.NegInfinity if i == 0 or extents[i - 1].disjoint(extent) else extent[0]
1428 # etc.
1429 if i == 0 or extents[i - 1].disjoint(extent):
1430 lo = segments.NegInfinity
1431 else:
1432 lo = extent[0]
1433 if i >= len(extents) - 1 or extents[i + 1].disjoint(extent):
1434 hi = segments.PosInfinity
1435 else:
1436 hi = extent[1]
1437 if lo is not segments.NegInfinity or hi is not segments.PosInfinity:
1438 clipsegs[i] = segments.segment(lo, hi)
1439
1440 #
1441 # match parents to caches
1442 #
1443
1444 if verbose:
1445 print("Matching jobs to caches ...", file=sys.stderr)
1446 parent_groups, unused = match_nodes_to_caches(parents, caches)
1447 if verbose and unused:
1448 # there were parents that didn't match any caches. this
1449 # happens if ligolw_cafe decides their outputs aren't
1450 # needed
1451 print("Notice: %d jobs (of %d) produce output that will not be used by a coincidence job" % (unused, len(parents)), file=sys.stderr)
1452
1453 #
1454 # done
1455 #
1456
1457 return zip(segs, parent_groups, caches, clipsegs)
static double max(double a, double b)
Definition: EPFilters.c:43
static double min(double a, double b)
Definition: EPFilters.c:42
def __init__(self, config_parser)
Definition: power.py:611
def get_output_cache(self)
Definition: power.py:645
def add_file_arg(self, filename)
Definition: power.py:639
def add_input_cache(self, cache)
Definition: power.py:632
def __init__(self, *args)
Definition: power.py:626
def get_output_files(self)
Definition: power.py:648
def get_input_cache(self)
Definition: power.py:642
def __init__(self, config_parser)
Definition: power.py:555
def add_input_cache(self, cache)
Definition: power.py:585
def add_file_arg(self, filename)
Definition: power.py:588
def get_output_files(self)
Definition: power.py:603
def get_output_cache(self)
Definition: power.py:600
def __init__(self, *args)
Definition: power.py:573
def set_name(self, *args)
Definition: power.py:580
def write_input_files(self, *args)
Definition: power.py:591
def __init__(self, config_parser)
Definition: power.py:509
def add_file_arg(self, filename)
Definition: power.py:538
def get_output_cache(self)
Definition: power.py:544
def get_input_cache(self)
Definition: power.py:541
def get_output_files(self)
Definition: power.py:547
def get_output(self)
Definition: power.py:550
def add_input_cache(self, cache)
Definition: power.py:531
def __init__(self, *args)
Definition: power.py:525
def __init__(self, config_parser)
Definition: power.py:672
def __init__(self, config_parser)
Definition: power.py:656
def __init__(self, *args)
Definition: power.py:685
def set_coincidence_segments(self, seglist)
Definition: power.py:723
def get_output_files(self)
Definition: power.py:717
def add_file_arg(self, filename)
Definition: power.py:708
def get_output_cache(self)
Definition: power.py:714
def add_input_cache(self, cache)
Definition: power.py:691
def get_input_cache(self)
Definition: power.py:711
def get_output(self)
Definition: power.py:720
def __init__(self, config_parser)
Definition: power.py:778
def set_name(self, *args)
Definition: power.py:800
def set_output(self, description)
Definition: power.py:816
def write_input_files(self, *args)
Definition: power.py:835
def add_input_cache(self, cache)
Definition: power.py:804
def add_file_arg(self, filename)
Definition: power.py:813
def __init__(self, *args)
Definition: power.py:792
A lalapps_binj job used by the power pipeline.
Definition: power.py:267
def get_output_files(self)
Definition: power.py:345
def set_time_slide_file(self, filename)
Definition: power.py:312
def set_start(self, start)
Definition: power.py:318
def set_end(self, end)
Definition: power.py:326
def get_output_cache(self)
Returns a LAL cache of the output file name.
Definition: power.py:339
def set_user_tag(self, tag)
Definition: power.py:303
def __init__(self, job)
Definition: power.py:295
def get_time_slide_file(self)
Definition: power.py:315
def write_input_files(self, *args)
Definition: power.py:495
def set_name(self, *args)
Definition: power.py:471
def get_output_cache(self)
Definition: power.py:492
def __update_output_cache(self, observatory=None, segment=None)
Definition: power.py:462
def set_output(self, path=None, observatory=None, segment=None)
Definition: power.py:480
def get_output_files(self)
Definition: power.py:501
def add_preserve_cache(self, cache)
Definition: power.py:484
def __init__(self, job, remove_input, *args)
Definition: power.py:451
def add_input_cache(self, cache)
Definition: power.py:476
A lalapps_power job used by the power pipeline.
Definition: power.py:365
def get_output_files(self)
Definition: power.py:421
def set_mdccache(self, file)
Set the LAL frame cache to to use.
Definition: power.py:437
def get_user_tag(self)
Definition: power.py:408
def __init__(self, job)
Definition: power.py:383
def set_user_tag(self, tag)
Definition: power.py:402
def set_injection_file(self, file)
Set the name of the XML file from which to read a list of software injections.
Definition: power.py:445
def get_output_cache(self)
Returns a LAL cache of the output file name.
Definition: power.py:416
def set_ifo(self, instrument)
Load additional options from the per-instrument section in the config file.
Definition: power.py:395
def get_output(self)
Definition: power.py:424
def __init__(self, config_parser)
config_parser = ConfigParser object
Definition: power.py:234
def __init__(self, job)
Definition: power.py:245
def add_input_cache(self, cache)
Definition: power.py:251
def get_output_cache(self)
Definition: power.py:256
def __init__(self, config_parser)
Definition: power.py:728
def get_output(self)
Definition: power.py:773
def __init__(self, *args)
Definition: power.py:739
def add_input_cache(self, cache)
Definition: power.py:745
def get_output_cache(self)
Definition: power.py:765
def set_output(self, filename)
Definition: power.py:757
def get_input_cache(self)
Definition: power.py:762
def add_file_arg(self, filename)
Definition: power.py:754
def get_output_files(self)
Definition: power.py:770
A class to hold timing parameter values.
Definition: power.py:118
def __init__(self, config_parser)
Definition: power.py:119
def get_executable(config_parser, name)
Definition: power.py:69
def segment_ok(timing_params, segment)
Return True if the segment can be analyzed using lalapps_power.
Definition: power.py:1001
def get_files_per_burca(config_parser)
Definition: power.py:107
def make_burca2_fragment(dag, coinc_cache, likelihood_parents, tag)
Definition: power.py:1246
def get_universe(config_parser)
Definition: power.py:61
def get_out_dir(config_parser)
Definition: power.py:73
def make_cache_entry(input_cache, description, path)
Definition: power.py:137
def group_coinc_parents(parents, offset_vectors, extentlimit=None, verbose=False)
Definition: power.py:1393
def make_burca_tailor_fragment(dag, input_cache, seg, tag)
Definition: power.py:1209
def get_accounting_group(config_parser)
Definition: power.py:65
def get_files_per_bucut(config_parser)
Definition: power.py:103
def make_datafind_fragment(dag, instrument, seg)
Definition: power.py:1027
def job_length_from_psds(timing_params, psds)
From the analysis parameters and a count of PSDs, return the length of the job in seconds.
Definition: power.py:962
def collect_output_caches(parents)
Definition: power.py:170
def split_segment(timing_params, segment, psds_per_job)
Split the data segment into correctly-overlaping segments.
Definition: power.py:978
def get_triggers_dir(config_parser)
Definition: power.py:81
def make_dir_if_not_exists(dir)
Definition: power.py:85
def get_files_per_binjfind(config_parser)
Definition: power.py:111
def make_datafind_stage(dag, seglists, verbose=False)
Definition: power.py:1287
def make_bucluster_fragment(dag, parents, tag, verbose=False)
Definition: power.py:1137
def get_files_per_bucluster(config_parser)
Definition: power.py:99
def make_binj_fragment(dag, seg, time_slides_cache_entry, tag, offset, flow=None, fhigh=None)
Definition: power.py:1098
def make_binjfind_fragment(dag, parents, tag, verbose=False)
Definition: power.py:1120
def make_power_segment_fragment(dag, datafindnodes, instrument, segment, tag, timing_params, psds_per_job, binjnodes=set(), verbose=False)
Construct a DAG fragment for an entire segment, splitting the segment into multiple trigger generator...
Definition: power.py:1341
def get_cache_dir(config_parser)
Definition: power.py:77
def make_burca_fragment(dag, parents, tag, coincidence_segments=None, verbose=False)
Definition: power.py:1172
def cache_span(cache)
Definition: power.py:204
def match_nodes_to_caches(nodes, caches)
For each cache, get the set of nodes whose output files it contains.
Definition: power.py:181
def make_sqlite_fragment(dag, parents, tag, verbose=False)
Definition: power.py:1195
def remove_too_short_segments(seglistdict, timing_params)
Remove segments from seglistdict that are too short to analyze.
Definition: power.py:1010
def write_output_cache(nodes, filename)
Definition: power.py:215
def init_job_types(config_parser, job_types=("datafind", "rm", "binj", "power", "lladd", "binjfind", "bucluster", "bucut", "burca", "burca2", "sqlite", "burcatailor"))
Construct definitions of the submit files.
Definition: power.py:882
def make_power_fragment(dag, parents, instrument, seg, tag, framecache, injargs={})
Definition: power.py:1078
def make_lladd_fragment(dag, parents, tag, segment=None, input_cache=None, remove_input=False, preserve_cache=None, extra_input_cache=None)
Definition: power.py:1044
def make_dag_directories(config_parser)
Definition: power.py:94
def make_single_instrument_stage(dag, datafinds, seglistdict, tag, timing_params, psds_per_job, binjnodes=set(), verbose=False)
Definition: power.py:1365
def psds_from_job_length(timing_params, t)
Return the number of PSDs that can fit into a job of length t seconds.
Definition: power.py:948
def make_bucut_fragment(dag, parents, tag, verbose=False)
Definition: power.py:1155