11Classes needed for the cosmic string analysis pipeline.
15from __future__
import print_function
23import igwn_segments
as segments
24from lal
import iterutils
25from lal
import LIGOTimeGPS
26from lal
import pipeline
28from lalburst
import cafe
29from lalburst
import power
32__author__ =
'Xavier Siemens<siemens@gravity.phys.uwm.edu>'
34__version__ =
'$Revision$'
47 return config_parser.getint(
"pipeline",
"files_per_meas_likelihood")
51 return config_parser.getint(
"pipeline",
"files_per_calc_likelihood")
55 return config_parser.getint(
"pipeline",
"files_per_run_sqlite")
69 pipeline.CondorDAGJob.__init__(self,
"vanilla", power.get_executable(config_parser,
"lalapps_string_meas_likelihood"))
70 self.set_sub_file(
"lalapps_string_meas_likelihood.sub")
71 self.set_stdout_file(os.path.join(power.get_out_dir(config_parser),
"lalapps_string_meas_likelihood-$(cluster)-$(process).out"))
72 self.set_stderr_file(os.path.join(power.get_out_dir(config_parser),
"lalapps_string_meas_likelihood-$(cluster)-$(process).err"))
73 self.add_condor_cmd(
"getenv",
"True")
74 self.add_condor_cmd(
"accounting_group", power.get_accounting_group(config_parser))
75 self.add_ini_opts(config_parser,
"lalapps_string_meas_likelihood")
81 raise ValueError(
"files_per_meas_likelihood < 1")
86 pipeline.CondorDAGNode.__init__(self, *args)
90 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
91 self.
cache_dir = os.path.join(os.getcwd(), self.job().cache_dir)
92 self.
output_dir = os.path.join(os.getcwd(), self.job().output_dir)
95 pipeline.CondorDAGNode.set_name(self, *args)
97 self.add_var_opt(
"input-cache", self.
cache_name)
101 raise AttributeError(
"cannot change attributes after computing output cache")
105 raise NotImplementedError
109 raise AttributeError(
"cannot change attributes after computing output cache")
110 cache_entry = power.make_cache_entry(self.
input_cache, description,
"")
111 filename = os.path.join(self.
output_dir,
"%s-STRING_LIKELIHOOD_%s-%d-%d.xml.gz" % (cache_entry.observatory, cache_entry.description, int(cache_entry.segment[0]), int(abs(cache_entry.segment))))
112 cache_entry.url =
"file://localhost" + os.path.abspath(filename)
113 self.add_var_opt(
"output", filename)
123 raise AttributeError(
"must call set_output(description) first")
129 print(str(c), file=f)
130 pipeline.CondorDAGNode.write_input_files(self, *args)
133 raise NotImplementedError
136 raise NotImplementedError
141 pipeline.CondorDAGJob.__init__(self,
"vanilla", power.get_executable(config_parser,
"lalapps_string_calc_likelihood"))
142 self.set_sub_file(
"lalapps_string_calc_likelihood.sub")
143 self.set_stdout_file(os.path.join(power.get_out_dir(config_parser),
"lalapps_string_calc_likelihood-$(cluster)-$(process).out"))
144 self.set_stderr_file(os.path.join(power.get_out_dir(config_parser),
"lalapps_string_calc_likelihood-$(cluster)-$(process).err"))
145 self.add_condor_cmd(
"getenv",
"True")
146 self.add_condor_cmd(
"accounting_group", power.get_accounting_group(config_parser))
147 self.add_ini_opts(config_parser,
"lalapps_string_calc_likelihood")
151 raise ValueError(
"files_per_calc_likelihood < 1")
156 pipeline.CondorDAGNode.__init__(self, *args)
160 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
161 self.
cache_dir = os.path.join(os.getcwd(), self.job().cache_dir)
164 pipeline.CondorDAGNode.set_name(self, *args)
166 self.add_var_opt(
"input-cache", self.
cache_name)
173 self.add_output_file(c.path)
179 raise NotImplementedError
193 print(str(c), file=f)
196 print(str(c), file=f)
197 pipeline.CondorDAGNode.write_input_files(self, *args)
200 raise NotImplementedError
203 raise NotImplementedError
206class StringJob(pipeline.CondorDAGJob, pipeline.AnalysisJob):
208 A lalapps_StringSearch job used by the string pipeline. The static options
209 are read from the section
in the ini file. The
210 stdout
and stderr
from the job are directed to the logs directory. The job
211 runs
in the universe specified
in the ini file. The path to the executable
212 is determined
from the ini file.
216 config_parser = ConfigParser object from which options are read.
218 pipeline.CondorDAGJob.__init__(self, power.get_universe(config_parser), power.get_executable(config_parser, "lalapps_StringSearch"))
219 pipeline.AnalysisJob.__init__(self, config_parser)
220 self.add_ini_opts(config_parser,
"lalapps_StringSearch")
221 self.set_stdout_file(os.path.join(power.get_out_dir(config_parser),
"lalapps_StringSearch-$(cluster)-$(process).out"))
222 self.set_stderr_file(os.path.join(power.get_out_dir(config_parser),
"lalapps_StringSearch-$(cluster)-$(process).err"))
223 self.add_condor_cmd(
"getenv",
"True")
224 self.add_condor_cmd(
"accounting_group", power.get_accounting_group(config_parser))
225 self.set_sub_file(
"lalapps_StringSearch.sub")
231class StringNode(pipeline.CondorDAGNode,pipeline.AnalysisNode):
233 A RingNode runs an instance of the ring code in a Condor DAG.
237 job = A CondorDAGJob that can run an instance of lalapps_StringSearch.
239 pipeline.CondorDAGNode.__init__(self,job)
240 pipeline.AnalysisNode.__init__(self)
241 self.__usertag = job.get_config('pipeline',
'user_tag')
243 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
244 self.
output_dir = os.path.join(os.getcwd(), self.job().output_dir)
248 Load additional options from the per-instrument section
in
252 raise AttributeError(
"cannot change attributes after computing output cache")
253 pipeline.AnalysisNode.set_ifo(self, instrument)
254 for optvalue
in self.job()._AnalysisJob__cp.items(
"lalapps_StringSearch_%s" % instrument):
255 self.add_var_arg(
"--%s %s" % optvalue)
259 raise AttributeError(
"cannot change attributes after computing output cache")
261 self.add_var_opt(
"user-tag", self.
__usertag)
268 Returns a LAL cache of the output file name. Calling this
269 method also induces the output name to get set, so it must
273 self.
output_cache = [CacheEntry(self.get_ifo(), self.
__usertag, segments.segment(LIGOTimeGPS(self.get_start()), LIGOTimeGPS(self.get_end())),
"file://localhost" + os.path.abspath(self.
get_output()))]
277 raise NotImplementedError
281 Returns the file name of output from the ring code. This must be kept
282 synchronized
with the name of the output file
in ring.c.
284 if self._AnalysisNode__output
is None:
285 if None in (self.get_start(), self.get_end(), self.get_ifo(), self.
__usertag):
286 raise ValueError(
"start time, end time, ifo, or user tag has not been set")
287 seg = segments.segment(LIGOTimeGPS(self.get_start()), LIGOTimeGPS(self.get_end()))
288 self.set_output(os.path.join(self.
output_dir,
"%s-STRINGSEARCH_%s-%d-%d.xml.gz" % (self.get_ifo(), self.
__usertag, int(self.get_start()), int(self.get_end()) - int(self.get_start()))))
290 return self._AnalysisNode__output
294 Set the name of the XML file from which to read a list of
297 self.add_var_opt("injection-file", file)
298 self.add_input_file(file)
303 A lalapps_run_sqlite job used by the gstlal pipeline. The static
304 options are read from the [lalapps_run_sqlite] section in the ini
305 file. The stdout and stderr
from the job are directed to the logs
306 directory. The job runs
in the universe specified
in the ini file.
307 The path to the executable
is determined
from the ini file.
309 def __init__(self, config_parser):
311 config_parser = ConfigParser object
313 pipeline.CondorDAGJob.__init__(self, "vanilla", power.get_executable(config_parser,
"lalapps_run_sqlite"))
314 self.add_ini_opts(config_parser,
"lalapps_run_sqlite")
315 self.set_stdout_file(os.path.join(power.get_out_dir(config_parser),
"lalapps_run_sqlite-$(cluster)-$(process).out"))
316 self.set_stderr_file(os.path.join(power.get_out_dir(config_parser),
"lalapps_run_sqlite-$(cluster)-$(process).err"))
317 self.add_condor_cmd(
"getenv",
"True")
318 self.add_condor_cmd(
"accounting_group", power.get_accounting_group(config_parser))
319 self.set_sub_file(
"lalapps_run_sqlite.sub")
322 raise ValueError(
"files_per_run_sqlite < 1")
327 pipeline.CondorDAGNode.__init__(self, *args)
330 self._CondorDAGNode__macros[
"initialdir"] = os.getcwd()
336 pipeline.CondorDAGNode.add_file_arg(self, filename)
337 self.add_output_file(filename)
346 self.add_var_opt(
"sql-file", filename)
372 assert segment_length >= 2 * pad
373 duration = segment_length - 2 * pad
374 extra = (2 * duration + short_segment_duration) % (4 * short_segment_duration)
378 segment_length -= extra
381 assert segment_length >= 0
382 return segment_length
387 Return True if the segment seg
is long enough to be analyzed by
388 lalapps_StringSearch.
390 return float(abs(seg)) - 2 * pad >= min_segment_length
395 Remove segments from the segmentlistdict seglists that are too short to
398 CAUTION: this function modifies seglists
in place.
400 for seglist
in seglists.values():
401 iterutils.inplace_filter(
lambda seg:
segment_ok(seg, min_segment_length, pad), seglist)
406 seglists = seglists.copy()
409 offset_vectors = [offset_vector
for offset_vector
in offset_vectors
if set(offset_vector.keys()).issubset(set(seglists.keys()))]
420 new = cafe.get_coincident_segmentlistdict(seglists, offset_vectors)
425 for seglist
in new.values():
426 for i
in range(len(seglist)):
427 seglist[i] = segments.segment(int(math.floor(seglist[i][0])), int(math.ceil(seglist[i][1])))
449meas_likelihoodjob =
None
450calc_likelihoodjob =
None
454def init_job_types(config_parser, job_types = (
"string",
"meas_likelihoodjob",
"calc_likelihood",
"runsqlite")):
456 Construct definitions of the submit files.
458 global stringjob, meas_likelihoodjob, calc_likelihoodjob, runsqlitejob
461 if "string" in job_types:
465 if "meas_likelihood" in job_types:
469 if "calc_likelihood" in job_types:
473 if "runsqlite" in job_types:
493 node.set_name(
"lalapps_StringSearch_%s_%s_%d_%d" % (tag, instrument, int(seg[0]), int(abs(seg))))
494 map(node.add_parent, parents)
499 node.set_cache(framecache)
500 node.set_ifo(instrument)
501 node.set_start(seg[0])
503 node.set_user_tag(tag)
504 for arg, value
in injargs.items():
506 node.add_var_arg(
"--%s %s" % (arg, value))
516def split_segment(seg, min_segment_length, pad, overlap, short_segment_duration, max_job_length):
518 if min_segment_length + 2 * pad <= overlap:
519 raise ValueError(
"infinite loop: min_segment_length + 2 * pad must be > overlap")
524 seglist = segments.segmentlist()
525 while abs(seg) >= min_segment_length + 2 * pad:
527 if abs(seg) >= max_job_length:
528 seglist.append(segments.segment(seg[0], seg[0] + max_job_length))
530 seglist.append(segments.segment(seg[0], seg[0] +
clip_segment_length(abs(seg), pad, short_segment_duration)))
531 assert abs(seglist[-1]) != 0
533 if abs((int(seglist[-1][0]) - seglist[-1][0]) / seglist[-1][0]) > 1e-14
or abs((int(seglist[-1][1]) - seglist[-1][1]) / seglist[-1][1]) > 1e-14:
534 raise ValueError(
"segment %s does not have integer boundaries" % str(seglist[-1]))
536 seg = segments.segment(seglist[-1][1] - overlap, seg[1])
538 raise ValueError(
"unable to use segment %s" % str(seg))
542def make_string_segment_fragment(dag, datafindnodes, instrument, seg, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes = set(), verbose =
False):
544 Construct a DAG fragment for an entire segment, splitting the
545 segment into multiple trigger generator jobs.
548 binjnodes = set(node
for node
in binjnodes
if power.cache_span(node.get_output_cache()).intersects(seg))
553 [framecache] = [node.get_output()
for node
in datafindnodes]
555 [simfile] = [cache_entry.path
for node
in binjnodes
for cache_entry
in node.get_output_cache()]
556 injargs = {
"injection-file": simfile}
559 seglist =
split_segment(seg, min_segment_length, pad, overlap, short_segment_duration, max_job_length)
561 print(
"Segment split: " + str(seglist), file=sys.stderr)
564 nodes |=
make_string_fragment(dag, datafindnodes | binjnodes, instrument, seg, tag, framecache, injargs = injargs)
573def make_single_instrument_stage(dag, datafinds, seglistdict, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes = set(), verbose =
False):
575 for instrument, seglist
in seglistdict.items():
578 print(
"generating %s fragment %s" % (instrument, str(seg)), file=sys.stderr)
581 dfnodes = set([node
for node
in datafinds
if (node.get_ifo() == instrument)
and (seg
in segments.segment(node.get_start(), node.get_end()))])
582 if len(dfnodes) != 1:
583 raise ValueError(
"error, not exactly 1 datafind is suitable for trigger generator job at %s in %s" % (str(seg), instrument))
586 nodes |=
make_string_segment_fragment(dag, dfnodes, instrument, seg, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes = binjnodes, verbose = verbose)
602 code =
"""DELETE FROM
605 (end_time + 1e-9 * end_time_ns < (SELECT MIN(in_start_time + 1e-9 * in_start_time_ns) FROM search_summary NATURAL JOIN process WHERE program == 'StringSearch'))
607 (start_time + 1e-9 * start_time_ns > (SELECT MAX(in_end_time + 1e-9 * in_end_time_ns) FROM search_summary NATURAL JOIN process WHERE program == 'StringSearch'));
611 print(code, file=
file(filename,
"w"))
617 if files_per_run_sqlite
is None:
618 files_per_run_sqlite = runsqlitejob.files_per_run_sqlite
620 input_cache = power.collect_output_caches(parents)
623 node.set_sql_file(sql_file)
624 node.add_input_cache([cache_entry
for cache_entry, parent
in input_cache[:files_per_run_sqlite]])
625 for parent
in set(parent
for cache_entry, parent
in input_cache[:files_per_run_sqlite]):
626 node.add_parent(parent)
627 del input_cache[:files_per_run_sqlite]
628 seg = power.cache_span(node.get_output_cache())
629 node.set_name(
"lalapps_run_sqlite_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
645 if files_per_meas_likelihood
is None:
646 files_per_meas_likelihood = meas_likelihoodjob.files_per_meas_likelihood
648 input_cache = power.collect_output_caches(parents)
651 node.add_input_cache([cache_entry
for cache_entry, parent
in input_cache[:files_per_meas_likelihood]])
652 for parent
in set(parent
for cache_entry, parent
in input_cache[:files_per_meas_likelihood]):
653 node.add_parent(parent)
654 del input_cache[:files_per_meas_likelihood]
655 seg = power.cache_span(node.get_input_cache())
656 node.set_name(
"lalapps_string_meas_likelihood_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
673 if files_per_calc_likelihood
is None:
674 files_per_calc_likelihood = calc_likelihoodjob.files_per_calc_likelihood
675 input_cache = power.collect_output_caches(parents)
676 likelihood_cache = power.collect_output_caches(likelihood_parents)
680 node.add_input_cache([cache_entry
for cache_entry, parent
in input_cache[:files_per_calc_likelihood]])
681 for parent
in set(parent
for cache_entry, parent
in input_cache[:files_per_calc_likelihood]):
682 node.add_parent(parent)
683 del input_cache[:files_per_calc_likelihood]
684 seg = power.cache_span(node.get_input_cache())
685 node.set_name(
"lalapps_string_calc_likelihood_%s_%d_%d" % (tag, int(seg[0]), int(abs(seg))))
686 for cache_entry, parent
in likelihood_cache:
687 node.add_parent(parent)
688 node.add_likelihood_cache([cache_entry])
files_per_calc_likelihood
def __init__(self, config_parser)
def get_output_cache(self)
def get_output_files(self)
def get_likelihood_cache(self)
def add_likelihood_cache(self, cache)
def add_input_cache(self, cache)
def set_name(self, *args)
def write_input_files(self, *args)
def get_input_cache(self)
def add_file_arg(self, filename)
def __init__(self, *args)
def __init__(self, config_parser)
files_per_meas_likelihood
def get_input_cache(self)
def set_output(self, description)
def add_file_arg(self, filename)
def add_input_cache(self, cache)
def get_output_cache(self)
def get_output_files(self)
def write_input_files(self, *args)
def __init__(self, *args)
def set_name(self, *args)
A lalapps_run_sqlite job used by the gstlal pipeline.
def get_output_cache(self)
def add_input_cache(self, cache)
def set_sql_file(self, filename)
def get_input_cache(self)
def __init__(self, *args)
A lalapps_StringSearch job used by the string pipeline.
def __init__(self, config_parser)
config_parser = ConfigParser object from which options are read.
A RingNode runs an instance of the ring code in a Condor DAG.
def get_output(self)
Returns the file name of output from the ring code.
def set_ifo(self, instrument)
Load additional options from the per-instrument section in the config file.
def get_output_files(self)
def set_user_tag(self, tag)
def get_output_cache(self)
Returns a LAL cache of the output file name.
def set_injection_file(self, file)
Set the name of the XML file from which to read a list of software injections.
def __init__(self, job)
job = A CondorDAGJob that can run an instance of lalapps_StringSearch.
def init_job_types(config_parser, job_types=("string", "meas_likelihoodjob", "calc_likelihood", "runsqlite"))
Construct definitions of the submit files.
def get_files_per_calc_likelihood(config_parser)
def remove_too_short_segments(seglists, min_segment_length, pad)
Remove segments from the segmentlistdict seglists that are too short to analyze.
def make_string_segment_fragment(dag, datafindnodes, instrument, seg, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes=set(), verbose=False)
Construct a DAG fragment for an entire segment, splitting the segment into multiple trigger generator...
def write_clip_segment_sql_file(filename)
def split_segment(seg, min_segment_length, pad, overlap, short_segment_duration, max_job_length)
def clip_segment_length(segment_length, pad, short_segment_duration)
def segment_ok(seg, min_segment_length, pad)
Return True if the segment seg is long enough to be analyzed by lalapps_StringSearch.
def get_files_per_meas_likelihood(config_parser)
def get_files_per_run_sqlite(config_parser)
def make_meas_likelihood_fragment(dag, parents, tag, files_per_meas_likelihood=None)
def make_single_instrument_stage(dag, datafinds, seglistdict, tag, min_segment_length, pad, overlap, short_segment_duration, max_job_length, binjnodes=set(), verbose=False)
def make_run_sqlite_fragment(dag, parents, tag, sql_file, files_per_run_sqlite=None)
def make_calc_likelihood_fragment(dag, parents, likelihood_parents, tag, files_per_calc_likelihood=None, verbose=False)
def make_string_fragment(dag, parents, instrument, seg, tag, framecache, injargs={})
def compute_segment_lists(seglists, offset_vectors, min_segment_length, pad)