Loading [MathJax]/extensions/TeX/AMSsymbols.js
LALInference 4.1.9.1-5e288d3
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
lalinference_pipe.py
Go to the documentation of this file.
1##python
2# DAG generation code for running LALInference pipeline
3# (C) 2012 John Veitch, Vivien Raymond
4
5from lalinference import lalinference_pipe_utils as pipe_utils
6import numpy as np
7import configparser
8from optparse import OptionParser
9import sys
10import os
11import uuid
12from lal import pipeline
13from igwn_ligolw import lsctables
14from igwn_ligolw import utils as ligolw_utils
15
16usage=""" %prog [options] config.ini
17Setup a Condor DAG file to run the LALInference pipeline based on
18the config.ini file.
19
20The user can specify either an injection file to analyse, with the --inj option,
21a list of SnglInspiralTable or CoincInspiralTable triggers with the --<x>-triggers options,
22a GraceDB ID with the --gid option,
23or an ASCII list of GPS times with the --gps-time-file option.
24
25If none of the above options are given, the pipeline will analyse the entire
26stretch of time between gps-start-time and gps-end-time, specified in the config.ini file.
27
28The user must also specify and ini file which will contain the main analysis config.
29
30"""
31parser=OptionParser(usage)
32parser.add_option("-r","--run-path",default=None,action="store",type="string",help="Directory to run pipeline in (default: $PWD)",metavar="RUNDIR")
33parser.add_option("-p","--daglog-path",default=None,action="store",type="string",help="Path to directory to contain DAG log file. SHOULD BE LOCAL TO SUBMIT NODE",metavar="LOGDIR")
34parser.add_option("-g","--gps-time-file",action="store",type="string",default=None,help="Text file containing list of GPS times to analyse",metavar="TIMES.txt")
35parser.add_option("--gid",action="store",type="string",default=None,help="GraceDB ID")
36parser.add_option("--coinc",action="store",type="string",default=None,help="Path to coinc.xml from GraceDB")
37parser.add_option("--psd",action="store",type="string",default=None,help="Path to psd.xml.gz from GraceDB")
38parser.add_option("--service-url",action="store",type="string",default=None,help="GraceDB url from which xml files are downloaded")
39parser.add_option("-I","--injections",action="store",type="string",default=None,help="List of injections to perform and analyse",metavar="INJFILE.xml")
40parser.add_option("-B","--burst_injections",action="store",type="string",default=None,help="SimBurst table for LIB injections",metavar="INJFILE.xml")
41parser.add_option("-P","--pipedown-db",action="store",type="string",default=None,help="Pipedown database to read and analyse",metavar="pipedown.sqlite")
42parser.add_option("--condor-submit",action="store_true",default=False,help="Automatically submit the condor dag")
43
44(opts,args)=parser.parse_args()
45
46# We will store variations of runs to use here as a dictionary of
47# ((section, option), ...) : [variation1, ...]
48# Where (section, option) indicate the configparser option to vary
49# and multiple section,option pairs are allowed
50variations={}
51
52def mkdirs(path):
53 """
54 Helper function. Make the given directory, creating intermediate
55 dirs if necessary, and don't complain about it already existing.
56 """
57 if os.access(path,os.W_OK) and os.path.isdir(path): return
58 else: os.makedirs(path)
59
60def add_variations(cp, section, option, values=None, allowed_values=None):
61 """
62 Push some possible variations onto the stack.
63 If only one value is specified then just store it in cp as usual
64 cp : ConfigParser object to look in
65 section : [section] in cp
66 option : option in section
67 values: If given, use instead of cp's
68 allowed_values : if given, anything else will trigger an error
69 """
70 if not cp.has_section(section) and not cp.has_option(section,option):
71 return
72
73 try:
74 labels = cp.get('resultspage','label').split(',')
75 except:
76 labels = None
77 if values is not None:
78 vals = values
79 else:
80 vals = cp.get(section,option).split(',')
81 badvals = [v for v in vals if v not in (allowed_values or [])]
82 if allowed_values is not None and badvals:
83 raise ValueError("Unknown value for {section} . {option} {value}"
84 .format(section, option, \
85 ' '.join(badvals)
86 )
87 )
88 if len(vals) >1:
89 if labels is not None:
90 if len(labels) != len(vals):
91 raise ValueError(
92 "The number of labels does not match the "
93 "number of {}'s given".format(option))
94 return {(section,option): [[i,j] for i,j in zip(vals, labels)]}
95 return {(section,option): vals}
96 elif len(vals)==1:
97 cp.set(section, option, vals[0])
98 if labels is not None:
99 cp.set('resultspage', 'label', labels[0])
100 return {}
101 else:
102 print(("Found no variations of [{section}] {option}".format(section=section,
103 option=option)))
104 return {}
105
107 """Check that the priors are compatible with the fixed parameters
108
109 Parameters
110 ----------
111 cp: configparser.ConfigParser
112 an opened config parser object
113 """
114 fixed = {
115 key.split("fix-")[1]: float(item) for key, item in cp.items("engine") if
116 "fix-" in key
117 }
118 p_min = {
119 key.split("-min")[0]: float(item) for key, item in cp.items("engine") if
120 "-min" in key
121 }
122 p_max = {
123 key.split("-max")[0]: float(item) for key, item in cp.items("engine") if
124 "-max" in key
125 }
126
127 condition = "comp" in p_min.keys() or "comp" in p_max.keys()
128 condition2 = "chirpmass" in fixed.keys() and "q" or "eta" in fixed.keys()
129
130 if condition and condition2:
131
132 mchirp = float(fixed["chirpmass"])
133 if "q" in fixed.keys():
134 q = float(fixed["q"])
135
136 m1 = (q**(2./5.))*((1.0 + q)**(1./5.))*mchirp
137 m2 = (q**(-3./5.))*((1.0 + q)**(1./5.))*mchirp
138
139 if "eta" in fixed.keys():
140 eta = float(fixed["eta"])
141
142 mtotal = mchirp / (eta**(3./5.))
143 m1 = 0.5 * mtotal * (1.0 + (1.0 - 4.0 * eta)**0.5)
144 m2 = 0.5 * mtotal * (1.0 - (1.0 - 4.0 * eta)**0.5)
145
146 fixed["mass1"] = max(m1, m2)
147 fixed["mass2"] = min(m1, m2)
148 p_min["mass1"] = p_min["comp"]
149 p_min["mass2"] = p_min["comp"]
150 p_max["mass1"] = p_max["comp"]
151 p_max["mass2"] = p_max["comp"]
152
153 condition = lambda i: i in p_min.keys() and p_min[i] > fixed[i] or \
154 i in p_max.keys() and p_max[i] < fixed[i]
155
156 for i in fixed.keys():
157
158 if condition(i):
159 raise Exception(
160 "The fixed parameter %s lies outside of the bound for the prior. "
161 "Either change your prior bounds or change your fixed parameter"
162 % (i)
163 )
164
165def generate_variations(master_cp, variations):
166 """
167 Generate config parser objects for each of the variations
168 """
169 cur_basedir=master_cp.get('paths','basedir')
170 mkdirs(cur_basedir)
171 # save the master file
172 masterpath=os.path.join(cur_basedir,'config.ini')
173 with open(masterpath,'w') as cpfile:
174 master_cp.write(cpfile)
175
176 cur_webdir=master_cp.get('paths','webdir')
177 cur_dagdir=master_cp.get('paths','daglogdir')
178 # If no variations, done
179 if not variations:
180 yield master_cp
181 return
182
183 # Otherwise, vary over the next option
184 (section, opt), vals = variations.popitem()
185 for val in vals:
186 # Read file back in to get a new object
187 cp = configparser.ConfigParser()
188 cp.optionxform = str
189 cp.read(masterpath)
190 if type(val) == list:
191 cp.set(section,opt,val[0])
192 cp.set('resultspage','label',val[1])
193 val = val[0]
194 else:
195 cp.set(section,opt,val)
196 # Append to the paths
197 cp.set('paths','basedir',os.path.join(cur_basedir, \
198 '{val}'.format(opt=opt,val=val)))
199 cp.set('paths','webdir',os.path.join(cur_webdir, \
200 '{val}'.format(opt=opt,val=val)))
201 cp.set('paths','daglogdir',os.path.join(cur_dagdir, \
202 '{val}'.format(opt=opt,val=val)))
203 # recurse into remaining options
204 for sub_cp in generate_variations(cp,variations):
205 yield sub_cp
206
207if len(args)!=1:
208 parser.print_help()
209 print('Error: must specify one ini file')
210 sys.exit(1)
211
212inifile=args[0]
213
214cp=configparser.SafeConfigParser()
215fp=open(inifile)
216cp.optionxform = str
217try:
218 cp.read_file(fp)
219except AttributeError:
220 cp.readfp(fp)
221fp.close()
222
223# Set the base directory for the run
224if opts.run_path is not None:
225 cp.set('paths','basedir',os.path.abspath(opts.run_path))
226if cp.get('paths','basedir') is None:
227 print("Warning: no run dir set, using current dir")
228 cp.set('paths','basedir',os.path.getcwd())
229mkdirs(cp.get('paths','basedir'))
230
231if opts.daglog_path is not None:
232 cp.set('paths','daglogdir',os.path.abspath(opts.daglog_path))
233else:
234 cp.set('paths','daglogdir',os.path.abspath(cp.get('paths','basedir')))
235daglogdir=cp.get('paths','daglogdir')
236mkdirs(daglogdir)
237
238
239# Set up from all the various input options
240if opts.gps_time_file is not None:
241 cp.set('input','gps-time-file',os.path.abspath(opts.gps_time_file))
242
243if opts.injections is not None:
244 cp.set('input','injection-file',os.path.abspath(opts.injections))
245
246if opts.burst_injections is not None:
247 if opts.injections is not None:
248 print("ERROR: cannot pass both inspiral and burst tables for injection\n")
249 sys.exit(1)
250 cp.set('input','burst-injection-file',os.path.abspath(opts.burst_injections))
251
252if opts.gid is not None:
253 cp.set('input','gid',opts.gid)
254
255if opts.coinc is not None:
256 cp.set('input', 'coinc-xml', os.path.abspath(opts.coinc))
257
258if opts.psd is not None:
259 cp.set('input', 'psd-xml-gz', os.path.abspath(opts.psd))
260
261if opts.service_url is not None:
262 cp.set('analysis','service-url',opts.service_url)
263
264if opts.pipedown_db is not None:
265 cp.set('input','pipedown-db',os.path.abspath(opts.pipedown_db))
266
267# Some sanity checking
268approx='approx'
269if not (cp.has_option('engine','approx') or cp.has_option('engine','approximant') ):
270 print("Error: was expecting an 'approx' filed in the [engine] section\n")
271 sys.exit(1)
272
273# Check the priors are compatible with fixed parameters as part of the sanity
274# checking
276
277# Build a list of allowed variations
278variations.update(add_variations(cp, 'engine','approx'))
279variations.update(add_variations(cp, 'analysis', 'engine'))
280variations.update(add_variations(cp, 'analysis', 'roq'))
281
282roq_paths=[]
283def setup_roq(cp):
284 """
285 Generates cp objects with the different ROQs applied
286 """
287 use_roq=False
288 if cp.has_option('paths','roq_b_matrix_directory') or cp.has_option('paths','computeroqweights'):
289 if not cp.has_option('analysis','roq'):
290 print("Warning: If you are attempting to enable ROQ by specifying roq_b_matrix_directory or computeroqweights,\n\
291 please use analysis.roq in your config file in future. Enabling ROQ.")
292 cp.set('analysis','roq',True)
293 if not cp.getboolean('analysis','roq'):
294 yield cp
295 return
296 path=cp.get('paths','roq_b_matrix_directory')
297 if not os.path.isdir(path):
298 print("The ROQ directory %s does not seem to exist\n"%path)
299 sys.exit(1)
300 use_roq=True
301 roq_paths=os.listdir(path)
302 roq_params={}
303 roq_force_flow = None
304
305 if cp.has_option('lalinference','roq_force_flow'):
306 roq_force_flow = cp.getfloat('lalinference','roq_force_flow')
307 print("WARNING: Forcing the f_low to ", str(roq_force_flow), "Hz")
308 print("WARNING: Overwriting user choice of flow, srate, seglen, and (mc_min, mc_max and q-min) or (mass1_min, mass1_max, mass2_min, mass2_max)")
309
310 def key(item): # to order the ROQ bases
311 return float(item[1]['seglen'])
312
313 coinc_xml_obj = None
314 row=None
315
316 # Get file object of coinc.xml
317 if opts.gid is not None:
318 from ligo.gracedb.rest import GraceDb
319 gid=opts.gid
320 cwd=os.getcwd()
321 if cp.has_option('analysis', 'service-url'):
322 client = GraceDb(cp.get('analysis', 'service-url'))
323 else:
324 client = GraceDb()
325 coinc_xml_obj = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"))[0]
326 elif cp.has_option('input', 'coinc-xml'):
327 coinc_xml_obj = ligolw_utils.load_fileobj(open(cp.get('input', 'coinc-xml'), "rb"))[0]
328
329 # Get sim_inspiral from injection file
330 if cp.has_option('input','injection-file'):
331 print("Only 0-th event in the XML table will be considered while running with ROQ\n")
332 row = lsctables.SimInspiralTable.get_table(
333 ligolw_utils.load_filename(cp.get('input','injection-file'))
334 )[0]
335
336 roq_bounds = pipe_utils.Query_ROQ_Bounds_Type(path, roq_paths)
337 if roq_bounds == 'chirp_mass_q':
338 print('ROQ has bounds in chirp mass and mass-ratio')
339 mc_priors, trigger_mchirp = pipe_utils.get_roq_mchirp_priors(
340 path, roq_paths, roq_params, key, coinc_xml_obj=coinc_xml_obj, sim_inspiral=row
341 )
342 elif roq_bounds == 'component_mass':
343 print('ROQ has bounds in component masses')
344 # get component mass bounds, then compute the chirp mass that can be safely covered
345 # further below we pass along the component mass bounds to the sampler, not the tighter chirp-mass, q bounds
346 m1_priors, m2_priors, trigger_mchirp = pipe_utils.get_roq_component_mass_priors(
347 path, roq_paths, roq_params, key, coinc_xml_obj=coinc_xml_obj, sim_inspiral=row
348 )
349 mc_priors = {}
350 for (roq,m1_prior), (roq2,m2_prior) in zip(m1_priors.items(), m2_priors.items()):
351 mc_priors[roq] = sorted([pipe_utils.mchirp_from_components(m1_prior[1], m2_prior[0]), pipe_utils.mchirp_from_components(m1_prior[0], m2_prior[1])])
352
353 if cp.has_option('lalinference','trigger_mchirp'):
354 trigger_mchirp=float(cp.get('lalinference','trigger_mchirp'))
355 roq_mass_freq_scale_factor = pipe_utils.get_roq_mass_freq_scale_factor(mc_priors, trigger_mchirp, roq_force_flow)
356 if roq_mass_freq_scale_factor != 1.:
357 print('WARNING: Rescaling ROQ basis, please ensure it is allowed with the model used.')
358
359 # If the true chirp mass is unknown, add variations over the mass bins
360 if opts.gid is not None or (opts.injections is not None or cp.has_option('input','injection-file')) or cp.has_option('lalinference','trigger_mchirp') or cp.has_option('input', 'coinc-xml'):
361
362 for mc_prior in mc_priors:
363 mc_priors[mc_prior] = np.array(mc_priors[mc_prior])
364 # find mass bin containing the trigger
365 candidate_roq_paths = \
366 [roq for roq in roq_paths
367 if mc_priors[roq][0]*roq_mass_freq_scale_factor
368 <= trigger_mchirp <= mc_priors[roq][1]*roq_mass_freq_scale_factor]
369 if candidate_roq_paths:
370 margins = np.array(
371 [min(mc_priors[roq][1]*roq_mass_freq_scale_factor-trigger_mchirp,
372 trigger_mchirp-mc_priors[roq][0]*roq_mass_freq_scale_factor)
373 for roq in candidate_roq_paths])
374 best_path = candidate_roq_paths[np.argmax(margins)]
375 roq_paths = [best_path]
376 print('Prior in Mchirp will be [{}, {}] to contain the trigger Mchirp {}.'.format(
377 mc_priors[best_path][0]*roq_mass_freq_scale_factor,
378 mc_priors[best_path][1]*roq_mass_freq_scale_factor,
379 trigger_mchirp))
380 else:
381 print("No roq mass bins containing the trigger")
382 raise
383 else:
384 for mc_prior in mc_priors:
385 mc_priors[mc_prior] = np.array(mc_priors[mc_prior])*roq_mass_freq_scale_factor
386
387 # write the master configparser
388 cur_basedir = cp.get('paths','basedir')
389 masterpath=os.path.join(cur_basedir,'config.ini')
390 with open(masterpath,'w') as cpfile:
391 cp.write(cpfile)
392
393 for roq in roq_paths:
394 this_cp = configparser.ConfigParser()
395 this_cp.optionxform = str
396 this_cp.read(masterpath)
397 basedir = this_cp.get('paths','basedir')
398 for dirs in 'basedir','daglogdir','webdir':
399 val = this_cp.get('paths',dirs)
400 newval = os.path.join(val,roq)
401 mkdirs(newval)
402 this_cp.set('paths',dirs,newval)
403 this_cp.set('paths','roq_b_matrix_directory',os.path.join(cp.get('paths','roq_b_matrix_directory'),roq))
404 flow=roq_params[roq]['flow'] / roq_mass_freq_scale_factor
405 srate=2.*roq_params[roq]['fhigh'] / roq_mass_freq_scale_factor
406 #if srate > 8192:
407 # srate = 8192
408
409 seglen=roq_params[roq]['seglen'] * roq_mass_freq_scale_factor
410 # params.dat uses the convention q>1 so our q_min is the inverse of their qmax
411 this_cp.set('engine','srate',str(srate))
412 this_cp.set('engine','seglen',str(seglen))
413 if this_cp.has_option('lalinference','flow'):
414 tmp=this_cp.get('lalinference','flow')
415 tmp=eval(tmp)
416 ifos=tmp.keys()
417 else:
418 tmp={}
419 ifos=eval(this_cp.get('analysis','ifos'))
420 for i in ifos:
421 tmp[i]=flow
422 this_cp.set('lalinference','flow',str(tmp))
423 if roq_bounds == 'chirp_mass_q':
424 mc_min=mc_priors[roq][0]*roq_mass_freq_scale_factor
425 mc_max=mc_priors[roq][1]*roq_mass_freq_scale_factor
426 # params.dat uses the convention q>1 so our q_min is the inverse of their qmax
427 q_min=1./float(roq_params[roq]['qmax'])
428 this_cp.set('engine','chirpmass-min',str(mc_min))
429 this_cp.set('engine','chirpmass-max',str(mc_max))
430 this_cp.set('engine','q-min',str(q_min))
431 this_cp.set('engine','comp-min', str(max(roq_params[roq]['compmin'] * roq_mass_freq_scale_factor, mc_min * pow(1+q_min, 1./5.) * pow(q_min, 2./5.))))
432 this_cp.set('engine','comp-max', str(mc_max * pow(1+q_min, 1./5.) * pow(q_min, -3./5.)))
433 elif roq_bounds == 'component_mass':
434 m1_min = m1_priors[roq][0]
435 m1_max = m1_priors[roq][1]
436 m2_min = m2_priors[roq][0]
437 m2_max = m2_priors[roq][1]
438 this_cp.set('engine','mass1-min',str(m1_min))
439 this_cp.set('engine','mass1-max',str(m1_max))
440 this_cp.set('engine','mass2-min',str(m2_min))
441 this_cp.set('engine','mass2-max',str(m2_max))
442 yield this_cp
443 return
444
445# Create an outer dag to wrap the sub-dags
446outerdaglog=os.path.join(daglogdir,'lalinference_multi_'+str(uuid.uuid1())+'.log')
447outerdag=pipeline.CondorDAG(outerdaglog)
448outerdag.set_dag_file(os.path.join(cp.get('paths','basedir'),'multidag'))
449
450
451master_cp=cp
452# Iterate over variations and generate sub-dags
453for cp in generate_variations(master_cp,variations):
454 basepath=cp.get('paths','basedir')
455 # Link injection file into place as paths outside basedir are inaccessible to containerised jobs
456 if cp.has_option('input','injection-file'):
457 injpath=cp.get('input','injection-file')
458 myinjpath=os.path.join(basepath,os.path.basename(injpath))
459 if os.path.abspath(myinjpath) != os.path.abspath(injpath):
460 # If the injection file does not exist in the run dir, link it into place
461 # Useful for singularity jobs which see only rundir
462 if os.path.lexists(myinjpath):
463 # If the path exists, see if it is a link to the current file
464 # and if so, just update the config
465 if os.path.islink(myinjpath) and os.path.realpath(myinjpath)==os.path.realpath(injpath):
466 cp.set('input','injection-file',myinjpath)
467 else:
468 # Do not over-write the injection file
469 print(("Error: File {0} exists in run directory, not over-writing with \
470 {1}. Remove the existing file or create a fresh run directory".format(myinjpath,injpath)
471 ))
472 sys.exit(1)
473 else:
474 # The link doens't exist, so create it and update config
475 try:
476 os.link(os.path.abspath(injpath), myinjpath)
477 except:
478 from shutil import copyfile
479 copyfile(injpath,myinjpath)
480 cp.set('input','injection-file',myinjpath)
481
482 for this_cp in setup_roq(cp):
483 # Create the DAG from the configparser object
484 dag=pipe_utils.LALInferencePipelineDAG(this_cp)
485 dagjob=pipeline.CondorDAGManJob(os.path.join(this_cp.get('paths','basedir'),dag.get_dag_file()),
486 this_cp.get('paths','basedir'))
487 dagnode=pipeline.CondorDAGManNode(dagjob)
488 outerdag.add_node(dagnode)
489 dag.write_sub_files()
490 dag.write_dag()
491 dag.write_script()
492
493outerdag.write_sub_files()
494outerdag.write_dag()
495outerdag.write_script()
496
497# End of program
498print('Successfully created DAG file.')
499
500if opts.condor_submit:
501 import subprocess
502 if cp.has_option('condor','notification'):
503 x = subprocess.Popen(['condor_submit_dag','-dont_suppress_notification',outerdag.get_dag_file()])
504 else:
505 x = subprocess.Popen(['condor_submit_dag',outerdag.get_dag_file()])
506 x.wait()
507 if x.returncode==0:
508 print('Submitted DAG file')
509 else:
510 print('Unable to submit DAG file')
511else:
512 print('To submit, run:\n\tcondor_submit_dag {0}'.format(outerdag.get_dag_file()))
#define max(a, b)
def add_variations(cp, section, option, values=None, allowed_values=None)
Push some possible variations onto the stack.
def generate_variations(master_cp, variations)
Generate config parser objects for each of the variations.
def check_priors_are_compatible(cp)
Check that the priors are compatible with the fixed parameters.
def mkdirs(path)
Helper function.
def setup_roq(cp)
Generates cp objects with the different ROQs applied.