27Excess power pipeline construction tools.
37import igwn_segments
as segments
38from igwn_segments
import utils
as segmentsUtils
40from lal
import iterutils
41from lal
import pipeline
47__author__ =
"Duncan Brown <duncan@gravity.phys.uwm.edu>, Kipp Cannon <kipp@gravity.phys.uwm.edu>"
49__version__ =
"$Revision$"
62 return config_parser.get(
"condor",
"universe")
66 return config_parser.get(
"condor",
"accounting_group")
70 return config_parser.get(
"condor", name)
74 return config_parser.get(
"pipeline",
"out_dir")
78 return config_parser.get(
"pipeline",
"cache_dir")
82 return config_parser.get(
"pipeline",
"triggers_dir")
89 if e.errno != errno.EEXIST:
100 return config_parser.getint(
"pipeline",
"files_per_bucluster")
104 return config_parser.getint(
"pipeline",
"files_per_bucut")
108 return config_parser.getint(
"pipeline",
"files_per_burca")
112 return config_parser.getint(
"pipeline",
"files_per_binjfind")
117 A class to hold timing parameter values.
121 self.
resample_rate = config_parser.getfloat(
"lalapps_power",
"resample-rate")
129 self.psd_length, self.psd_shift, self.window_shift, self.window_pad, self.
tiling_length = lalburst.EPGetTimingParameters(
133 config_parser.getint(
"lalapps_power",
"psd-average-points")
139 seglists = segments.segmentlistdict()
140 for c
in input_cache:
141 seglists |= c.segmentlistdict
144 instruments = seglists.keys()
145 if None in instruments:
146 instruments.remove(
None)
150 for instrument
in seglists.keys():
151 if not seglists[instrument]:
152 del seglists[instrument]
156 url =
"file://localhost%s" % os.path.abspath(path)
163 url =
"file://localhost/dev/null"
167 return CacheEntry(
"+".join(instruments)
or None, description, seglists.extent_all(), url)
171 cache = [(cache_entry, parent)
for parent
in parents
for cache_entry
in parent.get_output_cache()]
172 cache.sort(key =
lambda x: x[0].segment)
178 For each cache, get the set of nodes whose output files it
179 contains. A node is allowed to provide more than one output file,
180 and thus can be listed
in more than one set.
182 # cache_entry --> node loop-up table
186 for cache_entry
in node.get_output_cache():
187 index[cache_entry] = node
190 node_groups = [set()
for cache
in caches]
193 for node_group, cache
in zip(node_groups, caches):
194 for cache_entry
in cache:
195 node_group.add(index[cache_entry])
198 unused = len(nodes) - len(set.union(*node_groups))
201 return node_groups, unused
205 a =
min([cache_entry.segment[0]
for cache_entry
in cache])
206 b =
max([cache_entry.segment[1]
for cache_entry
in cache])
207 return segments.segment(a, b)
216 f =
file(filename,
"w")
218 print(str(cache_entry), file=f)
233 config_parser = ConfigParser object
235 pipeline.CondorDAGJob.__init__(self, "local",
"/bin/rm")
236 self.set_stdout_file(os.path.join(
get_out_dir(config_parser),
"rm-$(cluster)-$(process).out"))
237 self.set_stderr_file(os.path.join(
get_out_dir(config_parser),
"rm-$(cluster)-$(process).err"))
238 self.add_condor_cmd(
"getenv",
"True")
240 self.add_opt(
"force",
"")
241 self.set_sub_file(
"rm.sub")
246 pipeline.CondorDAGNode.__init__(self, job)
249 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
253 for cache_entry
in cache:
254 pipeline.CondorDAGNode.add_file_arg(self, cache_entry.path)
260class BurstInjJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
262 A lalapps_binj job used by the power pipeline. The static options
263 are read from the [lalapps_binj] section in the ini file. The
264 stdout and stderr from the job are directed to the logs directory.
265 The job runs in the universe specified in the ini file. The path
266 to the executable is determined
from the ini file.
268 def __init__(self, config_parser):
270 config_parser = ConfigParser object
273 pipeline.AnalysisJob.__init__(self, config_parser)
276 if config_parser.has_option(
"pipeline",
"injection_bands"):
281 self.add_ini_opts(config_parser,
"lalapps_binj")
282 self.set_stdout_file(os.path.join(
get_out_dir(config_parser),
"lalapps_binj-$(macrogpsstarttime)-$(macrogpsendtime)-$(cluster)-$(process).out"))
283 self.set_stderr_file(os.path.join(
get_out_dir(config_parser),
"lalapps_binj-$(macrogpsstarttime)-$(macrogpsendtime)-$(cluster)-$(process).err"))
284 self.add_condor_cmd(
"getenv",
"True")
286 self.set_sub_file(
"lalapps_binj.sub")
291 self.
time_step = config_parser.getfloat(
"lalapps_binj",
"time-step")
296 pipeline.CondorDAGNode.__init__(self, job)
297 pipeline.AnalysisNode.__init__(self)
300 self.
output_dir = os.path.join(os.getcwd(), self.job().output_dir)
301 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
305 self.add_var_opt(
"user-tag", self.
__usertag)
309 raise AttributeError(
"cannot change attributes after computing output cache")
313 self.add_var_opt(
"time-slide-file", filename)
316 return self.get_opts().get(
"macrotimeslidefile",
None)
320 raise AttributeError(
"cannot change attributes after computing output cache")
321 self.add_var_opt(
"gps-start-time", start)
324 return self.get_opts().get(
"macrogpsstarttime",
None)
328 raise AttributeError(
"cannot change attributes after computing output cache")
329 self.add_var_opt(
"gps-end-time", end)
332 return self.get_opts().get(
"macrogpsendtime",
None)
336 Returns a LAL cache of the output file name. Calling this
337 method also induces the output name to get set, so it must
346 raise NotImplementedError
349 if self._AnalysisNode__output
is None:
351 raise ValueError(
"start time, end time, ifo, or user tag has not been set")
352 seg = segments.segment(lal.LIGOTimeGPS(self.
get_start()), lal.LIGOTimeGPS(self.
get_end()))
354 return self._AnalysisNode__output
357class PowerJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
359 A lalapps_power job used by the power pipeline. The static options
360 are read from the [lalapps_power] and [lalapps_power_<inst>]
361 sections in the ini file. The stdout and stderr from the job are
362 directed to the logs directory. The job runs in the universe
363 specified
in the ini file. The path to the executable
is determined
366 def __init__(self, config_parser):
368 config_parser = ConfigParser object
371 pipeline.AnalysisJob.__init__(self, config_parser)
372 self.add_ini_opts(config_parser,
"lalapps_power")
373 self.set_stdout_file(os.path.join(
get_out_dir(config_parser),
"lalapps_power-$(cluster)-$(process).out"))
374 self.set_stderr_file(os.path.join(
get_out_dir(config_parser),
"lalapps_power-$(cluster)-$(process).err"))
375 self.add_condor_cmd(
"getenv",
"True")
377 self.set_sub_file(
"lalapps_power.sub")
384 pipeline.CondorDAGNode.__init__(self, job)
385 pipeline.AnalysisNode.__init__(self)
388 self.
output_dir = os.path.join(os.getcwd(), self.job().output_dir)
389 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
393 Load additional options from the per-instrument section
in
397 raise AttributeError(
"cannot change attributes after computing output cache")
398 pipeline.AnalysisNode.set_ifo(self, instrument)
399 for optvalue
in self.job()._AnalysisJob__cp.items(
"lalapps_power_%s" % instrument):
400 self.add_var_arg(
"--%s %s" % optvalue)
404 raise AttributeError(
"cannot change attributes after computing output cache")
406 self.add_var_opt(
"user-tag", self.
__usertag)
413 Returns a LAL cache of the output file name. Calling this
414 method also induces the output name to get set, so it must
418 self.
output_cache = [CacheEntry(self.get_ifo(), self.
__usertag, segments.segment(lal.LIGOTimeGPS(self.get_start()), lal.LIGOTimeGPS(self.get_end())),
"file://localhost" + os.path.abspath(self.
get_output()))]
422 raise NotImplementedError
425 if self._AnalysisNode__output
is None:
426 if None in (self.get_start(), self.get_end(), self.get_ifo(), self.
__usertag):
427 raise ValueError(
"start time, end time, ifo, or user tag has not been set")
428 seg = segments.segment(lal.LIGOTimeGPS(self.get_start()), lal.LIGOTimeGPS(self.get_end()))
429 self.set_output(os.path.join(self.
output_dir,
"%s-POWER_%s-%d-%d.xml.gz" % (self.get_ifo(), self.
__usertag, int(self.get_start()), int(self.get_end()) - int(self.get_start()))))
430 return self._AnalysisNode__output
434 Set the LAL frame cache to to use. The frame cache is
435 passed to the job
with the --frame-cache argument.
@param
436 file: calibration file to use.
438 self.add_var_opt("mdc-cache", file)
439 self.add_input_file(file)
443 Set the name of the XML file from which to read a list of
446 self.add_var_opt("injection-file", file)
447 self.add_input_file(file)
452 pipeline.LigolwAddNode.__init__(self, job, *args)
455 self.
cache_dir = os.path.join(os.getcwd(), self.job().cache_dir)
457 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
460 self.add_var_arg(
"--remove-input")
462 def __update_output_cache(self, observatory = None, segment = None):
465 if observatory
is not None:
466 cache_entry.observatory = observatory
467 if segment
is not None:
468 cache_entry.segment = segment
472 pipeline.LigolwAddNode.set_name(self, *args)
474 self.add_var_opt(
"input-cache", self.
cache_name)
480 def set_output(self, path = None, observatory = None, segment = None):
481 pipeline.LigolwAddNode.set_output(self, path)
487 self.add_var_arg(
"--remove-input-except %s" % c.path)
498 print(str(c), file=f)
499 pipeline.LigolwAddNode.write_input_files(self, *args)
502 raise NotImplementedError
505 raise NotImplementedError
510 pipeline.CondorDAGJob.__init__(self,
"vanilla",
get_executable(config_parser,
"lalburst_cut"))
511 self.set_sub_file(
"lalburst_cut.sub")
512 self.set_stdout_file(os.path.join(
get_out_dir(config_parser),
"lalburst_cut-$(cluster)-$(process).out"))
513 self.set_stderr_file(os.path.join(
get_out_dir(config_parser),
"lalburst_cut-$(cluster)-$(process).err"))
514 self.add_condor_cmd(
"getenv",
"True")
516 self.add_condor_cmd(
"Requirements",
"Memory > 1100")
517 self.add_ini_opts(config_parser,
"lalburst_cut")
521 raise ValueError(
"files_per_bucut < 1")
526 pipeline.CondorDAGNode.__init__(self, *args)
529 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
535 pipeline.CondorDAGNode.add_file_arg(self, filename)
536 self.add_output_file(filename)
539 raise NotImplementedError
548 raise NotImplementedError
551 raise NotImplementedError
556 pipeline.CondorDAGJob.__init__(self,
"vanilla",
get_executable(config_parser,
"lalburst_cluster"))
557 self.set_sub_file(
"lalburst_cluster.sub")
558 self.set_stdout_file(os.path.join(
get_out_dir(config_parser),
"lalburst_cluster-$(cluster)-$(process).out"))
559 self.set_stderr_file(os.path.join(
get_out_dir(config_parser),
"lalburst_cluster-$(cluster)-$(process).err"))
560 self.add_condor_cmd(
"getenv",
"True")
562 self.add_condor_cmd(
"Requirements",
"Memory > 1100")
563 self.add_ini_opts(config_parser,
"lalburst_cluster")
569 raise ValueError(
"files_per_bucluster < 1")
574 pipeline.CondorDAGNode.__init__(self, *args)
577 self.
cache_dir = os.path.join(os.getcwd(), self.job().cache_dir)
578 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
581 pipeline.CondorDAGNode.set_name(self, *args)
583 self.add_var_opt(
"input-cache", self.
cache_name)
589 raise NotImplementedError
594 print(str(c), file=f)
595 pipeline.CondorDAGNode.write_input_files(self, *args)
604 raise NotImplementedError
607 raise NotImplementedError
612 pipeline.CondorDAGJob.__init__(self,
"vanilla",
get_executable(config_parser,
"lalburst_injfind"))
613 self.set_sub_file(
"lalburst_injfind.sub")
614 self.set_stdout_file(os.path.join(
get_out_dir(config_parser),
"lalburst_injfind-$(cluster)-$(process).out"))
615 self.set_stderr_file(os.path.join(
get_out_dir(config_parser),
"lalburst_injfind-$(cluster)-$(process).err"))
616 self.add_condor_cmd(
"getenv",
"True")
618 self.add_ini_opts(config_parser,
"lalburst_injfind")
622 raise ValueError(
"files_per_binjfind < 1")
627 pipeline.CondorDAGNode.__init__(self, *args)
630 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
636 pipeline.CondorDAGNode.add_file_arg(self, filename)
637 self.add_output_file(filename)
640 raise NotImplementedError
649 raise NotImplementedError
652 raise NotImplementedError
657 pipeline.CondorDAGJob.__init__(self,
"vanilla",
get_executable(config_parser,
"lalburst_coinc"))
658 self.set_sub_file(
"lalburst_coinc.sub")
659 self.set_stdout_file(os.path.join(
get_out_dir(config_parser),
"lalburst_coinc-$(cluster)-$(process).out"))
660 self.set_stderr_file(os.path.join(
get_out_dir(config_parser),
"lalburst_coinc-$(cluster)-$(process).err"))
661 self.add_condor_cmd(
"getenv",
"True")
663 self.add_condor_cmd(
"Requirements",
"Memory >= $(macrominram)")
664 self.add_ini_opts(config_parser,
"lalburst_coinc")
668 raise ValueError(
"files_per_burca < 1")
673 pipeline.CondorDAGJob.__init__(self,
"vanilla",
get_executable(config_parser,
"lalburst_coinc"))
674 self.set_sub_file(
"lalburst_coinc2.sub")
675 self.set_stdout_file(os.path.join(
get_out_dir(config_parser),
"lalburst_coinc2-$(cluster)-$(process).out"))
676 self.set_stderr_file(os.path.join(
get_out_dir(config_parser),
"lalburst_coinc2-$(cluster)-$(process).err"))
677 self.add_condor_cmd(
"getenv",
"True")
679 self.add_ini_opts(config_parser,
"lalburst_coinc2")
686 pipeline.CondorDAGNode.__init__(self, *args)
689 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
695 pipeline.CondorDAGNode.add_file_arg(self, filename)
696 self.add_output_file(filename)
697 longest_duration =
max(abs(cache_entry.segment)
for cache_entry
in self.
input_cache)
698 if longest_duration > 25000:
700 self.add_macro(
"macrominram", 1300)
701 elif longest_duration > 10000:
703 self.add_macro(
"macrominram", 800)
706 self.add_macro(
"macrominram", 0)
709 raise NotImplementedError
718 raise NotImplementedError
721 raise NotImplementedError
724 self.add_var_arg(
"--coincidence-segments %s" %
",".join(segmentsUtils.to_range_strings(seglist)))
729 pipeline.CondorDAGJob.__init__(self,
"vanilla",
get_executable(config_parser,
"ligolw_sqlite"))
730 self.set_sub_file(
"ligolw_sqlite.sub")
731 self.set_stdout_file(os.path.join(
get_out_dir(config_parser),
"ligolw_sqlite-$(cluster)-$(process).out"))
732 self.set_stderr_file(os.path.join(
get_out_dir(config_parser),
"ligolw_sqlite-$(cluster)-$(process).err"))
733 self.add_condor_cmd(
"getenv",
"True")
735 self.add_ini_opts(config_parser,
"ligolw_sqlite")
740 pipeline.CondorDAGNode.__init__(self, *args)
743 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
747 raise AttributeError(
"cannot change attributes after computing output cache")
751 pipeline.CondorDAGNode.add_file_arg(self, filename)
752 self.add_output_file(filename)
755 raise NotImplementedError
759 raise AttributeError(
"cannot change attributes after computing output cache")
760 self.add_macro(
"macrodatabase", filename)
771 raise NotImplementedError
774 raise NotImplementedError
779 pipeline.CondorDAGJob.__init__(self,
"vanilla",
get_executable(config_parser,
"lalburst_power_meas_likelihood"))
780 self.set_sub_file(
"lalburst_power_meas_likelihood.sub")
781 self.set_stdout_file(os.path.join(
get_out_dir(config_parser),
"lalburst_power_meas_likelihood-$(cluster)-$(process).out"))
782 self.set_stderr_file(os.path.join(
get_out_dir(config_parser),
"lalburst_power_meas_likelihood-$(cluster)-$(process).err"))
783 self.add_condor_cmd(
"getenv",
"True")
785 self.add_ini_opts(config_parser,
"lalburst_power_meas_likelihood")
793 pipeline.CondorDAGNode.__init__(self, *args)
796 self.
cache_dir = os.path.join(os.getcwd(), self.job().cache_dir)
797 self.
output_dir = os.path.join(os.getcwd(), self.job().output_dir)
798 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
801 pipeline.CondorDAGNode.set_name(self, *args)
806 raise AttributeError(
"cannot change attributes after computing output cache")
810 pipeline.CondorDAGNode.add_file_arg(self, filename)
811 self.add_output_file(filename)
814 raise NotImplementedError
818 raise AttributeError(
"cannot change attributes after computing output cache")
820 filename = os.path.join(self.
output_dir,
"%s-%s-%d-%d.xml.gz" % (cache_entry.observatory, cache_entry.description, int(cache_entry.segment[0]), int(abs(cache_entry.segment))))
821 self.add_var_opt(
"output", filename)
822 cache_entry.url =
"file://localhost" + os.path.abspath(filename)
832 raise AttributeError(
"must call set_output(description) first")
837 for arg
in self.get_args():
838 if "--add-from-cache" in arg:
841 print(str(c), file=f)
842 pipeline.CondorDAGNode.write_input_files(self, *args)
846 raise NotImplementedError
849 raise NotImplementedError
879def init_job_types(config_parser, job_types = (
"datafind",
"rm",
"binj",
"power",
"lladd",
"binjfind",
"bucluster",
"bucut",
"burca",
"burca2",
"sqlite",
"burcatailor")):
881 Construct definitions of the submit files.
883 global datafindjob, rmjob, binjjob, powerjob, lladdjob, binjfindjob, buclusterjob, llb2mjob, bucutjob, burcajob, burca2job, sqlitejob, burcatailorjob
886 if "datafind" in job_types:
887 datafindjob = pipeline.LSCDataFindJob(os.path.join(os.getcwd(),
get_cache_dir(config_parser)), os.path.join(os.getcwd(),
get_out_dir(config_parser)), config_parser)
890 if "rm" in job_types:
891 rmjob =
RMJob(config_parser)
894 if "binj" in job_types:
898 if "power" in job_types:
902 if "lladd" in job_types:
903 lladdjob = pipeline.LigolwAddJob(os.path.join(
get_out_dir(config_parser)), config_parser)
907 if "binjfind" in job_types:
911 if "bucut" in job_types:
915 if "bucluster" in job_types:
919 if "burca" in job_types:
923 if "burca2" in job_types:
927 if "sqlite" in job_types:
931 if "burcatailor" in job_types:
946 Return the number of PSDs that can fit into a job of length t
947 seconds. In general, the return value
is a non-integer.
952 t = t * timing_params.resample_rate - 2 * timing_params.filter_corruption
953 if t < timing_params.psd_length:
955 return (t - timing_params.psd_length) / timing_params.psd_shift + 1
960 From the analysis parameters and a count of PSDs, return the length
961 of the job in seconds.
964 raise ValueError(psds)
966 result = (psds - 1) * timing_params.psd_shift + timing_params.psd_length
968 result += 2 * timing_params.filter_corruption
970 return result / timing_params.resample_rate
975 Split the data segment into correctly-overlaping segments. We try
976 to have the numbers of PSDs
in each segment be equal to
977 psds_per_job, but
with a short segment at the end
if needed.
980 joblength = job_length_from_psds(timing_params, psds_per_job)
982 joboverlap = 2 * timing_params.filter_corruption + (timing_params.psd_length - timing_params.psd_shift)
984 joboverlap /= timing_params.resample_rate
986 segs = segments.segmentlist()
988 while t + joblength <= segment[1]:
989 segs.append(segments.segment(t, t + joblength) & segment)
990 t += joblength - joboverlap
1000 Return True if the segment can be analyzed using lalapps_power.
1007 Remove segments from seglistdict that are too short to analyze.
1009 CAUTION: this function modifies seglistdict in place.
1011 for seglist in seglistdict.values():
1012 iterutils.inplace_filter(lambda seg:
segment_ok(timing_params, seg), seglist)
1028 node = pipeline.LSCDataFindNode(datafindjob)
1029 node.set_name(
"ligo_data_find-%s-%d-%d" % (instrument, int(seg[0]), int(abs(seg))))
1030 node.set_start(seg[0] - datafind_pad)
1031 node.set_end(seg[1] + 1)
1035 node._AnalysisNode__ifo = instrument
1036 node.set_observatory(instrument[0])
1037 if node.get_type()
is None:
1038 node.set_type(datafindjob.get_config_file().get(
"datafind",
"type_%s" % instrument))
1044def make_lladd_fragment(dag, parents, tag, segment = None, input_cache = None, remove_input = False, preserve_cache = None, extra_input_cache = None):
1048 for parent
in parents:
1049 node.add_parent(parent)
1052 if input_cache
is None:
1054 for parent
in parents:
1055 node.add_input_cache(parent.get_output_cache())
1058 node.add_input_cache(input_cache)
1059 if extra_input_cache
is not None:
1061 node.add_input_cache(extra_input_cache)
1062 if preserve_cache
is not None:
1063 node.add_preserve_cache(preserve_cache)
1067 [cache_entry] = node.get_output_cache()
1069 segment = cache_entry.segment
1070 node.set_name(
"lladd_%s_%d_%d" % (tag, int(segment[0]), int(abs(segment))))
1071 node.set_output(os.path.join(node.output_dir,
"%s-%s-%d-%d.xml.gz" % (cache_entry.observatory, tag, int(segment[0]), int(abs(segment)))), segment = segment)
1080 node.set_name(
"lalapps_power_%s_%s_%d_%d" % (tag, instrument, int(seg[0]), int(abs(seg))))
1081 map(node.add_parent, parents)
1086 node.set_cache(framecache)
1087 node.set_ifo(instrument)
1088 node.set_start(seg[0])
1089 node.set_end(seg[1])
1090 node.set_user_tag(tag)
1091 for arg, value
in injargs.iteritems():
1093 node.add_var_arg(
"--%s %s" % (arg, value))
1100 start = seg[0] - seg[0] % binjjob.time_step + binjjob.time_step * offset
1103 node.set_time_slide_file(time_slides_cache_entry.path)
1104 node.set_start(start)
1105 node.set_end(seg[1])
1106 if flow
is not None:
1107 node.set_name(
"lalapps_binj_%s_%d_%d" % (tag, int(start), int(flow)))
1109 node.set_name(
"lalapps_binj_%s_%d" % (tag, int(start)))
1110 node.set_user_tag(tag)
1111 if flow
is not None:
1112 node.add_macro(
"macroflow", flow)
1113 if fhigh
is not None:
1114 node.add_macro(
"macrofhigh", fhigh)
1115 node.add_macro(
"macroseed", int(time.time()%100 + start))
1125 node.add_input_cache([cache_entry
for (cache_entry, parent)
in input_cache[:binjfindjob.files_per_binjfind]])
1126 for parent
in set(parent
for cache_entry, parent
in input_cache[:binjfindjob.files_per_binjfind]):
1127 node.add_parent(parent)
1128 del input_cache[:binjfindjob.files_per_binjfind]
1130 node.set_name(
"lalburst_injfind_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1131 node.add_macro(
"macrocomment", tag)
1142 node.add_input_cache([cache_entry
for (cache_entry, parent)
in input_cache[:buclusterjob.files_per_bucluster]])
1143 for parent
in set(parent
for cache_entry, parent
in input_cache[:buclusterjob.files_per_bucluster]):
1144 node.add_parent(parent)
1145 del input_cache[:buclusterjob.files_per_bucluster]
1147 node.set_name(
"lalburst_cluster_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1148 node.add_macro(
"macrocomment", tag)
1160 node.add_input_cache([cache_entry
for (cache_entry, parent)
in input_cache[:bucutjob.files_per_bucut]])
1161 for parent
in set(parent
for cache_entry, parent
in input_cache[:bucutjob.files_per_bucut]):
1162 node.add_parent(parent)
1163 del input_cache[:bucutjob.files_per_bucut]
1165 node.set_name(
"lalburst_cut_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1166 node.add_macro(
"macrocomment", tag)
1174 if coincidence_segments
is not None:
1177 assert len(input_cache) == 1
1181 node.add_input_cache([cache_entry
for (cache_entry, parent)
in input_cache[:burcajob.files_per_burca]])
1182 for parent
in set(parent
for cache_entry, parent
in input_cache[:burcajob.files_per_burca]):
1183 node.add_parent(parent)
1184 del input_cache[:burcajob.files_per_burca]
1186 node.set_name(
"lalburst_coinc_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1187 if coincidence_segments
is not None:
1188 node.set_coincidence_segments(coincidence_segments)
1189 node.add_macro(
"macrocomment", tag)
1198 for cache_entry, parent
in input_cache:
1200 node.add_input_cache([cache_entry])
1201 node.add_parent(parent)
1202 node.set_name(
"ligolw_sqlite_%s_%d" % (tag, len(nodes)))
1203 node.set_output(cache_entry.path.replace(
".xml.gz",
".sqlite"))
1210 input_cache = list(input_cache)
1211 input_cache.sort(reverse =
True)
1213 max_cost_per_job = 25
1217 while input_cache
and cost <= max_cost_per_job:
1218 cache.append(input_cache.pop())
1220 cost += (float(abs(cache[-1].segment)) / 10000.0)**2
1222 node.add_input_cache(cache)
1223 node.set_name(
"lalburst_power_meas_likelihood_%s_%d_%d_%d" % (tag, int(seg[0]), int(abs(seg)), len(nodes)))
1224 node.set_output(
"%s_%d" % (tag, len(nodes)))
1228 node.set_name(
"lalburst_power_meas_likelihood_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1229 for parent
in nodes:
1230 node.add_parent(parent)
1231 node.add_input_cache(parent.get_output_cache())
1232 del node.get_args()[:]
1233 node.add_var_arg(
"--add-from-cache %s" % node.cache_name)
1234 node.set_output(tag)
1236 delete_cache = set(node.get_input_cache()) - set(node.get_output_cache())
1239 rmnode.set_name(
"lalburst_power_meas_likelihood_rm_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
1240 rmnode.add_parent(node)
1241 rmnode.add_input_cache(delete_cache)
1242 dag.add_node(rmnode)
1249 coinc_cache = list(coinc_cache)
1250 coinc_cache.sort(reverse =
True)
1252 likelihood_data_cache_filename = os.path.join(burca2job.cache_dir,
"burca2_%s.cache" % tag)
1253 likelihood_data_cache_file =
file(likelihood_data_cache_filename,
"w")
1254 for cache_entry
in [cache_entry
for node
in likelihood_parents
for cache_entry
in node.get_output_cache()]:
1255 print(str(cache_entry), file=likelihood_data_cache_file)
1258 max_cost_per_job = 10
1262 while coinc_cache
and cost <= max_cost_per_job:
1263 cache.append(coinc_cache.pop())
1265 cost += (float(abs(cache[-1].segment)) / 10000.0)**2
1267 node.set_name(
"lalburst_coinc2_%s_%d" % (tag, len(nodes)))
1268 node.add_macro(
"macrocomment", tag)
1269 node.add_var_arg(
"--likelihood-data-cache %s" % likelihood_data_cache_filename)
1270 node.add_input_cache(cache)
1271 for parent
in likelihood_parents:
1272 node.add_parent(parent)
1289 print(
"building ligo_data_find jobs ...", file=sys.stderr)
1298 filled = seglists.copy().protract(datafind_pad / 2).contract(datafind_pad / 2)
1305 segs = [(seg, instrument)
for instrument, seglist
in filled.iteritems()
for seg
in seglist]
1309 for seg, instrument
in segs:
1311 print(
"making datafind job for %s spanning %s" % (instrument, seg), file=sys.stderr)
1337def make_power_segment_fragment(dag, datafindnodes, instrument, segment, tag, timing_params, psds_per_job, binjnodes = set(), verbose =
False):
1339 Construct a DAG fragment for an entire segment, splitting the
1340 segment into multiple trigger generator jobs.
1345 [framecache] = [node.get_output()
for node
in datafindnodes]
1347 [simfile] = [cache_entry.path
for node
in binjnodes
for cache_entry
in node.get_output_cache()]
1348 injargs = {
"injection-file": simfile}
1351 seglist =
split_segment(timing_params, segment, psds_per_job)
1353 print(
"Segment split: " + str(seglist), file=sys.stderr)
1356 nodes |=
make_power_fragment(dag, datafindnodes | binjnodes, instrument, seg, tag, framecache, injargs = injargs)
1367 for instrument, seglist
in seglistdict.iteritems():
1370 print(
"generating %s fragment %s" % (instrument, str(seg)), file=sys.stderr)
1373 dfnodes = set([node
for node
in datafinds
if (node.get_ifo() == instrument)
and (seg
in segments.segment(node.get_start(), node.get_end()))])
1374 if len(dfnodes) != 1:
1375 raise ValueError(
"error, not exactly 1 datafind is suitable for trigger generator job at %s in %s" % (str(seg), instrument))
1378 nodes +=
make_power_segment_fragment(dag, dfnodes, instrument, seg, tag, timing_params, psds_per_job, binjnodes = binjnodes, verbose = verbose)
1394 if not offset_vectors:
1399 print(
"Grouping jobs for coincidence analysis:", file=sys.stderr)
1406 seglists, bins = cafe.ligolw_cafe([cache_entry
for parent
in parents
for cache_entry
in parent.get_output_cache()], offset_vectors, extentlimit = extentlimit, verbose = verbose)
1413 caches = [frozenset(bin.objects)
for bin
in bins]
1414 assert len(set(caches)) == len(caches)
1415 segs = [
cache_span(bin.objects)
for bin
in bins]
1422 clipsegs = [
None] * len(bins)
1423 if extentlimit
is not None:
1424 extents = [bin.extent
for bin
in bins]
1425 for i, extent
in enumerate(extents):
1429 if i == 0
or extents[i - 1].disjoint(extent):
1430 lo = segments.NegInfinity
1433 if i >= len(extents) - 1
or extents[i + 1].disjoint(extent):
1434 hi = segments.PosInfinity
1437 if lo
is not segments.NegInfinity
or hi
is not segments.PosInfinity:
1438 clipsegs[i] = segments.segment(lo, hi)
1445 print(
"Matching jobs to caches ...", file=sys.stderr)
1447 if verbose
and unused:
1451 print(
"Notice: %d jobs (of %d) produce output that will not be used by a coincidence job" % (unused, len(parents)), file=sys.stderr)
1457 return zip(segs, parent_groups, caches, clipsegs)
static double max(double a, double b)
static double min(double a, double b)
def __init__(self, config_parser)
def get_output_cache(self)
def add_file_arg(self, filename)
def add_input_cache(self, cache)
def __init__(self, *args)
def get_output_files(self)
def get_input_cache(self)
def __init__(self, config_parser)
def get_input_cache(self)
def add_input_cache(self, cache)
def add_file_arg(self, filename)
def get_output_files(self)
def get_output_cache(self)
def __init__(self, *args)
def set_name(self, *args)
def write_input_files(self, *args)
def __init__(self, config_parser)
def add_file_arg(self, filename)
def get_output_cache(self)
def get_input_cache(self)
def get_output_files(self)
def add_input_cache(self, cache)
def __init__(self, *args)
def __init__(self, config_parser)
def __init__(self, config_parser)
def __init__(self, *args)
def set_coincidence_segments(self, seglist)
def get_output_files(self)
def add_file_arg(self, filename)
def get_output_cache(self)
def add_input_cache(self, cache)
def get_input_cache(self)
def __init__(self, config_parser)
def set_name(self, *args)
def set_output(self, description)
def write_input_files(self, *args)
def add_input_cache(self, cache)
def add_file_arg(self, filename)
def get_input_cache(self)
def get_output_files(self)
def __init__(self, *args)
def get_output_cache(self)
A lalapps_binj job used by the power pipeline.
def get_output_files(self)
def set_time_slide_file(self, filename)
def set_start(self, start)
def get_output_cache(self)
Returns a LAL cache of the output file name.
def set_user_tag(self, tag)
def get_time_slide_file(self)
def write_input_files(self, *args)
def set_name(self, *args)
def get_output_cache(self)
def __update_output_cache(self, observatory=None, segment=None)
def set_output(self, path=None, observatory=None, segment=None)
def get_output_files(self)
def add_preserve_cache(self, cache)
def __init__(self, job, remove_input, *args)
def get_input_cache(self)
def add_input_cache(self, cache)
A lalapps_power job used by the power pipeline.
def get_output_files(self)
def set_mdccache(self, file)
Set the LAL frame cache to to use.
def set_user_tag(self, tag)
def set_injection_file(self, file)
Set the name of the XML file from which to read a list of software injections.
def get_output_cache(self)
Returns a LAL cache of the output file name.
def set_ifo(self, instrument)
Load additional options from the per-instrument section in the config file.
def __init__(self, config_parser)
config_parser = ConfigParser object
def add_input_cache(self, cache)
def get_output_cache(self)
def __init__(self, config_parser)
def __init__(self, *args)
def add_input_cache(self, cache)
def get_output_cache(self)
def set_output(self, filename)
def get_input_cache(self)
def add_file_arg(self, filename)
def get_output_files(self)
A class to hold timing parameter values.
def __init__(self, config_parser)
def get_executable(config_parser, name)
def segment_ok(timing_params, segment)
Return True if the segment can be analyzed using lalapps_power.
def get_files_per_burca(config_parser)
def make_burca2_fragment(dag, coinc_cache, likelihood_parents, tag)
def get_universe(config_parser)
def get_out_dir(config_parser)
def make_cache_entry(input_cache, description, path)
def group_coinc_parents(parents, offset_vectors, extentlimit=None, verbose=False)
def make_burca_tailor_fragment(dag, input_cache, seg, tag)
def get_accounting_group(config_parser)
def get_files_per_bucut(config_parser)
def make_datafind_fragment(dag, instrument, seg)
def job_length_from_psds(timing_params, psds)
From the analysis parameters and a count of PSDs, return the length of the job in seconds.
def collect_output_caches(parents)
def split_segment(timing_params, segment, psds_per_job)
Split the data segment into correctly-overlaping segments.
def get_triggers_dir(config_parser)
def make_dir_if_not_exists(dir)
def get_files_per_binjfind(config_parser)
def make_datafind_stage(dag, seglists, verbose=False)
def make_bucluster_fragment(dag, parents, tag, verbose=False)
def get_files_per_bucluster(config_parser)
def make_binj_fragment(dag, seg, time_slides_cache_entry, tag, offset, flow=None, fhigh=None)
def make_binjfind_fragment(dag, parents, tag, verbose=False)
def make_power_segment_fragment(dag, datafindnodes, instrument, segment, tag, timing_params, psds_per_job, binjnodes=set(), verbose=False)
Construct a DAG fragment for an entire segment, splitting the segment into multiple trigger generator...
def get_cache_dir(config_parser)
def make_burca_fragment(dag, parents, tag, coincidence_segments=None, verbose=False)
def match_nodes_to_caches(nodes, caches)
For each cache, get the set of nodes whose output files it contains.
def make_sqlite_fragment(dag, parents, tag, verbose=False)
def remove_too_short_segments(seglistdict, timing_params)
Remove segments from seglistdict that are too short to analyze.
def write_output_cache(nodes, filename)
def init_job_types(config_parser, job_types=("datafind", "rm", "binj", "power", "lladd", "binjfind", "bucluster", "bucut", "burca", "burca2", "sqlite", "burcatailor"))
Construct definitions of the submit files.
def make_power_fragment(dag, parents, instrument, seg, tag, framecache, injargs={})
def make_lladd_fragment(dag, parents, tag, segment=None, input_cache=None, remove_input=False, preserve_cache=None, extra_input_cache=None)
def make_dag_directories(config_parser)
def make_single_instrument_stage(dag, datafinds, seglistdict, tag, timing_params, psds_per_job, binjnodes=set(), verbose=False)
def psds_from_job_length(timing_params, t)
Return the number of PSDs that can fit into a job of length t seconds.
def make_bucut_fragment(dag, parents, tag, verbose=False)