Loading [MathJax]/extensions/TeX/AMSsymbols.js
LALApps 10.1.0.1-b246709
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
cosmicstring.py
Go to the documentation of this file.
2# =============================================================================
3#
4# Preamble
5#
6# =============================================================================
7#
8
9
10"""
11Classes needed for the cosmic string analysis pipeline.
12"""
13
14
15from __future__ import print_function
16
17
18import math
19import os
20import sys
21
22
23import igwn_segments as segments
24from lal import iterutils
25from lal import LIGOTimeGPS
26from lal import pipeline
27from lal.utils import CacheEntry
28from lalburst import cafe
29from lalburst import power
30
31
32__author__ = 'Xavier Siemens<siemens@gravity.phys.uwm.edu>'
33__date__ = '$Date$'
34__version__ = '$Revision$'
35
36
37#
38# =============================================================================
39#
40# Configuration
41#
42# =============================================================================
43#
44
45
47 return config_parser.getint("pipeline", "files_per_meas_likelihood")
48
49
51 return config_parser.getint("pipeline", "files_per_calc_likelihood")
52
53
54def get_files_per_run_sqlite(config_parser):
55 return config_parser.getint("pipeline", "files_per_run_sqlite")
56
57
58#
59# =============================================================================
60#
61# DAG Node and Job Class
62#
63# =============================================================================
64#
65
66
67class MeasLikelihoodJob(pipeline.CondorDAGJob):
68 def __init__(self, config_parser):
69 pipeline.CondorDAGJob.__init__(self, "vanilla", power.get_executable(config_parser, "lalapps_string_meas_likelihood"))
70 self.set_sub_file("lalapps_string_meas_likelihood.sub")
71 self.set_stdout_file(os.path.join(power.get_out_dir(config_parser), "lalapps_string_meas_likelihood-$(cluster)-$(process).out"))
72 self.set_stderr_file(os.path.join(power.get_out_dir(config_parser), "lalapps_string_meas_likelihood-$(cluster)-$(process).err"))
73 self.add_condor_cmd("getenv", "True")
74 self.add_condor_cmd("accounting_group", power.get_accounting_group(config_parser))
75 self.add_ini_opts(config_parser, "lalapps_string_meas_likelihood")
76
77 self.cache_dir = power.get_cache_dir(config_parser)
78 self.output_dir = "."
80 if self.files_per_meas_likelihood < 1:
81 raise ValueError("files_per_meas_likelihood < 1")
82
83
84class MeasLikelihoodNode(pipeline.CondorDAGNode):
85 def __init__(self, *args):
86 pipeline.CondorDAGNode.__init__(self, *args)
87 self.input_cache = []
88 self.output_cache = []
89
90 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
91 self.cache_dir = os.path.join(os.getcwd(), self.job().cache_dir)
92 self.output_dir = os.path.join(os.getcwd(), self.job().output_dir)
93
94 def set_name(self, *args):
95 pipeline.CondorDAGNode.set_name(self, *args)
96 self.cache_name = os.path.join(self.cache_dir, "%s.cache" % self.get_name())
97 self.add_var_opt("input-cache", self.cache_name)
98
99 def add_input_cache(self, cache):
100 if self.output_cache:
101 raise AttributeError("cannot change attributes after computing output cache")
102 self.input_cache.extend(cache)
103
104 def add_file_arg(self, filename):
105 raise NotImplementedError
106
107 def set_output(self, description):
108 if self.output_cache:
109 raise AttributeError("cannot change attributes after computing output cache")
110 cache_entry = power.make_cache_entry(self.input_cache, description, "")
111 filename = os.path.join(self.output_dir, "%s-STRING_LIKELIHOOD_%s-%d-%d.xml.gz" % (cache_entry.observatory, cache_entry.description, int(cache_entry.segment[0]), int(abs(cache_entry.segment))))
112 cache_entry.url = "file://localhost" + os.path.abspath(filename)
113 self.add_var_opt("output", filename)
114 del self.output_cache[:]
115 self.output_cache.append(cache_entry)
116 return filename
117
119 return self.input_cache
120
122 if not self.output_cache:
123 raise AttributeError("must call set_output(description) first")
124 return self.output_cache
125
126 def write_input_files(self, *args):
127 f = file(self.cache_name, "w")
128 for c in self.input_cache:
129 print(str(c), file=f)
130 pipeline.CondorDAGNode.write_input_files(self, *args)
131
133 raise NotImplementedError
134
135 def get_output(self):
136 raise NotImplementedError
137
138
139class CalcLikelihoodJob(pipeline.CondorDAGJob):
140 def __init__(self, config_parser):
141 pipeline.CondorDAGJob.__init__(self, "vanilla", power.get_executable(config_parser, "lalapps_string_calc_likelihood"))
142 self.set_sub_file("lalapps_string_calc_likelihood.sub")
143 self.set_stdout_file(os.path.join(power.get_out_dir(config_parser), "lalapps_string_calc_likelihood-$(cluster)-$(process).out"))
144 self.set_stderr_file(os.path.join(power.get_out_dir(config_parser), "lalapps_string_calc_likelihood-$(cluster)-$(process).err"))
145 self.add_condor_cmd("getenv", "True")
146 self.add_condor_cmd("accounting_group", power.get_accounting_group(config_parser))
147 self.add_ini_opts(config_parser, "lalapps_string_calc_likelihood")
148 self.cache_dir = power.get_cache_dir(config_parser)
150 if self.files_per_calc_likelihood < 1:
151 raise ValueError("files_per_calc_likelihood < 1")
152
153
154class CalcLikelihoodNode(pipeline.CondorDAGNode):
155 def __init__(self, *args):
156 pipeline.CondorDAGNode.__init__(self, *args)
157 self.input_cache = []
160 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
161 self.cache_dir = os.path.join(os.getcwd(), self.job().cache_dir)
162
163 def set_name(self, *args):
164 pipeline.CondorDAGNode.set_name(self, *args)
165 self.cache_name = os.path.join(self.cache_dir, "%s.cache" % self.get_name())
166 self.add_var_opt("input-cache", self.cache_name)
167 self.likelihood_cache_name = os.path.join(self.cache_dir, "%s_likelihood.cache" % self.get_name())
168 self.add_var_opt("likelihood-cache", self.likelihood_cache_name)
169
170 def add_input_cache(self, cache):
171 self.input_cache.extend(cache)
172 for c in cache:
173 self.add_output_file(c.path)
174
175 def add_likelihood_cache(self, cache):
176 self.likelihood_cache.extend(cache)
177
178 def add_file_arg(self, filename):
179 raise NotImplementedError
180
182 return self.input_cache
183
185 return self.output_cache
186
188 return self.likelihood_cache
189
190 def write_input_files(self, *args):
191 f = file(self.cache_name, "w")
192 for c in self.input_cache:
193 print(str(c), file=f)
194 f = file(self.likelihood_cache_name, "w")
195 for c in self.likelihood_cache:
196 print(str(c), file=f)
197 pipeline.CondorDAGNode.write_input_files(self, *args)
198
200 raise NotImplementedError
201
202 def get_output(self):
203 raise NotImplementedError
204
205
206class StringJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
207 """
208 A lalapps_StringSearch job used by the string pipeline. The static options
209 are read from the section in the ini file. The
210 stdout and stderr from the job are directed to the logs directory. The job
211 runs in the universe specified in the ini file. The path to the executable
212 is determined from the ini file.
213 """
214 def __init__(self,config_parser):
215 """
216 config_parser = ConfigParser object from which options are read.
217 """
218 pipeline.CondorDAGJob.__init__(self, power.get_universe(config_parser), power.get_executable(config_parser, "lalapps_StringSearch"))
219 pipeline.AnalysisJob.__init__(self, config_parser)
220 self.add_ini_opts(config_parser, "lalapps_StringSearch")
221 self.set_stdout_file(os.path.join(power.get_out_dir(config_parser), "lalapps_StringSearch-$(cluster)-$(process).out"))
222 self.set_stderr_file(os.path.join(power.get_out_dir(config_parser), "lalapps_StringSearch-$(cluster)-$(process).err"))
223 self.add_condor_cmd("getenv", "True")
224 self.add_condor_cmd("accounting_group", power.get_accounting_group(config_parser))
225 self.set_sub_file("lalapps_StringSearch.sub")
226 #self.add_condor_cmd("Requirements", "Memory > 1100")
227
228 self.output_dir = power.get_triggers_dir(config_parser)
229
230
231class StringNode(pipeline.CondorDAGNode,pipeline.AnalysisNode):
232 """
233 A RingNode runs an instance of the ring code in a Condor DAG.
234 """
235 def __init__(self,job):
236 """
237 job = A CondorDAGJob that can run an instance of lalapps_StringSearch.
238 """
239 pipeline.CondorDAGNode.__init__(self,job)
240 pipeline.AnalysisNode.__init__(self)
241 self.__usertag = job.get_config('pipeline','user_tag')
242 self.output_cache = []
243 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
244 self.output_dir = os.path.join(os.getcwd(), self.job().output_dir)
245
246 def set_ifo(self, instrument):
247 """
248 Load additional options from the per-instrument section in
249 the config file.
250 """
251 if self.output_cache:
252 raise AttributeError("cannot change attributes after computing output cache")
253 pipeline.AnalysisNode.set_ifo(self, instrument)
254 for optvalue in self.job()._AnalysisJob__cp.items("lalapps_StringSearch_%s" % instrument):
255 self.add_var_arg("--%s %s" % optvalue)
256
257 def set_user_tag(self, tag):
258 if self.output_cache:
259 raise AttributeError("cannot change attributes after computing output cache")
260 self.__usertag = tag
261 self.add_var_opt("user-tag", self.__usertag)
262
263 def get_user_tag(self):
264 return self.__usertag
265
266 def get_output_cache(self):
267 """
268 Returns a LAL cache of the output file name. Calling this
269 method also induces the output name to get set, so it must
270 be at least once.
271 """
272 if not self.output_cache:
273 self.output_cache = [CacheEntry(self.get_ifo(), self.__usertag, segments.segment(LIGOTimeGPS(self.get_start()), LIGOTimeGPS(self.get_end())), "file://localhost" + os.path.abspath(self.get_output()))]
274 return self.output_cache
275
277 raise NotImplementedError
278
279 def get_output(self):
280 """
281 Returns the file name of output from the ring code. This must be kept
282 synchronized with the name of the output file in ring.c.
283 """
284 if self._AnalysisNode__output is None:
285 if None in (self.get_start(), self.get_end(), self.get_ifo(), self.__usertag):
286 raise ValueError("start time, end time, ifo, or user tag has not been set")
287 seg = segments.segment(LIGOTimeGPS(self.get_start()), LIGOTimeGPS(self.get_end()))
288 self.set_output(os.path.join(self.output_dir, "%s-STRINGSEARCH_%s-%d-%d.xml.gz" % (self.get_ifo(), self.__usertag, int(self.get_start()), int(self.get_end()) - int(self.get_start()))))
289
290 return self._AnalysisNode__output
291
292 def set_injection_file(self, file):
293 """
294 Set the name of the XML file from which to read a list of
295 software injections.
296 """
297 self.add_var_opt("injection-file", file)
298 self.add_input_file(file)
299
300
301class RunSqliteJob(pipeline.CondorDAGJob):
302 """
303 A lalapps_run_sqlite job used by the gstlal pipeline. The static
304 options are read from the [lalapps_run_sqlite] section in the ini
305 file. The stdout and stderr from the job are directed to the logs
306 directory. The job runs in the universe specified in the ini file.
307 The path to the executable is determined from the ini file.
308 """
309 def __init__(self, config_parser):
310 """
311 config_parser = ConfigParser object
312 """
313 pipeline.CondorDAGJob.__init__(self, "vanilla", power.get_executable(config_parser, "lalapps_run_sqlite"))
314 self.add_ini_opts(config_parser, "lalapps_run_sqlite")
315 self.set_stdout_file(os.path.join(power.get_out_dir(config_parser), "lalapps_run_sqlite-$(cluster)-$(process).out"))
316 self.set_stderr_file(os.path.join(power.get_out_dir(config_parser), "lalapps_run_sqlite-$(cluster)-$(process).err"))
317 self.add_condor_cmd("getenv", "True")
318 self.add_condor_cmd("accounting_group", power.get_accounting_group(config_parser))
319 self.set_sub_file("lalapps_run_sqlite.sub")
321 if self.files_per_run_sqlite < 1:
322 raise ValueError("files_per_run_sqlite < 1")
323
324
325class RunSqliteNode(pipeline.CondorDAGNode):
326 def __init__(self, *args):
327 pipeline.CondorDAGNode.__init__(self, *args)
328 self.input_cache = []
330 self._CondorDAGNode__macros["initialdir"] = os.getcwd()
331
332 def add_input_cache(self, cache):
333 self.input_cache.extend(cache)
334 for c in cache:
335 filename = c.path
336 pipeline.CondorDAGNode.add_file_arg(self, filename)
337 self.add_output_file(filename)
338
340 return self.input_cache
341
343 return self.output_cache
344
345 def set_sql_file(self, filename):
346 self.add_var_opt("sql-file", filename)
347
348
349#
350# =============================================================================
351#
352# Segmentation
353#
354# =============================================================================
355#
356
357
358def clip_segment_length(segment_length, pad, short_segment_duration):
359 # clip segment to the length required by lalapps_StringSearch. if
360 #
361 # duration = segment length - padding
362 #
363 # then
364 #
365 # duration / short_segment_duration - 0.5
366 #
367 # must be an odd integer, therefore
368 #
369 # 2 * duration + short_segment_duration
370 #
371 # must be divisble by (4 * short_segment_duration)
372 assert segment_length >= 2 * pad
373 duration = segment_length - 2 * pad
374 extra = (2 * duration + short_segment_duration) % (4 * short_segment_duration)
375 extra /= 2
376
377 # clip
378 segment_length -= extra
379
380 # done. negative return value not allowed
381 assert segment_length >= 0
382 return segment_length
383
384
385def segment_ok(seg, min_segment_length, pad):
386 """
387 Return True if the segment seg is long enough to be analyzed by
388 lalapps_StringSearch.
389 """
390 return float(abs(seg)) - 2 * pad >= min_segment_length
391
392
393def remove_too_short_segments(seglists, min_segment_length, pad):
394 """
395 Remove segments from the segmentlistdict seglists that are too short to
396 analyze.
397
398 CAUTION: this function modifies seglists in place.
399 """
400 for seglist in seglists.values():
401 iterutils.inplace_filter(lambda seg: segment_ok(seg, min_segment_length, pad), seglist)
402
403
404def compute_segment_lists(seglists, offset_vectors, min_segment_length, pad):
405 # don't modify original
406 seglists = seglists.copy()
407
408 # ignore offset vectors referencing instruments we don't have
409 offset_vectors = [offset_vector for offset_vector in offset_vectors if set(offset_vector.keys()).issubset(set(seglists.keys()))]
410
411 # cull too-short single-instrument segments from the input
412 # segmentlist dictionary; this can significantly increase
413 # the speed of the get_coincident_segmentlistdict()
414 # function when the input segmentlists have had many data
415 # quality holes poked out of them
416 remove_too_short_segments(seglists, min_segment_length, pad)
417
418 # extract the segments that are coincident under the time
419 # slides
420 new = cafe.get_coincident_segmentlistdict(seglists, offset_vectors)
421
422 # round to integer boundaries because lalapps_StringSearch can't accept
423 # non-integer start/stop times
424 # FIXME: fix that in lalapps_StringSearch
425 for seglist in new.values():
426 for i in range(len(seglist)):
427 seglist[i] = segments.segment(int(math.floor(seglist[i][0])), int(math.ceil(seglist[i][1])))
428 # intersect with original segments to ensure we haven't expanded beyond
429 # original bounds
430 new &= seglists
431
432 # again remove too-short segments
433 remove_too_short_segments(new, min_segment_length, pad)
434
435 # done
436 return new
437
438
439#
440# =============================================================================
441#
442# DAG Job Types
443#
444# =============================================================================
445#
446
447
448stringjob = None
449meas_likelihoodjob = None
450calc_likelihoodjob = None
451runsqlitejob = None
452
453
454def init_job_types(config_parser, job_types = ("string", "meas_likelihoodjob", "calc_likelihood", "runsqlite")):
455 """
456 Construct definitions of the submit files.
457 """
458 global stringjob, meas_likelihoodjob, calc_likelihoodjob, runsqlitejob
459
460 # lalapps_StringSearch
461 if "string" in job_types:
462 stringjob = StringJob(config_parser)
463
464 # lalapps_string_meas_likelihood
465 if "meas_likelihood" in job_types:
466 meas_likelihoodjob = MeasLikelihoodJob(config_parser)
467
468 # lalapps_string_calc_likelihood
469 if "calc_likelihood" in job_types:
470 calc_likelihoodjob = CalcLikelihoodJob(config_parser)
471
472 # lalapps_run_sqlite
473 if "runsqlite" in job_types:
474 runsqlitejob = RunSqliteJob(config_parser)
475
476
477#
478# =============================================================================
479#
480# lalapps_StringSearch Jobs
481#
482# =============================================================================
483#
484
485
486#
487# one job
488#
489
490
491def make_string_fragment(dag, parents, instrument, seg, tag, framecache, injargs = {}):
492 node = StringNode(stringjob)
493 node.set_name("lalapps_StringSearch_%s_%s_%d_%d" % (tag, instrument, int(seg[0]), int(abs(seg))))
494 map(node.add_parent, parents)
495 # FIXME: StringNode should not be subclassed from AnalysisNode,
496 # because that class is too hard-coded. For example, there is no
497 # way to switch to analysing gaussian noise except to comment out
498 # this line in the code.
499 node.set_cache(framecache)
500 node.set_ifo(instrument)
501 node.set_start(seg[0])
502 node.set_end(seg[1])
503 node.set_user_tag(tag)
504 for arg, value in injargs.items():
505 # this is a hack, but I can't be bothered
506 node.add_var_arg("--%s %s" % (arg, value))
507 dag.add_node(node)
508 return set([node])
509
510
511#
512# one segment
513#
514
515
516def split_segment(seg, min_segment_length, pad, overlap, short_segment_duration, max_job_length):
517 # avoid infinite loop
518 if min_segment_length + 2 * pad <= overlap:
519 raise ValueError("infinite loop: min_segment_length + 2 * pad must be > overlap")
520
521 # clip max_job_length down to an allowed size
522 max_job_length = clip_segment_length(max_job_length, pad, short_segment_duration)
523
524 seglist = segments.segmentlist()
525 while abs(seg) >= min_segment_length + 2 * pad:
526 # try to use max_job_length each time
527 if abs(seg) >= max_job_length:
528 seglist.append(segments.segment(seg[0], seg[0] + max_job_length))
529 else:
530 seglist.append(segments.segment(seg[0], seg[0] + clip_segment_length(abs(seg), pad, short_segment_duration)))
531 assert abs(seglist[-1]) != 0 # safety-check for no-op
532 # bounds must be integers
533 if abs((int(seglist[-1][0]) - seglist[-1][0]) / seglist[-1][0]) > 1e-14 or abs((int(seglist[-1][1]) - seglist[-1][1]) / seglist[-1][1]) > 1e-14:
534 raise ValueError("segment %s does not have integer boundaries" % str(seglist[-1]))
535 # advance segment
536 seg = segments.segment(seglist[-1][1] - overlap, seg[1])
537 if not seglist:
538 raise ValueError("unable to use segment %s" % str(seg))
539 return seglist
540
541
542def make_string_segment_fragment(dag, datafindnodes, instrument, seg, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes = set(), verbose = False):
543 """
544 Construct a DAG fragment for an entire segment, splitting the
545 segment into multiple trigger generator jobs.
546 """
547 # figure out which binj nodes, if any, produce output for this job
548 binjnodes = set(node for node in binjnodes if power.cache_span(node.get_output_cache()).intersects(seg))
549
550 # only one frame cache file can be provided as input, and only one
551 # injection description file can be provided as input.
552 # the unpacking indirectly tests that the file count is correct
553 [framecache] = [node.get_output() for node in datafindnodes]
554 if binjnodes:
555 [simfile] = [cache_entry.path for node in binjnodes for cache_entry in node.get_output_cache()]
556 injargs = {"injection-file": simfile}
557 else:
558 injargs = {}
559 seglist = split_segment(seg, min_segment_length, pad, overlap, short_segment_duration, max_job_length)
560 if verbose:
561 print("Segment split: " + str(seglist), file=sys.stderr)
562 nodes = set()
563 for seg in seglist:
564 nodes |= make_string_fragment(dag, datafindnodes | binjnodes, instrument, seg, tag, framecache, injargs = injargs)
565 return nodes
566
567
568#
569# all segments
570#
571
572
573def make_single_instrument_stage(dag, datafinds, seglistdict, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes = set(), verbose = False):
574 nodes = set()
575 for instrument, seglist in seglistdict.items():
576 for seg in seglist:
577 if verbose:
578 print("generating %s fragment %s" % (instrument, str(seg)), file=sys.stderr)
579
580 # find the datafind job this job is going to need
581 dfnodes = set([node for node in datafinds if (node.get_ifo() == instrument) and (seg in segments.segment(node.get_start(), node.get_end()))])
582 if len(dfnodes) != 1:
583 raise ValueError("error, not exactly 1 datafind is suitable for trigger generator job at %s in %s" % (str(seg), instrument))
584
585 # trigger generator jobs
586 nodes |= make_string_segment_fragment(dag, dfnodes, instrument, seg, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes = binjnodes, verbose = verbose)
587
588 # done
589 return nodes
590
591
592#
593# =============================================================================
594#
595# lalapps_run_sqlite Jobs
596#
597# =============================================================================
598#
599
600
602 code = """DELETE FROM
603 segment
604WHERE
605 (end_time + 1e-9 * end_time_ns < (SELECT MIN(in_start_time + 1e-9 * in_start_time_ns) FROM search_summary NATURAL JOIN process WHERE program == 'StringSearch'))
606 OR
607 (start_time + 1e-9 * start_time_ns > (SELECT MAX(in_end_time + 1e-9 * in_end_time_ns) FROM search_summary NATURAL JOIN process WHERE program == 'StringSearch'));
608
609VACUUM;"""
610
611 print(code, file=file(filename, "w"))
612
613 return filename
614
615
616def make_run_sqlite_fragment(dag, parents, tag, sql_file, files_per_run_sqlite = None):
617 if files_per_run_sqlite is None:
618 files_per_run_sqlite = runsqlitejob.files_per_run_sqlite
619 nodes = set()
620 input_cache = power.collect_output_caches(parents)
621 while input_cache:
622 node = RunSqliteNode(runsqlitejob)
623 node.set_sql_file(sql_file)
624 node.add_input_cache([cache_entry for cache_entry, parent in input_cache[:files_per_run_sqlite]])
625 for parent in set(parent for cache_entry, parent in input_cache[:files_per_run_sqlite]):
626 node.add_parent(parent)
627 del input_cache[:files_per_run_sqlite]
628 seg = power.cache_span(node.get_output_cache())
629 node.set_name("lalapps_run_sqlite_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
630 dag.add_node(node)
631 nodes.add(node)
632 return nodes
633
634
635#
636# =============================================================================
637#
638# lalapps_string_meas_likelihood Jobs
639#
640# =============================================================================
641#
642
643
644def make_meas_likelihood_fragment(dag, parents, tag, files_per_meas_likelihood = None):
645 if files_per_meas_likelihood is None:
646 files_per_meas_likelihood = meas_likelihoodjob.files_per_meas_likelihood
647 nodes = set()
648 input_cache = power.collect_output_caches(parents)
649 while input_cache:
650 node = MeasLikelihoodNode(meas_likelihoodjob)
651 node.add_input_cache([cache_entry for cache_entry, parent in input_cache[:files_per_meas_likelihood]])
652 for parent in set(parent for cache_entry, parent in input_cache[:files_per_meas_likelihood]):
653 node.add_parent(parent)
654 del input_cache[:files_per_meas_likelihood]
655 seg = power.cache_span(node.get_input_cache())
656 node.set_name("lalapps_string_meas_likelihood_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
657 node.set_output(tag)
658 dag.add_node(node)
659 nodes.add(node)
660 return nodes
661
662
663#
664# =============================================================================
665#
666# lalapps_string_calc_likelihood Jobs
667#
668# =============================================================================
669#
670
671
672def make_calc_likelihood_fragment(dag, parents, likelihood_parents, tag, files_per_calc_likelihood = None, verbose = False):
673 if files_per_calc_likelihood is None:
674 files_per_calc_likelihood = calc_likelihoodjob.files_per_calc_likelihood
675 input_cache = power.collect_output_caches(parents)
676 likelihood_cache = power.collect_output_caches(likelihood_parents)
677 nodes = set()
678 while input_cache:
679 node = CalcLikelihoodNode(calc_likelihoodjob)
680 node.add_input_cache([cache_entry for cache_entry, parent in input_cache[:files_per_calc_likelihood]])
681 for parent in set(parent for cache_entry, parent in input_cache[:files_per_calc_likelihood]):
682 node.add_parent(parent)
683 del input_cache[:files_per_calc_likelihood]
684 seg = power.cache_span(node.get_input_cache())
685 node.set_name("lalapps_string_calc_likelihood_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
686 for cache_entry, parent in likelihood_cache:
687 node.add_parent(parent)
688 node.add_likelihood_cache([cache_entry])
689 dag.add_node(node)
690 nodes.add(node)
691 return nodes
def __init__(self, config_parser)
def add_likelihood_cache(self, cache)
def add_input_cache(self, cache)
def write_input_files(self, *args)
def add_file_arg(self, filename)
def __init__(self, config_parser)
Definition: cosmicstring.py:68
def set_output(self, description)
def add_file_arg(self, filename)
def add_input_cache(self, cache)
Definition: cosmicstring.py:99
def write_input_files(self, *args)
A lalapps_run_sqlite job used by the gstlal pipeline.
def add_input_cache(self, cache)
def set_sql_file(self, filename)
def __init__(self, *args)
A lalapps_StringSearch job used by the string pipeline.
def __init__(self, config_parser)
config_parser = ConfigParser object from which options are read.
A RingNode runs an instance of the ring code in a Condor DAG.
def get_output(self)
Returns the file name of output from the ring code.
def set_ifo(self, instrument)
Load additional options from the per-instrument section in the config file.
def set_user_tag(self, tag)
def get_output_cache(self)
Returns a LAL cache of the output file name.
def set_injection_file(self, file)
Set the name of the XML file from which to read a list of software injections.
def __init__(self, job)
job = A CondorDAGJob that can run an instance of lalapps_StringSearch.
def init_job_types(config_parser, job_types=("string", "meas_likelihoodjob", "calc_likelihood", "runsqlite"))
Construct definitions of the submit files.
def get_files_per_calc_likelihood(config_parser)
Definition: cosmicstring.py:50
def remove_too_short_segments(seglists, min_segment_length, pad)
Remove segments from the segmentlistdict seglists that are too short to analyze.
def make_string_segment_fragment(dag, datafindnodes, instrument, seg, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes=set(), verbose=False)
Construct a DAG fragment for an entire segment, splitting the segment into multiple trigger generator...
def write_clip_segment_sql_file(filename)
def split_segment(seg, min_segment_length, pad, overlap, short_segment_duration, max_job_length)
def clip_segment_length(segment_length, pad, short_segment_duration)
def segment_ok(seg, min_segment_length, pad)
Return True if the segment seg is long enough to be analyzed by lalapps_StringSearch.
def get_files_per_meas_likelihood(config_parser)
Definition: cosmicstring.py:46
def get_files_per_run_sqlite(config_parser)
Definition: cosmicstring.py:54
def make_meas_likelihood_fragment(dag, parents, tag, files_per_meas_likelihood=None)
def make_single_instrument_stage(dag, datafinds, seglistdict, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes=set(), verbose=False)
def make_run_sqlite_fragment(dag, parents, tag, sql_file, files_per_run_sqlite=None)
def make_calc_likelihood_fragment(dag, parents, likelihood_parents, tag, files_per_calc_likelihood=None, verbose=False)
def make_string_fragment(dag, parents, instrument, seg, tag, framecache, injargs={})
def compute_segment_lists(seglists, offset_vectors, min_segment_length, pad)