5from __future__
import print_function
10from optparse
import OptionParser
12from configparser
import ConfigParser
14from lal
import pipeline
16from lalinference
import lalinference_pipe_utils
as pipe_utils
19usage=
""" %prog [options] config.ini
20Setup a DAG to run an end-to-end lalinference test:
21 1) Generate samples from prior
22 2) Analyse a set of injections drawn from the prior
23 3) Run P vs P test on results
26parser=OptionParser(usage)
27parser.add_option(
"-r",
"--run-path",default=
'./',action=
"store",type=
"string",help=
"Directory to run pipeline in (default: $PWD)",metavar=
"RUNDIR")
28parser.add_option(
"-p",
"--daglog-path",default=
None,action=
"store",type=
"string",help=
"Path to directory to contain DAG log file. SHOULD BE LOCAL TO SUBMIT NODE",metavar=
"LOGDIR")
29parser.add_option(
"-I",
"--injections",default=
None,action=
"store",type=
"string",help=
"Path to injection file, will bypass the prior-sampling stage",metavar=
"injections.xml")
30parser.add_option(
'-N',
'--trials',action=
'store',type=
'int',metavar=
'NUM',help=
'Number of prior samples to analyse')
33(opts,args)=parser.parse_args()
43prior_cp=ConfigParser()
44prior_cp.optionxform = str
46 prior_cp.read_file(open(inifile))
48 prior_cp.readfp(open(inifile))
51main_cp.optionxform = str
53 main_cp.read_file(open(inifile))
55 main_cp.readfp(open(inifile))
58rundir=os.path.abspath(opts.run_path)
60if opts.daglog_path
is not None:
61 prior_cp.set(
'paths',
'daglogdir',os.path.join(os.path.abspath(opts.daglog_path),
'prior'))
62 main_cp.set(
'paths',
'daglogdir',os.path.join(os.path.abspath(opts.daglog_path),
'main'))
63 daglogdir=os.path.abspath(opts.daglog_path)
65 prior_cp.set(
'paths',
'daglogdir',os.path.join(os.path.abspath(opts.run_path),
'prior'))
66 main_cp.set(
'paths',
'daglogdir',os.path.join(os.path.abspath(opts.run_path),
'main'))
67 daglogdir=os.path.abspath(opts.run_path)
69webdir=main_cp.get(
'ppanalysis',
'webdir')
70priordir=os.path.join(rundir,
'prior')
71maindir=os.path.join(rundir,
'main')
72priorwebdir=os.path.join(webdir,
'prior')
73mainwebdir=os.path.join(webdir,
'injections')
74prior_cp.set(
'paths',
'basedir',priordir)
75main_cp.set(
'paths',
'basedir',maindir)
76prior_cp.set(
'paths',
'webdir',priorwebdir)
77main_cp.set(
'paths',
'webdir',mainwebdir)
79outerlogdir=os.path.join(daglogdir,
'log')
87if prior_cp.get(
'analysis',
'engine')==
'lalinferencenest':
88 prior_cp.set(
'engine',
'sampleprior',
str(20*opts.trials))
89 prior_cp.set(
'engine',
'zeroLogLike',
'')
90 prior_cp.set(
'engine',
'nlive',
str(20*opts.trials))
91if prior_cp.get(
'analysis',
'engine')==
'lalinferenceburst':
92 prior_cp.set(
'engine',
'sampleprior',
str(20*opts.trials))
93 prior_cp.set(
'engine',
'zeroLogLike',
'')
94 prior_cp.set(
'engine',
'nlive',
str(20*opts.trials))
95elif prior_cp.get(
'analysis',
'engine')==
'lalinferencemcmc':
96 prior_cp.set(
'engine',
'Neff',
str(opts.trials))
97 prior_cp.set(
'engine',
'zeroLogLike',
'')
98elif prior_cp.get(
'analysis',
'engine')==
'lalinferencebambi':
99 prior_cp.set(
'engine',
'zeroLogLike',
'')
100 prior_cp.set(
'engine',
'nlive',
str(opts.trials))
101elif prior_cp.get(
'analysis',
'engine')==
'lalinferencebambimpi':
102 prior_cp.set(
'engine',
'zeroLogLike',
'')
103 prior_cp.set(
'engine',
'nlive',
str(opts.trials))
106outerdaglog=os.path.join(daglogdir,
'lalinference_injection_test_'+
str(uuid.uuid1())+
'.log')
107outerdag=pipeline.CondorDAG(outerdaglog)
108outerdag.set_dag_file(os.path.join(rundir,
'priortest'))
112fake_event=pipe_utils.Event(trig_time=trig_time)
113tfpath=os.path.join(rundir,
'time.txt')
114tfile=open(tfpath,
'w')
115print(
'%i\n'%(trig_time), file=tfile)
117prior_cp.set(
'input',
'gps-time-file',tfpath)
119priordag=pipe_utils.LALInferencePipelineDAG(prior_cp)
120priordag.set_dag_file(os.path.join(priordir,
'lalinference_priorsample'))
121priordagjob=pipeline.CondorDAGManJob(priordag.get_dag_file(),dir=priordir)
122priordagnode=pipeline.CondorDAGManNode(priordagjob)
124pagenode=filter(
lambda n:isinstance(n,pipe_utils.ResultsPageNode), priordag.get_nodes())[0]
125priorfile=pagenode.get_pos_file()
128convertsub=os.path.join(rundir,
'samples2injections.sub')
129converterr=os.path.join(outerlogdir,
'samples2injection-$(cluster)-$(process)-$(node).err')
130convertout=os.path.join(outerlogdir,
'samples2injection-$(cluster)-$(process)-$(node).out')
133 injfile=os.path.abspath(opts.injections)
135 injfile=os.path.join(rundir,
'priorsamples.xml')
136approx=prior_cp.get(
'engine',
'approx')
137prior2injexe=prior_cp.get(
'condor',
'pos_to_sim_burst')
138prior2injjob=pipeline.CondorDAGJob(
'vanilla',prior2injexe)
139if main_cp.has_option(
'analysis',
'accounting_group'):
140 prior2injjob.add_condor_cmd(
'accounting_group',main_cp.get(
'analysis',
'accounting_group'))
141prior2injjob.set_sub_file(convertsub)
142prior2injjob.set_stderr_file(converterr)
143prior2injjob.set_stdout_file(convertout)
144prior2injjob.add_condor_cmd(
'getenv',
'True')
145prior2injnode=pipeline.CondorDAGNode(prior2injjob)
146prior2injnode.add_var_opt(
'output',injfile)
147prior2injnode.add_var_opt(
'num-of-injs',
str(opts.trials))
148prior2injnode.add_var_opt(
'approx',approx)
149prior2injnode.add_var_arg(priorfile)
150prior2injnode.add_parent(priordagnode)
154main_cp.set(
'input',
'gps-start-time',
str(trig_time-1000))
155main_cp.set(
'input',
'gps-end-time',
str(trig_time+1000))
156maindag=pipe_utils.LALInferencePipelineDAG(main_cp)
157maindag.set_dag_file(os.path.join(maindir,
'lalinference_pipeline'))
158maindagjob=pipeline.CondorDAGManJob(maindag.get_dag_file(),dir=maindir)
159maindagnode=pipeline.CondorDAGManNode(maindagjob)
160maindag.config.set(
'input',
'burst-injection-file',injfile)
161for i
in range(
int(opts.trials)):
162 ev=pipe_utils.Event(trig_time=trig_time,event_id=i)
163 e=maindag.add_full_analysis(ev)
165outerdag.add_node(maindagnode)
167if not opts.injections:
168 outerdag.add_node(priordagnode)
169 outerdag.add_node(prior2injnode)
170 maindagnode.add_parent(prior2injnode)
173resultspagenodes=filter(
lambda n: isinstance(n, pipe_utils.ResultsPageNode), maindag.get_nodes())
174posteriorfiles=[n.get_pos_file()
for n
in resultspagenodes]
177ppsub=os.path.join(rundir,
'ppanalysis.sub')
178pperr=os.path.join(outerlogdir,
'ppanalysis-$(cluster)-$(process)-$(node).err')
179ppout=os.path.join(outerlogdir,
'ppanalysis-$(cluster)-$(process)-$(node).out')
180ppexe=prior_cp.get(
'condor',
'ppanalysis')
181ppjob=pipeline.CondorDAGJob(
'vanilla',ppexe)
182ppjob.set_sub_file(ppsub)
183ppjob.set_stderr_file(pperr)
184ppjob.set_stdout_file(ppout)
185ppjob.add_condor_cmd(
'getenv',
'True')
186if main_cp.has_option(
'analysis',
'accounting_group'):
187 ppjob.add_condor_cmd(
'accounting_group',main_cp.get(
'analysis',
'accounting_group'))
189ppnode=pipeline.CondorDAGNode(ppjob)
190ppnode.add_var_opt(
'injXML',injfile)
191if main_cp.has_option(
'ppanalysis',
'webdir'):
192 outdir=main_cp.get(
'ppanalysis',
'webdir')
194 outdir=os.path.join(rundir,
'ppanalysis')
197ppnode.add_var_opt(
'outdir',outdir)
198for f
in posteriorfiles:
199 ppnode.add_var_arg(f)
201ppnode.add_parent(maindagnode)
202outerdag.add_node(ppnode)
205outerdag.write_sub_files()
207outerdag.write_script()
209if not opts.injections:
210 priordag.write_sub_files()
212 priordag.write_script()
214maindag.write_sub_files()
216maindag.write_script()
219print(
'Successfully created DAG file.')
220print(
'Now run condor_submit_dag %s\n'%(outerdag.get_dag_file()))
def mkdirs(path)
Helper function.