4 from lalinference
import lalinference_pipe_utils
as pipe_utils
6 from six.moves
import configparser
7 from optparse
import OptionParser
11 from glue
import pipeline
12 from ligo.lw
import ligolw
13 from ligo.lw
import lsctables
14 from ligo.lw
import utils
as ligolw_utils
16 usage=
""" %prog [options] config.ini
17 Setup a Condor DAG file to run the LALInference pipeline based on
20 The user can specify either an injection file to analyse, with the --inj option,
21 a list of SnglInspiralTable or CoincInspiralTable triggers with the --<x>-triggers options,
22 a GraceDB ID with the --gid option,
23 or an ASCII list of GPS times with the --gps-time-file option.
25 If none of the above options are given, the pipeline will analyse the entire
26 stretch of time between gps-start-time and gps-end-time, specified in the config.ini file.
28 The user must also specify and ini file which will contain the main analysis config.
31 parser=OptionParser(usage)
32 parser.add_option(
"-r",
"--run-path",default=
None,action=
"store",type=
"string",help=
"Directory to run pipeline in (default: $PWD)",metavar=
"RUNDIR")
33 parser.add_option(
"-p",
"--daglog-path",default=
None,action=
"store",type=
"string",help=
"Path to directory to contain DAG log file. SHOULD BE LOCAL TO SUBMIT NODE",metavar=
"LOGDIR")
34 parser.add_option(
"-g",
"--gps-time-file",action=
"store",type=
"string",default=
None,help=
"Text file containing list of GPS times to analyse",metavar=
"TIMES.txt")
35 parser.add_option(
"--gid",action=
"store",type=
"string",default=
None,help=
"GraceDB ID")
36 parser.add_option(
"--coinc",action=
"store",type=
"string",default=
None,help=
"Path to coinc.xml from GraceDB")
37 parser.add_option(
"--psd",action=
"store",type=
"string",default=
None,help=
"Path to psd.xml.gz from GraceDB")
38 parser.add_option(
"--service-url",action=
"store",type=
"string",default=
None,help=
"GraceDB url from which xml files are downloaded")
39 parser.add_option(
"-I",
"--injections",action=
"store",type=
"string",default=
None,help=
"List of injections to perform and analyse",metavar=
"INJFILE.xml")
40 parser.add_option(
"-B",
"--burst_injections",action=
"store",type=
"string",default=
None,help=
"SimBurst table for LIB injections",metavar=
"INJFILE.xml")
41 parser.add_option(
"-P",
"--pipedown-db",action=
"store",type=
"string",default=
None,help=
"Pipedown database to read and analyse",metavar=
"pipedown.sqlite")
42 parser.add_option(
"--condor-submit",action=
"store_true",default=
False,help=
"Automatically submit the condor dag")
44 (opts,args)=parser.parse_args()
54 Helper function. Make the given directory, creating intermediate
55 dirs if necessary, and don't complain about it already existing.
57 if os.access(path,os.W_OK)
and os.path.isdir(path):
return
58 else: os.makedirs(path)
60 def add_variations(cp, section, option, values=None, allowed_values=None):
62 Push some possible variations onto the stack.
63 If only one value is specified then just store it in cp as usual
64 cp : ConfigParser object to look in
65 section : [section] in cp
66 option : option in section
67 values: If given, use instead of cp's
68 allowed_values : if given, anything else will trigger an error
70 if not cp.has_section(section)
and not cp.has_option(section,option):
74 labels = cp.get(
'resultspage',
'label').split(
',')
77 if values
is not None:
80 vals = cp.get(section,option).split(
',')
81 badvals = [v
for v
in vals
if v
not in (allowed_values
or [])]
82 if allowed_values
is not None and badvals:
83 raise ValueError(
"Unknown value for {section} . {option} {value}"
84 .format(section, option, \
89 if labels
is not None:
90 if len(labels) != len(vals):
92 "The number of labels does not match the "
93 "number of {}'s given".format(option))
94 return {(section,option): [[i,j]
for i,j
in zip(vals, labels)]}
95 return {(section,option): vals}
97 cp.set(section, option, vals[0])
98 if labels
is not None:
99 cp.set(
'resultspage',
'label', labels[0])
102 print((
"Found no variations of [{section}] {option}".format(section=section,
107 """Check that the priors are compatible with the fixed parameters
111 cp: configparser.ConfigParser
112 an opened config parser object
115 key.split(
"fix-")[1]: float(item)
for key, item
in cp.items(
"engine")
if
119 key.split(
"-min")[0]: float(item)
for key, item
in cp.items(
"engine")
if
123 key.split(
"-max")[0]: float(item)
for key, item
in cp.items(
"engine")
if
127 condition =
"comp" in p_min.keys()
or "comp" in p_max.keys()
128 condition2 =
"chirpmass" in fixed.keys()
and "q" or "eta" in fixed.keys()
130 if condition
and condition2:
132 mchirp = float(fixed[
"chirpmass"])
133 if "q" in fixed.keys():
134 q = float(fixed[
"q"])
136 m1 = (q**(2./5.))*((1.0 + q)**(1./5.))*mchirp
137 m2 = (q**(-3./5.))*((1.0 + q)**(1./5.))*mchirp
139 if "eta" in fixed.keys():
140 eta = float(fixed[
"eta"])
142 mtotal = mchirp / (eta**(3./5.))
143 m1 = 0.5 * mtotal * (1.0 + (1.0 - 4.0 * eta)**0.5)
144 m2 = 0.5 * mtotal * (1.0 - (1.0 - 4.0 * eta)**0.5)
146 fixed[
"mass1"] =
max(m1, m2)
147 fixed[
"mass2"] = min(m1, m2)
148 p_min[
"mass1"] = p_min[
"comp"]
149 p_min[
"mass2"] = p_min[
"comp"]
150 p_max[
"mass1"] = p_max[
"comp"]
151 p_max[
"mass2"] = p_max[
"comp"]
153 condition =
lambda i: i
in p_min.keys()
and p_min[i] > fixed[i]
or \
154 i
in p_max.keys()
and p_max[i] < fixed[i]
156 for i
in fixed.keys():
160 "The fixed parameter %s lies outside of the bound for the prior. "
161 "Either change your prior bounds or change your fixed parameter"
167 Generate config parser objects for each of the variations
169 cur_basedir=master_cp.get(
'paths',
'basedir')
172 masterpath=os.path.join(cur_basedir,
'config.ini')
173 with open(masterpath,
'w')
as cpfile:
174 master_cp.write(cpfile)
176 cur_webdir=master_cp.get(
'paths',
'webdir')
177 cur_dagdir=master_cp.get(
'paths',
'daglogdir')
184 (section, opt), vals = variations.popitem()
187 cp = configparser.ConfigParser()
190 if type(val) == list:
191 cp.set(section,opt,val[0])
192 cp.set(
'resultspage',
'label',val[1])
195 cp.set(section,opt,val)
197 cp.set(
'paths',
'basedir',os.path.join(cur_basedir, \
198 '{val}'.format(opt=opt,val=val)))
199 cp.set(
'paths',
'webdir',os.path.join(cur_webdir, \
200 '{val}'.format(opt=opt,val=val)))
201 cp.set(
'paths',
'daglogdir',os.path.join(cur_dagdir, \
202 '{val}'.format(opt=opt,val=val)))
209 print(
'Error: must specify one ini file')
214 cp=configparser.SafeConfigParser()
221 if opts.run_path
is not None:
222 cp.set(
'paths',
'basedir',os.path.abspath(opts.run_path))
223 if cp.get(
'paths',
'basedir')
is None:
224 print(
"Warning: no run dir set, using current dir")
225 cp.set(
'paths',
'basedir',os.path.getcwd())
226 mkdirs(cp.get(
'paths',
'basedir'))
228 if opts.daglog_path
is not None:
229 cp.set(
'paths',
'daglogdir',os.path.abspath(opts.daglog_path))
231 cp.set(
'paths',
'daglogdir',os.path.abspath(cp.get(
'paths',
'basedir')))
232 daglogdir=cp.get(
'paths',
'daglogdir')
237 if opts.gps_time_file
is not None:
238 cp.set(
'input',
'gps-time-file',os.path.abspath(opts.gps_time_file))
240 if opts.injections
is not None:
241 cp.set(
'input',
'injection-file',os.path.abspath(opts.injections))
243 if opts.burst_injections
is not None:
244 if opts.injections
is not None:
245 print(
"ERROR: cannot pass both inspiral and burst tables for injection\n")
247 cp.set(
'input',
'burst-injection-file',os.path.abspath(opts.burst_injections))
249 if opts.gid
is not None:
250 cp.set(
'input',
'gid',opts.gid)
252 if opts.coinc
is not None:
253 cp.set(
'input',
'coinc-xml', os.path.abspath(opts.coinc))
255 if opts.psd
is not None:
256 cp.set(
'input',
'psd-xml-gz', os.path.abspath(opts.psd))
258 if opts.service_url
is not None:
259 cp.set(
'analysis',
'service-url',opts.service_url)
261 if opts.pipedown_db
is not None:
262 cp.set(
'input',
'pipedown-db',os.path.abspath(opts.pipedown_db))
266 if not (cp.has_option(
'engine',
'approx')
or cp.has_option(
'engine',
'approximant') ):
267 print(
"Error: was expecting an 'approx' filed in the [engine] section\n")
282 Generates cp objects with the different ROQs applied
285 if cp.has_option(
'paths',
'roq_b_matrix_directory')
or cp.has_option(
'paths',
'computeroqweights'):
286 if not cp.has_option(
'analysis',
'roq'):
287 print(
"Warning: If you are attempting to enable ROQ by specifying roq_b_matrix_directory or computeroqweights,\n\
288 please use analysis.roq in your config file in future. Enabling ROQ.")
289 cp.set(
'analysis',
'roq',
True)
290 if not cp.getboolean(
'analysis',
'roq'):
293 path=cp.get(
'paths',
'roq_b_matrix_directory')
294 if not os.path.isdir(path):
295 print(
"The ROQ directory %s does not seem to exist\n"%path)
298 roq_paths=os.listdir(path)
300 roq_force_flow =
None
302 if cp.has_option(
'lalinference',
'roq_force_flow'):
303 roq_force_flow = cp.getfloat(
'lalinference',
'roq_force_flow')
304 print(
"WARNING: Forcing the f_low to ",
str(roq_force_flow),
"Hz")
305 print(
"WARNING: Overwriting user choice of flow, srate, seglen, and (mc_min, mc_max and q-min) or (mass1_min, mass1_max, mass2_min, mass2_max)")
308 return float(item[1][
'seglen'])
314 if opts.gid
is not None:
315 from ligo.gracedb.rest
import GraceDb
318 if cp.has_option(
'analysis',
'service-url'):
319 client = GraceDb(cp.get(
'analysis',
'service-url'))
322 coinc_xml_obj = ligolw_utils.load_fileobj(client.files(gid,
"coinc.xml"), contenthandler = lsctables.use_in(ligolw.LIGOLWContentHandler))[0]
323 elif cp.has_option(
'input',
'coinc-xml'):
324 coinc_xml_obj = ligolw_utils.load_fileobj(open(cp.get(
'input',
'coinc-xml'),
"rb"), contenthandler = lsctables.use_in(ligolw.LIGOLWContentHandler))[0]
327 if cp.has_option(
'input',
'injection-file'):
328 print(
"Only 0-th event in the XML table will be considered while running with ROQ\n")
329 row = lsctables.SimInspiralTable.get_table(
330 ligolw_utils.load_filename(cp.get(
'input',
'injection-file'),contenthandler=lsctables.use_in(ligolw.LIGOLWContentHandler))
333 roq_bounds = pipe_utils.Query_ROQ_Bounds_Type(path, roq_paths)
334 if roq_bounds ==
'chirp_mass_q':
335 print(
'ROQ has bounds in chirp mass and mass-ratio')
336 mc_priors, trigger_mchirp = pipe_utils.get_roq_mchirp_priors(
337 path, roq_paths, roq_params, key, coinc_xml_obj=coinc_xml_obj, sim_inspiral=row
339 elif roq_bounds ==
'component_mass':
340 print(
'ROQ has bounds in component masses')
343 m1_priors, m2_priors, trigger_mchirp = pipe_utils.get_roq_component_mass_priors(
344 path, roq_paths, roq_params, key, coinc_xml_obj=coinc_xml_obj, sim_inspiral=row
347 for (roq,m1_prior), (roq2,m2_prior)
in zip(m1_priors.items(), m2_priors.items()):
348 mc_priors[roq] = sorted([pipe_utils.mchirp_from_components(m1_prior[1], m2_prior[0]), pipe_utils.mchirp_from_components(m1_prior[0], m2_prior[1])])
350 if cp.has_option(
'lalinference',
'trigger_mchirp'):
351 trigger_mchirp=float(cp.get(
'lalinference',
'trigger_mchirp'))
352 roq_mass_freq_scale_factor = pipe_utils.get_roq_mass_freq_scale_factor(mc_priors, trigger_mchirp, roq_force_flow)
353 if roq_mass_freq_scale_factor != 1.:
354 print(
'WARNING: Rescaling ROQ basis, please ensure it is allowed with the model used.')
357 if opts.gid
is not None or (opts.injections
is not None or cp.has_option(
'input',
'injection-file'))
or cp.has_option(
'lalinference',
'trigger_mchirp')
or cp.has_option(
'input',
'coinc-xml'):
359 for mc_prior
in mc_priors:
360 mc_priors[mc_prior] = np.array(mc_priors[mc_prior])
362 candidate_roq_paths = \
363 [roq
for roq
in roq_paths
364 if mc_priors[roq][0]*roq_mass_freq_scale_factor
365 <= trigger_mchirp <= mc_priors[roq][1]*roq_mass_freq_scale_factor]
366 if candidate_roq_paths:
368 [min(mc_priors[roq][1]*roq_mass_freq_scale_factor-trigger_mchirp,
369 trigger_mchirp-mc_priors[roq][0]*roq_mass_freq_scale_factor)
370 for roq
in candidate_roq_paths])
371 best_path = candidate_roq_paths[np.argmax(margins)]
372 roq_paths = [best_path]
373 print(
'Prior in Mchirp will be [{}, {}] to contain the trigger Mchirp {}.'.format(
374 mc_priors[best_path][0]*roq_mass_freq_scale_factor,
375 mc_priors[best_path][1]*roq_mass_freq_scale_factor,
378 print(
"No roq mass bins containing the trigger")
381 for mc_prior
in mc_priors:
382 mc_priors[mc_prior] = np.array(mc_priors[mc_prior])*roq_mass_freq_scale_factor
385 cur_basedir = cp.get(
'paths',
'basedir')
386 masterpath=os.path.join(cur_basedir,
'config.ini')
387 with open(masterpath,
'w')
as cpfile:
390 for roq
in roq_paths:
391 this_cp = configparser.ConfigParser()
392 this_cp.optionxform = str
393 this_cp.read(masterpath)
394 basedir = this_cp.get(
'paths',
'basedir')
395 for dirs
in 'basedir',
'daglogdir',
'webdir':
396 val = this_cp.get(
'paths',dirs)
397 newval = os.path.join(val,roq)
399 this_cp.set(
'paths',dirs,newval)
400 this_cp.set(
'paths',
'roq_b_matrix_directory',os.path.join(cp.get(
'paths',
'roq_b_matrix_directory'),roq))
401 flow=roq_params[roq][
'flow'] / roq_mass_freq_scale_factor
402 srate=2.*roq_params[roq][
'fhigh'] / roq_mass_freq_scale_factor
406 seglen=roq_params[roq][
'seglen'] * roq_mass_freq_scale_factor
408 this_cp.set(
'engine',
'srate',
str(srate))
409 this_cp.set(
'engine',
'seglen',
str(seglen))
410 if this_cp.has_option(
'lalinference',
'flow'):
411 tmp=this_cp.get(
'lalinference',
'flow')
416 ifos=eval(this_cp.get(
'analysis',
'ifos'))
419 this_cp.set(
'lalinference',
'flow',
str(tmp))
420 if roq_bounds ==
'chirp_mass_q':
421 mc_min=mc_priors[roq][0]*roq_mass_freq_scale_factor
422 mc_max=mc_priors[roq][1]*roq_mass_freq_scale_factor
424 q_min=1./float(roq_params[roq][
'qmax'])
425 this_cp.set(
'engine',
'chirpmass-min',
str(mc_min))
426 this_cp.set(
'engine',
'chirpmass-max',
str(mc_max))
427 this_cp.set(
'engine',
'q-min',
str(q_min))
428 this_cp.set(
'engine',
'comp-min',
str(
max(roq_params[roq][
'compmin'] * roq_mass_freq_scale_factor, mc_min * pow(1+q_min, 1./5.) * pow(q_min, 2./5.))))
429 this_cp.set(
'engine',
'comp-max',
str(mc_max * pow(1+q_min, 1./5.) * pow(q_min, -3./5.)))
430 elif roq_bounds ==
'component_mass':
431 m1_min = m1_priors[roq][0]
432 m1_max = m1_priors[roq][1]
433 m2_min = m2_priors[roq][0]
434 m2_max = m2_priors[roq][1]
435 this_cp.set(
'engine',
'mass1-min',
str(m1_min))
436 this_cp.set(
'engine',
'mass1-max',
str(m1_max))
437 this_cp.set(
'engine',
'mass2-min',
str(m2_min))
438 this_cp.set(
'engine',
'mass2-max',
str(m2_max))
443 outerdaglog=os.path.join(daglogdir,
'lalinference_multi_'+
str(uuid.uuid1())+
'.log')
444 outerdag=pipeline.CondorDAG(outerdaglog)
445 outerdag.set_dag_file(os.path.join(cp.get(
'paths',
'basedir'),
'multidag'))
451 basepath=cp.get(
'paths',
'basedir')
453 if cp.has_option(
'input',
'injection-file'):
454 injpath=cp.get(
'input',
'injection-file')
455 myinjpath=os.path.join(basepath,os.path.basename(injpath))
456 if os.path.abspath(myinjpath) != os.path.abspath(injpath):
459 if os.path.lexists(myinjpath):
462 if os.path.islink(myinjpath)
and os.path.realpath(myinjpath)==os.path.realpath(injpath):
463 cp.set(
'input',
'injection-file',myinjpath)
466 print((
"Error: File {0} exists in run directory, not over-writing with \
467 {1}. Remove the existing file or create a fresh run directory".format(myinjpath,injpath)
473 os.link(os.path.abspath(injpath), myinjpath)
475 from shutil
import copyfile
476 copyfile(injpath,myinjpath)
477 cp.set(
'input',
'injection-file',myinjpath)
481 dag=pipe_utils.LALInferencePipelineDAG(this_cp)
482 dagjob=pipeline.CondorDAGManJob(os.path.join(this_cp.get(
'paths',
'basedir'),dag.get_dag_file()),
483 this_cp.get(
'paths',
'basedir'))
484 dagnode=pipeline.CondorDAGManNode(dagjob)
485 outerdag.add_node(dagnode)
486 dag.write_sub_files()
490 outerdag.write_sub_files()
492 outerdag.write_script()
495 print(
'Successfully created DAG file.')
497 if opts.condor_submit:
499 if cp.has_option(
'condor',
'notification'):
500 x = subprocess.Popen([
'condor_submit_dag',
'-dont_suppress_notification',outerdag.get_dag_file()])
502 x = subprocess.Popen([
'condor_submit_dag',outerdag.get_dag_file()])
505 print(
'Submitted DAG file')
507 print(
'Unable to submit DAG file')
509 print(
'To submit, run:\n\tcondor_submit_dag {0}'.format(outerdag.get_dag_file()))
def add_variations(cp, section, option, values=None, allowed_values=None)
Push some possible variations onto the stack.
def generate_variations(master_cp, variations)
Generate config parser objects for each of the variations.
def check_priors_are_compatible(cp)
Check that the priors are compatible with the fixed parameters.
def mkdirs(path)
Helper function.
def setup_roq(cp)
Generates cp objects with the different ROQs applied.