Loading [MathJax]/extensions/TeX/AMSsymbols.js
LALInference 4.1.9.1-5e288d3
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
lalinference_pipe_utils.py
Go to the documentation of this file.
2#flow DG Class definitions for LALInference Pipeline
3# (C) 2012 John Veitch, Vivien Raymond, Kiersten Ruisard, Kan Wang
4
5import itertools
6from lal import pipeline
7import igwn_segments as segments
8from igwn_ligolw import lsctables
9from igwn_ligolw import utils as ligolw_utils
10import os
11import socket
12import uuid
13import ast
14from math import floor,ceil,log,pow
15import sys
16import random
17from itertools import permutations
18import numpy as np
19from glob import glob
20import math
21from functools import reduce
22try:
23 from configparser import NoOptionError, NoSectionError
24except ImportError:
25 from ConfigParser import NoOptionError, NoSectionError
26import numpy
27
28# We use the GLUE pipeline utilities to construct classes for each
29# type of job. Each class has inputs and outputs, which are used to
30# join together types of jobs into a DAG.
31
32def findSegmentsToAnalyze(ifo, frametype, state_vector_channel, bits, gpsstart, gpsend):
33 """Return list of segments whose data quality is good enough for PE. The data
34 quality is examined with statevector in frame files. If frame files do not
35 exist, return empty list.
36
37 Parameters
38 ----
39 ifo: string
40 frametype: string
41 state_vector_channel: string
42 bits: list of string
43 List of bits. This function extracts the data taken when all of the
44 bits in this list are "active" assuming such data is good enough for
45 PE.
46 gpsstart, gpsend: float
47 GPS period to analyse
48 """
49 try:
50 from glue.lal import Cache
51 from gwdatafind import find_urls
52 import gwpy
53 import gwpy.timeseries
54 import gwpy.segments
55 except ImportError:
56 print('Unable to import necessary modules. Querying science segments not possible. Please try installing gwdatafind and gwpy')
57 raise
58 # search for frame file and read its statevector channel
59 datacache = Cache.from_urls(find_urls(ifo[0], frametype, gpsstart, gpsend))
60 if not datacache:
61 return gwpy.segments.SegmentList([])
62 state = gwpy.timeseries.StateVector.read(
63 datacache, state_vector_channel, start=gpsstart, end=gpsend,
64 pad=0 # padding data so that errors are not raised even if found data are not continuous.
65 )
66 if not np.issubdtype(state.dtype, np.unsignedinteger):
67 # if data are not unsigned integers, cast to them now so that
68 # we can determine the bit content for the flags
69 state = state.astype(
70 "uint32",
71 casting="unsafe",
72 subok=True,
73 copy=False,
74 )
75 flags = state.to_dqflags()
76 # extract segments all of whose bits are active
77 segments = flags[bits[0]].active
78 for bit in bits:
79 segments -= ~flags[bit].active
80 return segments
81
82def guess_url(fslocation):
83 """
84 Try to work out the web address of a given path
85 """
86 SERVER="localhost"
87 USER=os.environ['USER']
88 HOST=socket.getfqdn()
89 if 'public_html' in fslocation:
90 k='public_html/'
91 elif 'WWW' in fslocation:
92 k='WWW/'
93 elif 'www_html' in fslocation:
94 k='www_html/'
95 else:
96 k=None
97 if k is not None:
98 (a,b)=fslocation.split(k)
99 webpath=os.path.join('~%s'%(USER),b)
100 onweb=True
101 else:
102 (c,d)=fslocation.split(USER,1)
103 for k in ['public_html','WWW','www_html']:
104 trypath=c+os.environ['USER']+'/'+k+d
105 #Follow symlinks
106 if os.path.realpath(trypath)==os.path.normpath(fslocation):
107 #(a,b)=trypath.split(k)
108 webpath=os.path.join('~%s'%(USER),d)
109 onweb=True
110 break
111 else:
112 webpath=fslocation
113 onweb=False
114 if 'atlas' in HOST:
115 url="https://atlas1.atlas.aei.uni-hannover.de/"
116 elif 'ligo-wa' in HOST:
117 url="https://ldas-jobs.ligo-wa.caltech.edu/"
118 elif 'ligo-la' in HOST:
119 url="https://ldas-jobs.ligo-la.caltech.edu/"
120 elif 'cit' in HOST or 'caltech' in HOST:
121 url="https://ldas-jobs.ligo.caltech.edu/"
122 elif 'uwm' in HOST or 'nemo' in HOST:
123 url="https://ldas-jobs.phys.uwm.edu/"
124 elif 'phy.syr.edu' in HOST:
125 url="https://sugar-jobs.phy.syr.edu/"
126 elif 'arcca.cf.ac.uk' in HOST:
127 url="https://geo2.arcca.cf.ac.uk/"
128 elif 'vulcan' in HOST:
129 url="https://galahad.aei.mpg.de/"
130 else:
131 if onweb:
132 url="http://%s/"%(HOST)
133 else:
134 url=HOST+':'
135 url=url+webpath
136 return(url)
137
138class Event():
139 """
140 Represents a unique event to run on
141 """
142 new_id=itertools.count()
143 def __init__(self,trig_time=None,SimInspiral=None,SimBurst=None,SnglInspiral=None,CoincInspiral=None,event_id=None,timeslide_dict=None,GID=None,ifos=None, duration=None,srate=None,trigSNR=None,fhigh=None,horizon_distance=None):
144 self.trig_time=trig_time
145 self.injection=SimInspiral
146 self.burstinjection=SimBurst
147 self.sngltrigger=SnglInspiral
148 if timeslide_dict is None:
150 else:
151 self.timeslides=timeslide_dict
152 self.GID=GID
153 self.coinctrigger=CoincInspiral
154 if ifos is None:
155 self.ifos = []
156 else:
157 self.ifos = ifos
158 self.duration = duration
159 self.srate = srate
160 self.trigSNR = trigSNR
161 self.fhigh = fhigh
162 self.horizon_distance = horizon_distance
163 if event_id is not None:
164 self.event_id=event_id
165 else:
166 self.event_id=next(Event.new_id)
167 if self.injection is not None:
168 self.trig_time=self.injection.geocent_end_time + 1.0e-9*self.injection.geocent_end_time_ns
169 if event_id is None: self.event_id=self.injection.simulation_id
170 if self.burstinjection is not None:
171 self.trig_time=self.burstinjection.time_geocent + 1.0e-9*self.burstinjection.time_geocent_ns
172 if event_id is None: self.event_id=self.burstinjection.simulation_id
173 if self.sngltrigger is not None:
174 self.trig_time=self.sngltrigger.end_time + 1.0e-9 * self.sngltrigger.end_time_ns
175 self.event_id=self.sngltrigger.event_id
176 if self.coinctrigger is not None:
177 self.trig_time=self.coinctrigger.end_time + 1.0e-9 * self.coinctrigger.end_time_ns
178 self.event_id=self.coinctrigger.event_id
179 if self.GID is not None:
180 self.event_id=int(''.join(i for i in self.GID if i.isdigit()))
182 def set_engine_option(self,opt,val):
183 """
184 Can set event-specific options for the engine nodes
185 using this option, e.g. ev.set_engine_option('time-min','1083759273')
186 """
187 self.engine_opts[opt]=val
188
189dummyCacheNames=['LALLIGO','LALVirgo','LALAdLIGO','LALAdVirgo']
190
192 coinc_xml_obj, psd_dict=None, gid=None, threshold_snr=None, flow=20.0,
193 roq=False, use_gracedbpsd=False
194):
195 """This function calculates seglen, fhigh, srate and horizon distance from
196 coinc.xml and psd.xml.gz from GraceDB and create list of Events as input of
197 pipeline. This function is based on Chris Pankow's script.
198
199 Parameters
200 ----------
201 coinc_xml_obj: ligolw.ligolw.Document
202 file object of coinc.xml
203 psd_dict: dictionary of REAL8FrequencySeries
204 PSDs of all the ifos
205 threshold_snr: float
206 snr threshold for detection
207 flow: float
208 lower frequecy cutoff for overlap calculation
209 roq: bool
210 Whether the run uses ROQ or not
211 use_gracedbpsd: bool
212 Whether the gracedb PSD is used or not in PE
213 """
214 output=[]
215 import lal
216 from lalsimulation import SimInspiralChirpTimeBound, IMRPhenomDGetPeakFreq
217 try:
218 from gstlal import reference_psd
219 except ImportError:
220 reference_psd = None
221 try:
222 from gwpy.frequencyseries import FrequencySeries
223 from gwpy.astro import inspiral_range
224 except ImportError:
225 inspiral_range = None
226 coinc_events = lsctables.CoincInspiralTable.get_table(coinc_xml_obj)
227 sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(coinc_xml_obj))
228 ifos = sorted(coinc_events[0].instruments)
229 trigSNR = coinc_events[0].snr
230 # Parse PSD
231 srate_psdfile=16384
232 fhigh=None
233 if psd_dict is not None and use_gracedbpsd:
234 psd = list(psd_dict.values())[0]
235 srate_psdfile = pow(
236 2.0, ceil(log(psd.f0 + psd.deltaF * (psd.data.length - 1), 2))
237 ) * 2
238 coinc_map = lsctables.CoincMapTable.get_table(coinc_xml_obj)
239 for coinc in coinc_events:
240 these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
241 dur=[]
242 srate=[]
243 horizon_distance=[]
244 for e in these_sngls:
245 if roq==False:
246 chirplen = SimInspiralChirpTimeBound(flow, e.mass1 * lal.MSUN_SI, e.mass2 * lal.MSUN_SI, 0.0, 0.0)
247 fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
248 dur.append(pow(2.0, ceil( log(max(8.0, chirplen + 2.0), 2) ) ) )
249 srate.append(pow(2.0, ceil( log(fstop, 2) ) ) * 2)
250 # determine horizon distance
251 if threshold_snr is not None:
252 if e.eff_distance is not None and not math.isnan(e.eff_distance):
253 if e.snr > threshold_snr:
254 horizon_distance.append(e.eff_distance * e.snr / threshold_snr)
255 else:
256 horizon_distance.append(2 * e.eff_distance)
257 else:
258 if psd_dict is not None:
259 # Calculate horizon distance from psd to determine
260 # upper limit of distance prior.
261 psd = psd_dict[e.ifo]
262 # If roq is not used, fstop has not been calculated up
263 # to this point.
264 if not roq==False:
265 fstop = IMRPhenomDGetPeakFreq(e.mass1, e.mass2, 0.0, 0.0)
266 # reference_psd.HorizonDistance is more precise
267 # calculator of horizon distance than
268 # gwpy.astro.inspiral_range.
269 if reference_psd is not None:
270 HorizonDistanceObj = reference_psd.HorizonDistance(
271 f_min = flow, f_max = fstop, delta_f = 1.0 / 32.0,
272 m1 = e.mass1, m2 = e.mass2
273 )
274 horizon_distance.append(
275 HorizonDistanceObj(psd, snr = threshold_snr)[0]
276 )
277 # If reference_psd is not available, use
278 # gwpy.astro.inspiral_range.
279 elif inspiral_range is not None:
280 gwpy_psd = FrequencySeries(
281 psd.data.data, f0 = psd.f0, df = psd.deltaF
282 )
283 try:
284 horizon_distance.append(
285 inspiral_range(
286 gwpy_psd, threshold_snr, e.mass1, e.mass2,
287 flow, fstop, True
288 ).value
289 )
290 # If flow of psd is lower than f_ISCO, inspiral_range
291 # raises IndexError. In this case, nothing is
292 # appended to horizon_distance.
293 except IndexError:
294 pass
295 if srate:
296 if max(srate)<srate_psdfile:
297 srate = max(srate)
298 else:
299 srate = srate_psdfile
300 if psd_dict is not None and use_gracedbpsd:
301 fhigh = srate_psdfile/2.0 * 0.95 # Because of the drop-off near Nyquist of the PSD from gstlal
302 else:
303 srate = None
304 if dur:
305 duration = max(dur)
306 else:
307 duration = None
308 horizon_distance = max(horizon_distance) if len(horizon_distance) > 0 else None
309 ev=Event(CoincInspiral=coinc, GID=gid, ifos = ifos, duration = duration, srate = srate,
310 trigSNR = trigSNR, fhigh = fhigh, horizon_distance=horizon_distance)
311 output.append(ev)
312
313 print("Found %d coinc events in table." % len(coinc_events))
314 return output
315
316def open_pipedown_database(database_filename,tmp_space):
317 """
318 Open the connection to the pipedown database
319 """
320 if not os.access(database_filename,os.R_OK):
321 raise Exception('Unable to open input file: %s'%(database_filename))
322 from igwn_ligolw import dbtables
323 import sqlite3
324 working_filename=dbtables.get_connection_filename(database_filename,tmp_path=tmp_space)
325 connection = sqlite3.connect(working_filename)
326 if tmp_space:
327 dbtables.set_temp_store_directory(connection,tmp_space)
328 #dbtables.DBTable_set_connection(connection)
329 return (connection,working_filename)
330
331def get_zerolag_lloid(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
332 """
333 Returns a list of Event objects
334 from pipedown data base. Can dump some stats to dumpfile if given,
335 and filter by gpsstart and gpsend to reduce the nunmber or specify
336 max_cfar to select by combined FAR
337 """
338 output={}
339 if gpsstart is not None: gpsstart=float(gpsstart)
340 if gpsend is not None: gpsend=float(gpsend)
341 # Get coincs
342 get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
343 FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
344 sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
345 join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
346 WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:1'\
347 "
348 if gpsstart is not None:
349 get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
350 if gpsend is not None:
351 get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
352 if max_cfar !=-1:
353 get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
354 if min_cfar != -1:
355 get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
356 db_out=database_connection.cursor().execute(get_coincs)
357 extra={}
358 for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
359 coinc_id=int(coinc_id.split(":")[-1])
360 if not coinc_id in output.keys():
361 output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
362 extra[coinc_id]={}
363 output[coinc_id].timeslides[ifo]=0
364 output[coinc_id].ifos.append(ifo)
365 extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
366 if dumpfile is not None:
367 fh=open(dumpfile,'w')
368 for co in output.keys():
369 for ifo in output[co].ifos:
370 fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
371 fh.close()
372 return output.values()
373
374def get_zerolag_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1):
375 """
376 Returns a list of Event objects
377 from pipedown data base. Can dump some stats to dumpfile if given,
378 and filter by gpsstart and gpsend to reduce the nunmber or specify
379 max_cfar to select by combined FAR
380 """
381 output={}
382 if gpsstart is not None: gpsstart=float(gpsstart)
383 if gpsend is not None: gpsend=float(gpsend)
384 # Get coincs
385 get_coincs = "SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
386 FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name=='sngl_inspiral' and coinc_event_map.event_id ==\
387 sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) \
388 join coinc_inspiral on (coinc_event.coinc_event_id==coinc_inspiral.coinc_event_id) \
389 WHERE coinc_event.time_slide_id=='time_slide:time_slide_id:10049'\
390 "
391 if gpsstart is not None:
392 get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 > %f'%(gpsstart)
393 if gpsend is not None:
394 get_coincs=get_coincs+' and coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1.0e-9 < %f'%(gpsend)
395 if max_cfar !=-1:
396 get_coincs=get_coincs+' and coinc_inspiral.combined_far < %f'%(max_cfar)
397 if min_cfar != -1:
398 get_coincs=get_coincs+' and coinc_inspiral.combined_far > %f'%(min_cfar)
399 db_out=database_connection.cursor().execute(get_coincs)
400 extra={}
401 for (sngl_time, ifo, coinc_id, snr, chisq, cfar) in db_out:
402 coinc_id=int(coinc_id.split(":")[-1])
403 if not coinc_id in output.keys():
404 output[coinc_id]=Event(trig_time=sngl_time,timeslide_dict={},event_id=int(coinc_id))
405 extra[coinc_id]={}
406 output[coinc_id].timeslides[ifo]=0
407 output[coinc_id].ifos.append(ifo)
408 extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
409 if dumpfile is not None:
410 fh=open(dumpfile,'w')
411 for co in output.keys():
412 for ifo in output[co].ifos:
413 fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
414 fh.close()
415 return output.values()
416
417def get_timeslides_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1):
418 """
419 Returns a list of Event objects
420 with times and timeslide offsets
421 """
422 output={}
423 if gpsstart is not None: gpsstart=float(gpsstart)
424 if gpsend is not None: gpsend=float(gpsend)
425 db_segments=[]
426 sql_seg_query="SELECT search_summary.out_start_time, search_summary.out_end_time from search_summary join process on process.process_id==search_summary.process_id where process.program=='thinca'"
427 db_out = database_connection.cursor().execute(sql_seg_query)
428 for d in db_out:
429 if d not in db_segments:
430 db_segments.append(d)
431 seglist=segments.segmentlist([segments.segment(d[0],d[1]) for d in db_segments])
432 db_out_saved=[]
433 # Get coincidences
434 get_coincs="SELECT sngl_inspiral.end_time+sngl_inspiral.end_time_ns*1e-9,time_slide.offset,sngl_inspiral.ifo,coinc_event.coinc_event_id,sngl_inspiral.snr,sngl_inspiral.chisq,coinc_inspiral.combined_far \
435 FROM sngl_inspiral join coinc_event_map on (coinc_event_map.table_name == 'sngl_inspiral' and coinc_event_map.event_id \
436 == sngl_inspiral.event_id) join coinc_event on (coinc_event.coinc_event_id==coinc_event_map.coinc_event_id) join time_slide\
437 on (time_slide.time_slide_id == coinc_event.time_slide_id and time_slide.instrument==sngl_inspiral.ifo)\
438 join coinc_inspiral on (coinc_inspiral.coinc_event_id==coinc_event.coinc_event_id) where coinc_event.time_slide_id!='time_slide:time_slide_id:10049'"
439 joinstr = ' and '
440 if gpsstart is not None:
441 get_coincs=get_coincs+ joinstr + ' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 > %f'%(gpsstart)
442 if gpsend is not None:
443 get_coincs=get_coincs+ joinstr+' coinc_inspiral.end_time+coinc_inspiral.end_time_ns*1e-9 <%f'%(gpsend)
444 if max_cfar!=-1:
445 get_coincs=get_coincs+joinstr+' coinc_inspiral.combined_far < %f'%(max_cfar)
446 db_out=database_connection.cursor().execute(get_coincs)
447 # Timeslide functionality requires obsolete pylal - will be removed
448 from pylal import SnglInspiralUtils
449 extra={}
450 for (sngl_time, slide, ifo, coinc_id, snr, chisq, cfar) in db_out:
451 coinc_id=int(coinc_id.split(":")[-1])
452 seg=list(filter(lambda seg:sngl_time in seg,seglist))[0]
453 slid_time = SnglInspiralUtils.slideTimeOnRing(sngl_time,slide,seg)
454 if not coinc_id in output.keys():
455 output[coinc_id]=Event(trig_time=slid_time,timeslide_dict={},event_id=int(coinc_id))
456 extra[coinc_id]={}
457 output[coinc_id].timeslides[ifo]=slid_time-sngl_time
458 output[coinc_id].ifos.append(ifo)
459 extra[coinc_id][ifo]={'snr':snr,'chisq':chisq,'cfar':cfar}
460 if dumpfile is not None:
461 fh=open(dumpfile,'w')
462 for co in output.keys():
463 for ifo in output[co].ifos:
464 fh.write('%s %s %s %s %s %s %s\n'%(str(co),ifo,str(output[co].trig_time),str(output[co].timeslides[ifo]),str(extra[co][ifo]['snr']),str(extra[co][ifo]['chisq']),str(extra[co][ifo]['cfar'])))
465 fh.close()
466 return output.values()
467
468def mkdirs(path):
469 """
470 Helper function. Make the given directory, creating intermediate
471 dirs if necessary, and don't complain about it already existing.
472 """
473 if os.access(path,os.W_OK) and os.path.isdir(path): return
474 else: os.makedirs(path)
475
477 if name=='lalinferencenest':
478 return LALInferenceNestNode
479 if name=='lalinferenceburst':
480 return LALInferenceBurstNode
481 if name=='lalinferencemcmc':
482 return LALInferenceMCMCNode
483 if name=='lalinferencedatadump':
484 return LALInferenceDataDumpNode
485 if name=='bayeswavepsd':
486 return BayesWavePSDNode
487 if name=='bayeswavepost':
488 return BayesWavePostNode
489 return EngineNode
490
492 name=cp.get('analysis','engine')
493 if name=='random':
494 engine_list=['lalinferencenest','lalinferencemcmc']
495 if cp.has_option('input','gid'):
496 gid=cp.get('input','gid')
497 engine_number=int(''.join(i for i in gid if i.isdigit())) % 2
498 else:
499 engine_number=random.randint(0,1)
500 return engine_list[engine_number]
501 else:
502 return name
503
504def scan_timefile(timefile):
505 import re
506 p=re.compile(r'[\d.]+')
507 times=[]
508 timefilehandle=open(timefile,'r')
509 for time in timefilehandle:
510 if not p.match(time):
511 continue
512 if float(time) in times:
513 print('Skipping duplicate time %s'%(time))
514 continue
515 print('Read time %s'%(time))
516 times.append(float(time))
517 timefilehandle.close()
518 return times
519
520def get_xml_psds(psdxml,ifos,outpath,end_time=None):
521 """
522 Get a psd.xml.gz file and:
523 1) Reads it
524 2) Checks the psd file contains all the IFO we want to analyze
525 3) Writes down the PSDs into an ascii file for each IFO in psd.xml.gz. The name of the file contains the trigtime (if given) and the IFO name.
526 Input:
527 psdxml: psd.xml.gz file
528 ifos: list of ifos used for the analysis
529 outpath: path where the ascii PSD will be written to
530 (end_time): trigtime for this event. Will be used a part of the PSD file name
531 """
532 try:
533 from lal import series as lalseries
534 except ImportError:
535 print("ERROR, cannot import lal.series in bppu/get_xml_psds()\n")
536 raise
537
538 out={}
539 if not os.path.isdir(outpath):
540 os.makedirs(outpath)
541 if end_time is not None:
542 time=repr(float(end_time))
543 else:
544 time=''
545 #check we don't already have ALL the psd files #
546 got_all=1
547 for ifo in ifos:
548 path_to_ascii_psd=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
549 # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
550 if os.path.isfile(path_to_ascii_psd):
551 got_all*=1
552 else:
553 got_all*=0
554 if got_all==1:
555 #print "Already have PSD files. Nothing to do...\n"
556 for ifo in ifos:
557 out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
558 return out
559
560 # We need to convert the PSD for one or more IFOS. Open the file
561 if not os.path.isfile(psdxml):
562 print("ERROR: impossible to open the psd file %s. Exiting...\n"%psdxml)
563 sys.exit(1)
564 xmlpsd = lalseries.read_psd_xmldoc(ligolw_utils.load_filename(psdxml))
565 # Check the psd file contains all the IFOs we want to analize
566 for ifo in ifos:
567 if not ifo in xmlpsd:
568 print("ERROR. The PSD for the ifo %s does not seem to be contained in %s\n"%(ifo,psdxml))
569 sys.exit(1)
570 #loop over ifos in psd xml file
571 for instrument in xmlpsd.keys():
572 #name of the ascii file we are going to write the PSD into
573 path_to_ascii_psd=os.path.join(outpath,instrument+'_psd_'+time+'.txt')
574 # Check we don't already have that ascii (e.g. because we are running parallel runs of the save event
575 if os.path.isfile(path_to_ascii_psd):
576 continue
577 # get data for the IFO
578 ifodata=xmlpsd[instrument]
579 #check data is not empty
580 if ifodata is None:
581 continue
582 # write down PSD into an ascii file
583 combine = np.c_[ifodata.f0 + np.arange(ifodata.data.length) * ifodata.deltaF, ifodata.data.data]
584 np.savetxt(path_to_ascii_psd,combine)
585 # set node.psds dictionary with the path to the ascii files
586 ifo=instrument
587 out[ifo]=os.path.join(outpath,ifo+'_psd_'+time+'.txt')
588 return out
589
590def get_trigger_chirpmass(coinc_xml_obj):
591 coinc_events = lsctables.CoincInspiralTable.get_table(coinc_xml_obj)
592 sngl_event_idx = dict((row.event_id, row) for row in lsctables.SnglInspiralTable.get_table(coinc_xml_obj))
593 coinc_map = lsctables.CoincMapTable.get_table(coinc_xml_obj)
594 mass1 = []
595 mass2 = []
596 for coinc in coinc_events:
597 these_sngls = [sngl_event_idx[c.event_id] for c in coinc_map if c.coinc_event_id == coinc.coinc_event_id]
598 for e in these_sngls:
599 mass1.append(e.mass1)
600 mass2.append(e.mass2)
601 # check that trigger masses are identical in each IFO
602 assert len(set(mass1)) == 1
603 assert len(set(mass2)) == 1
604
605 mchirp = (mass1[0]*mass2[0])**(3./5.) / ( (mass1[0] + mass2[0])**(1./5.) )
606
607 return mchirp
608
609def get_roq_mchirp_priors(path, roq_paths, roq_params, key, coinc_xml_obj=None, sim_inspiral=None):
610
611 ## XML and GID cannot be given at the same time
612 ## sim_inspiral must already point at the right row
613 mc_priors = {}
614
615 if coinc_xml_obj is not None and sim_inspiral is not None:
616 print("Error in get_roq_mchirp_priors, cannot use both coinc.xml and sim_inspiral\n")
617 sys.exit(1)
618
619 for roq in roq_paths:
620 params=os.path.join(path,roq,'params.dat')
621 roq_params[roq]=np.genfromtxt(params,names=True)
622 mc_priors[roq]=[float(roq_params[roq]['chirpmassmin']),float(roq_params[roq]['chirpmassmax'])]
623 ordered_roq_paths=[item[0] for item in sorted(roq_params.items(), key=key)][::-1]
624 # below is to construct non-overlapping mc priors for multiple roq mass-bin runs
625 '''i=0
626 for roq in ordered_roq_paths:
627 if i>0:
628 # change min, just set to the max of the previous one since we have already aligned it in the previous iteration of this loop
629 #mc_priors[roq][0]+= (mc_priors[roq_lengths[i-1]][1]-mc_priors[roq][0])/2.
630 mc_priors[roq][0]=mc_priors[ordered_roq_paths[i-1]][1]
631 if i<len(roq_paths)-1:
632 mc_priors[roq][1]-= (mc_priors[roq][1]- mc_priors[ordered_roq_paths[i+1]][0])/2.
633 i+=1'''
634 if coinc_xml_obj is not None:
635 trigger_mchirp = get_trigger_chirpmass(coinc_xml_obj)
636 elif sim_inspiral is not None:
637 trigger_mchirp = sim_inspiral.mchirp
638 else:
639 trigger_mchirp = None
640
641 return mc_priors, trigger_mchirp
642
643def get_roq_component_mass_priors(path, roq_paths, roq_params, key, coinc_xml_obj=None, sim_inspiral=None):
644
645 ## coinc_xml_obj and sim_inspiral cannot be given at the same time
646 ## sim_inspiral must already point at the right row
647 m1_priors = {}
648 m2_priors = {}
649
650 if coinc_xml_obj is not None and sim_inspiral is not None:
651 print("Error in get_roq_mchirp_priors, cannot use both coinc.xml and sim_inspiral\n")
652 sys.exit(1)
653
654 for roq in roq_paths:
655 params=os.path.join(path,roq,'params.dat')
656 roq_params[roq]=np.genfromtxt(params,names=True)
657 m1_priors[roq]=[float(roq_params[roq]['mass1min']),float(roq_params[roq]['mass1max'])]
658 m2_priors[roq]=[float(roq_params[roq]['mass2min']),float(roq_params[roq]['mass2max'])]
659
660 if coinc_xml_obj is not None:
661 trigger_mchirp = get_trigger_chirpmass(coinc_xml_obj)
662 elif sim_inspiral is not None:
663 trigger_mchirp = sim_inspiral.mchirp
664 else:
665 trigger_mchirp = None
666
667 return m1_priors, m2_priors, trigger_mchirp
668
669def get_roq_mass_freq_scale_factor(mc_priors, trigger_mchirp, force_flow=None):
670 mc_min = min([prange[0] for prange in mc_priors.values()])
671 mc_max = max([prange[1] for prange in mc_priors.values()])
672 scale_factor = 1.
673 if force_flow == None and trigger_mchirp != None:
674 if trigger_mchirp >= mc_max:
675 scale_factor = 2.**(floor(trigger_mchirp/mc_max))
676 if trigger_mchirp <= mc_min:
677 scale_factor = (2./3.2)**(ceil(trigger_mchirp/mc_min))
678 elif force_flow != None:
679 scale_factor = 20./force_flow
680 return scale_factor
681
682
684 return (m1*m2)**(3.0/5.0) / (m1+m2)**(1.0/5.0)
685
686def Query_ROQ_Bounds_Type(path, roq_paths):
687 # Assume that parametrization of ROQ bounds is independent of seglen; just look at first one
688 roq = roq_paths[0]
689 params = os.path.join(path,roq,'params.dat')
690 roq_params0 = np.genfromtxt(params,names=True)
691 roq_names_set = set(roq_params0.dtype.names)
692 component_mass_bounds_set = set(['mass1min', 'mass1max', 'mass2min', 'mass2max'])
693 chirp_mass_q_bounds_set = set(['chirpmassmin', 'chirpmassmax', 'qmin', 'qmax'])
694 if roq_names_set.issuperset(component_mass_bounds_set):
695 roq_bounds = 'component_mass'
696 elif roq_names_set.issuperset(chirp_mass_q_bounds_set):
697 roq_bounds = 'chirp_mass_q'
698 else:
699 print('Invalid bounds for ROQ. Ether (m1,m2) or (mc,q) bounds are supported.')
700 sys.exit(1)
701 return roq_bounds
702
704 approximant = cp.get("engine", "approx")
705 if "pseudo" in approximant:
706 approximant = approximant.split("pseudo")[0]
707 return approximant
708
709class LALInferencePipelineDAG(pipeline.CondorDAG):
710 def __init__(self,cp):
711 self.subfiles=[]
712 self.config=cp
715 if cp.has_option('paths','basedir'):
716 self.basepath=cp.get('paths','basedir')
717 else:
718 self.basepath=os.getcwd()
719 print('No basepath specified, using current directory: %s'%(self.basepath))
720 mkdirs(self.basepath)
721 print("Generating LALInference DAG in {0}".format(self.basepath))
722 self.posteriorpath=os.path.join(self.basepath,'posterior_samples')
724 daglogdir=cp.get('paths','daglogdir')
725 mkdirs(daglogdir)
726 self.daglogfile=os.path.join(daglogdir,'lalinference_pipeline-'+str(uuid.uuid1())+'.log')
727 super(LALInferencePipelineDAG,self).__init__(self.daglogfile)
728 if cp.has_option('paths','cachedir'):
729 self.cachepath=cp.get('paths','cachedir')
730 else:
731 self.cachepath=os.path.join(self.basepath,'caches')
732 mkdirs(self.cachepath)
733 if cp.has_option('paths','logdir'):
734 self.logpath=cp.get('paths','logdir')
735 else:
736 self.logpath=os.path.join(self.basepath,'log')
737 mkdirs(self.logpath)
738 if cp.has_option('analysis','ifos'):
739 self.ifos=ast.literal_eval(cp.get('analysis','ifos'))
740 else:
741 self.ifos=['H1','L1','V1']
742 self.segments={}
743 if cp.has_option('datafind','veto-categories'):
744 self.veto_categories=cp.get('datafind','veto-categories')
745 else: self.veto_categories=[]
746 for ifo in self.ifos:
747 self.segments[ifo]=[]
752 self.dq={}
753 self.frtypes=ast.literal_eval(cp.get('datafind','types'))
754 self.channels=ast.literal_eval(cp.get('data','channels'))
756 if cp.has_option("paths","webdir"):
757 self.webdir=cp.get('paths','webdir')
758 if cp.has_option("paths","existing_webdir"):
759 self.webdir=cp.get('paths','existing_webdir')
760 if cp.has_option('analysis','dataseed'):
761 self.dataseed=cp.getint('analysis','dataseed')
762 else:
763 self.dataseed=None
764 if cp.has_option('analysis','randomseed'):
765 self.randomseed=cp.getint('analysis','randomseed')
766 else:
767 self.randomseed=random.randint(1,2**31)
768 # Set up necessary job files.
769 self.prenodes={}
770 self.datafind_job = pipeline.LSCDataFindJob(self.cachepath,self.logpath,self.config)
771 self.datafind_job.add_opt('url-type','file')
772 # If running on OSG use its datafind server
773 if cp.has_option('analysis','osg') and cp.getboolean('analysis','osg'):
774 self.datafind_job.add_opt('server','datafind.ligo.org:443')
775 if cp.has_option('condor','accounting_group'):
776 self.datafind_job.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
777 if cp.has_option('condor','accounting_group_user'):
778 self.datafind_job.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
779 self.datafind_job.set_sub_file(os.path.abspath(os.path.join(self.basepath,'datafind.sub')))
780 self.preengine_job = EngineJob(self.config, os.path.join(self.basepath,'prelalinference.sub'),self.logpath,engine='lalinferencedatadump',ispreengine=True,sharedfs=True)
781 self.preengine_job.set_universe('vanilla')
782 if self.config.getboolean('analysis','roq'):
783 self.computeroqweights_job = ROMJob(self.config,os.path.join(self.basepath,'computeroqweights.sub'),self.logpath)
784 if self.config.has_option('condor','bayesline'):
785 self.bayesline_job = BayesLineJob(self.config,os.path.join(self.basepath,'bayesline.sub'),self.logpath)
788 if self.config.has_option('condor','bayeswave'):
789 for ifo in self.ifos:
790 self.bayeswavepsd_job[ifo] = BayesWavePSDJob(self.config,os.path.join(self.basepath,'bayeswavepsd_%s.sub'%(ifo)),self.logpath)
791 self.bayeswavepost_job[ifo] = BayesWavePostJob(self.config,os.path.join(self.basepath,'bayeswavepost_%s.sub'%(ifo)),self.logpath)
792 # Need to create a job file for each IFO combination
794 ifocombos=[]
795 for N in range(1,len(self.ifos)+1):
796 for a in permutations(self.ifos,N):
797 ifocombos.append(a)
798 for ifos in ifocombos:
799 self.engine_jobs[ifos] = EngineJob(self.config, os.path.join(self.basepath,'engine_%s.sub'%(reduce(lambda x,y:x+y, map(str,ifos)))),self.logpath,engine=self.engine)
800 if "summarypages" in self.config.get('condor','resultspage'):
801 self.results_page_job = PESummaryResultsPageJob(self.config, os.path.join(self.basepath,'resultspage.sub'),self.logpath)
802 else:
803 self.results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspage.sub'),self.logpath)
804 self.cotest_results_page_job = ResultsPageJob(self.config,os.path.join(self.basepath,'resultspagecoherent.sub'),self.logpath)
805 if self.config.has_section('spin_evol'):
806 self.evolve_spins_job=EvolveSamplesJob(self.config, os.path.join(self.basepath,'evolve_spins.sub'),self.logpath)
807 if self.engine=='lalinferencemcmc':
808 self.combine_job = CombineMCMCJob(self.config,os.path.join(self.basepath,'combine_files.sub'),self.logpath)
809 self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,engine='mcmc')
810 else:
811 self.merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs.sub'),self.logpath,engine='nest')
812 self.coherence_test_job = CoherenceTestJob(self.config,os.path.join(self.basepath,'coherence_test.sub'),self.logpath)
813 self.gracedbjob = GraceDBJob(self.config,os.path.join(self.basepath,'gracedb.sub'),self.logpath)
814 self.mapjob = SkyMapJob(cp, os.path.join(self.basepath,'skymap.sub'), self.logpath)
815 self.plotmapjob = PlotSkyMapJob(cp, os.path.join(self.basepath,'plotskymap.sub'),self.logpath)
816 self.postruninfojob=PostRunInfoJob(self.config,os.path.join(self.basepath,'postrungdbinfo.sub'),self.logpath)
817 # Process the input to build list of analyses to do
819
820 # Sanity checking
821 if len(self.events)==0:
822 print('No input events found, please check your config if you expect some events')
823 self.times=[e.trig_time for e in self.events]
824
825 # Set up the segments
826 if not (self.config.has_option('input','gps-start-time') and self.config.has_option('input','gps-end-time')) and len(self.times)>0:
827 (mintime,maxtime)=self.get_required_data(self.times)
828 if not self.config.has_option('input','gps-start-time'):
829 self.config.set('input','gps-start-time',str(int(floor(mintime))))
830 if not self.config.has_option('input','gps-end-time'):
831 self.config.set('input','gps-end-time',str(int(ceil(maxtime))))
833
834 # Save the final configuration that is being used
835 # first to the run dir
836 conffilename=os.path.join(self.basepath,'config.ini')
837 with open(conffilename,'w') as conffile:
838 self.config.write(conffile)
839 if self.config.has_option('paths','webdir'):
840 mkdirs(self.config.get('paths','webdir'))
841 with open(os.path.join(self.config.get('paths','webdir'),'config.ini'),'w') as conffile:
842 self.config.write(conffile)
843
844 # Generate the DAG according to the config given
845 for event in self.events: self.add_full_analysis(event)
846 if self.config.has_option('analysis','upload-to-gracedb'):
847 if self.config.getboolean('analysis','upload-to-gracedb'):
848 self.add_gracedb_FITSskymap_upload(self.events[0],engine=self.engine)
849 if self.config.has_option('condor','gdbinfo') and self.config.has_option('analysis','ugid') and self.config.getboolean('analysis','upload-to-gracedb'):
850 if self.config.has_option('gracedbinfo','server'):
851 gdb_srv=self.config.get('gracedbinfo','server')
852 else:
853 gdb_srv=None
854
855 self.add_gracedb_info_node(None,event.GID,analysis='LIB',issky=True,server=gdb_srv)
856
857 self.dagfilename="lalinference_%s-%s"%(self.config.get('input','gps-start-time'),self.config.get('input','gps-end-time'))
858 self.set_dag_file(os.path.join(self.basepath,self.dagfilename))
859
860 # Write the wrapper script for file transfers
861 self.write_wrapper_script(os.path.join(self.basepath,'lalinf_touch_output'))
862
863 def write_wrapper_script(self, path):
864 script = """#!/usr/bin/env bash
865 echo "Making placeholder output files"
866 IFS=','
867 for f in $@; do
868 touch $f;
869 echo "created $f";
870 done;
871 """
872 with open(path,'w') as scr:
873 scr.write(script)
874 import stat
875 os.chmod(path, stat.S_IXUSR | stat.S_IWUSR | stat.S_IRUSR | stat.S_IXOTH | stat.S_IROTH)
876
877 def add_full_analysis(self,event):
878 if self.engine=='lalinferencenest' or self.engine=='lalinferenceburst':
879 result=self.add_full_analysis_lalinferencenest(event)
880 elif self.engine=='lalinferencemcmc':
881 result=self.add_full_analysis_lalinferencemcmc(event)
882 else:
883 raise Exception('Unknown engine {0}'.format(self.engine))
884 return result
885
886 def get_required_data(self,times):
887 """
888 Calculate the data that will be needed to process all events
889 """
890 #psdlength = self.config.getint('input','max-psd-length')
891 padding=self.config.getint('input','padding')
892 if self.config.has_option('engine','seglen') or self.config.has_option('lalinference','seglen'):
893 if self.config.has_option('engine','seglen'):
894 seglen = int(np.ceil(self.config.getfloat('engine','seglen')))
895 if self.config.has_option('lalinference','seglen'):
896 seglen = self.config.getint('lalinference','seglen')
897 try:
898 use_gracedbpsd = (not self.config.getboolean('input','ignore-gracedb-psd'))
899 except (NoOptionError, NoSectionError):
900 use_gracedbpsd = True
901 if (use_gracedbpsd and os.path.isfile(os.path.join(self.basepath,'psd.xml.gz'))) or self.config.has_option('condor','bayesline') or self.config.has_option('condor','bayeswave'):
902 psdlength = 0
903 padding = 0
904 self.config.set('input','padding',str(padding))
905 if self.config.has_option('condor','bayeswave'):
906 if (np.log2(seglen)%1):
907 seglen = np.power(2., np.ceil(np.log2(seglen)))
908 else:
909 psdlength = 32*seglen
910 else:
911 seglen = max(e.duration for e in self.events)
912 try:
913 use_gracedbpsd = (not self.config.getboolean('input','ignore-gracedb-psd'))
914 except (NoOptionError, NoSectionError):
915 use_gracedbpsd = True
916 if (use_gracedbpsd and os.path.isfile(os.path.join(self.basepath,'psd.xml.gz'))) or self.config.has_option('condor','bayesline') or self.config.has_option('condor','bayeswave'):
917 psdlength = 0
918 padding = 0
919 self.config.set('input','padding',str(padding))
920 if self.config.has_option('condor','bayeswave'):
921 if (np.log2(seglen)%1):
922 seglen = np.power(2., np.ceil(np.log2(seglen)))
923 else:
924 psdlength = 32*seglen
925 # Assume that the data interval is (end_time - seglen -padding , end_time + psdlength +padding )
926 # -> change to (trig_time - seglen - padding - psdlength + 2 , trig_time + padding + 2) to estimate the psd before the trigger for online follow-up.
927 # Also require padding before start time
928 return (min(times)-padding-seglen-psdlength+2,max(times)+padding+2)
929
930 def setup_from_times(self,times):
931 """
932 Generate a DAG from a list of times
933 """
934 for time in self.times:
935 self.add_full_analysis(Event(trig_time=time))
936
937 def select_events(self):
938 """
939 Read events from the config parser. Understands both ranges and comma separated events, or combinations
940 eg. events=[0,1,5:10,21] adds to the analysis the events: 0,1,5,6,7,8,9,10 and 21
941 """
942 events=[]
943 times=[]
944 raw_events=self.config.get('input','events').replace('[','').replace(']','').split(',')
945 for raw_event in raw_events:
946 if ':' in raw_event:
947 limits=raw_event.split(':')
948 if len(limits) != 2:
949 print("Error: in event config option; ':' must separate two numbers.")
950 exit(0)
951 low=int(limits[0])
952 high=int(limits[1])
953 if low>high:
954 events.extend(range(int(high),int(low)+1))
955 elif high>low:
956 events.extend(range(int(low),int(high)+1))
957 else:
958 events.append(int(raw_event))
959 return events
960
961 def setup_from_inputs(self):
962 """
963 Scan the list of inputs, i.e.
964 gps-time-file, injection-file, sngl-inspiral-file, coinc-inspiral-file, pipedown-database
965 in the [input] section of the ini file.
966 And process the events found therein
967 """
968 events=[]
969 gpsstart=None
970 gpsend=None
971 if self.config.has_option('input','gps-start-time'):
972 gpsstart=self.config.getfloat('input','gps-start-time')
973 if self.config.has_option('input','gps-end-time'):
974 gpsend=self.config.getfloat('input','gps-end-time')
975 inputnames=['gps-time-file','burst-injection-file','injection-file','coinc-xml','pipedown-db','gid','gstlal-db']
976 ReadInputFromList=sum([ 1 if self.config.has_option('input',name) else 0 for name in inputnames])
977 # If no input events given, just return an empty list (e.g. for PP pipeline)
978 if ReadInputFromList!=1 and (gpsstart is None or gpsend is None):
979 return []
980 # Review: Clean up this section
981 if self.config.has_option('input','events'):
982 selected_events=self.config.get('input','events')
983 print('Selected events %s'%(str(selected_events)))
984
985 if selected_events=='all':
986 selected_events=None
987 else:
988 selected_events=self.select_events()
989 else:
990 selected_events=None
991
992 if(self.config.has_option('engine','correlatedGaussianLikelihood') or
993 self.config.has_option('engine','bimodalGaussianLikelihood') or
994 self.config.has_option('engine','rosenbrockLikelihood')):
995 analytic_test = True
996 else:
997 analytic_test = False
998
999 # No input file given, analyse the entire time stretch between gpsstart and gpsend
1000 if self.config.has_option('input','analyse-all-time') and self.config.getboolean('input','analyse-all-time')==True:
1001 print('Setting up for analysis of continuous time stretch %f - %f'%(gpsstart,gpsend))
1002 if self.config.has_option('engine','seglen'):
1003 seglen=self.config.getfloat('engine','seglen')
1004 else:
1005 print('ERROR: seglen must be specified in [engine] section when running without input file')
1006 sys.exit(1)
1007 if(self.config.has_option('input','segment-overlap')):
1008 overlap=self.config.getfloat('input','segment-overlap')
1009 else:
1010 overlap=32.;
1011 if(overlap>seglen):
1012 print('ERROR: segment-overlap is greater than seglen')
1013 sys.exit(1)
1014 # Now divide gpsstart - gpsend into jobs of seglen - overlap length
1015 t=gpsstart
1016 events=[]
1017 while(t<gpsend):
1018 ev=Event(trig_time=t+seglen-2)
1019 ev.set_engine_option('segment-start',str(t-overlap))
1020 if not analytic_test:
1021 ev.set_engine_option('time-min',str(t))
1022 tMax=t + seglen - overlap
1023 if tMax>=gpsend:
1024 tMax=gpsend
1025 if not analytic_test:
1026 ev.set_engine_option('time-max',str(tMax))
1027 events.append(ev)
1028 t=tMax
1029 return events
1030
1031 # ASCII list of GPS times
1032 if self.config.has_option('input','gps-time-file'):
1033 times=scan_timefile(self.config.get('input','gps-time-file'))
1034 if self.config.has_option('input','timeslides-ascii'):
1035 # The timeslides-ascii files contains one row per trigtime, and a column per IFO
1036 # Note: the IFO order is the same given in the ifos section of the [analysis] tag
1037 print("Reading timeslides from ascii file. Columns order is understood as follow:")
1038 for this_ifo,ifo in enumerate(self.ifos):
1039 print("Column %d"%this_ifo + "= %s "%(ifo))
1040 dest=self.config.get('input','timeslides-ascii')
1041 if not os.path.isfile(dest):
1042 print("ERROR the ascii file %s containing the timeslides does not exist\n"%dest)
1043 exit(1)
1044 else:
1045 from numpy import loadtxt
1046 data=loadtxt(dest).reshape(-1,len(self.ifos))
1047 if len(self.ifos)!= len(data[0,:]):
1048 print("ERROR: ascii timeslide file must contain a column for each IFO used in the analysis!\n")
1049 exit(1)
1050 if len(times)!=len(data[:,0]):
1051 print('ERROR: ascii timeslide must contain a row for each trigtime. Exiting...\n')
1052 exit(1)
1053 timeslides={}
1054 for this_time,time in enumerate(times):
1055 timeslides[this_time]={}
1056 for this_ifo,ifo in enumerate(self.ifos):
1057 timeslides[this_time][ifo]=data[this_time,this_ifo]
1058 events=[Event(trig_time=time,timeslide_dict=timeslides[i_time]) for i_time,time in enumerate(times)]
1059 else:
1060 events=[Event(trig_time=time) for time in times]
1061 # Siminspiral Table
1062 if self.config.has_option('input','injection-file'):
1063 injTable = lsctables.SimInspiralTable.get_table(
1064 ligolw_utils.load_filename(self.config.get('input','injection-file')))
1065 events=[Event(SimInspiral=inj) for inj in injTable]
1066 # SimBurst Table
1067 if self.config.has_option('input','burst-injection-file'):
1068 injfile=self.config.get('input','burst-injection-file')
1069 injTable=lsctables.SimBurstTable.get_table(ligolw_utils.load_filename(injfile))
1070 events=[Event(SimBurst=inj) for inj in injTable]
1071 # LVAlert CoincInspiral Table
1072 gid = None
1073 if self.config.has_option('input','gid') or self.config.has_option('input', 'coinc-xml'):
1074 flow=20.0
1075 if self.config.has_option('lalinference','flow'):
1076 flow=min(ast.literal_eval(self.config.get('lalinference','flow')).values())
1077 threshold_snr = None
1078 if not self.config.has_option('engine','distance-max') and self.config.has_option('input','threshold-snr'):
1079 threshold_snr=self.config.getfloat('input','threshold-snr')
1080
1081 # get coinc object and psd object
1082 from lal import series as lalseries
1083 psd_file_obj = None
1084 if self.config.has_option('input', 'gid'):
1085 from ligo.gracedb.rest import GraceDb, HTTPError
1086 gid = self.config.get('input', 'gid')
1087 if self.config.has_option('analysis','service-url'):
1088 client = GraceDb(
1089 service_url=self.config.get('analysis', 'service-url')
1090 )
1091 else:
1092 client = GraceDb()
1093 print("Download %s coinc.xml" % gid)
1094 coinc_file_obj = client.files(gid, "coinc.xml")
1095 print("Download %s psd.xml.gz" % gid)
1096 try:
1097 psd_file_obj = client.files(gid, "psd.xml.gz")
1098 except HTTPError:
1099 print("Failed to download %s psd.xml.gz. lalinference will estimate the psd itself." % gid)
1100 else:
1101 coinc_file_obj = open(self.config.get('input', 'coinc-xml'), "rb")
1102 try:
1103 psd_file_obj = open(self.config.get('input', 'psd-xml-gz'), "rb")
1104 except:
1105 print("lalinference will estimate the psd itself.")
1106
1107 # write down the objects to files
1108 coinc_xml_obj = ligolw_utils.load_fileobj(coinc_file_obj)[0]
1109 ligolw_utils.write_filename(
1110 coinc_xml_obj, os.path.join(self.basepath, "coinc.xml")
1111 )
1112 if psd_file_obj is not None:
1113 path_to_psd = os.path.join(self.basepath, "psd.xml.gz")
1114 psd_xml_obj = ligolw_utils.load_fileobj(psd_file_obj)[0]
1115 psd_dict = lalseries.read_psd_xmldoc(psd_xml_obj)
1116 ligolw_utils.write_filename(psd_xml_obj, path_to_psd)
1117 ifos = sorted(
1118 lsctables.CoincInspiralTable.get_table(
1119 coinc_xml_obj
1120 )[0].instruments
1121 )
1123 os.path.realpath(path_to_psd), ifos,
1124 os.path.realpath(os.path.join(self.basepath, "PSDs")),
1125 end_time=None
1126 )
1127 else:
1128 psd_dict = None
1129
1130 try:
1131 use_gracedbpsd = (not self.config.getboolean('input','ignore-gracedb-psd'))
1132 except (NoOptionError, NoSectionError):
1133 use_gracedbpsd = True
1135 coinc_xml_obj, psd_dict=psd_dict, gid=gid,
1136 threshold_snr=threshold_snr, flow=flow,
1137 roq=self.config.getboolean('analysis','roq'),
1138 use_gracedbpsd=use_gracedbpsd
1139 )
1140
1141 # pipedown-database
1142 if self.config.has_option('input','gstlal-db'):
1143 queryfunc=get_zerolag_lloid
1144 dbname=self.config.get('input','gstlal-db')
1145 elif self.config.has_option('input','pipedown-db'):
1146 queryfunc=get_zerolag_pipedown
1147 dbname=self.config.get('input','pipedown-db')
1148 else: dbname=None
1149 if dbname:
1150 db_connection = open_pipedown_database(dbname,None)[0]
1151 # Timeslides
1152 if self.config.has_option('input','time-slide-dump'):
1153 timeslidedump=self.config.get('input','time-slide-dump')
1154 else:
1155 timeslidedump=None
1156 if self.config.has_option('input','min-cfar'):
1157 mincfar=self.config.getfloat('input','min-cfar')
1158 else:
1159 mincfar=-1
1160 if self.config.has_option('input','max-cfar'):
1161 maxcfar=self.config.getfloat('input','max-cfar')
1162 else:
1163 maxcfar=-1
1164 if self.config.get('input','timeslides').lower()=='true':
1165 events=get_timeslides_pipedown(db_connection, gpsstart=gpsstart, gpsend=gpsend,dumpfile=timeslidedump,max_cfar=maxcfar)
1166 else:
1167 events=queryfunc(db_connection, gpsstart=gpsstart, gpsend=gpsend, dumpfile=timeslidedump,max_cfar=maxcfar,min_cfar=mincfar)
1168 if(selected_events is not None):
1169 used_events=[]
1170 for i in selected_events:
1171 e=events[i]
1172 e.event_id=i
1173 used_events.append(e)
1174 events=used_events
1175 if gpsstart is not None:
1176 events = list(filter(lambda e: not e.trig_time<gpsstart, events))
1177 if gpsend is not None:
1178 events = list(filter(lambda e: not e.trig_time>gpsend, events))
1179 return events
1180
1181 # Check whether to add spin evolution job
1183 if self.config.has_section('spin_evol'):
1184 from lalsimulation import SimInspiralGetApproximantFromString, SimInspiralGetSpinSupportFromApproximant
1185
1186 tidal_run_tests = self.config.has_option('engine', 'tidal') or self.config.has_option('engine', 'tidalT')
1187
1188 nonprecessing_run_tests = self.config.has_option('engine', 'disable-spin') or self.config.has_option('engine', 'aligned-spin')
1189
1190 approx_num = SimInspiralGetApproximantFromString(extract_approx(self.config))
1191
1192 precessing_wf_test = (SimInspiralGetSpinSupportFromApproximant(approx_num) == 3) # 3 corresponds to LAL_SIM_INSPIRAL_PRECESSINGSPIN
1193
1194 if tidal_run_tests:
1195 print("\n****** Note: Spin evolution will not be performed because tidal parameters are turned on ******\n")
1196 spin_evol_flag = 0
1197 elif precessing_wf_test and not nonprecessing_run_tests:
1198 spin_evol_flag = 1
1199 else:
1200 print("\n****** Note: Spin evolution will not be performed because this is not a precessing run ******\n")
1201 spin_evol_flag = 0
1202 else:
1203 spin_evol_flag = 0
1204
1205 return spin_evol_flag
1206
1207 def add_full_analysis_lalinferencenest(self,event):
1208 """
1209 Generate an end-to-end analysis of a given event (Event class)
1210 For LALinferenceNest code. Uses parallel runs if specified
1211 """
1212 evstring=str(event.event_id)
1213 if event.trig_time is not None:
1214 evstring=str(event.trig_time)+'-'+str(event.event_id)
1215 if self.config.has_option('analysis','nparallel'):
1216 Npar=self.config.getint('analysis','nparallel')
1217 else:
1218 Npar=4
1219 # Set up the parallel engine nodes
1220 enginenodes=[]
1221 bwpsdnodes={}
1222 bwpostnodes={}
1223 for i in range(Npar):
1224 n,bwpsdnodes,bwpostnodes=self.add_engine_node(event,bwpsd=bwpsdnodes,bwpost=bwpostnodes)
1225 if n is not None:
1226 if i>0:
1227 n.add_var_arg('--dont-dump-extras')
1228 enginenodes.append(n)
1229 if len(enginenodes)==0:
1230 return False
1231 myifos=enginenodes[0].get_ifos()
1232 # Merge the results together
1233 pagedir=os.path.join(self.webdir,evstring,myifos)
1234 #pagedir=os.path.join(self.basepath,evstring,myifos)
1235 mkdirs(pagedir)
1236 mergenode=MergeNode(self.merge_job,parents=enginenodes,engine='nest')
1237 mergenode.set_pos_output_file(os.path.join(self.posteriorpath,'posterior_%s_%s.hdf5'%(myifos,evstring)))
1238 self.add_node(mergenode)
1239 # Add spin evolution, if requested, setting the parent of the results page to the spin evolution if that is perfomed and to the merge if not
1240 if self.spin_evol_checks():
1241 evolve_spins_node = EvolveSamplesNode(self.evolve_spins_job, posfile = mergenode.get_pos_file(), parent = mergenode)
1242 self.add_node(evolve_spins_node)
1243
1244 respage_parent = evolve_spins_node
1245 else:
1246 respage_parent = mergenode
1247
1248 # Call finalize to build final list of available data
1249 enginenodes[0].finalize()
1250 enginenodes[0].set_psd_files()
1251 enginenodes[0].set_snr_file()
1252 if self.config.getboolean('analysis','coherence-test') and len(enginenodes[0].ifos)>1:
1253 if "summarypages" not in self.config.get('condor','resultspage'):
1254 respagenode=self.add_results_page_node(resjob=self.cotest_results_page_job,outdir=pagedir,parent=respage_parent,ifos=enginenodes[0].ifos)
1255 respagenode.set_psd_files(enginenodes[0].get_psd_files())
1256 respagenode.set_snr_file(enginenodes[0].get_snr_file())
1257 if os.path.exists(self.basepath+'/coinc.xml'):
1258 try:
1259 gid = self.config.get('input','gid')
1260 except:
1261 gid = None
1262 respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
1263
1264 par_mergenodes=[]
1265 for ifo in enginenodes[0].ifos:
1266 co_merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs_%s.sub'%(ifo)),self.logpath,engine='nest')
1267 cotest_nodes=[]
1268 for i in range(Npar):
1269 cot_node,bwpsdnodes,bwpostnodes=self.add_engine_node(event,bwpsd=bwpsdnodes,bwpost=bwpostnodes,ifos=[ifo],co_test=True)
1270 if cot_node is not None:
1271 if i>0:
1272 cot_node.add_var_arg('--dont-dump-extras')
1273 cotest_nodes.append(cot_node)
1274 if len(cotest_nodes)==0:
1275 return False
1276 for co in cotest_nodes:
1277 co.set_psdstart(enginenodes[0].GPSstart)
1278 co.set_psdlength(enginenodes[0].psdlength)
1279 if co!=cotest_nodes[0]:
1280 co.add_var_arg('--dont-dump-extras')
1281 else:
1282 co.set_psd_files()
1283 co.set_snr_file()
1284 pmergenode=MergeNode(co_merge_job,parents=cotest_nodes,engine='nest')
1285 pmergenode.set_pos_output_file(os.path.join(self.posteriorpath,'posterior_%s_%s.hdf5'%(ifo,evstring)))
1286 self.add_node(pmergenode)
1287 par_mergenodes.append(pmergenode)
1288 presultsdir=os.path.join(pagedir,ifo)
1289 mkdirs(presultsdir)
1290
1291 mkdirs(os.path.join(self.basepath,'coherence_test'))
1292 subresnode=self.add_results_page_node(outdir=presultsdir,parent=pmergenode,ifos=ifo)
1293 subresnode.set_psd_files(cotest_nodes[0].get_psd_files())
1294 subresnode.set_snr_file(cotest_nodes[0].get_snr_file())
1295 if os.path.exists(self.basepath+'/coinc.xml'):
1296 try:
1297 gid = self.config.get('input','gid')
1298 except:
1299 gid = None
1300 subresnode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
1301 if self.config.has_option('input','injection-file') and event.event_id is not None:
1302 subresnode.set_injection(self.config.get('input','injection-file'),event.event_id)
1303 elif self.config.has_option('input','burst-injection-file') and event.event_id is not None:
1304 subresnode.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
1305 coherence_node=CoherenceTestNode(self.coherence_test_job,outfile=os.path.join(self.basepath,'coherence_test','coherence_test_%s_%s.dat'%(myifos,evstring)))
1306 coherence_node.add_coherent_parent(mergenode)
1307 for parmergenode in par_mergenodes:
1308 coherence_node.add_incoherent_parent(parmergenode)
1309 self.add_node(coherence_node)
1310 respagenode.add_parent(coherence_node)
1311 respagenode.set_bayes_coherent_incoherent(coherence_node.outfile)
1312
1313 elif "summarypages" in self.config.get('condor','resultspage'):
1314 par_mergenodes=[]
1315 for ifo in enginenodes[0].ifos:
1316 co_merge_job = MergeJob(self.config,os.path.join(self.basepath,'merge_runs_%s.sub'%(ifo)),self.logpath,engine='nest')
1317 cotest_nodes=[]
1318 for i in range(Npar):
1319 cot_node,bwpsdnodes,bwpostnodes=self.add_engine_node(event,bwpsd=bwpsdnodes,bwpost=bwpostnodes,ifos=[ifo],co_test=True)
1320 if cot_node is not None:
1321 if i>0:
1322 cot_node.add_var_arg('--dont-dump-extras')
1323 cotest_nodes.append(cot_node)
1324 if len(cotest_nodes)==0:
1325 return False
1326 for co in cotest_nodes:
1327 co.set_psdstart(enginenodes[0].GPSstart)
1328 co.set_psdlength(enginenodes[0].psdlength)
1329 if co!=cotest_nodes[0]:
1330 co.add_var_arg('--dont-dump-extras')
1331 else:
1332 co.set_psd_files()
1333 co.set_snr_file()
1334 pmergenode=MergeNode(co_merge_job,parents=cotest_nodes,engine='nest')
1335 pmergenode.set_pos_output_file(os.path.join(self.posteriorpath,'posterior_%s_%s.hdf5'%(ifo,evstring)))
1336 self.add_node(pmergenode)
1337 par_mergenodes.append(pmergenode)
1338 presultsdir=os.path.join(pagedir,ifo)
1339 mkdirs(presultsdir)
1340
1341
1342 respagenode=self.add_results_page_node_pesummary(outdir=pagedir,parent=respage_parent,gzip_output=None,ifos=enginenodes[0].ifos,
1343 evstring=evstring, coherence=True)
1344 respagenode.set_psd_files(enginenodes[0].ifos, enginenodes[0].get_psd_files())
1345 try:
1346 cachefiles = self.config.get('resultspage','plot-strain-data')
1347 cachefiles_option = True
1348 except:
1349 cachefiles_option = False
1350 if cachefiles_option:
1351 respagenode.set_cache_files(enginenodes[0].channels, enginenodes[0].cachefiles)
1352
1353 try:
1354 labels = self.config.get('resultspage','label')
1355 except:
1356 labels = None
1357 respagenode.set_labels(labels)
1358
1359 try:
1360 gid = self.config.get('input','gid')
1361 except:
1362 gid = None
1363 respagenode.set_gid(gid)
1364 if os.path.exists(self.basepath+'/coinc.xml'):
1365 respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
1366
1367 else:
1368
1369 if "summarypages" in self.config.get('condor','resultspage'):
1370 respagenode=self.add_results_page_node_pesummary(outdir=pagedir,parent=respage_parent,gzip_output=None,ifos=enginenodes[0].ifos,
1371 evstring=evstring, coherence=False)
1372 respagenode.set_psd_files(enginenodes[0].ifos, enginenodes[0].get_psd_files())
1373 try:
1374 cachefiles = self.config.get('resultspage','plot-strain-data')
1375 cachefiles_option = True
1376 except:
1377 cachefiles_option = False
1378 if cachefiles_option:
1379 respagenode.set_cache_files(enginenodes[0].channels, enginenodes[0].cachefiles)
1380 try:
1381 labels = self.config.get('resultspage','label')
1382 except:
1383 labels = None
1384 respagenode.set_labels(labels)
1385
1386 try:
1387 gid = self.config.get('input','gid')
1388 except:
1389 gid = None
1390 respagenode.set_gid(gid)
1391 if os.path.exists(self.basepath+'/coinc.xml'):
1392 respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
1393 else:
1394 respagenode=self.add_results_page_node(outdir=pagedir,parent=respage_parent,gzip_output=None,ifos=enginenodes[0].ifos)
1395 respagenode.set_psd_files(enginenodes[0].get_psd_files())
1396 respagenode.set_snr_file(enginenodes[0].get_snr_file())
1397 if os.path.exists(self.basepath+'/coinc.xml'):
1398 try:
1399 gid = self.config.get('input','gid')
1400 except:
1401 gid = None
1402 respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
1403 if self.config.has_option('input','injection-file') and event.event_id is not None:
1404 respagenode.set_injection(self.config.get('input','injection-file'),event.event_id)
1405 elif self.config.has_option('input','burst-injection-file') and event.event_id is not None:
1406 respagenode.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
1407
1408 if self.config.has_option('analysis','upload-to-gracedb'):
1409 if self.config.has_option('gracedbinfo','server'):
1410 gdb_srv=self.config.get('gracedbinfo','server')
1411 else:
1412 gdb_srv=None
1413
1414 if self.config.getboolean('analysis','upload-to-gracedb') and event.GID is not None:
1415 self.add_gracedb_start_node(event.GID,'LALInference',[sciseg.get_df_node() for sciseg in enginenodes[0].scisegs.values()],server=gdb_srv)
1416 self.add_gracedb_log_node(respagenode,event.GID,server=gdb_srv)
1417 elif self.config.has_option('analysis','ugid'):
1418 # LIB will want to upload info to gracedb but if we pass the gid in the usual way the pipeline
1419 # will try to pull inspiral-only XML tables from the gdb page, failing.
1420 # To avoid that, LIB will read the gracedDB id to upload info to as an ugid=ID option
1421 # in the analysis section.
1422 ugid=self.config.get('analysis','ugid')
1423 event.GID=ugid
1424 self.add_gracedb_start_node(ugid,'LIB',[sciseg.get_df_node() for sciseg in enginenodes[0].scisegs.values()],server=gdb_srv)
1425 self.add_gracedb_log_node(respagenode,ugid,burst=True,server=gdb_srv)
1426 if self.config.has_option('resultspage','email'):
1427 emailto=self.config.get('resultspage','email')
1428 else:
1429 emailto=None
1430 if self.config.has_option('gracedbinfo','server'):
1431 gdb_srv=self.config.get('gracedbinfo','server')
1432 else:
1433 gdb_srv=None
1434
1435 temp_node=self.add_gracedb_info_node(respagenode,ugid,analysis='LIB',email=emailto,server=gdb_srv)
1436 if self.config.has_option('condor','ligo-skymap-plot') and self.config.has_option('condor','ligo-skymap-from-samples'):
1437 if self.engine=='lalinferenceburst': prefix='LIB'
1438 else: prefix='LALInference'
1439 mapnode = SkyMapNode(self.mapjob, posfile = mergenode.get_pos_file(), parent=mergenode,
1440 prefix= prefix, outdir=pagedir, ifos=self.ifos)
1441 plotmapnode = PlotSkyMapNode(self.plotmapjob, parent=mapnode, inputfits = mapnode.outfits, output=os.path.join(pagedir,'skymap.png'))
1442 self.add_node(mapnode)
1443 self.add_node(plotmapnode)
1444 return True
1445
1446 def add_full_analysis_lalinferencemcmc(self,event):
1447 """
1448 Generate an end-to-end analysis of a given event
1449 For LALInferenceMCMC.
1450 """
1451 evstring=str(event.event_id)
1452 if event.trig_time is not None:
1453 evstring=str(event.trig_time)+'-'+str(event.event_id)
1454 if self.config.has_option('analysis','nparallel'):
1455 Npar=self.config.getint('analysis','nparallel')
1456 else:
1457 Npar=2
1458 enginenodes=[]
1459 bwpsdnodes={}
1460 bwpostnodes={}
1461 for i in range(Npar):
1462 n,bwpsdnodes,bwpostnodes=self.add_engine_node(event,bwpsd=bwpsdnodes,bwpost=bwpostnodes)
1463 if n is not None:
1464 if i>0:
1465 n.add_var_arg('--dont-dump-extras')
1466 enginenodes.append(n)
1467 if len(enginenodes)==0:
1468 return False
1469 myifos=enginenodes[0].get_ifos()
1470 enginenodes[0].set_psd_files()
1471 enginenodes[0].set_snr_file()
1472 pagedir=os.path.join(self.webdir,evstring,myifos)
1473 mkdirs(pagedir)
1474 combinenodes=[]
1475 for i in range(Npar):
1476 combinenodes.append(CombineMCMCNode(self.combine_job,parents=[enginenodes[i]]))
1477 input_file = combinenodes[i].get_parent_posfile(enginenodes[i])
1478 input_file_split_index = input_file.find('lalinferencemcmc-')
1479 combinenodes[i].set_pos_output_file(input_file[:input_file_split_index]+'combine_'+input_file[input_file_split_index:])
1480 combinenodes[i].add_file_arg(input_file)
1481 number_of_mpi_jobs = self.config.getint('mpi','mpi_task_count')
1482 for j in range(1,number_of_mpi_jobs):
1483 combinenodes[i].add_file_arg(input_file+".%02d" % j)
1484 self.add_node(combinenodes[i])
1485 mergenode=MergeNode(self.merge_job,parents=combinenodes,engine='mcmc')
1486 mergenode.set_pos_output_file(os.path.join(self.posteriorpath,'posterior_%s_%s.hdf5'%(myifos,evstring)))
1487 if self.config.has_option('resultspage','deltaLogP'):
1488 mergenode.add_var_arg('--deltaLogP '+str(self.config.getfloat('resultspage','deltaLogP')))
1489 if self.config.has_option('resultspage','downsample'):
1490 mergenode.add_var_arg('--downsample '+str(self.config.getint('resultspage','downsample')))
1491 if self.config.has_option('resultspage','fixedBurnin'):
1492 mergenode.add_var_arg('--fixedBurnin '+str(self.config.getint('resultspage','fixedBurnin')))
1493 self.add_node(mergenode)
1494
1495 # Add spin evolution, if requested, setting the parent of the results page to the spin evolution if that is perfomed and to the merge if not
1496 if self.spin_evol_checks():
1497 evolve_spins_node = EvolveSamplesNode(self.evolve_spins_job, posfile = mergenode.get_pos_file(), parent = mergenode)
1498 self.add_node(evolve_spins_node)
1499
1500 respage_parent = evolve_spins_node
1501 else:
1502 respage_parent = mergenode
1503
1504 if "summarypages" in self.config.get('condor','resultspage'):
1505 respagenode=self.add_results_page_node_pesummary(outdir=pagedir,parent=respage_parent,gzip_output=None,ifos=enginenodes[0].ifos, evstring=evstring, coherence=self.config.getboolean('analysis','coherence-test'))
1506 respagenode.set_psd_files(enginenodes[0].ifos, enginenodes[0].get_psd_files())
1507 try:
1508 cachefiles = self.config.get('resultspage','plot-strain-data')
1509 cachefiles_option = True
1510 except:
1511 cachefiles_option = False
1512 if cachefiles_option:
1513 respagenode.set_cache_files(enginenodes[0].channels, enginenodes[0].cachefiles)
1514
1515
1516 try:
1517 labels = self.config.get('resultspage','label')
1518 except:
1519 labels = None
1520 respagenode.set_labels(labels)
1521
1522 if os.path.exists(self.basepath+'/coinc.xml'):
1523 try:
1524 gid = self.config.get('input','gid')
1525 except:
1526 gid = None
1527 respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
1528 else:
1529 respagenode=self.add_results_page_node(outdir=pagedir,parent=respage_parent,gzip_output=None,ifos=enginenodes[0].ifos)
1530 respagenode.set_psd_files(enginenodes[0].get_psd_files())
1531 respagenode.set_snr_file(enginenodes[0].get_snr_file())
1532 if os.path.exists(self.basepath+'/coinc.xml'):
1533 try:
1534 gid = self.config.get('input','gid')
1535 except:
1536 gid = None
1537 respagenode.set_coinc_file(os.path.join(self.basepath, 'coinc.xml'), gid)
1538 if self.config.has_option('input','injection-file') and event.event_id is not None:
1539 respagenode.set_injection(self.config.get('input','injection-file'),event.event_id)
1540 if self.config.has_option('input','burst-injection-file') and event.event_id is not None:
1541 respagenode.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
1542 if event.GID is not None:
1543 if self.config.has_option('analysis','upload-to-gracedb'):
1544 if self.config.getboolean('analysis','upload-to-gracedb'):
1545 if self.config.has_option('gracedbinfo','server'):
1546 gdb_srv=self.config.get('gracedbinfo','server')
1547 else:
1548 gdb_srv=None
1549 self.add_gracedb_start_node(event.GID,'LALInference',[sciseg.get_df_node() for sciseg in enginenodes[0].scisegs.values()],server=gdb_srv)
1550 self.add_gracedb_log_node(respagenode,event.GID,server=gdb_srv)
1551 if self.config.has_option('condor','ligo-skymap-plot') and self.config.has_option('condor','ligo-skymap-from-samples'):
1552 mapnode = SkyMapNode(self.mapjob, posfile = mergenode.get_pos_file(), parent=mergenode,
1553 prefix= 'LALInference', outdir=pagedir, ifos=self.ifos)
1554 plotmapnode = PlotSkyMapNode(self.plotmapjob, parent=mapnode, inputfits = mapnode.outfits, output=os.path.join(pagedir,'skymap.png'))
1555 self.add_node(mapnode)
1556 self.add_node(plotmapnode)
1557
1559 # Query the segment database for science segments and
1560 # add them to the pool of segments
1561 start=self.config.getfloat('input','gps-start-time')
1562 end=self.config.getfloat('input','gps-end-time')
1563 if self.config.has_option('input','ignore-state-vector'):
1564 if self.config.getboolean('input','ignore-state-vector'):
1565 i=0
1566 for ifo in self.ifos:
1567 sciseg=pipeline.ScienceSegment((i,start,end,end-start))
1568 df_node=self.get_datafind_node(ifo,self.frtypes[ifo],int(sciseg.start()),int(sciseg.end()))
1569 sciseg.set_df_node(df_node)
1570 self.segments[ifo].append(sciseg)
1571 i+=1
1572 return
1573 # Read state vector
1574 for ifo in self.ifos:
1575 print('Read state vector for {0} data between {1} and {2}'.format(ifo,start,end))
1576 segs = findSegmentsToAnalyze(
1577 ifo, self.frtypes[ifo], ast.literal_eval(self.config.get('statevector', 'state-vector-channel'))[ifo],
1578 ast.literal_eval(self.config.get('statevector', 'bits')), start, end
1579 )
1580 segs.coalesce()
1581 for seg in segs:
1582 sciseg=pipeline.ScienceSegment((segs.index(seg),seg[0],seg[1],seg[1]-seg[0]))
1583 df_node=self.get_datafind_node(ifo,self.frtypes[ifo],int(sciseg.start()),int(sciseg.end()))
1584 sciseg.set_df_node(df_node)
1585 self.segments[ifo].append(sciseg)
1586
1587 def get_datafind_node(self,ifo,frtype,gpsstart,gpsend):
1588 node=pipeline.LSCDataFindNode(self.datafind_job)
1589 node.set_observatory(ifo[0])
1590 node.set_type(frtype)
1591 node.set_start(gpsstart)
1592 node.set_end(gpsend)
1593 #self.add_node(node)
1594 return node
1595
1596 def add_engine_node(self,event,bwpsd={},bwpost={},ifos=None,co_test=False,extra_options=None):
1597 """
1598 Add an engine node to the dag. Will find the appropriate cache files automatically.
1599 Will determine the data to be read and the output file.
1600 Will use all IFOs known to the DAG, unless otherwise specified as a list of strings
1601 """
1602 if ifos is None and self.config.has_option('analysis','ifos'):
1603 ifos = ast.literal_eval(self.config.get('analysis','ifos'))
1604 if ifos is None and len(event.ifos)>0:
1605 ifos=event.ifos
1606 if ifos is None:
1607 ifos=self.ifos
1608 end_time=event.trig_time
1609 if self.config.has_option('lalinference','seglen'):
1610 seglen=self.config.getfloat('lalinference','seglen')
1611 elif self.config.has_option('engine','seglen'):
1612 seglen=self.config.getfloat('engine','seglen')
1613 else:
1614 seglen=event.duration
1615 segstart=end_time+2-seglen
1616 segend=segstart+seglen
1617 # check whether lalinference psd is used or not
1618 try:
1619 use_gracedbpsd = (not self.config.getboolean('input','ignore-gracedb-psd'))
1620 except (NoOptionError, NoSectionError):
1621 use_gracedbpsd = True
1622 use_lalinference_psd = not ((use_gracedbpsd and os.path.isfile(os.path.join(self.basepath,'psd.xml.gz')))
1623 or self.config.has_option('condor','bayesline')
1624 or self.config.has_option('condor','bayeswave'))
1625 # if lalinference psd is used and minimum_realizations_number is specified, lengthen the required science segment.
1626 if use_lalinference_psd and self.config.has_option('input', 'minimum_realizations_number'):
1627 psdstart = segstart - self.config.getint('input','padding') - \
1628 self.config.getint('input', 'minimum_realizations_number') * seglen
1629 else:
1630 psdstart = segstart
1631 myifos=set([])
1632 for ifo in ifos:
1633 for seg in self.segments[ifo]:
1634 if psdstart >= seg.start() and segend <= seg.end():
1635 myifos.add(ifo)
1636 ifos=myifos
1637 if len(ifos)==0:
1638 print('No data found for time %f - %f, skipping'%(segstart,segend))
1639 return
1640
1641 computeroqweightsnode={}
1642 bayeslinenode={}
1643 bayeswavepsdnode=bwpsd
1644 bayeswavepostnode=bwpost
1646 node=self.EngineNode(self.engine_jobs[tuple(ifos)])
1647 roqeventpath=os.path.join(self.preengine_job.roqpath,str(event.event_id)+'/')
1648 if self.config.has_option('condor','bayesline') or self.config.has_option('condor','bayeswave') or self.config.getboolean('analysis','roq'):
1649 mkdirs(roqeventpath)
1650 node.set_trig_time(end_time)
1651 prenode.set_trig_time(end_time)
1652 node.set_seed(self.randomseed)
1653 prenode.set_seed(self.randomseed)
1654 original_srate=0
1655 srate=0
1656 if event.srate:
1657 original_srate=event.srate
1658 if self.config.has_option('lalinference','srate'):
1659 original_srate=self.config.getfloat('lalinference','srate')
1660 elif self.config.has_option('engine','srate'):
1661 original_srate=self.config.getfloat('engine','srate')
1662 if (np.log2(original_srate)%1):
1663 print('The srate given,'+str(original_srate)+' Hz, is not a power of 2.')
1664 print('For data handling purposes, the srate used will be '+str(np.power(2., np.ceil(np.log2(original_srate))))+' Hz')
1665 print('The inner products will still however be integrated up to '+str(original_srate/2.)+' Hz')
1666 srate = np.power(2., np.ceil(np.log2(original_srate)))
1667 else:
1668 srate = original_srate
1669 if srate != 0:
1670 node.set_srate(int(np.ceil(srate)))
1671 prenode.set_srate(int(np.ceil(srate)))
1672 if original_srate != 0:
1673 for ifo in ifos:
1674 node.fhighs[ifo]=str(original_srate/2.-1./seglen)
1675 prenode.fhighs[ifo]=str(original_srate/2.-1./seglen)
1676 node.set_srate(srate)
1677 prenode.set_srate(srate)
1678 if event.trigSNR:
1679 node.set_trigSNR(event.trigSNR)
1680 if event.horizon_distance:
1681 node.set_horizon_distance(event.horizon_distance)
1682 if self.dataseed:
1683 node.set_dataseed(self.dataseed+event.event_id)
1684 prenode.set_dataseed(self.dataseed+event.event_id)
1685 gotdata=0
1686 for ifo in ifos:
1687 if ifo in event.timeslides:
1688 slide=event.timeslides[ifo]
1689 else:
1690 slide=0
1691 for seg in self.segments[ifo]:
1692 if segstart >= seg.start() and segend <= seg.end():
1693 if not self.config.has_option('lalinference','fake-cache'):
1694 if self.config.has_option('condor','bayesline') or self.config.getboolean('analysis','roq'):
1695 prenode.add_ifo_data(ifo,seg,self.channels[ifo],timeslide=slide)
1696 gotdata+=node.add_ifo_data(ifo,seg,self.channels[ifo],timeslide=slide)
1697 else:
1698 fakecachefiles=ast.literal_eval(self.config.get('lalinference','fake-cache'))
1699 if self.config.has_option('condor','bayesline') or self.config.getboolean('analysis','roq'):
1700 prenode.add_fake_ifo_data(ifo,seg,fakecachefiles[ifo],self.channels[ifo],timeslide=slide)
1701 gotdata+=node.add_fake_ifo_data(ifo,seg,fakecachefiles[ifo],self.channels[ifo],timeslide=slide)
1702 if self.config.has_option('lalinference','psd-xmlfile'):
1703 psdpath=os.path.realpath(self.config.get('lalinference','psd-xmlfile'))
1704 node.psds=get_xml_psds(psdpath,ifos,os.path.join(self.basepath,'PSDs'),end_time=end_time)
1705 prenode.psds=get_xml_psds(psdpath,ifos,os.path.join(self.basepath,'PSDs'),end_time=end_time)
1706 if len(ifos)==0:
1707 node.ifos=node.cachefiles.keys()
1708 prenode.ifos=prenode.cachefiles.keys()
1709 else:
1710 node.ifos=ifos
1711 prenode.ifos=ifos
1712 gotdata=1
1713 try:
1714 use_gracedbpsd = (not self.config.getboolean('input','ignore-gracedb-psd'))
1715 except (NoOptionError, NoSectionError):
1716 use_gracedbpsd = True
1717 if use_gracedbpsd:
1718 if os.path.isfile(os.path.join(self.basepath,'psd.xml.gz')):
1719 psdpath=os.path.join(self.basepath,'psd.xml.gz')
1720 node.psds=get_xml_psds(psdpath,ifos,os.path.join(self.basepath,'PSDs'),end_time=None)
1721 prenode.psds=get_xml_psds(psdpath,ifos,os.path.join(self.basepath,'PSDs'),end_time=None)
1722 for ifo in ifos:
1723 prenode.flows[ifo]=str(20.0)
1724 if self.config.has_option('lalinference','flow'):
1725 node.flows=ast.literal_eval(self.config.get('lalinference','flow'))
1726 prenode.flows=ast.literal_eval(self.config.get('lalinference','flow'))
1727 if event.fhigh:
1728 for ifo in ifos:
1729 node.fhighs[ifo]=str(event.fhigh)
1730 prenode.fhighs[ifo]=str(event.fhigh)
1731 if self.config.has_option('lalinference','fhigh'):
1732 node.fhighs=ast.literal_eval(self.config.get('lalinference','fhigh'))
1733 prenode.fhighs=ast.literal_eval(self.config.get('lalinference','fhigh'))
1734 prenode.set_max_psdlength(self.config.getint('input','max-psd-length'))
1735 prenode.set_padding(self.config.getint('input','padding'))
1736 #prenode[ifo].set_output_file('/dev/null')
1737 prenode.add_file_opt('outfile',roqeventpath+'data-dump',file_is_output_file=True)
1738 prenode.add_var_arg('--data-dump')
1739 if self.config.has_option('lalinference','seglen'):
1740 p_seglen=self.config.getfloat('lalinference','seglen')
1741 elif self.config.has_option('engine','seglen'):
1742 p_seglen=self.config.getfloat('engine','seglen')
1743 else:
1744 p_seglen=event.duration
1745 prenode.set_seglen(p_seglen)
1746 if self.config.has_option('condor','bayeswave'):
1747 prenode.set_psdlength(p_seglen)
1748 if self.config.has_option('lalinference','seglen'):
1749 bw_seglen = self.config.getfloat('lalinference','seglen')
1750 elif self.config.has_option('engine','seglen'):
1751 bw_seglen = self.config.getfloat('engine','seglen')
1752 else:
1753 bw_seglen = event.duration
1754 if (np.log2(bw_seglen)%1):
1755 print('BayesWave only supports seglengths which are powers of 2, you have specified a seglength of '+str(bw_seglen)+' seconds.')
1756 print('Instead, a seglenth of '+str(np.power(2., np.ceil(np.log2(bw_seglen))))+'s will be used for the BayesWave PSD estimation.')
1757 print('The main LALInference job will stil use seglength '+str(bw_seglen)+' seconds.')
1758 bw_seglen = np.power(2., np.ceil(np.log2(bw_seglen)))
1759 if self.config.has_option('condor','bayeswave'):
1760 if self.config.has_option('bayeswave','bw_srate'):
1761 bw_srate = self.config.getfloat('bayeswave','bw_srate')
1762 print('BayesWave will generate PSDs using a srate of '+str(bw_srate)+' Hz.')
1763 print('The main LALInference job will stil use srate '+str(srate)+' Hz.')
1764 elif (np.log2(srate)%1):
1765 print('BayesWave only supports srates which are powers of 2, you have specified a srate of '+str(srate)+' Hertz.')
1766 print('Instead, a srate of '+str(np.power(2., np.ceil(np.log2(srate))))+'Hz will be used for the BayesWave PSD estimation.')
1767 print('The main LALInference job will stil use srate '+str(srate)+' Hertz.')
1768 bw_srate = np.power(2., np.ceil(np.log2(srate)))
1769 else:
1770 bw_srate = srate
1771 # Add the nodes it depends on
1772 for ifokey, seg in node.scisegs.items():
1773 dfnode=seg.get_df_node()
1774
1775 if 1==1:
1776 if self.config.has_option('condor','bayeswave') and not co_test:
1777 for ifo in ifos:
1778 bwPSDpath = os.path.join(roqeventpath,'BayesWave_PSD_'+ifo+'/')
1779 if not os.path.isdir(bwPSDpath):
1780 mkdirs(bwPSDpath)
1781 if ifo not in bayeswavepsdnode:
1782 bayeswavepsdnode[ifo]=self.add_bayeswavepsd_node(ifo)
1783 bayeswavepsdnode[ifo].add_var_arg('--bayesLine')
1784 bayeswavepsdnode[ifo].add_var_arg('--cleanOnly')
1785 bayeswavepsdnode[ifo].add_var_arg('--checkpoint')
1786 bayeswavepsdnode[ifo].set_output_dir(bwPSDpath)
1787 bayeswavepsdnode[ifo].set_trig_time(end_time)
1788 bayeswavepsdnode[ifo].set_seglen(bw_seglen)
1789 bayeswavepsdnode[ifo].set_psdlength(bw_seglen)
1790 bayeswavepsdnode[ifo].set_srate(bw_srate)
1791 if ifo in event.timeslides:
1792 slide=event.timeslides[ifo]
1793 else:
1794 slide=0
1795 for seg in self.segments[ifo]:
1796 if segstart >= seg.start() and segend <= seg.end():
1797 if not self.config.has_option('lalinference','fake-cache'):
1798 bayeswavepsdnode[ifo].add_ifo_data(ifo,seg,self.channels[ifo],timeslide=slide)
1799 else:
1800 fakecachefiles=ast.literal_eval(self.config.get('lalinference','fake-cache'))
1801 bayeswavepsdnode[ifo].add_fake_ifo_data(ifo,seg,fakecachefiles[ifo],self.channels[ifo],timeslide=slide)
1802 if self.config.has_option('lalinference','flow'):
1803 bayeswavepsdnode[ifo].flows[ifo]=np.power(2,np.floor(np.log2(ast.literal_eval(self.config.get('lalinference','flow'))[ifo])))
1804 print('BayesWave requires f_low being a power of 2, therefore f_low for '+ifo+' has been changed from '+str(ast.literal_eval(self.config.get('lalinference','flow'))[ifo])+' to '+str(np.power(2,np.floor(np.log2(ast.literal_eval(self.config.get('lalinference','flow'))[ifo]))))+' Hz (for the BayesWave job only, in the main LALInference jobs f_low will still be '+str(ast.literal_eval(self.config.get('lalinference','flow'))[ifo])+' Hz)')
1805 bayeswavepsdnode[ifo].set_seed(self.randomseed)
1806 bayeswavepsdnode[ifo].set_chainseed(self.randomseed+event.event_id)
1807 if self.dataseed:
1808 bayeswavepsdnode[ifo].set_dataseed(self.dataseed+event.event_id)
1809 else:
1810 bayeswavepsdnode[ifo].set_dataseed(self.randomseed+event.event_id)
1811 if ifo not in bayeswavepostnode:
1812 bayeswavepostnode[ifo]=self.add_bayeswavepost_node(ifo,parent=bayeswavepsdnode[ifo])
1813 bayeswavepostnode[ifo].add_var_arg('--0noise')
1814 bayeswavepostnode[ifo].add_var_arg('--lite')
1815 bayeswavepostnode[ifo].add_var_arg('--bayesLine')
1816 bayeswavepostnode[ifo].add_var_arg('--cleanOnly')
1817 #bayeswavepostnode[ifo].add_var_arg('--checkpoint')
1818 bayeswavepostnode[ifo].set_output_dir(bwPSDpath)
1819 #bayeswavepostnode[ifo].add_var_arg('--runName BayesWave_PSD')
1820 #bayeswavepostnode[ifo].add_output_file(os.path.join(bwPSDpath, 'post/clean/glitch_median_PSD_forLI_'+ifo+'.dat'))
1821 bayeswavepostnode[ifo].set_trig_time(end_time)
1822 bayeswavepostnode[ifo].set_seglen(bw_seglen)
1823 bayeswavepostnode[ifo].set_psdlength(bw_seglen)
1824 bayeswavepostnode[ifo].set_srate(bw_srate)
1825 bayeswavepost_fakecache = 'interp:'+bwPSDpath+ifo+'_fairdraw_asd.dat'
1826
1827 if ifo in event.timeslides:
1828 slide=event.timeslides[ifo]
1829 else:
1830 slide=0
1831 for seg in self.segments[ifo]:
1832 if segstart >= seg.start() and segend <= seg.end():
1833 #if not self.config.has_option('lalinference','fake-cache'):
1834 # bayeswavepostnode[ifo].add_ifo_data(ifo,seg,self.channels[ifo],timeslide=slide)
1835 #else:
1836 #fakecachefiles=ast.literal_eval(self.config.get('lalinference','fake-cache'))
1837 bayeswavepostnode[ifo].add_fake_ifo_data(ifo,seg,bayeswavepost_fakecache,self.channels[ifo],timeslide=slide)
1838
1839 if self.config.has_option('lalinference','flow'):
1840 bayeswavepostnode[ifo].flows[ifo]=np.power(2,np.floor(np.log2(ast.literal_eval(self.config.get('lalinference','flow'))[ifo])))
1841 bayeswavepostnode[ifo].set_seed(self.randomseed)
1842 bayeswavepostnode[ifo].set_chainseed(self.randomseed+event.event_id)
1843 if self.dataseed:
1844 bayeswavepostnode[ifo].set_dataseed(self.dataseed+event.event_id)
1845 else:
1846 bayeswavepostnode[ifo].set_dataseed(self.randomseed+event.event_id)
1847 if self.config.has_option('condor','bayesline') or self.config.getboolean('analysis','roq'):
1848 if gotdata and event.event_id not in self.prenodes.keys():
1849 if prenode not in self.get_nodes():
1850 self.add_node(prenode)
1851 for ifo in ifos:
1852 if self.config.getboolean('analysis','roq'):
1853 computeroqweightsnode[ifo]=self.add_rom_weights_node(ifo,prenode)
1854 #self.add_node(computeroqweightsnode[ifo])
1855 if self.config.has_option('input','injection-file'):
1856 freqDataFile=os.path.join(roqeventpath,'data-dump'+ifo+'-freqDataWithInjection.dat')
1857 else:
1858 freqDataFile=os.path.join(roqeventpath,'data-dump'+ifo+'-freqData.dat')
1859 prenode.add_output_file(freqDataFile)
1860 prenode.add_output_file(os.path.join(roqeventpath,'data-dump'+ifo+'-PSD.dat'))
1861 if self.config.has_option('condor','bayesline'):
1862 bayeslinenode[ifo]=self.add_bayesline_node(ifo,prenode)
1863 bayeslinenode[ifo].add_var_arg('-i '+freqDataFile)
1864 bayeslinenode[ifo].add_input_file(freqDataFile)
1865 bayeslinenode[ifo].add_var_arg('-o '+os.path.join(roqeventpath,'BayesLine_PSD_'+ifo+'.dat'))
1866 bayeslinenode[ifo].add_output_file(os.path.join(roqeventpath,'BayesLine_PSD_'+ifo+'.dat'))
1867 if self.config.getboolean('analysis','roq'):
1868 computeroqweightsnode[ifo].add_var_arg('--fHigh '+str(prenode.fhighs[ifo]))
1869 computeroqweightsnode[ifo].add_file_opt('data',freqDataFile)
1870 computeroqweightsnode[ifo].add_file_opt('psd',os.path.join(roqeventpath,'data-dump'+ifo+'-PSD.dat'))
1871 computeroqweightsnode[ifo].add_file_opt('out',roqeventpath,file_is_output_file=True)
1872 computeroqweightsnode[ifo].add_output_file(os.path.join(roqeventpath,'weights_quadratic_'+ifo+'.dat'))
1873 #self.prenodes[seg.id()]=(prenode,computeroqweightsnode)
1874 if self.config.has_option('condor','bayesline'):
1875 self.prenodes[event.event_id]=(prenode,bayeslinenode)
1876 if self.config.getboolean('analysis','roq'):
1877 self.prenodes[event.event_id]=(prenode,computeroqweightsnode)
1878
1879 if self.config.has_option('condor','bayesline') or self.config.getboolean('analysis','roq'):
1880 #node.add_parent(self.prenodes[seg.id()][1][ifokey])
1881 node.add_parent(self.prenodes[event.event_id][1][ifokey])
1882
1883 if dfnode is not None and dfnode not in self.get_nodes():
1884 if not self.config.has_option('lalinference','fake-cache'):
1885 self.add_node(dfnode)
1886
1887 if gotdata:
1888 self.add_node(node)
1889 else:
1890 print('no data found for time %f'%(end_time))
1891 return None, bayeswavepsdnode
1892 if extra_options is not None:
1893 for opt in extra_options.keys():
1894 node.add_var_arg('--'+opt+' '+extra_options[opt])
1895 # Add control options
1896 if self.config.has_option('input','injection-file'):
1897 node.set_injection(self.config.get('input','injection-file'),event.event_id)
1898 prenode.set_injection(self.config.get('input','injection-file'),event.event_id)
1899 if self.config.has_option('condor','bayeswave') and bayeswavepsdnode and bayeswavepostnode:
1900 for ifo in ifos:
1901 bayeswavepsdnode[ifo].set_injection(self.config.get('input','injection-file'),event.event_id)
1902 bayeswavepostnode[ifo].set_injection(self.config.get('input','injection-file'),event.event_id)
1903 if self.config.has_option('input','burst-injection-file'):
1904 node.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
1905 prenode.set_injection(self.config.get('input','burst-injection-file'),event.event_id)
1906 if self.config.has_option('lalinference','seglen'):
1907 node.set_seglen(self.config.getfloat('lalinference','seglen'))
1908 elif self.config.has_option('engine','seglen'):
1909 node.set_seglen(self.config.getfloat('engine','seglen'))
1910 else:
1911 node.set_seglen(event.duration)
1912 if self.config.has_option('condor','bayeswave') and bayeswavepsdnode and bayeswavepostnode:
1913 node.set_psdlength(bw_seglen)
1914 if self.config.has_option('input','psd-length'):
1915 node.set_psdlength(self.config.getint('input','psd-length'))
1916 prenode.set_psdlength(self.config.getint('input','psd-length'))
1917 if self.config.has_option('condor','bayeswave') and bayeswavepsdnode and bayeswavepostnode:
1918 for ifo in ifos:
1919 bayeswavepsdnode[ifo].set_psdlength(self.config.getint('input','psd-length'))
1920 bayeswavepostnode[ifo].set_psdlength(self.config.getint('input','psd-length'))
1921 if self.config.has_option('input','psd-start-time'):
1922 node.set_psdstart(self.config.getfloat('input','psd-start-time'))
1923 prenode.set_psdstart(self.config.getfloat('input','psd-start-time'))
1924 if self.config.has_option('condor','bayeswave') and bayeswavepsdnode and bayeswavepostnode:
1925 for ifo in ifos:
1926 bayeswavepsdnode[ifo].set_psdstart(self.config.getfloat('input','psd-start-time'))
1927 bayeswavepostnode[ifo].set_psdstart(self.config.getfloat('input','psd-start-time'))
1928 node.set_max_psdlength(self.config.getint('input','max-psd-length'))
1929 prenode.set_max_psdlength(self.config.getint('input','max-psd-length'))
1930 node.set_padding(self.config.getint('input','padding'))
1931 prenode.set_padding(self.config.getint('input','padding'))
1932 if self.config.has_option('condor','bayeswave') and bayeswavepsdnode and bayeswavepostnode:
1933 for ifo in ifos:
1934 bayeswavepsdnode[ifo].set_max_psdlength(self.config.getint('input','max-psd-length'))
1935 bayeswavepsdnode[ifo].set_padding(self.config.getint('input','padding'))
1936 bayeswavepostnode[ifo].set_max_psdlength(self.config.getint('input','max-psd-length'))
1937 bayeswavepostnode[ifo].set_padding(self.config.getint('input','padding'))
1938 out_dir=os.path.join(self.basepath,'engine')
1939 mkdirs(out_dir)
1940 node.set_output_file(os.path.join(out_dir,node.engine+'-'+str(event.event_id)+'-'+node.get_ifos()+'-'+str(node.get_trig_time())+'-'+str(node.id)))
1941 if self.config.getboolean('analysis','roq'):
1942 for ifo in ifos:
1943 node.add_file_opt(ifo+'-roqweightsLinear',os.path.join(roqeventpath,'weights_linear_'+ifo+'.dat'))
1944 node.add_file_opt(ifo+'-roqweightsQuadratic',os.path.join(roqeventpath,'weights_quadratic_'+ifo+'.dat'))
1945 node.add_file_opt('roqtime_steps',os.path.join(roqeventpath,'roq_sizes.dat'))
1946 node.add_file_opt('roq-times',os.path.join(roqeventpath,'tcs.dat'))
1947 node.add_file_opt('roqnodesLinear',os.path.join(roqeventpath,'fnodes_linear.dat'))
1948 node.add_file_opt('roqnodesQuadratic',os.path.join(roqeventpath,'fnodes_quadratic.dat'))
1949 if self.config.has_option('condor','bayesline'):
1950 for ifo in ifos:
1951 node.psds[ifo]=os.path.join(roqeventpath,'BayesLine_PSD_'+ifo+'.dat')
1952 node.add_input_file(os.path.join(roqeventpath,'BayesLine_PSD_'+ifo+'.dat'))
1953 prenode.psds[ifo]=os.path.join(roqeventpath,'BayesLine_PSD_'+ifo+'.dat')
1954 prenode.add_input_file(os.path.join(roqeventpath,'BayesLine_PSD_'+ifo+'.dat'))
1955 if self.config.has_option('condor','bayeswave') and bayeswavepostnode:
1956 for ifo in ifos:
1957 node.psds[ifo]=os.path.join(roqeventpath,'BayesWave_PSD_'+ifo+'/post/clean/glitch_median_PSD_forLI_'+ifo+'.dat')
1958 node.add_input_file(os.path.join(roqeventpath,'BayesWave_PSD_'+ifo+'/post/clean/glitch_median_PSD_forLI_'+ifo+'.dat'))
1959 prenode.psds[ifo]=os.path.join(roqeventpath,'BayesWave_PSD_'+ifo+'/post/clean/glitch_median_PSD_forLI_'+ifo+'.dat')
1960 prenode.add_input_file(os.path.join(roqeventpath,'BayesWave_PSD_'+ifo+'/post/clean/glitch_median_PSD_forLI_'+ifo+'.dat'))
1961 for (opt,arg) in event.engine_opts.items():
1962 node.add_var_opt(opt,arg)
1963 if self.config.has_option('condor','bayeswave') and self.engine not in ['bayeswave','bayeswavepost']:
1964 for ifo in ifos:
1965 node.add_parent(bayeswavepostnode[ifo])
1966 prenode.add_parent(bayeswavepostnode[ifo])
1967 return node,bayeswavepsdnode,bayeswavepostnode
1968
1969 def add_results_page_node_pesummary(self, resjob=None, outdir=None, parent=None, extra_options=None, gzip_output=None, ifos=None, evstring=None, coherence=False):
1970 if resjob is None:
1971 resjob=self.results_page_job
1972 node=PESummaryResultsPageNode(resjob)
1973 if parent is not None:
1974 node.add_parent(parent)
1975 if coherence:
1976 infiles = [os.path.join(self.posteriorpath,'posterior_%s_%s.hdf5'%(ifo,evstring)) for ifo in ifos]
1977 infiles.append(parent.get_pos_file())
1978 else:
1979 infiles = [parent.get_pos_file()]
1980 conffile = os.path.join(self.config.get('paths','webdir'),'config.ini')
1981 node.add_input_file(conffile)
1982 inifiles = [conffile]*len(infiles)
1983 node.add_var_opt("samples", " ".join(infiles))
1984 for f in infiles:
1985 node.add_input_file(f)
1986 node.add_file_opt("config", " ".join(inifiles))
1987 approximant = extract_approx(self.config)
1988 node.add_var_opt('approximant', " ".join([approximant]*len(infiles)))
1989 calibration = []
1990 for ifo in ifos:
1991 try:
1992 calibration_envelope = self.config.get("engine", "%s-spcal-envelope" %(ifo))
1993 node.add_input_file(calibration_envelope)
1994 calibration.append("%s:%s" %(ifo, calibration_envelope))
1995 except:
1996 pass
1997 if len(calibration) > 0:
1998 node.add_var_opt("calibration", " ".join(calibration))
1999 node.set_output_path(outdir)
2000 self.add_node(node)
2001 return node
2002
2003 def add_results_page_node(self,resjob=None,outdir=None,parent=None,extra_options=None,gzip_output=None,ifos=None):
2004 if resjob is None:
2005 resjob=self.results_page_job
2006 node=ResultsPageNode(resjob)
2007 if parent is not None:
2008 node.add_parent(parent)
2009 infile=parent.get_pos_file()
2010 node.add_file_arg(infile)
2011 node.append_in_files(infile)
2012
2013 node.set_output_path(outdir)
2014 if gzip_output is not None:
2015 node.set_gzip_output(gzip_output)
2016 if ifos is not None:
2017 if isinstance(ifos,list):
2018 pass
2019 else:
2020 ifos=[ifos]
2021 node.set_ifos(ifos)
2022
2023 self.add_node(node)
2024 return node
2025
2026 def add_gracedb_start_node(self,gid,name='',parent=None,server=None):
2027
2028 node=GraceDBNode(self.gracedbjob,parent=parent,gid=gid,command='create log',tag='pe',server=server)
2029 node.set_message(name+' online parameter estimation started.')
2030 self.add_node(node)
2031 return node
2032
2033 def add_gracedb_log_node(self,respagenode,gid,burst=False,server=None):
2034 nodes=[]
2035 node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid,command='create log',tag='pe',server=server)
2036 resurl=respagenode.webpath.replace(self.gracedbjob.basepath,self.gracedbjob.baseurl)
2037 #node.set_message('online parameter estimation results: '+resurl+'/posplots.html')
2038 node.set_message("LALInference online parameter estimation finished. <a href="+resurl+"/posplots.html>results</a>")
2039 self.add_node(node)
2040 nodes.append(node)
2041
2042 tag='pe'
2043 if self.config.has_option('analysis','add-lvem-tag'):
2044 if self.config.getboolean('analysis','add-lvem-tag'):
2045 tag='pe,lvem'
2046 if burst is False:
2047 node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid,command='upload',tag=tag)
2048 node.set_filename(respagenode.webpath+'/corner/extrinsic.png')
2049 self.add_node(node)
2050 nodes.append(node)
2051
2052 node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid,command='upload',tag='pe')
2053 node.set_filename(respagenode.webpath+'/corner/intrinsic.png')
2054 self.add_node(node)
2055 nodes.append(node)
2056
2057 node=GraceDBNode(self.gracedbjob,parent=respagenode,gid=gid,command='upload',tag='pe')
2058 node.set_filename(respagenode.webpath+'/corner/sourceFrame.png')
2059 self.add_node(node)
2060 nodes.append(node)
2061
2062 return nodes
2063
2064 def add_gracedb_FITSskymap_upload(self,event,engine=None):
2065 gid=event.GID
2066 if gid is None:
2067 return
2068 if engine=='lalinferenceburst':
2069 prefix='LIB'
2070 elif engine is None:
2071 prefix="skymap"
2072 else:
2073 prefix='LALInference'
2074 nodes=None
2075 if self.config.has_option('condor','skyarea'):
2076 if self.config.has_option('analysis','upload-to-gracedb'):
2077 if self.config.getboolean('analysis','upload-to-gracedb'):
2078 tag='sky_loc'
2079 if self.config.has_option('analysis','add-lvem-tag'):
2080 if self.config.getboolean('analysis','add-lvem-tag'):
2081 tag='sky_loc,lvem'
2082 skynodes=filter(lambda x: isinstance(x,SkyMapNode) ,self.get_nodes())
2083 nodes=[]
2084 for sk in skynodes:
2085 if len(sk.ifos)>1:
2086 node=GraceDBNode(self.gracedbjob,parent=sk,gid=gid,tag=tag)
2087 #for p in sk.__parents:
2088 # if isinstance(p,ResultPageNode):
2089 # resultpagenode=p
2090 node.set_filename(sk.outfits)
2091 node.set_message('%s FITS sky map'%prefix)
2092 self.add_node(node)
2093 nodes.append(node)
2094 return nodes
2095
2096 def add_gracedb_info_node(self,respagenode,gid,analysis='LALInference',issky=False,prefix="LIB",email=None,server=None):
2097
2098 # if issky=True, this node will upload the FITS file into GDB. BCI and BSN will be used to decide which tags to use
2099 # Otherwise, this node will upload information about parameters and bayes factors
2100
2101 if respagenode is not None:
2102 samples=respagenode.posfile
2103 hdf5samples=respagenode.get_in_files()[0] # This should only be called by LIB, which only uses one file
2104 else:
2105 # Try to find it
2106 resnodes=filter(lambda x: isinstance(x,ResultsPageNode) ,self.get_nodes())
2107 for rs in resnodes:
2108 if len(rs.ifos)>1:
2109 respagenode=rs
2110 samples=respagenode.posfile
2111 hdf5samples=respagenode.get_in_files()[0] # This should only be called by LIB, which only uses one file
2112
2113 if self.postruninfojob.isdefined is False:
2114 return None
2115 if issky is False:
2116 node=PostRunInfoNode(self.postruninfojob,parent=respagenode,gid=gid,samples=hdf5samples,server=server)
2117 if email is not None:
2118 node.set_email(email)
2119 else:
2120 skynodes=filter(lambda x: isinstance(x,SkyMapNode) ,self.get_nodes())
2121 for sk in skynodes:
2122 skymap=sk.outdir+'/%s.fits'%prefix
2123 message=' %s FITS sky map'%prefix
2124 node=PostRunInfoNode(self.postruninfojob,parent=sk,gid=gid,samples=None,server=server)
2125 node.set_skymap(skymap)
2126 node.set_message(message)
2127
2128 bci=respagenode.get_bcifile()
2129 if bci is not None:
2130 node.set_bci(bci)
2131 bsn=respagenode.get_bsnfile()
2132 if bsn is not None:
2133 node.set_bsn(bsn)
2134 node.set_analysis(analysis)
2135 node.finalize()
2136 self.add_node(node)
2137 return node
2138
2139 def add_rom_weights_node(self,ifo,parent=None):
2140 #try:
2141 #node=self.computeroqweightsnodes[ifo]
2142 #except KeyError:
2143 node=ROMNode(self.computeroqweights_job,ifo,parent.seglen,parent.flows[ifo])
2144 self.computeroqweightsnode[ifo]=node
2145 if parent is not None:
2146 node.add_parent(parent)
2147 self.add_node(node)
2148 return node
2149
2150 def add_bayesline_node(self,ifo,parent=None):
2151 node=BayesLineNode(self.bayesline_job)
2152 self.bayeslinenode[ifo]=node
2153 if parent is not None:
2154 node.add_parent(parent)
2155 self.add_node(node)
2156 return node
2157
2158 def add_bayeswavepsd_node(self,ifo,parent=None):
2159 node=BayesWavePSDNode(self.bayeswavepsd_job[ifo])
2160 self.bayeswavepsdnode[ifo]=node
2161 if parent is not None:
2162 node.add_parent(parent)
2163 self.add_node(node)
2164 return node
2165
2166 def add_bayeswavepost_node(self,ifo,parent=None):
2167 node=BayesWavePostNode(self.bayeswavepost_job[ifo])
2168 self.bayeswavepostnode[ifo]=node
2169 if parent is not None:
2170 node.add_parent(parent)
2171 self.add_node(node)
2172 return node
2173
2174
2175class LALInferenceDAGJob(pipeline.CondorDAGJob):
2176 """
2177 Class to define DAG Jobs for lalinference pipeline.
2178 Handles some common condor settings like requirements, accounting groups,
2179 and queues.
2180 Parameters:
2181
2182 cp : configparser object
2183 sharedfs: If False, will map files to local paths on execute node
2184 """
2185 def __init__(self, cp=None, sharedfs=False, requires_frames=False):
2186 self.transfer_files=not sharedfs
2188 self.requires_frames=requires_frames
2189 self.x509path=None
2190 if not cp:
2191 # Create dummy empty config
2192 from configparser import ConfigParser
2193 cp = ConfigParser()
2194 # If the user has specified sharedfs=True, disable the file transfer
2195 if cp.has_option('condor','sharedfs'):
2196 if cp.getboolean('condor','sharedfs'):
2197 self.transfer_files = False
2198 if cp.has_option('analysis','osg'):
2199 # We must allow file transfer for OSG running
2200 if self.transfer_files:
2201 self.osg=cp.getboolean('analysis','osg')
2202 else:
2203 self.osg=False
2204 else:
2205 self.osg=False
2206 # 500 MB should be enough for anyone
2207 self.add_condor_cmd('request_disk','500M')
2208 self.add_condor_cmd('getenv','True')
2209 # Only remove the job from the queue if it completed successfully
2210 # self.add_condor_cmd('on_exit_remove','(ExitBySignal == False) && (ExitCode == 0)')
2211 # Unless max_retries is exceeded
2212 self.add_condor_cmd('max_retries','5')
2213 # Add requirements from the configparser condor section
2214 if cp.has_option('condor','requirements'):
2215 self.add_requirement(cp.get('condor','requirements'))
2216 # Add accounting group information if present
2217 if cp.has_option('condor','accounting_group'):
2218 self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
2219 if cp.has_option('condor','accounting_group_user'):
2220 self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
2221 if cp.has_option('condor','queue'):
2222 self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
2223 # The following line means the jobs will run ONLY on the specified queue
2224 # Disabled to allow priority_pe jobs to also run on general resources
2225 # self.add_requirement('(TARGET.'+cp.get('condor','queue')+' =?= True)')
2226 if self.transfer_files:
2227 self.add_condor_cmd('transfer_executable','False')
2228 self.add_condor_cmd('transfer_input_files','$(macroinput)')
2229 self.add_condor_cmd('transfer_output_files','$(macrooutput)')
2230 self.add_condor_cmd('transfer_output_remaps','"$(macrooutputremaps)"')
2231 # Add data transfer options
2232 self.add_condor_cmd('should_transfer_files','YES')
2233 self.add_condor_cmd('when_to_transfer_output','ON_EXIT_OR_EVICT')
2234 # Wrapper script to create files to transfer back
2235 self.add_condor_cmd('+PreCmd','"lalinf_touch_output"')
2236 self.add_condor_cmd('+PreArguments', '"$(macrooutput)"')
2237 # Sync logs back to run directory
2238 self.add_condor_cmd('stream_output', True)
2239 self.add_condor_cmd('stream_error', True)
2240 else:
2241 self.add_condor_cmd('should_transfer_files','NO')
2242
2243 if self.osg: # Job will run on OSG nodes
2244 # Remote site must have LIGO frames
2245 # This line forces the job to run on OSG (remove after finished testing to allow local running
2246 # self.add_requirement('IS_GLIDEIN=?=True')
2247 pass
2248 else: # Job will run on local cluster
2249 # DESIRED_Sites="nogrid" forces to not run on OSG
2250 self.add_condor_cmd('+DESIRED_Sites','"nogrid"')
2251 # FlockLocal allows running on local cluster if submitted from an ldas-osg node
2252 self.add_condor_cmd('+flock_local',True)
2253
2254 def set_x509path(self, path):
2255 """
2256 Copy the x509 proxy file to the path given, and
2257 use it in the DAG. Requires ligo-proxy-init to be run
2258 before generating DAG.
2259 """
2260 from shutil import copyfile
2261 from subprocess import check_output, CalledProcessError
2262
2263 try:
2264 res = check_output(['grid-proxy-info','-path'])
2265 except CalledProcessError as e:
2266 print(e.output)
2267 print('Error: x509proxy file not found. Please run ligo-proxy-init before creating DAG')
2268 sys.exit(1)
2269 tmp_path = res.strip()
2270 print('Using x509 proxy from ',tmp_path)
2271 copyfile(tmp_path, path)
2272 self.x509path = path
2273
2274 def add_requirement(self,requirement):
2275 """
2276 Add a requirement to the condor submit file
2277 """
2278 # Check that string isn't empty
2279 if requirement:
2280 self.requirements.append(requirement)
2281
2282 def write_sub_file(self):
2283 """
2284 Over-load CondorDAGJob.write_sub_file to write the requirements
2285 """
2286 if self.requires_frames and self.osg:
2287 self.add_requirement('HAS_LIGO_FRAMES =?= True')
2288 # If using frames, set the x509 path to a permanant location
2289 self.set_x509path(os.path.join(
2290 self.get_config('paths','basedir'),'x509proxy')
2291 )
2292 if self.x509path is not None:
2293 self.add_condor_cmd('use_x509userproxy','True')
2294 self.add_condor_cmd('x509userproxy',self.x509path)
2295 if self.requirements:
2296 self.add_condor_cmd('requirements','&&'.join('({0})'.format(r) for r in self.requirements))
2297
2298 # Call the parent method to do the rest
2299 super(LALInferenceDAGJob,self).write_sub_file()
2300
2302 def __init__(self, *args, **kwargs):
2303 kwargs['sharedfs']=True
2304 super(LALInferenceDAGSharedFSJob, self).__init__(*args, **kwargs)
2305
2306class LALInferenceDAGNode(pipeline.CondorDAGNode):
2307 """
2308 Node for LALInference DAG jobs. Will perform filename mapping
2309 to run in condor's local filesystem
2310 """
2311 def add_output_file(self,filename):
2312 fname=filename
2313 if self.job().transfer_files:
2314 fname=os.path.basename(
2315 os.path.relpath(fname,
2316 start=self.job().get_config('paths','basedir')))
2317 self.add_output_macro(fname)
2318 super(LALInferenceDAGNode,self).add_output_file(filename)
2319 def add_input_file(self,filename):
2320 filename=os.path.relpath(filename,start=self.job().get_config('paths','basedir'))
2321 self.add_input_macro(filename)
2322 super(LALInferenceDAGNode,self).add_input_file(filename)
2323 def add_checkpoint_file(self,filename):
2324 filename=os.path.relpath(filename,start=self.job().get_config('paths','basedir'))
2325 self.add_checkpoint_macro(filename)
2326 super(LALInferenceDAGNode,self).add_checkpoint_file(filename)
2327 def add_file_opt(self, opt, filename, file_is_output_file=False):
2328 # The code option needs the path relative to basedir
2329 relfile=os.path.relpath(filename,start=self.job().get_config('paths','basedir'))
2330 # Use the basename here if working in local filesystem mode
2331 if self.job().transfer_files:
2332 self.add_var_opt(opt,os.path.join('.',os.path.basename(relfile)))
2333 else:
2334 self.add_var_opt(opt, relfile)
2335 if file_is_output_file:
2336 self.add_output_file(filename)
2337 else:
2338 self.add_input_file(filename)
2339
2340 def finalize(self):
2341 # Set up file mapping
2342 outputremaps = ';'.join( ['{0}={1}'.format(os.path.basename(f), f)
2343 for f in self.get_output_files()]
2344 )
2345 if self.job().transfer_files:
2346 self.add_macro('macrooutputremaps', outputremaps)
2347 self.add_input_file(os.path.join(self.job().get_config('paths','basedir'),'lalinf_touch_output'))
2348 for f in self.job().get_input_files():
2349 self.add_input_file(f)
2350 super(LALInferenceDAGNode, self).finalize()
2351
2352
2353class EngineJob(LALInferenceDAGJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
2354 def __init__(self,cp,submitFile,logdir,engine,ispreengine=False,sharedfs=False,
2355 requires_frames=False, *args, **kwargs):
2356 self.ispreengine=ispreengine
2357 self.engine=engine
2358 basepath=cp.get('paths','basedir')
2359 universe = 'vanilla'
2360 if ispreengine is True:
2361 roqpath=os.path.join(basepath,'ROQdata')
2362 self.roqpath=roqpath
2363 mkdirs(roqpath)
2364 exe=cp.get('condor',self.engine)
2365 else:
2366 if self.engine=='lalinferencemcmc':
2367 exe=cp.get('condor','mpiwrapper')
2368 elif self.engine=='lalinferencenest' or self.engine=='lalinferenceburst':
2369 exe=cp.get('condor',self.engine)
2370 else:
2371 print('LALInferencePipe: Unknown engine node type %s!'%(self.engine))
2372 sys.exit(1)
2373
2374 pipeline.CondorDAGJob.__init__(self,universe,exe)
2375 pipeline.AnalysisJob.__init__(self,cp)
2376 LALInferenceDAGJob.__init__(self, cp, sharedfs=sharedfs, requires_frames=requires_frames)
2377
2378 if cp.has_option('engine','resume'):
2379 self.resume=True
2380 # These are taken from James's example
2381 self.add_condor_cmd('+SuccessCheckpointExitCode','77')
2382 self.add_condor_cmd('+WantFTOnCheckpoint','True')
2383 self.add_opt('checkpoint-exit-code','77')
2384 else:
2385 self.resume=False
2386
2387 # Set the options which are always used
2388 self.set_sub_file(os.path.abspath(submitFile))
2389 if self.engine=='lalinferencemcmc':
2390 self.binary=cp.get('condor',self.engine.replace('mpi',''))
2391 self.mpirun=cp.get('condor','mpirun')
2392 if cp.has_section('mpi'):
2393 if ispreengine is False:
2394 self.machine_count=cp.get('mpi','machine-count')
2395 self.machine_memory=cp.get('mpi','machine-memory')
2396 if cp.has_option('mpi','mpi_task_count'):
2397 self.mpi_task_count=cp.get('mpi','mpi_task_count')
2398 else:
2400 else:
2401 self.machine_count=str(1)
2402 self.machine_memory=cp.get('mpi','machine-memory')
2404 else:
2405 self.machine_count=str(1)
2406 self.machine_memory=str(1024) # default value if the user did not specify something
2408
2409 self.add_condor_cmd('request_cpus',self.machine_count)
2410 self.add_condor_cmd('environment','OMP_NUM_THREADS='+str(int(ceil(int(self.machine_count)/int(self.mpi_task_count)))))
2411 self.add_condor_cmd('request_memory',str(float(self.machine_count)*float(self.machine_memory)))
2412 if self.engine=='lalinferencenest':
2413 self.add_condor_cmd('request_memory','4000') # 4GB RAM for high SNR BNS
2414
2415 self.add_condor_cmd('stream_output', True)
2416 self.add_condor_cmd('stream_error', True)
2417
2418 if cp.has_section(self.engine):
2419 if not ispreengine:
2420 self.add_ini_opts(cp,self.engine)
2421
2422 # Add [engine] options, convering file-related ones to local paths
2423 for k in cp.options('engine'):
2424 arg=cp.get('engine',k)
2425 if os.path.exists(arg) and self.transfer_files:
2426 self.add_input_file(arg)
2427 self.add_opt(k, os.path.join('.', os.path.basename(arg)))
2428 else:
2429 self.add_opt(k,arg)
2430
2431 self.set_stdout_file(os.path.join(logdir,'lalinference-$(cluster)-$(process)-$(node).out'))
2432 self.set_stderr_file(os.path.join(logdir,'lalinference-$(cluster)-$(process)-$(node).err'))
2433 # For LALInferenceNest demand only 1 thread (to be tuned later)
2434 if self.engine=='lalinferencenest':
2435 self.add_condor_cmd('environment','OMP_NUM_THREADS=1')
2436 if cp.has_option('condor','notification'):
2437 self.set_notification(cp.get('condor','notification'))
2438 if cp.has_option('resultspage','email'):
2439 self.add_condor_cmd('notify_user',cp.get('resultspage','email'))
2440
2441
2443 new_id = itertools.count()
2444 def __init__(self,li_job):
2445 super(EngineNode,self).__init__(li_job)
2446 self.ifos=[]
2447 self.scisegs={}
2448 self.channels={}
2449 self.psds={}
2450 self.flows={}
2451 self.fhighs={}
2453 self.seglen=None
2454 self.psdlength=None
2455 self.padding=None
2456 self.maxlength=None
2457 self.psdstart=None
2458 self.snrfile=None
2459 self.psdfiles=None
2462 self.id=next(EngineNode.new_id)
2463 self.__finaldata=False
2464 self.fakedata=False
2465 self.lfns=[] # Local file names (for frame files and pegasus)
2466
2467 def set_seglen(self,seglen):
2468 self.seglen=seglen
2469
2470 def set_psdlength(self,psdlength):
2471 self.psdlength=psdlength
2472
2473 def set_max_psdlength(self,psdlength):
2474 self.maxlength=psdlength
2475
2476 def set_padding(self,padding):
2477 self.padding=padding
2478
2479 def set_psdstart(self,psdstart):
2480 self.psdstart=psdstart
2481
2482 def set_seed(self,seed):
2483 self.add_var_opt('randomseed',str(int(seed)+self.id))
2484
2485 def set_srate(self,srate):
2486 self.add_var_opt('srate',str(srate))
2487
2488 def set_trigSNR(self,trigSNR):
2489 self.add_var_opt('trigger-snr',str(trigSNR))
2490
2491 def set_horizon_distance(self,horizon_distance):
2492 self.add_var_opt('distance-max',str(horizon_distance))
2493
2494 def set_dataseed(self,seed):
2495 self.add_var_opt('dataseed',str(seed))
2496
2497 def set_chainseed(self,seed):
2498 self.add_var_opt('chainseed',str(seed))
2499
2500 def get_ifos(self):
2501 return ''.join(map(str,self.ifos))
2502
2503 def set_psd_files(self):
2504 pathroot=self.posfile
2505 if pathroot[-3:]=='.00':
2506 pathroot=pathroot[:-3]
2507 st=""
2508 for i in self.ifos:
2509 tmpst="%s%s-PSD.dat,"%(pathroot,i)
2510 st+=tmpst
2511 self.add_output_file(tmpst[:-1])
2512 st=st[:-1]
2513 self.psdfiles=st
2514
2515 def get_psd_files(self):
2516 return self.psdfiles
2517
2518 def set_snr_file(self):
2519 try:
2520 #bambi store the outfile in fileroot
2521 pathroot=self.fileroot
2522 except:
2523 #mcmc and nest in postfile
2524 pathroot=self.posfile
2525 if pathroot[-3:]=='.00':
2526 pathroot=pathroot[:-3]
2527 st="%s_snr.txt"%pathroot
2528 self.add_output_file(st)
2529 self.snrfile=st
2530
2531 def get_snr_file(self):
2532 return self.snrfile
2533
2534 def set_trig_time(self,time):
2535 """
2536 Set the end time of the signal for the centre of the prior in time
2537 """
2538 self.__trigtime=float(time)
2539 self.add_var_opt('trigtime','{:.9f}'.format(float(time)))
2540
2541 def set_event_number(self,event):
2542 """
2543 Set the event number in the injection XML.
2544 """
2545 if event is not None:
2546 self.__event=int(event)
2547 self.add_var_opt('event',str(event))
2548
2549 def set_injection(self,injfile,event):
2550 """
2551 Set a software injection to be performed.
2552 """
2553 self.add_file_opt('inj',injfile)
2554 self.set_event_number(event)
2555
2556 def get_trig_time(self): return self.__trigtime
2557
2558 def add_fake_ifo_data(self,ifo,sciseg,fake_cache_name,fake_channel_name,timeslide=0):
2559 """
2560 Dummy method to set up fake data without needing to run datafind
2561 """
2562 self.ifos.append(ifo)
2563 self.scisegs[ifo]=sciseg
2564 self.cachefiles[ifo]=fake_cache_name
2565 self.timeslides[ifo]=timeslide
2566 self.channels[ifo]=fake_channel_name
2567 self.fakedata=True
2568 return 1
2569
2570 def add_ifo_data(self,ifo,sciseg,channelname,timeslide=0):
2571 if self.ifos != ifo:
2572 self.ifos.append(ifo)
2573 self.scisegs[ifo]=sciseg
2574 parent=sciseg.get_df_node()
2575 if parent is not None:
2576 self.add_parent(parent)
2577 df_output=parent.get_output()
2578 self.set_cache(df_output,ifo)
2579 self.timeslides[ifo]=timeslide
2580 self.channels[ifo]=channelname
2581 self.job().requires_frames=True
2582 return 1
2583 else: return 0
2584
2585 def set_cache(self,filename,ifo):
2586 """
2587 Add a cache file from LIGODataFind. Based on same method from pipeline.AnalysisNode
2588 """
2589 self.cachefiles[ifo]=filename
2590 self.add_input_file(filename)
2591
2592 def finalize(self):
2593 if not self.__finaldata:
2594 self._finalize_ifo_data()
2595
2596 super(EngineNode, self).finalize()
2597
2598 def _finalize_ifo_data(self):
2599 """
2600 Add final list of IFOs and data to analyse to command line arguments.
2601 """
2602 for ifo in self.ifos:
2603 self.add_var_arg('--ifo '+ifo)
2604 if self.fakedata:
2605 self.add_var_opt('%s-cache'%(ifo),self.cachefiles[ifo])
2606 elif not self.lfns:
2607 self.add_file_opt('%s-cache'%(ifo),self.cachefiles[ifo])
2608 self.add_var_opt('%s-channel'%(ifo),self.channels[ifo])
2609 if self.flows: self.add_var_opt('%s-flow'%(ifo),self.flows[ifo])
2610 if self.fhighs: self.add_var_opt('%s-fhigh'%(ifo),self.fhighs[ifo])
2611 if self.psds:
2612 if os.path.exists(self.psds[ifo]):
2613 self.add_file_opt('%s-psd'%(ifo),self.psds[ifo])
2614 else:
2615 self.add_var_opt('%s-psd'%(ifo),self.psds[ifo])
2616 if any(self.timeslides): self.add_var_opt('%s-timeslide'%(ifo),self.timeslides[ifo])
2617
2618 """ The logic here is the following:
2619 The CBC code starts from the earliest commont time, but that means that if you run on *the same trigtime* the PSD start and PSDlength you'll get will be different, depending on wheather you are running on only one event or several, and the exact position of the event you are interested in in the list of times.
2620 Instead for each event (including single IFO runs) we do:
2621 a) get its trigtime
2622 b) set PSDlengh=maxPSD (requested by the user or equal to 32seglen)
2623 c) go define GPSstart= trigtime - (PSDlength + seglen + padding - 2)
2624
2625 By definition this means that GPSstart+ PSDlengh with never overlap with trigtime. Furthermore running on the same event will lead to the same PSDstart and lenght, no matter of whether that is a one-event or multi-event run.
2626 We should check that the PSDstart so obtained is in science mode. This is what the while loop 9 lines below is meant for.
2627 """
2628 trig_time=self.get_trig_time()
2629 maxLength=self.maxlength
2630 offset=(maxLength+self.seglen-2.+self.padding)
2631 self.GPSstart=trig_time-offset
2632 self.__GPSend=0
2633 length=maxLength
2634 dt=self.seglen/4.
2635 # First find appropriate start time
2636 while ((self.GPSstart+length>=trig_time) or any( [self.GPSstart < seg.start()+self.padding for seg in self.scisegs.values() ] )) and length>0:
2637 self.GPSstart+=dt
2638 length-=dt
2639 # Now adjust so length fits inside segment
2640 while any([self.GPSstart+length>=seg.end()-self.padding for seg in self.scisegs.values()]) and length>0:
2641 length-=dt
2642 if length<=0:
2643 print('Unable to find data for event {0}'.format(trig_time))
2644 if length<maxLength:
2645 print('Trimmed analysis data for trigger {}. Analysing {} -- {} ({}s of data)'.format(trig_time, self.GPSstart, self.GPSstart+length,length))
2646
2647 if self.psdstart is not None:
2648 self.GPSstart=self.psdstart
2649 #print 'Over-riding start time to user-specified value %f'%(self.GPSstart)
2650 #if self.GPSstart<starttime or self.GPSstart>endtime:
2651 # print 'ERROR: Over-ridden time lies outside of science segment!'
2652 # raise Exception('Bad psdstart specified')
2653 self.add_var_opt('psdstart','{:.9f}'.format(self.GPSstart))
2654 if self.psdlength is None:
2655 self.psdlength=length
2656 if(self.psdlength>self.maxlength):
2657 self.psdlength=self.maxlength
2658 self.add_var_opt('psdlength',self.psdlength)
2659 self.add_var_opt('seglen',self.seglen)
2660
2661 self.__finaldata=True
2662
2664 def __init__(self,li_job):
2665 super(LALInferenceNestNode,self).__init__(li_job)
2666 self.engine='lalinferencenest'
2667 self.outfilearg='outfile'
2668
2669 def set_output_file(self,filename):
2670 self.nsfile=filename+'.hdf5'
2671 self.posfile=self.nsfile
2672 self.add_file_opt(self.outfilearg,self.nsfile,file_is_output_file=True)
2673
2674 def get_ns_file(self):
2675 return self.nsfile
2676
2678 def __init__(self,li_job):
2679 super(LALInferenceNestNode,self).__init__(li_job)
2680 self.engineengine='lalinferenceburst'
2682 def set_injection(self,injfile,event):
2683 """
2684 Set a software injection to be performed.
2685 """
2686 self.add_file_opt('binj',injfile)
2687 self.set_event_number(event)
2688
2690 def __init__(self,li_job):
2691 super(LALInferenceMCMCNode,self).__init__(li_job)
2692 self.engine='lalinferencemcmc'
2693 self.outfilearg='outfile'
2694 self.add_var_opt('mpirun',li_job.mpirun)
2695 self.add_var_opt('np',str(li_job.mpi_task_count))
2696 # The MCMC exe itself should not be transferred, as it will break conda linking
2697 self.add_var_opt('executable',li_job.binary)
2698
2699 def set_output_file(self,filename):
2700 self.posfile=filename+'.hdf5'
2701 self.add_file_opt(self.outfilearg,self.posfile,file_is_output_file=True)
2702 if self.job().resume:
2703 self.add_output_file(self.posfile+'.resume')
2704 # Should also take care of the higher temperature outpufiles with
2705 # self.add_output_file, getting the number of files from machine_count
2706 for i in range(1,int(self.job().mpi_task_count)):
2707 tempfile = self.posfile + '.' + '{:d}'.format(i).zfill(2)
2708 self.add_output_file(tempfile)
2709 if self.job().resume:
2710 self.add_output_file(tempfile+'.resume')
2711
2712 def get_pos_file(self):
2713 return self.posfile
2714
2716 def __init__(self,li_job):
2717 super(LALInferenceDataDumpNode,self).__init__(li_job)
2718 self.engine='lalinferencedatadump'
2719 self.outfilearg='outfile'
2720 def set_output_file(self,filename):
2721 pass
2722
2723class BayesWavePSDJob(LALInferenceDAGSharedFSJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
2724 """
2725 Class for a BayesWave job
2726
2727 Make sure all necessary commands are given for O3 BayesWave
2728 """
2729 def __init__(self,cp,submitFile,logdir):
2730 exe=cp.get('condor','bayeswave')
2731 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
2732 pipeline.AnalysisJob.__init__(self,cp)
2733 LALInferenceDAGSharedFSJob.__init__(self, cp)
2734 if cp.has_section('bayeswave'):
2735 self.add_ini_opts(cp,'bayeswave')
2736 self.set_sub_file(submitFile)
2737 self.set_stdout_file(os.path.join(logdir,'bayeswavepsd-$(cluster)-$(process).out'))
2738 self.set_stderr_file(os.path.join(logdir,'bayeswavepsd-$(cluster)-$(process).err'))
2739 # Bayeswave actually runs on node filesystem via these commands
2740 self.add_condor_cmd('transfer_executable','False')
2741 self.add_condor_cmd('getenv','True')
2742 self.add_condor_cmd('request_memory',cp.get('condor','bayeswave_request_memory'))
2743 self.ispreengine = False
2744 self.add_condor_cmd('stream_output', True)
2745 self.add_condor_cmd('stream_error', True)
2746 self.add_condor_cmd('+SuccessCheckpointExitCode', 77)
2747 self.add_condor_cmd('+WantFTOnCheckpoint', True)
2748 self.add_condor_cmd('should_transfer_files', 'YES')
2749 self.add_condor_cmd('when_to_transfer_output', 'ON_EXIT_OR_EVICT')
2750 self.add_condor_cmd('transfer_input_files','$(macroinput),/usr/bin/mkdir,caches')
2751 self.add_condor_cmd('transfer_output_files','$(macrooutput)')
2752 self.add_condor_cmd('+PreCmd','"mkdir"')
2753 self.add_condor_cmd('+PreArguments','"-p $(macrooutputDir)"')
2754
2755def topdir(path):
2756 """
2757 Returns the top directory in a path, e.g.
2758 topdir('a/b/c') -> 'a'
2759 """
2760 a,b=os.path.split(path)
2761 if a:
2762 return topdir(a)
2763 else:
2764 return b
2765
2766
2768 def __init__(self,bayeswavepsd_job):
2769 super(BayesWavePSDNode,self).__init__(bayeswavepsd_job)
2770 self.engine='bayeswave'
2771 self.outfilearg='outfile'
2772
2773 def set_output_file(self,filename):
2774 pass
2775
2776 def set_output_dir(self, dirname):
2777 path = os.path.relpath(dirname,
2778 start=self.job().get_config('paths','basedir'))
2779 self.add_var_opt('outputDir',path)
2780 # BWPost reads and writes to its directory
2781 # the output path is a set of nested dirs, if we tell condor to
2782 # transfer the top level it should copy back into place
2783 self.add_output_file(topdir(path))
2784
2785class BayesWavePostJob(LALInferenceDAGSharedFSJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
2786 """
2787 Class for a BayesWavePost job
2788
2789 Make sure all necessary commands are given for O3 BayesWavePost
2790 """
2791 def __init__(self,cp,submitFile,logdir):
2792 exe=cp.get('condor','bayeswavepost')
2793 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
2794 pipeline.AnalysisJob.__init__(self,cp)
2795 LALInferenceDAGSharedFSJob.__init__(self, cp)
2796 if cp.has_section('bayeswave'):
2797 self.add_ini_opts(cp,'bayeswave')
2798 self.set_sub_file(submitFile)
2799 self.add_condor_cmd('transfer_executable','False')
2800 self.set_stdout_file(os.path.join(logdir,'bayeswavepost-$(cluster)-$(process).out'))
2801 self.set_stderr_file(os.path.join(logdir,'bayeswavepost-$(cluster)-$(process).err'))
2802 self.add_condor_cmd('getenv','True')
2803 self.add_condor_cmd('request_memory',cp.get('condor','bayeswavepost_request_memory'))
2804 self.add_condor_cmd('stream_output', True)
2805 self.add_condor_cmd('stream_error', True)
2806 self.add_condor_cmd('+SuccessCheckpointExitCode', 77)
2807 self.add_condor_cmd('+WantFTOnCheckpoint', True)
2808 self.add_condor_cmd('should_transfer_files', 'YES')
2809 self.add_condor_cmd('when_to_transfer_output', 'ON_EXIT_OR_EVICT')
2810 self.add_condor_cmd('transfer_input_files','$(workdir)')
2811 self.add_condor_cmd('transfer_output_files','$(workdir)')
2812 self.ispreengine = False
2813
2815 def __init__(self,bayeswavepost_job):
2816 super(BayesWavePostNode,self).__init__(bayeswavepost_job)
2817 self.engine='bayeswavepost'
2818 self.outfilearg='outfile'
2819
2820 def set_output_file(self,filename):
2821 pass
2822
2823 def set_output_dir(self, dirname):
2824 path = os.path.relpath(dirname,
2825 start=self.job().get_config('paths','basedir'))
2826 self.add_var_opt('outputDir',path)
2827 # BWPost reads and writes to its directory
2828 self.add_macro('workdir',topdir(path))
2829
2830class PESummaryResultsPageJob(LALInferenceDAGSharedFSJob,pipeline.AnalysisJob):
2831 """Class to handle the creation of the summary page job using `PESummary`
2832 """
2833
2834 def __init__(self, cp, submitFile, logdir):
2835 exe=cp.get('condor','resultspage')
2836 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
2837 pipeline.AnalysisJob.__init__(self,cp) # Job always runs locally
2838 LALInferenceDAGSharedFSJob.__init__(self,cp)
2839 self.set_sub_file(os.path.abspath(submitFile))
2840 self.set_stdout_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).out'))
2841 self.set_stderr_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).err'))
2842 self.add_condor_cmd('getenv','True')
2843 try:
2844 self.add_condor_cmd('request_memory', cp.get('condor', 'resultspage_memory'))
2845 except NoOptionError:
2846 self.add_condor_cmd('request_memory', '2000')
2847
2848
2849class ResultsPageJob(LALInferenceDAGSharedFSJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
2850 def __init__(self,cp,submitFile,logdir):
2851 exe=cp.get('condor','resultspage')
2852 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
2853 pipeline.AnalysisJob.__init__(self,cp) # Job always runs locally
2854 LALInferenceDAGSharedFSJob.__init__(self, cp)
2855 self.set_sub_file(os.path.abspath(submitFile))
2856 self.set_stdout_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).out'))
2857 self.set_stderr_file(os.path.join(logdir,'resultspage-$(cluster)-$(process).err'))
2858 self.add_condor_cmd('getenv','True')
2859 try:
2860 self.add_condor_cmd('request_memory', cp.get('condor', 'resultspage_memory'))
2861 except NoOptionError:
2862 self.add_condor_cmd('request_memory', '2000')
2863 self.add_ini_opts(cp,'resultspage')
2864
2865 if cp.has_option('results','skyres'):
2866 self.add_opt('skyres',cp.get('results','skyres'))
2867
2869
2870 def __init__(self, results_page_job, outpath=None):
2871 super(PESummaryResultsPageNode,self).__init__(results_page_job)
2872 if outpath is not None:
2873 self.set_output_path(outpath)
2874
2875 @staticmethod
2877 entries = glob(path+"/*")
2878 if "%s/home.html" %(path) in entries:
2879 return "existing"
2880 return "new"
2881
2882 def set_output_path(self,path):
2883 self.webpath=path
2884 option = self.determine_webdir_or_existing_webdir(path)
2885 if option == "existing":
2886 self.add_var_opt('existing_webdir',path)
2887 else:
2888 self.add_var_opt('webdir',path)
2889
2891 return self.webpath
2892
2893 def set_injection(self,injfile,eventnumber):
2894 self.injfile=injfile
2895 self.add_file_opt('inj',injfile)
2896 self.set_event_number(eventnumber)
2897
2898 def get_injection(self):
2899 return self.injfile
2900
2901 def set_event_number(self,event):
2902 """
2903 Set the event number in the injection XML.
2904 """
2905 if event is not None:
2906 self.__event=int(event)
2907 self.add_var_opt('eventnum',str(event))
2908
2910 return self.__event
2911
2912 def set_psd_files(self,ifos,st):
2913 if st is None:
2914 return
2915 psds = ""
2916 st = st.split(",")
2917 for num, i in enumerate(ifos):
2918 psds += " %s:%s" %(i, st[num])
2919 self.add_var_opt('psd',psds)
2920
2921 def set_cache_files(self, channels, cachefiles):
2922 if cachefiles == {}:
2923 return
2924 if channels is None:
2925 return
2926
2927 gwdata = ""
2928 for i in channels.keys():
2929 gwdata += " %s:%s" % (channels[i], cachefiles[i])
2930 self.add_var_opt('gwdata',gwdata)
2931 self.add_input_file(cachefiles[i])
2932
2933 def set_calibration_files(self,ifos,st):
2934 if st is None:
2935 return
2936 calibration = ""
2937 st = st.split(",")
2938 for num, i in enumerate(ifos):
2939 calibration += " %s:%s" %(i, st[num])
2940 self.add_var_opt('calibration',calibration)
2941
2942 def set_labels(self, labels):
2943 if labels is None:
2944 return
2945 l = labels.split(",")
2946 self.add_var_opt('labels'," ".join(l))
2947
2948 def set_snr_file(self,st):
2949 if st is None:
2950 return
2951
2952 def set_coinc_file(self,coinc,gid=None):
2953 if gid:
2954 self.__event=gid
2955 if coinc is None:
2956 return
2957 self.add_file_opt('trigfile',coinc)
2958
2959 def add_engine_parent(self,node):
2960 """
2961 Add a parent node which is one of the engine nodes
2962 And automatically set options accordingly
2963 """
2964 self.add_parent(node)
2965 self.add_file_arg(node.get_pos_file())
2966 self.infiles.append(node.get_pos_file())
2967
2968 def get_pos_file(self):
2969 return self.posfile
2970
2971 def set_gid(self, gid):
2972 if gid is None:
2973 return
2974 self.add_var_opt('gracedb',gid)
2975
2977 self.add_file_opt('bci',bcifile)
2978
2979 def set_bayes_coherent_noise(self,bsnfile):
2980 self.add_file_opt('bsn',bsnfile)
2981
2982 def set_header_file(self,headerfile):
2983 self.add_file_opt('header',headerfile)
2984
2985 def set_ifos(self,ifos):
2986 self.ifos=ifos
2987
2988
2989
2991 def __init__(self,results_page_job,outpath=None):
2992 super(ResultsPageNode,self).__init__(results_page_job)
2993 if outpath is not None:
2994 self.set_output_path(outpath)
2995 self.__event=0
2996 self.ifos=None
2997 self.injfile=None
2998 self.bcifile=None
2999 self.bsnfile=None
3000 self.infiles=[]
3001
3002 def set_gzip_output(self,path):
3003 self.add_file_opt('archive',path,file_is_output_file=True)
3004
3005 def set_output_path(self,path):
3006 self.webpath=path
3007 #self.add_file_opt('outpath',path,file_is_output_file=True)
3008 self.add_var_opt('outpath',path)
3009 #self.add_file_opt('archive','results.tar.gz',file_is_output_file=True)
3010 mkdirs(path)
3011 self.posfile=os.path.join(path,'posterior_samples.dat')
3012 self.add_output_file(self.posfile)
3013
3015 return self.webpath
3016
3017 def set_injection(self,injfile,eventnumber):
3018 self.injfile=injfile
3019 self.add_file_opt('inj',injfile)
3020 self.set_event_number(eventnumber)
3021
3022 def get_injection(self):
3023 return self.injfile
3024
3025 def set_event_number(self,event):
3026 """
3027 Set the event number in the injection XML.
3028 """
3029 if event is not None:
3030 self.__event=int(event)
3031 self.add_var_arg('--eventnum '+str(event))
3032
3034 return self.__event
3035
3036 def set_psd_files(self,st):
3037 if st is None:
3038 return
3039 for i in st.split(','):
3040 self.add_input_file(i)
3041 self.add_var_arg('--psdfiles %s'%st)
3042
3043 def set_snr_file(self,st):
3044 if st is None:
3045 return
3046 self.add_file_opt('snr',st)
3047
3048 def set_coinc_file(self,coinc,gid=None):
3049 if gid:
3050 self.__event=gid
3051 if coinc is None:
3052 return
3053 self.add_var_arg('--trig '+coinc)
3054
3055 def append_in_files(self,this_file):
3056
3057 self.infiles.append(this_file)
3058
3059 def add_engine_parent(self,node):
3060 """
3061 Add a parent node which is one of the engine nodes
3062 And automatically set options accordingly
3063 """
3064 self.add_parent(node)
3065 self.add_file_arg(node.get_pos_file())
3066 self.append_in_files(node.get_pos_file())
3067
3068 def get_pos_file(self): return self.posfile
3069
3070 def get_in_files(self): return self.infiles
3071
3073 self.add_file_opt('bci',bcifile)
3074 self.bcifile=bcifile
3075
3076 def get_bcifile(self):
3077 return self.bcifile
3078
3079 def set_bayes_coherent_noise(self,bsnfile):
3080 self.add_file_opt('bsn',bsnfile)
3081 self.bsnfile=bsnfile
3082 def get_bsnfile(self):
3083 return self.bsnfile
3084
3085 def set_header_file(self,headerfile):
3086 self.add_file_opt('header',headerfile)
3087
3088 def set_ifos(self,ifos):
3089 self.ifos=ifos
3090
3091class CoherenceTestJob(LALInferenceDAGJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
3092 """
3093 Class defining the coherence test job to be run as part of a pipeline.
3094 """
3095 def __init__(self,cp,submitFile,logdir):
3096 exe=cp.get('condor','coherencetest')
3097 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
3098 pipeline.AnalysisJob.__init__(self,cp)
3099 LALInferenceDAGJob.__init__(self, cp)
3100 self.add_opt('new-coherent-incoherent-noise','')
3101 self.add_condor_cmd('getenv','True')
3102 self.set_stdout_file(os.path.join(logdir,'coherencetest-$(cluster)-$(process).out'))
3103 self.set_stderr_file(os.path.join(logdir,'coherencetest-$(cluster)-$(process).err'))
3104 self.set_sub_file(os.path.abspath(submitFile))
3105
3107 """
3108 Class defining the node for the coherence test
3109 """
3110 def __init__(self,coherencetest_job,outfile=None):
3111 super(CoherenceTestNode,self).__init__(coherencetest_job)
3114 self.finalized=False
3115 self.outfile=None
3116 if outfile is not None:
3117 self.outfile=outfile
3118 self.add_file_opt('outfile',outfile,file_is_output_file=True)
3119
3120 def add_coherent_parent(self,node):
3121 """
3122 Add a parent node which is an engine node, and process its outputfiles
3123 """
3124 self.coherent_parent=node
3125 self.add_parent(node)
3126 def add_incoherent_parent(self,node):
3127 """
3128 Add a parent node which provides one of the single-ifo evidence values
3129 """
3130 self.incoherent_parents.append(node)
3131 self.add_parent(node)
3132 def finalize(self):
3133 """
3134 Construct command line
3135 """
3136 if self.finalized==True: return
3137 self.finalized=True
3138 self.add_file_arg(self.coherent_parent.get_pos_file())
3139 for inco in self.incoherent_parents:
3140 self.add_file_arg(inco.get_pos_file())
3141 super(CoherenceTestNode, self).finalize()
3142
3143class MergeJob(LALInferenceDAGSharedFSJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
3144 """
3145 Class defining a job which merges several parallel nested sampling or MCMC jobs into a single file
3146 Input arguments:
3147 cp - A configparser object containing the setup of the analysis
3148 submitFile - Path to store the submit file
3149 logdir - A directory to hold the stderr, stdout files of the merge runs
3150 engine - Set to either 'nest' or 'mcmc' for the appropriate behaviour
3151 """
3152 def __init__(self,cp,submitFile,logdir,engine='nest'):
3153 if engine == 'mcmc':
3154 exe=cp.get('condor','mergeMCMCscript')
3155 else:
3156 exe=cp.get('condor','mergeNSscript')
3157 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
3158 pipeline.AnalysisJob.__init__(self,cp)
3159 LALInferenceDAGSharedFSJob.__init__(self, cp)
3160 self.set_sub_file(os.path.abspath(submitFile))
3161 self.set_stdout_file(os.path.join(logdir,'merge-$(cluster)-$(process).out'))
3162 self.set_stderr_file(os.path.join(logdir,'merge-$(cluster)-$(process).err'))
3163 self.add_condor_cmd('getenv','True')
3164 if cp.has_option('merge','npos') and engine == 'nest':
3165 self.add_opt('npos',cp.get('merge','npos'))
3166
3167
3169 """
3170 Class defining the DAG node for a NS merge job
3171 Input arguments:
3172 merge_job = A MergeJob object
3173 parents = iterable of parent LALInferenceNest nodes (must have get_ns_file() method)
3174 engine - Set to either 'nest' or 'mcmc' for the appropriate behaviour
3175 """
3176 def __init__(self,merge_job,parents=None,engine='nest'):
3177 super(MergeNode,self).__init__(merge_job)
3178 if parents is not None:
3179 for parent in parents:
3180 if engine == 'nest':
3181 self.add_engine_parent(parent)
3182 else:
3183 self.add_combine_parent(parent)
3184
3185 def add_engine_parent(self,parent):
3186 self.add_parent(parent)
3187 self.add_file_arg(parent.get_ns_file())
3188
3189 def add_combine_parent(self,parent):
3190 self.add_parent(parent)
3191 self.add_file_arg(parent.get_pos_file())
3192
3193 def set_pos_output_file(self,file):
3194 self.add_file_opt('pos',file,file_is_output_file=True)
3195 self.posfile=file
3196
3197 def get_pos_file(self): return self.posfile
3198
3199class CombineMCMCJob(LALInferenceDAGSharedFSJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
3200 """
3201 Class defining a job which combines several parallel MCMC chains into a single hdf5 file
3202 Input arguments:
3203 cp - A configparser object containing the setup of the analysis
3204 submitFile - Path to store the submit file
3205 logdir - A directory to hold the stderr, stdout files of the merge runs
3206 """
3207 def __init__(self,cp,submitFile,logdir):
3208 exe=cp.get('condor','combinePTMCMCh5script')
3209 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
3210 pipeline.AnalysisJob.__init__(self,cp)
3211 LALInferenceDAGSharedFSJob.__init__(self, cp)
3212 self.set_sub_file(os.path.abspath(submitFile))
3213 self.set_stdout_file(os.path.join(logdir,'combine-$(cluster)-$(process).out'))
3214 self.set_stderr_file(os.path.join(logdir,'combine-$(cluster)-$(process).err'))
3215 self.add_condor_cmd('getenv','True')
3216
3218 """
3219 Class defining the DAG node for a MCMC combine job
3220 Input arguments:
3221 combine_job = A CombineMCMCJob object
3222 parents = iterable of parent LALInferenceMCMC nodes (must have get_ns_file() method)
3223 """
3224 def __init__(self,combine_job,parents=None):
3225 super(CombineMCMCNode,self).__init__(combine_job)
3226 if parents is not None:
3227 for parent in parents:
3228 self.add_engine_parent(parent)
3229
3230 def add_engine_parent(self,parent):
3231 self.add_parent(parent)
3232 self.add_file_arg(parent.get_pos_file())
3233
3234 def get_parent_posfile(self,parent):
3235 return parent.get_pos_file()
3236
3237 def set_pos_output_file(self,file):
3238 self.add_file_opt('outfile',file,file_is_output_file=True)
3239 self.posfile=file
3240
3241 def get_pos_file(self): return self.posfile
3242
3243class GraceDBJob(LALInferenceDAGSharedFSJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
3244 """
3245 Class for a gracedb job
3246 """
3247 def __init__(self,cp,submitFile,logdir):
3248 exe=cp.get('condor','gracedb')
3249 pipeline.CondorDAGJob.__init__(self,"scheduler",exe)
3250 pipeline.AnalysisJob.__init__(self,cp)
3251 LALInferenceDAGSharedFSJob.__init__(self, cp)
3252 self.set_sub_file(os.path.abspath(submitFile))
3253 self.set_stdout_file(os.path.join(logdir,'gracedb-$(cluster)-$(process).out'))
3254 self.set_stderr_file(os.path.join(logdir,'gracedb-$(cluster)-$(process).err'))
3255 self.add_condor_cmd('getenv','True')
3256 self.basepath=cp.get('paths','webdir')
3258
3260 """
3261 Run the gracedb executable to report the results
3262 """
3263 def __init__(self,gracedb_job,gid=None,parent=None,message=None,upfile=None,command='upload',tag=None,server=None):
3264 # Message need to be a string
3265 # Upfile is the full path of the file to be uploaded
3266 super(GraceDBNode,self).__init__(gracedb_job)
3267 if gid: self.set_gid(gid)
3268 if parent:
3269 if isinstance(parent, list):
3270 for p in parent:
3271 self.add_parent(p)
3272 else:
3273 self.add_parent(parent)
3274 self.message=message
3275 self.filename=upfile
3276 self.command=command
3277 self.tag=tag
3278 self.server=server
3279 self.__finalized=False
3280
3281 def set_gid(self,gid):
3282 """
3283 Set the GraceDB ID to log to
3284 """
3285 self.gid=gid
3286
3287 def set_message(self,message):
3288 self.message=message
3289
3290 def set_filename(self,filename):
3291 self.filename=filename
3292
3293 def finalize(self):
3294 if self.__finalized:
3295 return
3296 self.add_var_arg(self.command)
3297 if self.tag:
3298 self.add_var_arg('--tag-name='+self.tag)
3299 self.add_var_arg(str(self.gid))
3300 if self.filename:
3301 self.add_var_arg(self.filename+' ')
3302 if self.message:
3303 self.add_var_arg("'{}'".format(self.message))
3304 if self.server:
3305 self.add_var_arg("--service-url %s"%self.server)
3306
3307 self.__finalized=True
3308 super(GraceDBNode, self).finalize()
3309
3310# Needs to access shared filesystem as it uses directory
3311class ROMJob(LALInferenceDAGSharedFSJob, pipeline.CondorDAGJob,pipeline.AnalysisJob):
3312 """
3313 Class for a ROM compute weights job
3314 """
3315 def __init__(self,cp,submitFile,logdir):
3316 time_step=0.000172895418228
3317 #This ensures that, for SNR < 100, a signal with bandwidth 20-4096 Hz will
3318 #have a resolved time posterior assuming a chirp-like frequency evolution
3319 #and aLIGO_ZDHP PSD
3320 dt=0.1
3321 exe=cp.get('condor','computeroqweights')
3322 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
3323 pipeline.AnalysisJob.__init__(self,cp)
3324 LALInferenceDAGSharedFSJob.__init__(self, cp)
3325 self.set_sub_file(submitFile)
3326 self.set_stdout_file(os.path.join(logdir,'computeroqweights-$(cluster)-$(process).out'))
3327 self.set_stderr_file(os.path.join(logdir,'computeroqweights-$(cluster)-$(process).err'))
3328 self.add_condor_cmd('getenv','True')
3329 self.add_arg('-B '+str(cp.get('paths','roq_b_matrix_directory')))
3330 if cp.has_option('engine','dt'):
3331 dt=cp.getfloat('engine','dt')
3332 self.add_arg('-t '+str(dt))
3333 if cp.has_option('engine','time_step'):
3334 time_step=cp.get('engine','time_step')
3335 self.add_arg('-T '+str(time_step))
3336 if cp.has_option('engine', 'approx'):
3337 self.add_arg('-a ' + str(cp.get('engine', 'approx')))
3338 if cp.has_option('condor','computeroqweights_memory'):
3339 required_memory = str(cp.get('condor','computeroqweights_memory'))
3340 else:
3341 roq_dir = cp.get('paths','roq_b_matrix_directory')
3342 params = np.genfromtxt(os.path.join(roq_dir, 'params.dat'), names=True)
3343 flow, fhigh, seglen = params['flow'], params['fhigh'], params['seglen']
3344 if os.path.exists(os.path.join(roq_dir, 'B_linear.npy')) and os.path.exists(os.path.join(roq_dir, 'B_quadratic.npy')):
3345 linear_basis_size = os.path.getsize(os.path.join(roq_dir, 'B_linear.npy'))
3346 quadratic_basis_size = os.path.getsize(os.path.join(roq_dir, 'B_quadratic.npy'))
3347 elif os.path.exists(os.path.join(roq_dir, 'selected_params_linear.npy')) and \
3348 os.path.exists(os.path.join(roq_dir, 'selected_params_quadratic.npy')):
3349 linear_basis_size = 32 * (fhigh - flow) * seglen * \
3350 len(np.load(os.path.join(roq_dir, 'selected_params_linear.npy')))
3351 quadratic_basis_size = 32 * (fhigh - flow) * seglen * \
3352 len(np.load(os.path.join(roq_dir, 'selected_params_quadratic.npy')))
3353 roq_weights_size = 3 * ((fhigh - flow) * seglen) * \
3354 (float(dt + 0.05) * 2 / float(time_step)) * 2 * 8
3355 # add 4gb of memory due to how matrix-copying is handled in
3356 # lalapps_compute_roq_weights.py/numpy
3357 required_memory = int(
3358 (linear_basis_size + quadratic_basis_size + roq_weights_size)
3359 / (1024 * 1024) + 4096)
3360 print('Requesting {} of memory for '
3361 'computeroqweights.'.format(required_memory))
3362 self.add_condor_cmd('request_memory', str(required_memory))
3363
3365 """
3366 Run the ROM compute weights script
3367 """
3368 def __init__(self,computeroqweights_job,ifo,seglen,flow):
3369 super(ROMNode,self).__init__(computeroqweights_job)
3370 self.__finalized=False
3371 self.add_var_arg('--seglen '+str(seglen))
3372 self.add_var_arg('--fLow '+str(flow))
3373 self.add_var_arg('--ifo '+ifo)
3374
3375class BayesLineJob(LALInferenceDAGSharedFSJob,pipeline.CondorDAGJob,pipeline.AnalysisJob):
3376 """
3377 Class for a BayesLine job
3378 """
3379 def __init__(self,cp,submitFile,logdir):
3380 exe=cp.get('condor','bayesline')
3381 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
3382 pipeline.AnalysisJob.__init__(self,cp)
3383 LALInferenceDAGSharedFSJob.__init__(self, cp)
3384 self.set_sub_file(submitFile)
3385 self.set_stdout_file(os.path.join(logdir,'bayesline-$(cluster)-$(process).out'))
3386 self.set_stderr_file(os.path.join(logdir,'bayesline-$(cluster)-$(process).err'))
3387 self.add_condor_cmd('getenv','True')
3388
3390 """
3391 Run the BayesLine code
3392 """
3393 def __init__(self,bayesline_job):
3394 super(BayesLineNode,self).__init__(bayesline_job)
3395 self.__finalized=False
3396
3397
3398class SkyMapNode(pipeline.CondorDAGNode):
3399 def __init__(self, skymap_job, posfile=None, parent=None, objid=None, prefix=None, outdir=None, ifos=None):
3400 self.prefix=prefix
3401 super(SkyMapNode, self).__init__(skymap_job)
3402 self.objid=None
3403 self.outdir=None
3404 self.finalized=False
3405 self.ifos=None
3406 if parent:
3407 self.add_parent(parent)
3408 if posfile:
3409 self.set_posfile(posfile)
3410 if objid:
3411 self.set_objid(objid)
3412 if outdir:
3413 self.set_outdir(outdir)
3414 if ifos:
3415 self.ifos=ifos
3416 def set_outdir(self, outdir):
3417 self.outdir=outdir
3418 if self.prefix:
3419 name = self.prefix+'.fits'
3420 else:
3421 name = 'skymap.fits'
3422 self.outfits = os.path.join(outdir,name)
3423 def set_posfile(self, posfile):
3424 self.posfile=posfile
3425 def get_outdir(self):
3426 return self.outdir
3427 def set_objid(self,objid):
3428 """
3429 Object ID for the fits file
3430 """
3431 self.objid = objid
3432 def finalize(self):
3433 """
3434 Construct command line
3435 """
3436 if self.finalized==True: return
3437 self.finalized=True
3438 self.add_file_opt('samples',self.posfile)
3439 self.add_file_opt('fitsoutname',self.outfits, file_is_output_file=True)
3440 self.add_file_opt('outdir',self.outdir, file_is_output_file=True)
3441 if self.objid:
3442 self.add_var_opt('objid',self.objid)
3443 if self.ifos:
3444 self.add_var_opt('instruments',' '.join(self.ifos))
3445 super(SkyMapNode,self).finalize()
3446
3447class SkyMapJob(LALInferenceDAGSharedFSJob, pipeline.CondorDAGJob,pipeline.AnalysisJob):
3448 """
3449 Node to run ligo-skymap-from-samples
3450 """
3451 def __init__(self, cp, submitFile, logdir):
3452 exe=cp.get('condor','ligo-skymap-from-samples')
3453 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
3454 pipeline.AnalysisJob.__init__(self,cp)
3455 LALInferenceDAGSharedFSJob.__init__(self, cp)
3456 self.set_sub_file(submitFile)
3457 self.set_stdout_file(os.path.join(logdir,'samples2map-$(cluster)-$(process).out'))
3458 self.set_stderr_file(os.path.join(logdir,'samples2map-$(cluster)-$(process).err'))
3459 # The user environment PYTHONPATH may be set to python2.7 version of lalsuite, so this is disabled
3460 #self.add_condor_cmd('getenv','True')
3461 # Add user-specified options from ini file
3462 self.add_ini_opts(cp,'ligo-skymap-from-samples')
3463 if cp.has_option('engine','margdist') or cp.has_option('engine','margdist-comoving'):
3464 self.add_opt('disable-distance-map','')
3465 self.add_opt('enable-multiresolution','')
3466
3467
3468class PlotSkyMapJob(LALInferenceDAGSharedFSJob, pipeline.CondorDAGJob, pipeline.AnalysisJob):
3469 """
3470 Job to run ligo-skymap-plot
3471 """
3472 def __init__(self, cp, submitFile, logdir):
3473 exe=cp.get('condor','ligo-skymap-plot')
3474 pipeline.CondorDAGJob.__init__(self, "vanilla", exe)
3475 pipeline.AnalysisJob.__init__(self, cp)
3476 LALInferenceDAGSharedFSJob.__init__(self, cp)
3477 self.set_sub_file(submitFile)
3478 self.set_stdout_file(os.path.join(logdir,'plotskymap-$(cluster)-$(process).out'))
3479 self.set_stderr_file(os.path.join(logdir,'plotskymap-$(cluster)-$(process).err'))
3480 # The user environment PYTHONPATH may be set to python2.7 version of lalsuite, so this is disabled
3481 # self.add_condor_cmd('getenv','True')
3482 # Add user-specified options from ini file
3483 self.add_ini_opts(cp,'ligo-skymap-plot')
3484
3486 def __init__(self, plotskymap_job, parent=None, inputfits = None, output=None):
3487 super(PlotSkyMapNode, self).__init__(plotskymap_job)
3488 if parent:
3489 self.add_parent(parent)
3490 if inputfits:
3491 self.set_input_fits(inputfits)
3492 if output:
3493 self.set_output(output)
3494 self.finalized=False
3495 def set_output(self, outfile):
3496 self.outfile=outfile
3497 def set_input_fits(self, fitsfile):
3498 self.fitsfile=fitsfile
3499 def get_output(self):
3500 return self.output
3501 def finalize(self):
3502 """
3503 Construct command line
3504 """
3505 if self.finalized==True: return
3506 self.finalized=True
3507 self.add_input_file(self.fitsfile)
3508 self.add_file_arg(self.fitsfile)
3509 # This is a var opt because the option name "output" results in the file
3510 # being added to macrooutput twice if add_file_opt is used.
3511 self.add_var_opt('output',self.outfile)
3512 super(PlotSkyMapNode,self).finalize()
3513
3514class PostRunInfoJob(LALInferenceDAGSharedFSJob, pipeline.CondorDAGJob,pipeline.AnalysisJob):
3515 def __init__(self,cp,submitFile,logdir):
3516
3517 self.isdefined=True
3518 if not cp.has_option('condor','gdbinfo'):
3519 self.isdefined=False
3520 return
3521 exe=cp.get('condor','gdbinfo')
3522 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
3523 pipeline.AnalysisJob.__init__(self,cp) # Job always runs locally
3524 LALInferenceDAGSharedFSJob.__init__(self, cp)
3525 self.set_sub_file(os.path.abspath(submitFile))
3526 self.set_stdout_file(os.path.join(logdir,'gdbinfo-$(cluster)-$(process).out'))
3527 self.set_stderr_file(os.path.join(logdir,'gdbinfo-$(cluster)-$(process).err'))
3528 self.add_condor_cmd('getenv','True')
3529 self.add_condor_cmd('RequestMemory','1000')
3530
3532 def __init__(self,post_run_info_job,gid=None,parent=None,samples=None,server=None):
3533 super(PostRunInfoNode, self).__init__(post_run_info_job)
3534 self.bci=None
3535 self.bsn=None
3536 self.analysis='LALInference'
3537 self.__finalized=False
3538 self.set_samples(samples)
3539 self.set_parent(parent)
3540 self.set_gid(gid)
3541 self.server=None
3542 if server is not None:
3543 self.set_server(server)
3544
3545 def finalize(self):
3546 self.add_var_opt('analysis',self.analysis)
3547 super(PostRunInfoNode, self).finalize()
3548 def set_parent(self,parentnode):
3549 self.add_parent(parentnode)
3550 def set_samples(self,samples):
3551 if samples is not None:
3552 self.add_var_arg('--samples %s'%samples)
3553 def set_skymap(self,skymap):
3554 self.add_var_arg('--skymap %s'%skymap)
3555 def set_message(self,message):
3556 self.add_var_arg("--message '%s'"%message)
3557 def set_email(self,email):
3558 self.add_var_arg('--email %s'%email)
3559 def set_gid(self,gid):
3560 self.add_var_opt('gid',gid)
3561 def set_bci(self,bci):
3562 self.add_file_opt('bci',bci)
3563 def set_bsn(self,bsn):
3564 self.add_file_opt('bsn',bsn)
3565 def set_analysis(self,analysis):
3566 self.analysis=analysis
3567 def set_server(self,server):
3568 self.server=server
3569 if server is not None:
3570 self.add_var_arg('--server %s'%self.server)
3571
3572class EvolveSamplesNode(pipeline.CondorDAGNode):
3573 """
3574 Node to evolve spins of posterior samples
3575 """
3576 def __init__(self,evolve_sample_job,parent=None,posfile=None):
3577 pipeline.CondorDAGNode.__init__(self,evolve_sample_job)
3578 if parent:
3579 self.add_parent(parent)
3580 if posfile:
3581 self.posfile=posfile
3582 self.set_posfile(posfile)
3583
3584 def set_posfile(self,posfile):
3585 self.add_var_arg('--sample_file %s'%posfile)
3586
3587 def get_pos_file(self):
3588 return self.posfile
3589
3590class EvolveSamplesJob(pipeline.CondorDAGJob,pipeline.AnalysisJob):
3591 """
3592 Class for evolving the spins of posterior samples
3593 """
3594 def __init__(self,cp,submitFile,logdir):
3595 exe=cp.get('condor','evolve_spins')
3596 pipeline.CondorDAGJob.__init__(self,"vanilla",exe)
3597 pipeline.AnalysisJob.__init__(self,cp)
3598 if cp.has_option('condor','accounting_group'):
3599 self.add_condor_cmd('accounting_group',cp.get('condor','accounting_group'))
3600 if cp.has_option('condor','accounting_group_user'):
3601 self.add_condor_cmd('accounting_group_user',cp.get('condor','accounting_group_user'))
3602 requirements=''
3603 if cp.has_option('condor','queue'):
3604 self.add_condor_cmd('+'+cp.get('condor','queue'),'True')
3605 requirements='(TARGET.'+cp.get('condor','queue')+' =?= True)'
3606 if cp.has_option('condor','Requirements'):
3607 if requirements!='':
3608 requirements=requirements+' && '
3609 requirements=requirements+cp.get('condor','Requirements')
3610 if requirements!='':
3611 self.add_condor_cmd('Requirements',requirements)
3612 self.set_sub_file(submitFile)
3613 self.set_stdout_file(os.path.join(logdir,'evolve_spins-$(cluster)-$(process).out'))
3614 self.set_stderr_file(os.path.join(logdir,'evolve_spins-$(cluster)-$(process).err'))
3615 self.add_condor_cmd('getenv','True')
3616 if cp.has_option('spin_evol','vfinal'):
3617 self.add_opt('vfinal', cp.get('spin_evol','vfinal'))
#define max(a, b)
Class for a BayesWavePost job.
def __init__(self, cp, submitFile, logdir)
ispreengine
engine
def __init__(self, bayeswavepost_job)
def set_output_dir(self, dirname)
def set_output_file(self, filename)
outfilearg
Class defining the coherence test job to be run as part of a pipeline.
Class defining the node for the coherence test.
def __init__(self, coherencetest_job, outfile=None)
def add_coherent_parent(self, node)
Add a parent node which is an engine node, and process its outputfiles.
def add_incoherent_parent(self, node)
Add a parent node which provides one of the single-ifo evidence values.
Class defining a job which combines several parallel MCMC chains into a single hdf5 file Input argume...
Class defining the DAG node for a MCMC combine job Input arguments: combine_job = A CombineMCMCJob ob...
def __init__(self, cp, submitFile, logdir, engine, ispreengine=False, sharedfs=False, requires_frames=False, *args, **kwargs)
def set_cache(self, filename, ifo)
Add a cache file from LIGODataFind.
def set_trig_time(self, time)
Set the end time of the signal for the centre of the prior in time.
def add_fake_ifo_data(self, ifo, sciseg, fake_cache_name, fake_channel_name, timeslide=0)
Dummy method to set up fake data without needing to run datafind.
def set_injection(self, injfile, event)
Set a software injection to be performed.
def set_event_number(self, event)
Set the event number in the injection XML.
def add_ifo_data(self, ifo, sciseg, channelname, timeslide=0)
Represents a unique event to run on.
def __init__(self, trig_time=None, SimInspiral=None, SimBurst=None, SnglInspiral=None, CoincInspiral=None, event_id=None, timeslide_dict=None, GID=None, ifos=None, duration=None, srate=None, trigSNR=None, fhigh=None, horizon_distance=None)
def set_engine_option(self, opt, val)
Can set event-specific options for the engine nodes using this option, e.g.
Class for evolving the spins of posterior samples.
def __init__(self, evolve_sample_job, parent=None, posfile=None)
Run the gracedb executable to report the results.
def __init__(self, gracedb_job, gid=None, parent=None, message=None, upfile=None, command='upload', tag=None, server=None)
def set_gid(self, gid)
Set the GraceDB ID to log to.
def set_injection(self, injfile, event)
Set a software injection to be performed.
Class to define DAG Jobs for lalinference pipeline.
def add_requirement(self, requirement)
Add a requirement to the condor submit file.
def set_x509path(self, path)
Copy the x509 proxy file to the path given, and use it in the DAG.
def write_sub_file(self)
Over-load CondorDAGJob.write_sub_file to write the requirements.
def __init__(self, cp=None, sharedfs=False, requires_frames=False)
def add_file_opt(self, opt, filename, file_is_output_file=False)
def add_gracedb_log_node(self, respagenode, gid, burst=False, server=None)
def add_full_analysis_lalinferencemcmc(self, event)
Generate an end-to-end analysis of a given event For LALInferenceMCMC.
def add_results_page_node(self, resjob=None, outdir=None, parent=None, extra_options=None, gzip_output=None, ifos=None)
def add_gracedb_info_node(self, respagenode, gid, analysis='LALInference', issky=False, prefix="LIB", email=None, server=None)
def get_required_data(self, times)
Calculate the data that will be needed to process all events.
def add_results_page_node_pesummary(self, resjob=None, outdir=None, parent=None, extra_options=None, gzip_output=None, ifos=None, evstring=None, coherence=False)
def add_engine_node(self, event, bwpsd={}, bwpost={}, ifos=None, co_test=False, extra_options=None)
Add an engine node to the dag.
def select_events(self)
Read events from the config parser.
def add_full_analysis_lalinferencenest(self, event)
Generate an end-to-end analysis of a given event (Event class) For LALinferenceNest code.
def add_gracedb_start_node(self, gid, name='', parent=None, server=None)
def setup_from_times(self, times)
Generate a DAG from a list of times.
Class defining a job which merges several parallel nested sampling or MCMC jobs into a single file In...
def __init__(self, cp, submitFile, logdir, engine='nest')
Class defining the DAG node for a NS merge job Input arguments: merge_job = A MergeJob object parents...
def __init__(self, merge_job, parents=None, engine='nest')
Class to handle the creation of the summary page job using PESummary
def set_event_number(self, event)
Set the event number in the injection XML.
def add_engine_parent(self, node)
Add a parent node which is one of the engine nodes And automatically set options accordingly.
def __init__(self, plotskymap_job, parent=None, inputfits=None, output=None)
isdefined
def __init__(self, cp, submitFile, logdir)
server
def set_skymap(self, skymap)
def set_gid(self, gid)
def set_email(self, email)
def set_bci(self, bci)
bsn
def set_samples(self, samples)
analysis
def set_message(self, message)
def set_parent(self, parentnode)
bci
def set_bsn(self, bsn)
def finalize(self)
def set_analysis(self, analysis)
__finalized
def set_server(self, server)
def __init__(self, post_run_info_job, gid=None, parent=None, samples=None, server=None)
def __init__(self, computeroqweights_job, ifo, seglen, flow)
def set_event_number(self, event)
Set the event number in the injection XML.
def __init__(self, results_page_job, outpath=None)
def add_engine_parent(self, node)
Add a parent node which is one of the engine nodes And automatically set options accordingly.
def __init__(self, skymap_job, posfile=None, parent=None, objid=None, prefix=None, outdir=None, ifos=None)
def set_objid(self, objid)
Object ID for the fits file.
def get_roq_component_mass_priors(path, roq_paths, roq_params, key, coinc_xml_obj=None, sim_inspiral=None)
def topdir(path)
Returns the top directory in a path, e.g.
def get_roq_mass_freq_scale_factor(mc_priors, trigger_mchirp, force_flow=None)
def get_roq_mchirp_priors(path, roq_paths, roq_params, key, coinc_xml_obj=None, sim_inspiral=None)
def get_timeslides_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1)
Returns a list of Event objects with times and timeslide offsets.
def create_events_from_coinc_and_psd(coinc_xml_obj, psd_dict=None, gid=None, threshold_snr=None, flow=20.0, roq=False, use_gracedbpsd=False)
This function calculates seglen, fhigh, srate and horizon distance from coinc.xml and psd....
def get_zerolag_pipedown(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1)
Returns a list of Event objects from pipedown data base.
def findSegmentsToAnalyze(ifo, frametype, state_vector_channel, bits, gpsstart, gpsend)
Return list of segments whose data quality is good enough for PE.
def get_xml_psds(psdxml, ifos, outpath, end_time=None)
Get a psd.xml.gz file and: 1) Reads it 2) Checks the psd file contains all the IFO we want to analyze...
def open_pipedown_database(database_filename, tmp_space)
Open the connection to the pipedown database.
def guess_url(fslocation)
Try to work out the web address of a given path.
def get_zerolag_lloid(database_connection, dumpfile=None, gpsstart=None, gpsend=None, max_cfar=-1, min_cfar=-1)
Returns a list of Event objects from pipedown data base.