LALInference  4.1.6.1-89842e6
lalinference_pipe.py
Go to the documentation of this file.
1 # DAG generation code for running LALInference pipeline
2 # (C) 2012 John Veitch, Vivien Raymond
3 
4 from lalinference import lalinference_pipe_utils as pipe_utils
5 import numpy as np
6 from six.moves import configparser
7 from optparse import OptionParser
8 import sys
9 import os
10 import uuid
11 from glue import pipeline
12 from ligo.lw import ligolw
13 from ligo.lw import lsctables
14 from ligo.lw import utils as ligolw_utils
15 
16 usage=""" %prog [options] config.ini
17 Setup a Condor DAG file to run the LALInference pipeline based on
18 the config.ini file.
19 
20 The user can specify either an injection file to analyse, with the --inj option,
21 a list of SnglInspiralTable or CoincInspiralTable triggers with the --<x>-triggers options,
22 a GraceDB ID with the --gid option,
23 or an ASCII list of GPS times with the --gps-time-file option.
24 
25 If none of the above options are given, the pipeline will analyse the entire
26 stretch of time between gps-start-time and gps-end-time, specified in the config.ini file.
27 
28 The user must also specify and ini file which will contain the main analysis config.
29 
30 """
31 parser=OptionParser(usage)
32 parser.add_option("-r","--run-path",default=None,action="store",type="string",help="Directory to run pipeline in (default: $PWD)",metavar="RUNDIR")
33 parser.add_option("-p","--daglog-path",default=None,action="store",type="string",help="Path to directory to contain DAG log file. SHOULD BE LOCAL TO SUBMIT NODE",metavar="LOGDIR")
34 parser.add_option("-g","--gps-time-file",action="store",type="string",default=None,help="Text file containing list of GPS times to analyse",metavar="TIMES.txt")
35 parser.add_option("--gid",action="store",type="string",default=None,help="GraceDB ID")
36 parser.add_option("--coinc",action="store",type="string",default=None,help="Path to coinc.xml from GraceDB")
37 parser.add_option("--psd",action="store",type="string",default=None,help="Path to psd.xml.gz from GraceDB")
38 parser.add_option("--service-url",action="store",type="string",default=None,help="GraceDB url from which xml files are downloaded")
39 parser.add_option("-I","--injections",action="store",type="string",default=None,help="List of injections to perform and analyse",metavar="INJFILE.xml")
40 parser.add_option("-B","--burst_injections",action="store",type="string",default=None,help="SimBurst table for LIB injections",metavar="INJFILE.xml")
41 parser.add_option("-P","--pipedown-db",action="store",type="string",default=None,help="Pipedown database to read and analyse",metavar="pipedown.sqlite")
42 parser.add_option("--condor-submit",action="store_true",default=False,help="Automatically submit the condor dag")
43 
44 (opts,args)=parser.parse_args()
45 
46 # We will store variations of runs to use here as a dictionary of
47 # ((section, option), ...) : [variation1, ...]
48 # Where (section, option) indicate the configparser option to vary
49 # and multiple section,option pairs are allowed
50 variations={}
51 
52 def mkdirs(path):
53  """
54  Helper function. Make the given directory, creating intermediate
55  dirs if necessary, and don't complain about it already existing.
56  """
57  if os.access(path,os.W_OK) and os.path.isdir(path): return
58  else: os.makedirs(path)
59 
60 def add_variations(cp, section, option, values=None, allowed_values=None):
61  """
62  Push some possible variations onto the stack.
63  If only one value is specified then just store it in cp as usual
64  cp : ConfigParser object to look in
65  section : [section] in cp
66  option : option in section
67  values: If given, use instead of cp's
68  allowed_values : if given, anything else will trigger an error
69  """
70  if not cp.has_section(section) and not cp.has_option(section,option):
71  return
72 
73  try:
74  labels = cp.get('resultspage','label').split(',')
75  except:
76  labels = None
77  if values is not None:
78  vals = values
79  else:
80  vals = cp.get(section,option).split(',')
81  badvals = [v for v in vals if v not in (allowed_values or [])]
82  if allowed_values is not None and badvals:
83  raise ValueError("Unknown value for {section} . {option} {value}"
84  .format(section, option, \
85  ' '.join(badvals)
86  )
87  )
88  if len(vals) >1:
89  if labels is not None:
90  if len(labels) != len(vals):
91  raise ValueError(
92  "The number of labels does not match the "
93  "number of {}'s given".format(option))
94  return {(section,option): [[i,j] for i,j in zip(vals, labels)]}
95  return {(section,option): vals}
96  elif len(vals)==1:
97  cp.set(section, option, vals[0])
98  if labels is not None:
99  cp.set('resultspage', 'label', labels[0])
100  return {}
101  else:
102  print(("Found no variations of [{section}] {option}".format(section=section,
103  option=option)))
104  return {}
105 
107  """Check that the priors are compatible with the fixed parameters
108 
109  Parameters
110  ----------
111  cp: configparser.ConfigParser
112  an opened config parser object
113  """
114  fixed = {
115  key.split("fix-")[1]: float(item) for key, item in cp.items("engine") if
116  "fix-" in key
117  }
118  p_min = {
119  key.split("-min")[0]: float(item) for key, item in cp.items("engine") if
120  "-min" in key
121  }
122  p_max = {
123  key.split("-max")[0]: float(item) for key, item in cp.items("engine") if
124  "-max" in key
125  }
126 
127  condition = "comp" in p_min.keys() or "comp" in p_max.keys()
128  condition2 = "chirpmass" in fixed.keys() and "q" or "eta" in fixed.keys()
129 
130  if condition and condition2:
131 
132  mchirp = float(fixed["chirpmass"])
133  if "q" in fixed.keys():
134  q = float(fixed["q"])
135 
136  m1 = (q**(2./5.))*((1.0 + q)**(1./5.))*mchirp
137  m2 = (q**(-3./5.))*((1.0 + q)**(1./5.))*mchirp
138 
139  if "eta" in fixed.keys():
140  eta = float(fixed["eta"])
141 
142  mtotal = mchirp / (eta**(3./5.))
143  m1 = 0.5 * mtotal * (1.0 + (1.0 - 4.0 * eta)**0.5)
144  m2 = 0.5 * mtotal * (1.0 - (1.0 - 4.0 * eta)**0.5)
145 
146  fixed["mass1"] = max(m1, m2)
147  fixed["mass2"] = min(m1, m2)
148  p_min["mass1"] = p_min["comp"]
149  p_min["mass2"] = p_min["comp"]
150  p_max["mass1"] = p_max["comp"]
151  p_max["mass2"] = p_max["comp"]
152 
153  condition = lambda i: i in p_min.keys() and p_min[i] > fixed[i] or \
154  i in p_max.keys() and p_max[i] < fixed[i]
155 
156  for i in fixed.keys():
157 
158  if condition(i):
159  raise Exception(
160  "The fixed parameter %s lies outside of the bound for the prior. "
161  "Either change your prior bounds or change your fixed parameter"
162  % (i)
163  )
164 
165 def generate_variations(master_cp, variations):
166  """
167  Generate config parser objects for each of the variations
168  """
169  cur_basedir=master_cp.get('paths','basedir')
170  mkdirs(cur_basedir)
171  # save the master file
172  masterpath=os.path.join(cur_basedir,'config.ini')
173  with open(masterpath,'w') as cpfile:
174  master_cp.write(cpfile)
175 
176  cur_webdir=master_cp.get('paths','webdir')
177  cur_dagdir=master_cp.get('paths','daglogdir')
178  # If no variations, done
179  if not variations:
180  yield master_cp
181  return
182 
183  # Otherwise, vary over the next option
184  (section, opt), vals = variations.popitem()
185  for val in vals:
186  # Read file back in to get a new object
187  cp = configparser.ConfigParser()
188  cp.optionxform = str
189  cp.read(masterpath)
190  if type(val) == list:
191  cp.set(section,opt,val[0])
192  cp.set('resultspage','label',val[1])
193  val = val[0]
194  else:
195  cp.set(section,opt,val)
196  # Append to the paths
197  cp.set('paths','basedir',os.path.join(cur_basedir, \
198  '{val}'.format(opt=opt,val=val)))
199  cp.set('paths','webdir',os.path.join(cur_webdir, \
200  '{val}'.format(opt=opt,val=val)))
201  cp.set('paths','daglogdir',os.path.join(cur_dagdir, \
202  '{val}'.format(opt=opt,val=val)))
203  # recurse into remaining options
204  for sub_cp in generate_variations(cp,variations):
205  yield sub_cp
206 
207 if len(args)!=1:
208  parser.print_help()
209  print('Error: must specify one ini file')
210  sys.exit(1)
211 
212 inifile=args[0]
213 
214 cp=configparser.SafeConfigParser()
215 fp=open(inifile)
216 cp.optionxform = str
217 cp.readfp(fp)
218 fp.close()
219 
220 # Set the base directory for the run
221 if opts.run_path is not None:
222  cp.set('paths','basedir',os.path.abspath(opts.run_path))
223 if cp.get('paths','basedir') is None:
224  print("Warning: no run dir set, using current dir")
225  cp.set('paths','basedir',os.path.getcwd())
226 mkdirs(cp.get('paths','basedir'))
227 
228 if opts.daglog_path is not None:
229  cp.set('paths','daglogdir',os.path.abspath(opts.daglog_path))
230 else:
231  cp.set('paths','daglogdir',os.path.abspath(cp.get('paths','basedir')))
232 daglogdir=cp.get('paths','daglogdir')
233 mkdirs(daglogdir)
234 
235 
236 # Set up from all the various input options
237 if opts.gps_time_file is not None:
238  cp.set('input','gps-time-file',os.path.abspath(opts.gps_time_file))
239 
240 if opts.injections is not None:
241  cp.set('input','injection-file',os.path.abspath(opts.injections))
242 
243 if opts.burst_injections is not None:
244  if opts.injections is not None:
245  print("ERROR: cannot pass both inspiral and burst tables for injection\n")
246  sys.exit(1)
247  cp.set('input','burst-injection-file',os.path.abspath(opts.burst_injections))
248 
249 if opts.gid is not None:
250  cp.set('input','gid',opts.gid)
251 
252 if opts.coinc is not None:
253  cp.set('input', 'coinc-xml', os.path.abspath(opts.coinc))
254 
255 if opts.psd is not None:
256  cp.set('input', 'psd-xml-gz', os.path.abspath(opts.psd))
257 
258 if opts.service_url is not None:
259  cp.set('analysis','service-url',opts.service_url)
260 
261 if opts.pipedown_db is not None:
262  cp.set('input','pipedown-db',os.path.abspath(opts.pipedown_db))
263 
264 # Some sanity checking
265 approx='approx'
266 if not (cp.has_option('engine','approx') or cp.has_option('engine','approximant') ):
267  print("Error: was expecting an 'approx' filed in the [engine] section\n")
268  sys.exit(1)
269 
270 # Check the priors are compatible with fixed parameters as part of the sanity
271 # checking
273 
274 # Build a list of allowed variations
275 variations.update(add_variations(cp, 'engine','approx'))
276 variations.update(add_variations(cp, 'analysis', 'engine'))
277 variations.update(add_variations(cp, 'analysis', 'roq'))
278 
279 roq_paths=[]
280 def setup_roq(cp):
281  """
282  Generates cp objects with the different ROQs applied
283  """
284  use_roq=False
285  if cp.has_option('paths','roq_b_matrix_directory') or cp.has_option('paths','computeroqweights'):
286  if not cp.has_option('analysis','roq'):
287  print("Warning: If you are attempting to enable ROQ by specifying roq_b_matrix_directory or computeroqweights,\n\
288  please use analysis.roq in your config file in future. Enabling ROQ.")
289  cp.set('analysis','roq',True)
290  if not cp.getboolean('analysis','roq'):
291  yield cp
292  return
293  path=cp.get('paths','roq_b_matrix_directory')
294  if not os.path.isdir(path):
295  print("The ROQ directory %s does not seem to exist\n"%path)
296  sys.exit(1)
297  use_roq=True
298  roq_paths=os.listdir(path)
299  roq_params={}
300  roq_force_flow = None
301 
302  if cp.has_option('lalinference','roq_force_flow'):
303  roq_force_flow = cp.getfloat('lalinference','roq_force_flow')
304  print("WARNING: Forcing the f_low to ", str(roq_force_flow), "Hz")
305  print("WARNING: Overwriting user choice of flow, srate, seglen, and (mc_min, mc_max and q-min) or (mass1_min, mass1_max, mass2_min, mass2_max)")
306 
307  def key(item): # to order the ROQ bases
308  return float(item[1]['seglen'])
309 
310  coinc_xml_obj = None
311  row=None
312 
313  # Get file object of coinc.xml
314  if opts.gid is not None:
315  from ligo.gracedb.rest import GraceDb
316  gid=opts.gid
317  cwd=os.getcwd()
318  if cp.has_option('analysis', 'service-url'):
319  client = GraceDb(cp.get('analysis', 'service-url'))
320  else:
321  client = GraceDb()
322  coinc_xml_obj = ligolw_utils.load_fileobj(client.files(gid, "coinc.xml"), contenthandler = lsctables.use_in(ligolw.LIGOLWContentHandler))[0]
323  elif cp.has_option('input', 'coinc-xml'):
324  coinc_xml_obj = ligolw_utils.load_fileobj(open(cp.get('input', 'coinc-xml'), "rb"), contenthandler = lsctables.use_in(ligolw.LIGOLWContentHandler))[0]
325 
326  # Get sim_inspiral from injection file
327  if cp.has_option('input','injection-file'):
328  print("Only 0-th event in the XML table will be considered while running with ROQ\n")
329  row = lsctables.SimInspiralTable.get_table(
330  ligolw_utils.load_filename(cp.get('input','injection-file'),contenthandler=lsctables.use_in(ligolw.LIGOLWContentHandler))
331  )[0]
332 
333  roq_bounds = pipe_utils.Query_ROQ_Bounds_Type(path, roq_paths)
334  if roq_bounds == 'chirp_mass_q':
335  print('ROQ has bounds in chirp mass and mass-ratio')
336  mc_priors, trigger_mchirp = pipe_utils.get_roq_mchirp_priors(
337  path, roq_paths, roq_params, key, coinc_xml_obj=coinc_xml_obj, sim_inspiral=row
338  )
339  elif roq_bounds == 'component_mass':
340  print('ROQ has bounds in component masses')
341  # get component mass bounds, then compute the chirp mass that can be safely covered
342  # further below we pass along the component mass bounds to the sampler, not the tighter chirp-mass, q bounds
343  m1_priors, m2_priors, trigger_mchirp = pipe_utils.get_roq_component_mass_priors(
344  path, roq_paths, roq_params, key, coinc_xml_obj=coinc_xml_obj, sim_inspiral=row
345  )
346  mc_priors = {}
347  for (roq,m1_prior), (roq2,m2_prior) in zip(m1_priors.items(), m2_priors.items()):
348  mc_priors[roq] = sorted([pipe_utils.mchirp_from_components(m1_prior[1], m2_prior[0]), pipe_utils.mchirp_from_components(m1_prior[0], m2_prior[1])])
349 
350  if cp.has_option('lalinference','trigger_mchirp'):
351  trigger_mchirp=float(cp.get('lalinference','trigger_mchirp'))
352  roq_mass_freq_scale_factor = pipe_utils.get_roq_mass_freq_scale_factor(mc_priors, trigger_mchirp, roq_force_flow)
353  if roq_mass_freq_scale_factor != 1.:
354  print('WARNING: Rescaling ROQ basis, please ensure it is allowed with the model used.')
355 
356  # If the true chirp mass is unknown, add variations over the mass bins
357  if opts.gid is not None or (opts.injections is not None or cp.has_option('input','injection-file')) or cp.has_option('lalinference','trigger_mchirp') or cp.has_option('input', 'coinc-xml'):
358 
359  for mc_prior in mc_priors:
360  mc_priors[mc_prior] = np.array(mc_priors[mc_prior])
361  # find mass bin containing the trigger
362  candidate_roq_paths = \
363  [roq for roq in roq_paths
364  if mc_priors[roq][0]*roq_mass_freq_scale_factor
365  <= trigger_mchirp <= mc_priors[roq][1]*roq_mass_freq_scale_factor]
366  if candidate_roq_paths:
367  margins = np.array(
368  [min(mc_priors[roq][1]*roq_mass_freq_scale_factor-trigger_mchirp,
369  trigger_mchirp-mc_priors[roq][0]*roq_mass_freq_scale_factor)
370  for roq in candidate_roq_paths])
371  best_path = candidate_roq_paths[np.argmax(margins)]
372  roq_paths = [best_path]
373  print('Prior in Mchirp will be [{}, {}] to contain the trigger Mchirp {}.'.format(
374  mc_priors[best_path][0]*roq_mass_freq_scale_factor,
375  mc_priors[best_path][1]*roq_mass_freq_scale_factor,
376  trigger_mchirp))
377  else:
378  print("No roq mass bins containing the trigger")
379  raise
380  else:
381  for mc_prior in mc_priors:
382  mc_priors[mc_prior] = np.array(mc_priors[mc_prior])*roq_mass_freq_scale_factor
383 
384  # write the master configparser
385  cur_basedir = cp.get('paths','basedir')
386  masterpath=os.path.join(cur_basedir,'config.ini')
387  with open(masterpath,'w') as cpfile:
388  cp.write(cpfile)
389 
390  for roq in roq_paths:
391  this_cp = configparser.ConfigParser()
392  this_cp.optionxform = str
393  this_cp.read(masterpath)
394  basedir = this_cp.get('paths','basedir')
395  for dirs in 'basedir','daglogdir','webdir':
396  val = this_cp.get('paths',dirs)
397  newval = os.path.join(val,roq)
398  mkdirs(newval)
399  this_cp.set('paths',dirs,newval)
400  this_cp.set('paths','roq_b_matrix_directory',os.path.join(cp.get('paths','roq_b_matrix_directory'),roq))
401  flow=roq_params[roq]['flow'] / roq_mass_freq_scale_factor
402  srate=2.*roq_params[roq]['fhigh'] / roq_mass_freq_scale_factor
403  #if srate > 8192:
404  # srate = 8192
405 
406  seglen=roq_params[roq]['seglen'] * roq_mass_freq_scale_factor
407  # params.dat uses the convention q>1 so our q_min is the inverse of their qmax
408  this_cp.set('engine','srate',str(srate))
409  this_cp.set('engine','seglen',str(seglen))
410  if this_cp.has_option('lalinference','flow'):
411  tmp=this_cp.get('lalinference','flow')
412  tmp=eval(tmp)
413  ifos=tmp.keys()
414  else:
415  tmp={}
416  ifos=eval(this_cp.get('analysis','ifos'))
417  for i in ifos:
418  tmp[i]=flow
419  this_cp.set('lalinference','flow',str(tmp))
420  if roq_bounds == 'chirp_mass_q':
421  mc_min=mc_priors[roq][0]*roq_mass_freq_scale_factor
422  mc_max=mc_priors[roq][1]*roq_mass_freq_scale_factor
423  # params.dat uses the convention q>1 so our q_min is the inverse of their qmax
424  q_min=1./float(roq_params[roq]['qmax'])
425  this_cp.set('engine','chirpmass-min',str(mc_min))
426  this_cp.set('engine','chirpmass-max',str(mc_max))
427  this_cp.set('engine','q-min',str(q_min))
428  this_cp.set('engine','comp-min', str(max(roq_params[roq]['compmin'] * roq_mass_freq_scale_factor, mc_min * pow(1+q_min, 1./5.) * pow(q_min, 2./5.))))
429  this_cp.set('engine','comp-max', str(mc_max * pow(1+q_min, 1./5.) * pow(q_min, -3./5.)))
430  elif roq_bounds == 'component_mass':
431  m1_min = m1_priors[roq][0]
432  m1_max = m1_priors[roq][1]
433  m2_min = m2_priors[roq][0]
434  m2_max = m2_priors[roq][1]
435  this_cp.set('engine','mass1-min',str(m1_min))
436  this_cp.set('engine','mass1-max',str(m1_max))
437  this_cp.set('engine','mass2-min',str(m2_min))
438  this_cp.set('engine','mass2-max',str(m2_max))
439  yield this_cp
440  return
441 
442 # Create an outer dag to wrap the sub-dags
443 outerdaglog=os.path.join(daglogdir,'lalinference_multi_'+str(uuid.uuid1())+'.log')
444 outerdag=pipeline.CondorDAG(outerdaglog)
445 outerdag.set_dag_file(os.path.join(cp.get('paths','basedir'),'multidag'))
446 
447 
448 master_cp=cp
449 # Iterate over variations and generate sub-dags
450 for cp in generate_variations(master_cp,variations):
451  basepath=cp.get('paths','basedir')
452  # Link injection file into place as paths outside basedir are inaccessible to containerised jobs
453  if cp.has_option('input','injection-file'):
454  injpath=cp.get('input','injection-file')
455  myinjpath=os.path.join(basepath,os.path.basename(injpath))
456  if os.path.abspath(myinjpath) != os.path.abspath(injpath):
457  # If the injection file does not exist in the run dir, link it into place
458  # Useful for singularity jobs which see only rundir
459  if os.path.lexists(myinjpath):
460  # If the path exists, see if it is a link to the current file
461  # and if so, just update the config
462  if os.path.islink(myinjpath) and os.path.realpath(myinjpath)==os.path.realpath(injpath):
463  cp.set('input','injection-file',myinjpath)
464  else:
465  # Do not over-write the injection file
466  print(("Error: File {0} exists in run directory, not over-writing with \
467  {1}. Remove the existing file or create a fresh run directory".format(myinjpath,injpath)
468  ))
469  sys.exit(1)
470  else:
471  # The link doens't exist, so create it and update config
472  try:
473  os.link(os.path.abspath(injpath), myinjpath)
474  except:
475  from shutil import copyfile
476  copyfile(injpath,myinjpath)
477  cp.set('input','injection-file',myinjpath)
478 
479  for this_cp in setup_roq(cp):
480  # Create the DAG from the configparser object
481  dag=pipe_utils.LALInferencePipelineDAG(this_cp)
482  dagjob=pipeline.CondorDAGManJob(os.path.join(this_cp.get('paths','basedir'),dag.get_dag_file()),
483  this_cp.get('paths','basedir'))
484  dagnode=pipeline.CondorDAGManNode(dagjob)
485  outerdag.add_node(dagnode)
486  dag.write_sub_files()
487  dag.write_dag()
488  dag.write_script()
489 
490 outerdag.write_sub_files()
491 outerdag.write_dag()
492 outerdag.write_script()
493 
494 # End of program
495 print('Successfully created DAG file.')
496 
497 if opts.condor_submit:
498  import subprocess
499  if cp.has_option('condor','notification'):
500  x = subprocess.Popen(['condor_submit_dag','-dont_suppress_notification',outerdag.get_dag_file()])
501  else:
502  x = subprocess.Popen(['condor_submit_dag',outerdag.get_dag_file()])
503  x.wait()
504  if x.returncode==0:
505  print('Submitted DAG file')
506  else:
507  print('Unable to submit DAG file')
508 else:
509  print('To submit, run:\n\tcondor_submit_dag {0}'.format(outerdag.get_dag_file()))
#define max(a, b)
def add_variations(cp, section, option, values=None, allowed_values=None)
Push some possible variations onto the stack.
def generate_variations(master_cp, variations)
Generate config parser objects for each of the variations.
def check_priors_are_compatible(cp)
Check that the priors are compatible with the fixed parameters.
def mkdirs(path)
Helper function.
def setup_roq(cp)
Generates cp objects with the different ROQs applied.