2Running the omega pipeline with condor for an arbitrary amount of sources
5__author__ =
"Jeroen Meidam"
6__credits__ = [
"Jeroen Meidam"]
7__maintainer__ =
"Jeroen Meidam"
8__email__ =
"jeroen.meidam@ligo.org"
11usage=
""" omegascans_dag.py config.ini [options]
12 This script creates a DAG to run the omega pipeline for multiple detectors
13 and multiple sources on a cluster.
14 Framefiles are automatically looked up
with gw_data_find
16 It requires a config file
and a sourcefile containing trigtimes
17 and detector timeslides.
19 Run
with --example to create an example config
and sourcefile.
21 It
is possible to include auxiliary channels
in the scans only
for H1
and L1.
22 For Virgo this
is only possible on one of the Virgo clusters
25###############################################################################
29###############################################################################
31from lal import pipeline
32from optparse import OptionParser
37from subprocess import Popen,PIPE
39from configparser import ConfigParser
41###############################################################################
45###############################################################################
47DefaultFrameTypes = {'L1':'L1_RDS_R_L1',#Includes auxiliary channels
49 'V1':
'T1300121_V1_EARLY_RECOLORED_V2'}
51DefaultConfigFiles = {
'L1':
'/archive/home/omega/configurations/S6/S6b/L0L1-RDS_R_L1-selected.txt',
52 'H1':
'/archive/home/omega/configurations/S6/S6b/H0H1-RDS_R_L1-selected.txt',
55RecoloredFrameTypes = {
'L1':
'T1200307_V4_EARLY_RECOLORED_V2',
56 'H1':
'T1200307_V4_EARLY_RECOLORED_V2',
57 'V1':
'T1300121_V1_EARLY_RECOLORED_V2'}
61# QScan configuration file
63# Scans H1 and L1 gravitational-wave channel data
64# from the S5 version 1 calibrated data set
67# shourov@ligo.caltech.edu
72[Parameters,Parameter Estimation]
76[Gravitational,Gravitational wave data]
79 channelName: 'V1:h_16384Hz'
80 frameType:
'T1300121_V1_EARLY_RECOLORED_V2'
83 searchFrequencyRange: [32 Inf]
84 searchQRange: [3.32 141]
85 searchMaximumEnergyLoss: 0.2
86 whiteNoiseFalseRate: 1e-3
88 searchWindowDuration: 0.5
89 plotTimeRanges: [1 8 100]
90 plotFrequencyRange: []
91 plotNormalizedEnergyRange: [0 25.5]
108[Parameters,Parameter Estimation]
112[Gravitational,Gravitational wave data]
115 channelName:
'L1:LDAS-STRAIN'
116 frameType:
'T1200307_V4_EARLY_RECOLORED_V2'
117 sampleFrequency: 4096
119 searchFrequencyRange: [32 Inf]
120 searchQRange: [3.32 141]
121 searchMaximumEnergyLoss: 0.2
122 whiteNoiseFalseRate: 1e-3
124 searchWindowDuration: 0.5
125 plotTimeRanges: [1 8 100]
126 plotFrequencyRange: []
127 plotNormalizedEnergyRange: [0 25.5]
144[Parameters,Parameter Estimation]
148[Gravitational,Gravitational wave data]
151 channelName:
'H1:LDAS-STRAIN'
152 frameType:
'T1200307_V4_EARLY_RECOLORED_V2'
153 sampleFrequency: 4096
155 searchFrequencyRange: [32 Inf]
156 searchQRange: [3.32 141]
157 searchMaximumEnergyLoss: 0.2
158 whiteNoiseFalseRate: 1e-3
160 searchWindowDuration: 0.5
161 plotTimeRanges: [1 8 100]
162 plotFrequencyRange: []
163 plotNormalizedEnergyRange: [0 25.5]
167ConfigsRecolored = {'L1':ConfigRecoloredL,
168 'H1':ConfigRecoloredH,
169 'V1':ConfigRecoloredV}
174#The basedir is where the dag and sub files end up.
175basedir=/home/jmeidam/tiger_runs/omegascans/example
177#Within the outdir you will have one folder for each trigtime
178# and within that, a resultsfolder (the resultspage)
for each ifo
179out=/home/jmeidam/public_html/omegascans/example
183sourcefile=/home/jmeidam/tiger_runs/omegascans/example/omegascanslist.txt
188executable=/home/omega/opt/omega/bin/wpipeline
195frametypes={
'L1':
'L1_RDS_R_L1',
'H1':
'H1_RDS_R_L1',
'V1':
'T1300121_V1_EARLY_RECOLORED_V2'}
197accounting_group=ligo.dev.o1.cbc.testgr.tiger
201oddslimit={
'low':20.0}
221webdir=https://ldas-jobs.ligo.caltech.edu/~jmeidam/omegascans/example
225"""trigtime H1 L1 V1 logodds
226966944343 0 1869348 593939 107.23
227970687764 0 -1963504 -903565 156.14
228969100904 0 -109089 -2545560 96.04
231###############################################################################
235###############################################################################
239 Helper function. Make the given directory, creating intermediate
240 dirs if necessary,
and don
't complain about it already existing.
242 if os.access(path,os.W_OK)
and os.path.isdir(path):
return
243 else: os.makedirs(path)
246 p = Popen([command], stdout=PIPE,stderr=PIPE, shell=
True)
247 out, err = p.communicate()
252 Use gw_data_find to find the frame directory name which is used
as
253 an argument to wpipeline.
255 scantime = trigtime+timeslide
258 command=
"/usr/bin/gw_data_find --observatory=%s --url-type=file --type=%s --gps-start-time=%.3f --gps-end-time=%.3f"%(ifo[0],frametype,scantime,scantime)
259 datafind_stdout,datafind_stderr =
system_call(command)
260 if not datafind_stdout:
261 print(datafind_stderr)
262 exit(
"gw_data_find failed, exiting...")
263 gwf_file = datafind_stdout.replace(
"file://localhost",
"").strip()
264 gwf_dir = os.path.dirname(gwf_file).strip()
267 if not os.path.isfile(gwf_file):
268 exit(
"%s is not a file or does not exist, exiting..."%gwf_file)
275 The omega pipeline executable is followed by
"scan", which
276 needs to be
in front of all the other arguments
and options.
277 This function corrects the final subfile to this end.
278 - It removes
"scan" from the executable
279 - Places it
as the first item
in "arguments"
281 with open(subfile,
'r')
as f:
282 lines = f.readlines()
287 if l.split()[0] ==
"executable":
289 if l.split()[0] ==
"arguments":
294 if lines[i_exec].strip().split()[-1] ==
"scan":
295 execitem = lines[i_exec].strip().split()[:-1]
296 lines[i_exec] =
' '.join(execitem)
297 lines[i_exec] +=
'\n'
301 lines[i_args] = x.replace(
'"',
'" scan',1)
304 with open(subfile,
'w')
as f:
311 If the scan argument is not included
in the executable,
312 it will
not show up
in the sh file.
313 This function corrects that.
315 scriptname = ".".join(dagfilename.split(
".")[:-1]) +
".sh"
316 with open(os.path.join(dagpath,scriptname),
'r')
as f:
317 lines = f.readlines()
319 if len(executable.split())>1:
320 if executable.split()[1] ==
'scan':
327 spl = l.strip().split()
329 if len(spl[0].split(
'/')) > 1:
339 with open(os.path.join(dagpath,scriptname),
'w')
as f:
359 pipeline.CondorDAG.__init__(self,self.
daglogfile)
360 if cp.has_option(
'paths',
'logdir'):
369 if cp.has_option(
'analysis',
'ifos'):
370 self.
ifos=ast.literal_eval(cp.get(
'analysis',
'ifos'))
372 self.
ifos=[
'H1',
'L1',
'V1']
378 self.
frametypes=ast.literal_eval(cp.get(
'analysis',
'frametypes'))
379 if cp.has_option(
'analysis',
'configfiles'):
380 self.
configfiles=ast.literal_eval(cp.get(
'analysis',
'configfiles'))
382 if cp.has_option(
'analysis',
'basicscan'):
385 for ifo
in self.
ifos:
388 f.write(ConfigsRecolored[ifo])
390 if 'V1' in self.
ifos:
392 DefaultConfigFiles[
'V1']=os.path.join(self.
basepath,
"basic_config_V1.txt")
393 with open(os.path.join(self.
basepath,DefaultConfigFiles[
'V1']),
'w')
as f:
394 f.write(ConfigsRecolored[
'V1'])
400 Nsources = len(info[
'trigtime'])
407 if self.
config.has_option(
'analysis',
'oddslimit'):
416 self.
oddslimit={
'low':float(
'-inf'),
'high':float(
'inf')}
423 print(
"calling gw_data_find for each node...")
424 for n
in range(Nsources):
427 if info[
'logodds'][n] > self.
oddslimit[
'low']
and info[
'logodds'][n] < self.
oddslimit[
'high']:
428 for ifo
in self.
ifos:
430 timeslides[ifo] = info[ifo][n]
434 for ifo
in self.
ifos:
436 timeslides[ifo] = info[ifo][n]
446 table_entry[ifo]=timeslides[ifo]
447 table_entry[
'trigtime']=trigtime
448 table_entry[
'logodds']=logodds
454 writes a cbc wiki type table with some information
and links
457 filename = os.path.join(self.outpath,"CBCwiki_table.txt")
458 print(
"writing CBC wiki table to \"%s\""%filename)
461 if self.
config.has_option(
'cbcwikitable',
'outlier'):
462 outlier=float(self.
config.get(
'cbcwikitable',
'outlier'))
464 fp = open(filename,
'w')
467 if self.
config.has_option(
'cbcwikitable',
'webdir'):
468 webdir=self.
config.get(
'cbcwikitable',
'webdir')
470 header=
"||'''trigtime'''"
471 for ifo
in self.
ifos:
472 header+=
"||'''timeslide %s'''"%ifo
473 for ifo
in self.
ifos:
474 header+=
"||'''injtime %s'''"%ifo
475 header+=
"||'''log odds'''||"
477 fp.write(header+
"\n")
482 if float(e[
'logodds']) > outlier:
483 entries_to_write.append(e)
487 for e
in entries_to_write:
488 string=
"||%d"%e[
'trigtime']
490 for ifo
in self.
ifos:
491 string+=
"||%d"%e[ifo]
493 for ifo
in self.
ifos:
494 time = float(e[
'trigtime'])+float(e[ifo])
496 link = os.path.join(webdir,
"scan_%d"%e[
'trigtime'],
"%s_%.2f"%(ifo,time))
497 string+=
"||[[%s|%d]]"%(link,time)
500 string+=
"||%.3f||"%e[
'logodds']
501 fp.write(string+
"\n")
508 Reads in a file containing columns
for at least trigtime, timeslide ifo1, timeslide ifo2
and timeslide ifo3 resp.
513 oddslimit_set = False
514 if self.
config.has_option(
'analysis',
'oddslimit'):
515 oddstest = ast.literal_eval(self.
config.get(
'analysis',
'oddslimit'))
516 if oddstest.has_key(
'low')
or oddstest.has_key(
'high'):
526 header = data[0].strip().split()
527 Nsources = len(data)-1
533 for i
in range(len(header)):
534 headerdict[header[i]] = i
537 if not headerdict.has_key(
"timeslide_"+ifo)
and not headerdict.has_key(ifo):
538 sys.exit(
"ERROR: Not all ifos from config file are present in \""+self.
sourcefile+
"\"")
540 if not headerdict.has_key(
'trigtime')
and not headerdict.has_key(
'injtime'):
541 sys.exit(
"ERROR: The \""+self.
sourcefile+
"\" does not contain \"trigtime\" column.")
545 if not headerdict.has_key(
'logodds')
and not headerdict.has_key(
'logO'):
546 sys.exit(
"ERROR: An odds limit was set, but \""+self.
sourcefile+
"\" does not contain \"logodds\" column.")
550 if headerdict.has_key(
"timeslide_"+ifo):
551 val = headerdict[
"timeslide_"+ifo]
553 if headerdict.has_key(
"injtime"):
554 val = headerdict[
"injtime"]
555 headerdict[
"trigtime"]=val
557 if headerdict.has_key(
"logO"):
558 val = headerdict[
"logO"]
559 headerdict[
"logodds"]=val
564 for n
in range(Nsources):
565 linesplit = data[n].strip().split()
566 col = headerdict[ifo]
567 timeslist.append(float(linesplit[col]))
572 if oddslimit_set: logoddslist=[]
573 for n
in range(Nsources):
574 linesplit = data[n].strip().split()
575 coltrig = headerdict[
'trigtime']
576 trigtimeslist.append(float(linesplit[coltrig]))
578 colodds = headerdict[
'logodds']
579 logoddslist.append(float(linesplit[colodds]))
581 info[
'trigtime'] = trigtimeslist
583 info[
'logodds'] = logoddslist
591 pipeline.CondorDAGJob.__init__(self,
'vanilla',cp.get(
'omegapipe',
'executable'))
593 self.set_sub_file(os.path.abspath(submitFile))
596 self.add_condor_cmd(
'accounting_group',cp.get(
'analysis',
'accounting_group'))
597 self.add_condor_cmd(
'RequestMemory',
str(2000))
598 self.add_arg(
"--report")
599 self.set_stdout_file(os.path.join(logdir,
'omegascans-$(cluster)-$(process)-$(node).out'))
600 self.set_stderr_file(os.path.join(logdir,
'omegascans-$(cluster)-$(process)-$(node).err'))
604 def __init__(self,trigtime,timeslide,ifo,frametype,configfile,outdir,job,logodds=None):
606 pipeline.CondorDAGNode.__init__(self,job)
608 scantime = trigtime+timeslide
612 lockfile = os.path.join(outdir,
"scan_%.0f"%trigtime,
"%s_%.2f"%(ifo,scantime),
"lock.txt")
613 if os.path.isfile(lockfile):
614 print(
"WARNING: lock file found in output directory.\n Deleting it now, but check that you did not forget to stop some ongoing run.")
617 self.add_var_arg(
'%.3f'%scantime)
618 self.add_var_opt(
"outdir",os.path.join(outdir,
"scan_%.0f"%trigtime,
"%s_%.2f"%(ifo,scantime)))
619 self.add_var_opt(
"configuration",configfile)
620 self.add_var_opt(
"framecache",self.
framedir)
639 parser=OptionParser(usage)
640 parser.add_option(
"-e",
"--example",default=
False,dest=
"example",action=
"store_true",help=
"Create example config.ini and an example sourcefile")
641 (opts,args) = parser.parse_args()
644 with open(
"omega_config.ini",
"w")
as f:
645 f.write(ExampleConfig)
646 with open(
"omegascanslist.txt",
"w")
as f:
647 f.write(ExampleSourceFile)
649 print(
"Example files \"omega_config.ini\" and \"omegascanslist.txt\" are created")
654 sys.exit(
"ERROR: Must provide one config.ini")
659 cp.read_file(open(args[0]))
660 except AttributeError:
661 cp.readfp(open(args[0]))
665 dag.write_sub_files()
673 fix_scriptfile(cp.get(
'paths',
'basedir'),dag.get_dag_file(),cp.get(
'omegapipe',
'executable'))
675 print(
'Successfully created DAG file.')
676 fulldagpath=os.path.join(cp.get(
'paths',
'basedir'),dag.get_dag_file())
677 print(
'Now run condor_submit_dag %s\n'%(fulldagpath))
688if __name__ ==
"__main__":
def __init__(self, cp, submitFile, logdir)
def __init__(self, trigtime, timeslide, ifo, frametype, configfile, outdir, job, logodds=None)
def write_table(self)
writes a cbc wiki type table with some information and links
basepath
Setup some paths and filenames.
def add_table_entry(self, trigtime, timeslides, ifos, logodds)
def ReadInfoFromFile(self, ifos)
Reads in a file containing columns for at least trigtime, timeslide ifo1, timeslide ifo2 and timeslid...
def fix_scriptfile(dagpath, dagfilename, executable)
If the scan argument is not included in the executable, it will not show up in the sh file.
def get_framedir(trigtime, timeslide, ifo, frametype)
Use gw_data_find to find the frame directory name which is used as an argument to wpipeline.
def mkdirs(path)
FUNCTIONS.
def fix_subfile(subfile)
The omega pipeline executable is followed by "scan", which needs to be in front of all the other argu...