5from __future__
import print_function
10from optparse
import OptionParser
12from configparser
import ConfigParser
14from lal
import pipeline
16from lalinference
import lalinference_pipe_utils
as pipe_utils
19usage=
""" %prog [options] config.ini
20Setup a DAG to run an end-to-end lalinference test:
21 1) Generate samples from prior
22 2) Analyse a set of injections drawn from the prior
23 3) Run P vs P test on results
26parser=OptionParser(usage)
27parser.add_option(
"-r",
"--run-path",default=
'./',action=
"store",type=
"string",help=
"Directory to run pipeline in (default: $PWD)",metavar=
"RUNDIR")
28parser.add_option(
"-p",
"--daglog-path",default=
None,action=
"store",type=
"string",help=
"Path to directory to contain DAG log file. SHOULD BE LOCAL TO SUBMIT NODE",metavar=
"LOGDIR")
29parser.add_option(
"-I",
"--injections",default=
None,action=
"store",type=
"string",help=
"Path to injection file, will bypass the prior-sampling stage",metavar=
"injections.xml")
30parser.add_option(
'-N',
'--trials',action=
'store',type=
'int',metavar=
'NUM',help=
'Number of prior samples to analyse')
33(opts,args)=parser.parse_args()
43prior_cp=ConfigParser()
44prior_cp.optionxform = str
46 prior_cp.read_file(open(inifile))
48 prior_cp.readfp(open(inifile))
51main_cp.optionxform = str
53 main_cp.read_file(open(inifile))
55 main_cp.readfp(open(inifile))
58for option
in [
'gps-start-time',
'gps-end-time']:
59 if main_cp.has_option(
'input',option):
60 main_cp.remove_option(
'input',option)
62rundir=os.path.abspath(opts.run_path)
64if opts.daglog_path
is not None:
65 prior_cp.set(
'paths',
'daglogdir',os.path.join(os.path.abspath(opts.daglog_path),
'prior'))
66 main_cp.set(
'paths',
'daglogdir',os.path.join(os.path.abspath(opts.daglog_path),
'main'))
67 daglogdir=os.path.abspath(opts.daglog_path)
69 prior_cp.set(
'paths',
'daglogdir',os.path.join(os.path.abspath(opts.run_path),
'prior'))
70 main_cp.set(
'paths',
'daglogdir',os.path.join(os.path.abspath(opts.run_path),
'main'))
71 daglogdir=os.path.abspath(opts.run_path)
73webdir=main_cp.get(
'ppanalysis',
'webdir')
74priordir=os.path.join(rundir,
'prior')
75maindir=os.path.join(rundir,
'main')
76priorwebdir=os.path.join(webdir,
'prior')
77mainwebdir=os.path.join(webdir,
'injections')
78prior_cp.set(
'paths',
'basedir',priordir)
79main_cp.set(
'paths',
'basedir',maindir)
80prior_cp.set(
'paths',
'webdir',priorwebdir)
81main_cp.set(
'paths',
'webdir',mainwebdir)
83outerlogdir=os.path.join(daglogdir,
'log')
91if prior_cp.get(
'analysis',
'engine')==
'lalinferencenest':
92 prior_cp.set(
'engine',
'sampleprior',
str(20*opts.trials))
93 prior_cp.set(
'engine',
'zeroLogLike',
'')
94 prior_cp.set(
'engine',
'nlive',
str(20*opts.trials))
95elif prior_cp.get(
'analysis',
'engine')==
'lalinferencemcmc':
96 prior_cp.set(
'engine',
'neff',
str(
max(opts.trials,1000)))
97 prior_cp.set(
'engine',
'zeroLogLike',
'')
98elif prior_cp.get(
'analysis',
'engine')==
'lalinferencebambi':
99 prior_cp.set(
'engine',
'zeroLogLike',
'')
100 prior_cp.set(
'engine',
'nlive',
str(opts.trials))
101elif prior_cp.get(
'analysis',
'engine')==
'lalinferencebambimpi':
102 prior_cp.set(
'engine',
'zeroLogLike',
'')
103 prior_cp.set(
'engine',
'nlive',
str(opts.trials))
106for option
in 'margphi',
'margtime',
'margtimephi':
107 if prior_cp.has_option(
'engine',option):
108 prior_cp.remove_option(
'engine',option)
111outerdaglog=os.path.join(daglogdir,
'lalinference_injection_test_'+
str(uuid.uuid1())+
'.log')
112outerdag=pipeline.CondorDAG(outerdaglog)
113outerdag.set_dag_file(os.path.join(rundir,
'priortest'))
117fake_event=pipe_utils.Event(trig_time=trig_time)
118tfpath=os.path.join(rundir,
'time.txt')
119tfile=open(tfpath,
'w')
120print(
'%i\n'%(trig_time), file=tfile)
122prior_cp.set(
'input',
'gps-time-file',tfpath)
124priordag=pipe_utils.LALInferencePipelineDAG(prior_cp)
125priordag.set_dag_file(os.path.join(priordir,
'lalinference_priorsample'))
126priordagjob=pipeline.CondorDAGManJob(priordag.get_dag_file(),dir=priordir)
127priordagnode=pipeline.CondorDAGManNode(priordagjob)
129pagenode=filter(
lambda n:isinstance(n,pipe_utils.ResultsPageNode), priordag.get_nodes())[0]
130priorfile=pagenode.get_pos_file()
133convertsub=os.path.join(rundir,
'samples2injections.sub')
134converterr=os.path.join(outerlogdir,
'samples2injection-$(cluster)-$(process)-$(node).err')
135convertout=os.path.join(outerlogdir,
'samples2injection-$(cluster)-$(process)-$(node).out')
138 injfile=os.path.abspath(opts.injections)
140 injfile=os.path.join(rundir,
'priorsamples.xml')
141approx=prior_cp.get(
'engine',
'approx')
142prior2injexe=prior_cp.get(
'condor',
'pos_to_sim_inspiral')
143prior2injjob=pipeline.CondorDAGJob(
'vanilla',prior2injexe)
144prior2injjob.set_sub_file(convertsub)
145prior2injjob.set_stderr_file(converterr)
146prior2injjob.set_stdout_file(convertout)
147prior2injjob.add_condor_cmd(
'getenv',
'True')
148if main_cp.has_option(
'condor',
'accounting_group'):
149 prior2injjob.add_condor_cmd(
'accounting_group',main_cp.get(
'condor',
'accounting_group'))
150prior2injnode=pipeline.CondorDAGNode(prior2injjob)
151prior2injnode.add_var_opt(
'output',injfile)
152prior2injnode.add_var_opt(
'num-of-injs',
str(opts.trials))
153prior2injnode.add_var_opt(
'approx',approx)
154if prior_cp.has_option(
'engine',
'amporder'):
155 amporder=prior_cp.get(
'engine',
'amporder')
158prior2injnode.add_var_opt(
'amporder',amporder)
159prior2injnode.add_var_arg(priorfile)
160prior2injnode.add_parent(priordagnode)
164main_cp.set(
'input',
'gps-start-time',
str(trig_time-1000))
165main_cp.set(
'input',
'gps-end-time',
str(trig_time+1000))
166maindag=pipe_utils.LALInferencePipelineDAG(main_cp)
167maindag.set_dag_file(os.path.join(maindir,
'lalinference_pipeline'))
168maindagjob=pipeline.CondorDAGManJob(maindag.get_dag_file(),dir=maindir)
169maindagnode=pipeline.CondorDAGManNode(maindagjob)
170maindag.config.set(
'input',
'injection-file',injfile)
171for i
in range(
int(opts.trials)):
172 ev=pipe_utils.Event(trig_time=trig_time,event_id=i)
173 e=maindag.add_full_analysis(ev)
175outerdag.add_node(maindagnode)
177if not opts.injections:
178 outerdag.add_node(priordagnode)
179 outerdag.add_node(prior2injnode)
180 maindagnode.add_parent(prior2injnode)
183resultspagenodes=filter(
lambda n: isinstance(n, pipe_utils.ResultsPageNode), maindag.get_nodes())
184posteriorfiles=[n.get_pos_file()
for n
in resultspagenodes]
187ppsub=os.path.join(rundir,
'ppanalysis.sub')
188pperr=os.path.join(outerlogdir,
'ppanalysis-$(cluster)-$(process)-$(node).err')
189ppout=os.path.join(outerlogdir,
'ppanalysis-$(cluster)-$(process)-$(node).out')
190ppexe=prior_cp.get(
'condor',
'ppanalysis')
191ppjob=pipeline.CondorDAGJob(
'vanilla',ppexe)
192ppjob.set_sub_file(ppsub)
193ppjob.set_stderr_file(pperr)
194ppjob.set_stdout_file(ppout)
195ppjob.add_condor_cmd(
'getenv',
'True')
196if main_cp.has_option(
'condor',
'accounting_group'):
197 ppjob.add_condor_cmd(
'accounting_group',main_cp.get(
'condor',
'accounting_group'))
199ppnode=pipeline.CondorDAGNode(ppjob)
200ppnode.add_var_opt(
'injXML',injfile)
201if main_cp.has_option(
'ppanalysis',
'webdir'):
202 outdir=main_cp.get(
'ppanalysis',
'webdir')
204 outdir=os.path.join(rundir,
'ppanalysis')
207ppnode.add_var_opt(
'outdir',outdir)
208for f
in posteriorfiles:
209 ppnode.add_var_arg(f)
211ppnode.add_parent(maindagnode)
212outerdag.add_node(ppnode)
214outerdag.write_sub_files()
216outerdag.write_script()
218if not opts.injections:
219 priordag.write_sub_files()
221 priordag.write_script()
223maindag.write_sub_files()
225maindag.write_script()
228print(
'Successfully created DAG file.')
229print(
'Now run condor_submit_dag %s\n'%(outerdag.get_dag_file()))
def mkdirs(path)
Helper function.