# Copyright (C) 2013--2014 Kipp Cannon, Chad Hanna
# Copyright (C) 2019 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
##
# @file
#
# A file that contains the inspiral_pipe module code; used to construct condor dags
#
##
# @package inspiral_pipe
#
# A module that contains the inspiral_pipe module code; used to construct condor dags
#
# ### Review Status
#
# | Names | Hash | Date | Diff to Head of Master |
# | ------------------------------------------- | ------------------------------------------- | ---------- | --------------------------- |
# | Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 8a6ea41398be79c00bdc27456ddeb1b590b0f68e | 2014-06-18 | <a href="@gstlal_inspiral_cgit_diff/python/inspiral_pipe.py?id=HEAD&id2=8a6ea41398be79c00bdc27456ddeb1b590b0f68e">inspiral_pipe.py</a> |
#
# #### Actions
#
# - In inspiral_pipe.py Fix the InsiralJob.___init___: fix the arguments
# - On line 201, fix the comment or explain what the comment is meant to be
#
# imports
#
from collections import defaultdict
import copy
import doctest
import functools
import itertools
import os
import socket
import stat
import warnings
import lal.series
from lal.utils import CacheEntry
from ligo import segments
from ligo.lw import lsctables, ligolw
from ligo.lw import utils as ligolw_utils
from gstlal import dagparts
from gstlal import datasource
from gstlal import inspiral
from gstlal import svd_bank
from gstlal.datafind import DataType
warnings.warn(
"all functionality within this module has been captured in gstlal.segments and"
"gstlal.dags.layers.inspiral, and will be removed from gstlal in the future",
DeprecationWarning,
)
#
# LIGOLW initialization
#
[docs]class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
lsctables.use_in(LIGOLWContentHandler)
#
# DAG layers
#
[docs]def online_inspiral_layer(dag, jobs, options):
job_tags = []
inj_job_tags = []
if options.ht_gate_threshold_linear is not None:
template_mchirp_dict = get_svd_bank_params_online(list(options.bank_cache.values())[0])
else: # saves cost of reading in svd banks
template_mchirp_dict = None
bank_groups = list(build_bank_groups(options.bank_cache, [1], options.max_jobs - 1))
if len(options.likelihood_files) != len(bank_groups):
raise ValueError("Likelihood files must correspond 1:1 with bank files")
for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_groups, options.likelihood_files, options.zerolag_likelihood_files)):
svd_bank_string = ",".join([":".join([k, v[0]]) for k,v in svd_banks.items()])
job_tags.append("%04d" % num_insp_nodes)
# Calculate the appropriate ht-gate-threshold value
threshold_values = get_threshold_values(template_mchirp_dict, [job_tags[-1]], [svd_bank_string], options)
# Data source dag options
if (options.data_source == "framexmit"):
datasource_opts = {
"framexmit-addr": datasource.framexmit_list_from_framexmit_dict(options.framexmit_dict),
"framexmit-iface": options.framexmit_iface
}
elif (options.data_source == "devshm"):
datasource_opts = {
"shared-memory-dir": datasource.pipeline_channel_list_from_channel_dict(options.shm_dir_dict, opt = "shared-memory-dir"),
"shared-memory-block-size": options.shared_memory_block_size,
"shared-memory-assumed-duration": options.shared_memory_assumed_duration
}
else:
datasource_opts = {
"shared-memory-partition": datasource.pipeline_channel_list_from_channel_dict(options.shm_part_dict, opt = "shared-memory-partition"),
"shared-memory-block-size": options.shared_memory_block_size,
"shared-memory-assumed-duration": options.shared_memory_assumed_duration
}
common_opts = {
"psd-fft-length": options.psd_fft_length,
"reference-psd": options.reference_psd,
"ht-gate-threshold": threshold_values,
"channel-name": datasource.pipeline_channel_list_from_channel_dict(options.channel_dict),
"state-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.state_channel_dict, opt = "state-channel-name"),
"dq-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.dq_channel_dict, opt = "dq-channel-name"),
"state-vector-on-bits": options.state_vector_on_bits,
"state-vector-off-bits": options.state_vector_off_bits,
"dq-vector-on-bits": options.dq_vector_on_bits,
"dq-vector-off-bits": options.dq_vector_off_bits,
"svd-bank": svd_bank_string,
"tmp-space": dagparts.condor_scratch_space(),
"track-psd": "",
"control-peak-time": options.control_peak_time,
"coincidence-threshold": options.coincidence_threshold,
"fir-stride": options.fir_stride,
"data-source": options.data_source,
"gracedb-far-threshold": options.gracedb_far_threshold,
"delay-uploads": options.delay_uploads,
"gracedb-group": options.gracedb_group,
"gracedb-pipeline": options.gracedb_pipeline,
"gracedb-search": options.gracedb_search,
"gracedb-label": options.gracedb_label,
"gracedb-service-url": options.gracedb_service_url,
"job-tag": job_tags[-1],
"likelihood-snapshot-interval": options.likelihood_snapshot_interval,
"far-trials-factor": options.far_trials_factor,
"min-instruments": options.min_instruments,
"time-slide-file": options.time_slide_file,
"output-kafka-server": options.output_kafka_server
}
common_opts.update(datasource_opts)
# If providing an activation counts file provide it.
if options.activation_counts_file:
common_opts.update({"activation-counts-file": options.activation_counts_file})
# add ranking stat compression options, if requested
if options.compress_ranking_stat:
compress_opts = {
"compress-ranking-stat": "",
"compress-ranking-stat-threshold": options.compress_ranking_stat_threshold
}
common_opts.update(compress_opts)
# disable service discovery if using singularity
if options.singularity_image:
common_opts.update({"disable-service-discovery": ""})
inspNode = dagparts.DAGNode(jobs['gstlalInspiral'], dag, [],
opts = common_opts,
input_files = {
"ranking-stat-input": [likefile],
"ranking-stat-pdf": options.marginalized_likelihood_file
},
output_files = {
"output": "/dev/null",
"ranking-stat-output": likefile,
"zerolag-rankingstat-pdf": zerolikefile
}
)
if str("%04d" %num_insp_nodes) in options.inj_channel_dict:
# FIXME The node number for injection jobs currently follows the same
# numbering system as non-injection jobs, except instead of starting at
# 0000 the numbering starts at 1000. There is probably a better way to
# do this in the future, this system was just the simplest to start
# with
inj_job_tags.append("%04d" % (num_insp_nodes + 1000))
injection_opts = {
"channel-name": datasource.pipeline_channel_list_from_channel_dict_with_node_range(options.inj_channel_dict, node = job_tags[-1]),
"state-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.inj_state_channel_dict, opt = "state-channel-name"),
"dq-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.inj_dq_channel_dict, opt = "dq-channel-name"),
"state-vector-on-bits": options.inj_state_vector_on_bits,
"state-vector-off-bits": options.inj_state_vector_off_bits,
"dq-vector-on-bits": options.inj_dq_vector_on_bits,
"dq-vector-off-bits": options.inj_dq_vector_off_bits,
"gracedb-far-threshold": options.inj_gracedb_far_threshold,
"gracedb-group": options.inj_gracedb_group,
"gracedb-pipeline": options.inj_gracedb_pipeline,
"gracedb-search": options.inj_gracedb_search,
"gracedb-service-url": options.inj_gracedb_service_url,
"job-tag": inj_job_tags[-1],
"likelihood-snapshot-interval": options.likelihood_snapshot_interval,
"far-trials-factor": options.far_trials_factor,
"min-instruments": options.min_instruments,
"time-slide-file": options.time_slide_file
}
common_opts.update(injection_opts)
inspInjNode = dagparts.DAGNode(jobs['gstlalInspiralInj'], dag, [],
opts = common_opts,
input_files = {
"ranking-stat-input": [likefile],
"ranking-stat-pdf": options.marginalized_likelihood_file
},
output_files = {
"output": "/dev/null"
}
)
return job_tags, inj_job_tags
[docs]def event_upload_layer(dag, jobs, options, job_tags):
job_options = {
"kafka-server": options.output_kafka_server,
"gracedb-group": options.gracedb_group,
"gracedb-pipeline": options.gracedb_pipeline,
"gracedb-search": options.gracedb_search,
"gracedb-service-url": options.gracedb_service_url,
"far-threshold": options.event_aggregator_far_threshold,
"far-trials-factor": options.event_aggregator_far_trials_factor,
"upload-cadence-type": options.event_aggregator_upload_cadence_type,
"upload-cadence-factor": options.event_aggregator_upload_cadence_factor,
"num-jobs": len(job_tags),
"input-topic": "events",
"rootdir": "event_uploader",
"verbose": "",
}
return dagparts.DAGNode(jobs['eventUploader'], dag, [], opts = job_options)
[docs]def event_plotter_layer(dag, jobs, options):
job_options = {
"kafka-server": options.output_kafka_server,
"gracedb-group": options.gracedb_group,
"gracedb-pipeline": options.gracedb_pipeline,
"gracedb-search": options.gracedb_search,
"gracedb-service-url": options.gracedb_service_url,
"verbose": "",
}
return dagparts.DAGNode(jobs['eventPlotter'], dag, [], opts = job_options)
[docs]def aggregator_layer(dag, jobs, options, job_tags):
# set up common settings for aggregation jobs
agg_options = {
"config": options.scald_config,
"data-type": "timeseries",
"uri": "kafka://{}@{}".format(options.analysis_tag, options.output_kafka_server),
}
# define routes used for aggregation jobs
snr_routes = ["%s_snr_history" % ifo for ifo in options.channel_dict]
network_routes = ["likelihood_history", "snr_history", "latency_history"]
state_routes = ["%s_strain_dropped" % ifo for ifo in options.channel_dict]
usage_routes = ["ram_history"]
instrument_latency_routes = ["%s_%s_latency" % (ifo, stage) for ifo in options.channel_dict for stage in ["datasource", "whitening", "snrSlice"]]
pipeline_latency_routes = ["all_%s_latency" % stage for stage in ["itacac"]]
agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, instrument_latency_routes, pipeline_latency_routes))
gates = ["%ssegments" % gate for gate in ("statevector", "dqvector", "whiteht")]
seg_routes = ["%s_%s" % (ifo, gate) for ifo in options.channel_dict for gate in gates]
# analysis-based aggregation jobs
# FIXME don't hard code the 1000
max_agg_jobs = 1000
agg_job_bounds = list(range(0, len(job_tags), max_agg_jobs))
agg_routes = list(dagparts.groups(agg_routes, max(max_agg_jobs // (4 * len(job_tags)), 1))) + ["far_history"]
for routes in agg_routes:
these_options = dict(agg_options)
these_options["topic"] = routes
for ii, _ in enumerate(agg_job_bounds):
if ii == 0: ### elect first aggregator per route as leader
these_options["across-jobs"] = ""
aggNode = dagparts.DAGNode(jobs['aggLeader'], dag, [], input_files = {"": "aggregate"}, opts = these_options)
else:
aggNode = dagparts.DAGNode(jobs['agg'], dag, [], input_files = {"": "aggregate"}, opts = these_options)
# segment-based jobs
seg_routes = list(dagparts.groups(seg_routes, max(max_agg_jobs // (4 * len(job_tags)), 1)))
for routes in seg_routes:
these_options = dict(agg_options)
these_options["topic"] = routes
for ii, _ in enumerate(agg_job_bounds):
if ii == 0: ### elect first aggregator per route as leader
these_options["across-jobs"] = ""
aggNode = dagparts.DAGNode(jobs['aggLeader'], dag, [], input_files = {"": "aggregate"}, opts = these_options)
else:
aggNode = dagparts.DAGNode(jobs['agg'], dag, [], input_files = {"": "aggregate"}, opts = these_options)
# Trigger counting
trigcount_options = {
"uri": "kafka://{}@{}".format(options.analysis_tag, options.output_kafka_server),
"gracedb-search": options.gracedb_search,
"gracedb-pipeline": options.gracedb_pipeline,
"output-period": 300,
}
dagparts.DAGNode(jobs['trigcount'], dag, [], opts = trigcount_options)
# Trigger aggregation
trigagg_options = {
"config": options.scald_config,
"data-type": "triggers",
"topic": "coinc",
"uri": "kafka://{}@{}".format(options.analysis_tag, options.output_kafka_server),
}
return dagparts.DAGNode(jobs['trigagg'], dag, [], input_files = {"": "aggregate"}, opts = trigagg_options)
[docs]def dq_monitor_layer(dag, jobs, options):
outpath = 'aggregator'
ll_dq_jobs = []
for ifo in options.channel_dict:
# Data source dag options
if (options.data_source == "framexmit"):
datasource_opts = {
"framexmit-addr": datasource.framexmit_list_from_framexmit_dict({ifo: options.framexmit_dict[ifo]}),
"framexmit-iface": options.framexmit_iface
}
elif (options.data_source == "devshm"):
datasource_opts = {
"shared-memory-dir": datasource.pipeline_channel_list_from_channel_dict({ifo: options.shm_dir_dict[ifo]}),
"shared-memory-block-size": options.shared_memory_block_size,
"shared-memory-assumed-duration": options.shared_memory_assumed_duration
}
else:
datasource_opts = {
"shared-memory-partition": datasource.pipeline_channel_list_from_channel_dict({ifo: options.shm_part_dict[ifo]}),
"shared-memory-block-size": options.shared_memory_block_size,
"shared-memory-assumed-duration": options.shared_memory_assumed_duration
}
common_opts = {
"psd-fft-length": options.psd_fft_length,
"channel-name": datasource.pipeline_channel_list_from_channel_dict({ifo: options.channel_dict[ifo]}),
"state-channel-name": datasource.pipeline_channel_list_from_channel_dict({ifo: options.state_channel_dict[ifo]}, opt = "state-channel-name"),
"dq-channel-name": datasource.pipeline_channel_list_from_channel_dict({ifo: options.dq_channel_dict[ifo]}, opt = "dq-channel-name"),
"state-vector-on-bits": options.state_vector_on_bits,
"state-vector-off-bits": options.state_vector_off_bits,
"dq-vector-on-bits": options.dq_vector_on_bits,
"dq-vector-off-bits": options.dq_vector_off_bits,
"data-source": options.data_source,
"scald-config": options.scald_config,
"out-path": outpath,
}
common_opts.update(datasource_opts)
ll_dq_jobs.append(dagparts.DAGNode(jobs['dq'], dag, [], opts = common_opts))
return ll_dq_jobs
[docs]def ref_psd_layer(dag, jobs, parent_nodes, segsdict, channel_dict, options):
psd_nodes = {}
for ifos in segsdict:
this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
for seg in segsdict[ifos]:
psd_path = subdir_path([jobs['refPSD'].output_path, str(int(seg[0]))[:5]])
psd_nodes[(ifos, seg)] = dagparts.DAGNode(
jobs['refPSD'],
dag,
parent_nodes = parent_nodes,
opts = {
"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"data-source":"frames",
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict, ifos = ifos),
"psd-fft-length":options.psd_fft_length,
"frame-segments-name": options.frame_segments_name
},
input_files = {
"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file
},
output_files = {
"write-psd":dagparts.T050017_filename(ifos, "REFERENCE_PSD", seg, '.xml.gz', path = psd_path)
},
)
# Make the reference PSD cache
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
with open('reference_psd.cache', "w") as output_cache_file:
for node in psd_nodes.values():
output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(node.output_files["write-psd"])))
return psd_nodes
[docs]def injection_template_match_layer(dag, jobs, parent_nodes, options, instruments):
inj_tmplt_match_nodes = {}
for inj in options.injection_file:
inj = inj.split(":")[-1]
sim_name = sim_tag_from_inj_file(inj)
inj_tmplt_match_nodes[sim_name] = {}
for nsplit in range(options.num_split_inj_files):
inj_filename = DataType.SPLIT_INJECTIONS.filename("H1K1L1V1", segments.segment(0, 0), f"{nsplit:04d}", sim_name)
inj_tmplt_match_nodes[sim_name][nsplit] = dagparts.DAGNode(
jobs['injTmpltMatch'],
dag,
parent_nodes = parent_nodes,
input_files = {
"injection-file": os.path.join(jobs['injSplitter'].output_path, inj_filename),
"template-bank": options.template_bank
},
output_files = {"output": "%s/%s-INJECTION_TEMPLATE_MATCH_%s_%04d.xml.gz"%(jobs['injTmpltMatch'].output_path, instruments, sim_name, nsplit)}
)
return inj_tmplt_match_nodes
[docs]def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, output_dir, template_mchirp_dict):
svd_nodes = {}
new_template_mchirp_dict = {}
svd_dtdphi_map = {}
autocorrelation_dict = {}
for autocorrelation in options.autocorrelation_length:
min_chirp_mass, max_chirp_mass, autocorrelation_length = autocorrelation.split(':')
min_chirp_mass, max_chirp_mass, autocorrelation_length = float(min_chirp_mass), float(max_chirp_mass), int(autocorrelation_length)
autocorrelation_dict[(min_chirp_mass,max_chirp_mass)] = autocorrelation_length
for ifo, list_of_svd_caches in bank_cache.items():
bin_offset = 0
for j, svd_caches in enumerate(list_of_svd_caches):
svd_caches = list(map(CacheEntry, open(svd_caches)))
for i, individual_svd_cache in enumerate(ce.path for ce in svd_caches):
# First sort out the clipleft, clipright options
clipleft = []
clipright = []
ids = []
mchirp_interval = (float("inf"), 0)
individual_svd_cache = list(map(CacheEntry, open(individual_svd_cache)))
for n, f in enumerate(ce.path for ce in individual_svd_cache):
# handle template bank clipping
clipleft.append(options.overlap[j] // 2)
clipright.append(options.overlap[j] // 2)
ids.append("%d_%d" % (i+bin_offset, n))
if f in template_mchirp_dict:
mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1]))
svd_dtdphi_map["%04d" % (i+bin_offset)] = options.dtdphi_file[j]
svd_bank_name = dagparts.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = jobs['svd'].output_path)
if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0):
new_template_mchirp_dict['%04d' % (i+bin_offset,)] = mchirp_interval
for key, value in autocorrelation_dict.items():
if key[0] <= new_template_mchirp_dict['%04d' % (i+bin_offset,)][1] < key[1]:
options.autocorrelation_length = value
svdnode = dagparts.DAGNode(
jobs['svd'],
dag,
parent_nodes = parent_nodes,
opts = {
"svd-tolerance":options.tolerance,
"flow":options.flow[j],
"sample-rate":options.sample_rate,
"clipleft":clipleft,
"clipright":clipright,
"samples-min":options.samples_min[j],
"samples-max-256":options.samples_max_256,
"samples-max-64":options.samples_max_64,
"samples-max":options.samples_max,
"autocorrelation-length":options.autocorrelation_length,
"bank-id":ids,
"identity-transform":options.identity_transform,
"ortho-gate-fap":0.5
},
input_files = {"reference-psd":psd},
input_cache_files = {"template-bank-cache":[ce.path for ce in individual_svd_cache]},
input_cache_file_name = os.path.basename(svd_bank_name).replace(".xml.gz", ".cache"),
output_files = {"write-svd":svd_bank_name},
)
if new_template_mchirp_dict['%04d' % (i+bin_offset,)][1] < options.mchirp_threshold:
svdnode.add_var_arg("--append-time-reversed-template")
# impose a priority to help with depth first submission
svdnode.set_priority(99)
svd_nodes.setdefault(ifo, []).append(svdnode)
bin_offset += i+1
# Plot template/svd bank jobs
primary_ifo = list(bank_cache.keys())[0]
dagparts.DAGNode(
jobs['plotBanks'],
dag,
parent_nodes = sum(list(svd_nodes.values()),[]),
opts = {"plot-template-bank":"", "output-dir": output_dir},
input_files = {"template-bank-file":options.template_bank},
)
return svd_nodes, new_template_mchirp_dict, svd_dtdphi_map
[docs]def inspiral_layer(dag, jobs, psd_nodes, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict):
common_opts = {
"psd-fft-length": options.psd_fft_length,
"frame-segments-name": options.frame_segments_name,
"tmp-space": dagparts.condor_scratch_space(),
"track-psd": "",
"control-peak-time": options.control_peak_time,
"coincidence-threshold": options.coincidence_threshold,
"singles-threshold": options.singles_threshold,
"fir-stride": options.fir_stride,
"data-source": "frames",
"local-frame-caching": "",
"min-instruments": options.min_instruments,
"reference-likelihood-file": options.reference_likelihood_file
}
# disable service discovery if using singularity
if options.singularity_image:
common_opts.update({"disable-service-discovery": ""})
inspiral_nodes = {}
for ifos in segsdict:
# FIXME: handles more than 3 ifos with same cpu/memory requests
inspiral_name = 'gstlalInspiral%dIFO' % min(len(ifos), 3)
inspiral_inj_name = 'gstlalInspiralInj%dIFO' % min(len(ifos), 3)
# setup dictionaries to hold the inspiral nodes
inspiral_nodes[(ifos, None)] = {}
ignore = {}
injection_files = []
for injections in options.injection_file:
min_chirp_mass, max_chirp_mass, injections = injections.split(':')
injection_files.append(injections)
min_chirp_mass, max_chirp_mass = float(min_chirp_mass), float(max_chirp_mass)
inspiral_nodes[(ifos, sim_tag_from_inj_file(injections))] = {}
ignore[injections] = []
# FIXME: confirm that this sort works as intended:
for bgbin_index, bounds in sorted(template_mchirp_dict.items(), key = lambda k : int(k[0])):
if max_chirp_mass <= bounds[0]:
ignore[injections].append(int(bgbin_index))
# NOTE putting a break here assumes that the min chirp mass
# in a subbank increases with bin number, i.e. XXXX+1 has a
# greater minimum chirpmass than XXXX, for all XXXX. Note
# that the reverse is not true, bin XXXX+1 may have a lower
# max chirpmass than bin XXXX.
elif min_chirp_mass > bounds[1]:
ignore[injections].append(int(bgbin_index))
# FIXME choose better splitting?
numchunks = 50
# only use a channel dict with the relevant channels
this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict)
# get the svd bank strings
svd_bank_strings_full = create_svd_bank_strings(svd_nodes, instruments = list(this_channel_dict.keys()))
# get a mapping between chunk counter and bgbin for setting priorities
bgbin_chunk_map = {}
for seg in segsdict[ifos]:
if injection_files:
output_seg_inj_path = subdir_path([jobs[inspiral_inj_name].output_path, str(int(seg[0]))[:5]])
if jobs[inspiral_name] is None:
# injection-only run
inspiral_nodes[(ifos, None)].setdefault(seg, [None])
else:
output_seg_path = subdir_path([jobs[inspiral_name].output_path, str(int(seg[0]))[:5]])
for chunk_counter, svd_bank_strings in enumerate(dagparts.groups(svd_bank_strings_full, numchunks)):
bgbin_indices = ['%04d' % (i + numchunks * chunk_counter,) for i,s in enumerate(svd_bank_strings)]
# setup output names
output_paths = [subdir_path([output_seg_path, bgbin_indices[i]]) for i, s in enumerate(svd_bank_strings)]
output_names = [dagparts.T050017_filename(ifos, '%s_LLOID' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
dist_stat_names = [dagparts.T050017_filename(ifos, '%s_DIST_STATS' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
for bgbin in bgbin_indices:
bgbin_chunk_map.setdefault(bgbin, chunk_counter)
# Calculate the appropriate ht-gate-threshold values according to the scale given
threshold_values = get_threshold_values(template_mchirp_dict, bgbin_indices, svd_bank_strings, options)
# non injection options
noninj_opts = {
"ht-gate-threshold": threshold_values,
"gps-start-time": int(seg[0]),
"gps-end-time": int(seg[1]),
"channel-name": datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
}
noninj_opts.update(common_opts)
# non injection node
noninjnode = dagparts.DAGNode(jobs[inspiral_name], dag,
parent_nodes = sum((svd_node_list[numchunks*chunk_counter:numchunks*(chunk_counter+1)] for svd_node_list in svd_nodes.values()),[]),
opts = noninj_opts,
input_files = {
"time-slide-file":options.time_slide_file,
"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file,
"reference-psd":psd_nodes[(ifos, seg)].output_files["write-psd"],
"blind-injections":options.blind_injections,
"veto-segments-file":options.vetoes,
},
input_cache_files = {"svd-bank-cache":svd_bank_cache_maker(svd_bank_strings)},
output_cache_files = {
"output-cache":output_names,
"ranking-stat-output-cache":dist_stat_names
}
)
# Set a post script to check for file integrity
if options.gzip_test:
noninjnode.set_post_script("gzip_test.sh")
noninjnode.add_post_script_arg(" ".join(output_names + dist_stat_names))
# impose a priority to help with depth first submission
noninjnode.set_priority(chunk_counter+15)
inspiral_nodes[(ifos, None)].setdefault(seg, []).append(noninjnode)
# process injections
for injections in injection_files:
# setup output names
sim_name = sim_tag_from_inj_file(injections)
bgbin_svd_bank_strings = [bgbin_svdbank for i, bgbin_svdbank in enumerate(zip(sorted(list(template_mchirp_dict.keys())), svd_bank_strings_full)) if i not in ignore[injections]]
for chunk_counter, bgbin_list in enumerate(dagparts.groups(bgbin_svd_bank_strings, numchunks)):
bgbin_indices, svd_bank_strings = zip(*bgbin_list)
output_paths = [subdir_path([output_seg_inj_path, bgbin_index]) for bgbin_index in bgbin_indices]
output_names = [dagparts.T050017_filename(ifos, '%s_LLOID_%s' % (idx, sim_name), seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)]
svd_names = [s for i, s in enumerate(svd_bank_cache_maker(svd_bank_strings, injection = True))]
try:
reference_psd = psd_nodes[(ifos, seg)].output_files["write-psd"]
parents = [svd_node_list[int(bgbin_index)] for svd_node_list in svd_nodes.values() for bgbin_index in bgbin_indices]
except AttributeError: ### injection-only run
reference_psd = psd_nodes[(ifos, seg)]
parents = []
svd_files = [CacheEntry.from_T050017(filename) for filename in svd_names]
input_cache_name = dagparts.group_T050017_filename_from_T050017_files(svd_files, '.cache').replace('SVD', 'SVD_%s' % sim_name)
# Calculate the appropriate ht-gate-threshold values according to the scale given
threshold_values = get_threshold_values(template_mchirp_dict, bgbin_indices, svd_bank_strings, options)
# injection options
inj_opts = {
"ht-gate-threshold":threshold_values,
"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict),
}
inj_opts.update(common_opts)
# setup injection node
# FIXME: handles more than 3 ifos with same cpu/memory requests
injnode = dagparts.DAGNode(jobs[inspiral_inj_name], dag,
parent_nodes = parents,
opts = inj_opts,
input_files = {
"time-slide-file":options.inj_time_slide_file,
"frame-cache":options.frame_cache,
"frame-segments-file":options.frame_segments_file,
"reference-psd":reference_psd,
"veto-segments-file":options.vetoes,
"injections": injections
},
input_cache_files = {"svd-bank-cache":svd_names},
input_cache_file_name = input_cache_name,
output_cache_files = {"output-cache":output_names}
)
# Set a post script to check for file integrity
if options.gzip_test:
injnode.set_post_script("gzip_test.sh")
injnode.add_post_script_arg(" ".join(output_names))
# impose a priority to help with depth first submission
if bgbin_chunk_map:
injnode.set_priority(bgbin_chunk_map[bgbin_indices[-1]]+1)
else:
injnode.set_priority(chunk_counter+1)
inspiral_nodes[(ifos, sim_name)].setdefault(seg, []).append(injnode)
# Replace mchirplo:mchirphi:inj.xml with inj.xml
options.injection_file = [inj.split(':')[-1] for inj in options.injection_file]
# NOTE: Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis
lloid_output, lloid_diststats = adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict)
return inspiral_nodes, lloid_output, lloid_diststats
[docs]def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs):
ligolw_add_nodes = []
for inj in options.injection_file:
sim_tag = sim_tag_from_inj_file(inj.split(":")[-1])
inj_snr_nodes = []
inj_splitter_node = dagparts.DAGNode(jobs['injSplitter'], dag, parent_nodes=[],
opts = {
"output-path":jobs['injSplitter'].output_path,
"usertag": sim_tag,
"nsplit": num_split_inj_snr_jobs
},
input_files = {"": inj.split(":")[-1]}
)
inj_splitter_node.set_priority(98)
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
injection_files = []
for i in range(num_split_inj_snr_jobs):
inj_filename = DataType.SPLIT_INJECTIONS.filename("H1K1L1V1", segments.segment(0, 0), f"{i:04d}", sim_tag)
injection_file = os.path.join(jobs['injSplitter'].output_path, inj_filename)
injSNRnode = dagparts.DAGNode(jobs['gstlalInjSnr'], dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node],
# FIXME somehow choose the actual flow based on mass?
# max(flow) is chosen for performance not
# correctness hopefully though it will be good
# enough
opts = {"flow":max(options.flow),"fmax":options.fmax},
input_files = {
"injection-file": injection_file,
"reference-psd-cache": "reference_psd.cache"
}
)
injSNRnode.set_priority(98)
inj_snr_nodes.append(injSNRnode)
injection_files.append(injection_file)
addnode = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes=inj_snr_nodes,
input_files = {"": ' '.join(injection_files)},
output_files = {"output": os.path.basename(inj.split(":")[-1])}
)
ligolw_add_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [addnode],
opts = {"sql-file":options.injection_proc_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":addnode.output_files["output"]}
)
)
return ligolw_add_nodes
[docs]def snrchi2_pdf_plot_layer(dag, jobs, marg_nodes, output_dir):
"""create snrchi2 PDF plot for each template bank bin
"""
dagparts.DAGNode(jobs['plotBinBackground'], dag,
parent_nodes = list(marg_nodes.values()),
input_files = {"" : [node.output_files["output"] for node in marg_nodes.values()]},
output_files = {"output-dir" : output_dir}
)
[docs]def summary_plot_layer(dag, jobs, farnode, options, injdbs, noninjdb, output_dir):
plotnodes = []
### common plot options
common_plot_opts = {
"segments-name": options.frame_segments_name,
"tmp-space": dagparts.condor_scratch_space(),
"output-dir": output_dir,
"likelihood-file":"post_marginalized_likelihood.xml.gz",
"shrink-data-segments": 32.0,
"extend-veto-segments": 8.,
}
sensitivity_opts = {
"output-dir":output_dir,
"tmp-space":dagparts.condor_scratch_space(),
"veto-segments-name":"vetoes",
"bin-by-source-type":"",
"dist-bins":200,
"data-segments-name":"datasegments"
}
### plot summary
opts = {"user-tag": "ALL_LLOID_COMBINED", "remove-precession": ""}
opts.update(common_plot_opts)
plotnodes.append(dagparts.DAGNode(jobs['plotSummary'], dag, parent_nodes=[farnode],
opts = opts,
input_files = {"": [noninjdb] + injdbs}
))
### isolated precession plot summary
opts = {"user-tag": "PRECESSION_LLOID_COMBINED", "isolate-precession": "", "plot-group": 1}
opts.update(common_plot_opts)
plotnodes.append(dagparts.DAGNode(jobs['plotSummaryIsolatePrecession'], dag, parent_nodes=[farnode],
opts = opts,
input_files = {"":[noninjdb] + injdbs}
))
for injdb in injdbs:
### individual injection plot summary
opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1], "remove-precession": "", "plot-group": 1}
opts.update(common_plot_opts)
plotnodes.append(dagparts.DAGNode(jobs['plotSnglInjSummary'], dag, parent_nodes=[farnode],
opts = opts,
input_files = {"":[noninjdb] + [injdb]}
))
### isolated precession injection plot summary
opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID","PRECESSION_LLOID"), "isolate-precession": "", "plot-group": 1}
opts.update(common_plot_opts)
plotnodes.append(dagparts.DAGNode(jobs['plotSnglInjSummaryIsolatePrecession'], dag, parent_nodes=[farnode],
opts = opts,
input_files = {"":[noninjdb] + [injdb]}
))
### sensitivity plots
opts = {"user-tag": "ALL_LLOID_COMBINED"}
opts.update(sensitivity_opts)
plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode],
opts = opts,
input_files = {"zero-lag-database": noninjdb, "": injdbs}
))
for injdb in injdbs:
opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1]}
opts.update(sensitivity_opts)
plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode],
opts = opts,
input_files = {"zero-lag-database": noninjdb, "": injdb}
))
### background plots
plotnodes.append(dagparts.DAGNode(jobs['plotBackground'], dag, parent_nodes = [farnode],
opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir},
input_files = {"":"post_marginalized_likelihood.xml.gz", "database":noninjdb}
))
return plotnodes
[docs]def clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete):
"""clean intermediate merger products
"""
for db in dbs_to_delete:
dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes,
input_files = {"": db}
)
for margfile in margfiles_to_delete:
dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes,
input_files = {"": margfile}
)
return None
[docs]def inj_psd_layer(segsdict, options):
psd_nodes = {}
psd_cache_files = {}
for ce in map(CacheEntry, open(options.psd_cache)):
psd_cache_files.setdefault(frozenset(lsctables.instrumentsproperty.get(ce.observatory)), []).append((ce.segment, ce.path))
for ifos in segsdict:
# FIXME: confirm that this sort works
reference_psd_files = sorted(psd_cache_files[ifos], key = lambda s : s[0])
ref_psd_file_num = 0
for seg in segsdict[ifos]:
while int(reference_psd_files[ref_psd_file_num][0][0]) < int(seg[0]):
ref_psd_file_num += 1
psd_nodes[(ifos, seg)] = reference_psd_files[ref_psd_file_num][1]
ref_psd_parent_nodes = []
return psd_nodes, ref_psd_parent_nodes
[docs]def mass_model_layer(dag, jobs, parent_nodes, instruments, options, seg, psd):
"""mass model node
"""
if options.mass_model_file is None:
# choose, arbitrarily, the lowest instrument in alphabetical order
model_file_name = dagparts.T050017_filename(instruments, 'ALL_MASS_MODEL', seg, '.h5', path = jobs['model'].output_path)
model_node = dagparts.DAGNode(jobs['model'], dag,
input_files = {"template-bank": options.template_bank, "reference-psd": psd},
opts = {"model":options.mass_model},
output_files = {"output": model_file_name},
parent_nodes = parent_nodes
)
return [model_node], model_file_name
else:
return [], options.mass_model_file
[docs]def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, sqlfile, input_files=None):
"""merge and cluster from sqlite database
"""
if input_files:
input_ = {"": input_files}
else:
input_ = {}
# Merge database into chunks
sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parent_nodes,
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_files = input_,
input_cache_files = {"input-cache": db_cache},
output_files = {"database":db},
input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache')
)
# cluster database
return dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file": sqlfile, "tmp-space": dagparts.condor_scratch_space()},
input_files = {"": db}
)
[docs]def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map, idq_file = None):
instruments = "".join(sorted(instrument_set))
margnodes = {}
# NOTE! we rely on there being identical templates in each instrument,
# so we just take one of the values of the svd_nodes which are a dictionary
# FIXME, the svd nodes list has to be the same as the sorted keys of
# lloid_output. svd nodes should be made into a dictionary much
# earlier in the code to prevent a mishap
if svd_nodes:
one_ifo_svd_nodes = dict(("%04d" % n, node) for n, node in enumerate(list(svd_nodes.values())[0]))
# Here n counts the bins
# FIXME - this is broken for injection dags right now because of marg nodes
# first non-injections, which will get skipped if this is an injections-only run
for bin_key in sorted(list(lloid_output[None].keys())):
outputs = lloid_output[None][bin_key]
diststats = lloid_diststats[bin_key]
inputs = [o[0] for o in outputs]
parents = dagparts.flatten([o[1] for o in outputs])
rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key)
if svd_nodes:
parent_nodes = [one_ifo_svd_nodes[bin_key]] + model_node
svd_file = one_ifo_svd_nodes[bin_key].output_files["write-svd"]
else:
parent_nodes = model_node
svd_path = os.path.join(options.analysis_path, jobs['svd'].output_path)
svd_file = dagparts.T050017_filename(instrument_set[0], '%s_SVD' % bin_key, boundary_seg, '.xml.gz', path = svd_path)
# FIXME we keep this here in case we someday want to have a
# mass bin dependent prior, but it really doesn't matter for
# the time being.
prior_input_files = {
"svd-file": svd_file,
"mass-model-file": model_file,
"dtdphi-file": svd_dtdphi_map[bin_key],
"psd-xml": ref_psd
}
if idq_file is not None:
prior_input_files["idq-file"] = idq_file
priornode = dagparts.DAGNode(jobs['createPriorDistStats'], dag,
parent_nodes = parent_nodes,
opts = {
"instrument": instrument_set,
"background-prior": 1,
"min-instruments": options.min_instruments,
"coincidence-threshold":options.coincidence_threshold,
"df": "bandwidth"
},
input_files = prior_input_files,
output_files = {"write-likelihood": rankfile('CREATE_PRIOR_DIST_STATS', job=jobs['createPriorDistStats'])}
)
# Create a file that has the priors *and* all of the diststats
# for a given bin marginalized over time. This is all that will
# be needed to compute the likelihood
diststats_per_bin_node = dagparts.DAGNode(jobs['marginalize'], dag,
parent_nodes = [priornode] + parents,
opts = {"marginalize": "ranking-stat"},
input_cache_files = {"likelihood-cache": diststats + [priornode.output_files["write-likelihood"]]},
output_files = {"output": rankfile('MARG_DIST_STATS', job=jobs['marginalize'])},
input_cache_file_name = rankfile('MARG_DIST_STATS')
)
margnodes[bin_key] = diststats_per_bin_node
return margnodes
[docs]def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set, with_zero_lag = False):
rankpdf_nodes = []
rankpdf_zerolag_nodes = []
instruments = "".join(sorted(instrument_set))
# Here n counts the bins
for bin_key in sorted(list(marg_nodes.keys())):
rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key)
calcranknode = dagparts.DAGNode(jobs['calcRankPDFs'], dag,
parent_nodes = [marg_nodes[bin_key]],
opts = {"ranking-stat-samples":options.ranking_stat_samples},
input_files = {"": marg_nodes[bin_key].output_files["output"]},
output_files = {"output": rankfile('CALC_RANK_PDFS', job=jobs['calcRankPDFs'])},
)
rankpdf_nodes.append(calcranknode)
if with_zero_lag:
calcrankzerolagnode = dagparts.DAGNode(jobs['calcRankPDFsWithZerolag'], dag,
parent_nodes = [marg_nodes[bin_key]],
opts = {"add-zerolag-to-background": "", "ranking-stat-samples": options.ranking_stat_samples},
input_files = {"": marg_nodes[bin_key].output_files["output"]},
output_files = {"output": rankfile('CALC_RANK_PDFS_WZL', job=jobs['calcRankPDFsWithZerolag'])},
)
rankpdf_zerolag_nodes.append(calcrankzerolagnode)
return rankpdf_nodes, rankpdf_zerolag_nodes
[docs]def lnlrcdf_signal_layer(dag, jobs, parent_nodes, inj_tmplt_match_nodes, options, boundary_seg, instrument_set):
ngroup = min(options.ngroup, len(parent_nodes))
lnlrcdf_signal_nodes = {}
for inj in options.injection_file:
sim_name = sim_tag_from_inj_file(inj.split(":")[-1])
lnlrcdf_signal_nodes[sim_name] = []
for group in range(ngroup):
bin_keys = list(parent_nodes)[group::ngroup]
rankfiles = dict((bin_key, functools.partial(get_rank_file, "".join(sorted(instrument_set)), boundary_seg, bin_key)) for bin_key in bin_keys)
for nsplit in range(options.num_split_inj_files):
lnlrcdf_signal_nodes[sim_name].append(
dagparts.DAGNode(
jobs['lnlrcdfSignal'],
dag,
parent_nodes = [parent_nodes[bin_key] for bin_key in bin_keys] + [inj_tmplt_match_nodes[sim_name][nsplit]],
# FIXME somehow choose the actual flow based on mass?
# max(flow) is chosen for performance not
# correctness hopefully though it will be good
# enough
opts = {"f-low":max(options.flow)},
input_files = {
"injection-template-match-file": inj_tmplt_match_nodes[sim_name][nsplit].output_files["output"],
"likelihood-url": [rankfile('MARG_DIST_STATS', jobs['marginalize']) for rankfile in rankfiles.values()]
},
output_files = {"output-file": rankfiles[min(bin_keys)]('%s_%04d'%(sim_name, nsplit), jobs['lnlrcdfSignal'])},
)
)
return lnlrcdf_signal_nodes
[docs]def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set):
likelihood_nodes = {}
instruments = "".join(sorted(instrument_set))
# non-injection jobs
for bin_key in sorted(list(lloid_output[None].keys())):
outputs = lloid_output[None][bin_key]
diststats = lloid_diststats[bin_key]
inputs = [o[0] for o in outputs]
# (input files for next job, dist stat files, parents for next job)
if bin_key in marg_nodes:
likelihood_url = marg_nodes[bin_key].output_files["output"]
parents = [marg_nodes[bin_key]]
else:
likelihood_url = lloid_diststats[bin_key][0]
parents = []
likelihood_nodes[None, bin_key] = (inputs, likelihood_url, parents)
# injection jobs
for inj in options.injection_file:
lloid_nodes = lloid_output[sim_tag_from_inj_file(inj)]
for bin_key in sorted(list(lloid_nodes.keys())):
outputs = lloid_nodes[bin_key]
diststats = lloid_diststats[bin_key]
if outputs is not None:
inputs = [o[0] for o in outputs]
parents = dagparts.flatten([o[1] for o in outputs])
if bin_key in marg_nodes:
parents.append(marg_nodes[bin_key])
likelihood_url = marg_nodes[bin_key].output_files["output"]
else:
likelihood_url = lloid_diststats[bin_key][0]
likelihood_nodes[sim_tag_from_inj_file(inj), bin_key] = (inputs, likelihood_url, parents)
return likelihood_nodes
[docs]def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, boundary_seg, instruments, with_zero_lag = False):
num_chunks = 100
innodes = {}
# after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run
for (sim_tag, bin_key), (inputs, likelihood_url, parents) in likelihood_nodes.items():
db = inputs_to_db(jobs, inputs, job_type = 'toSqliteNoCache')
xml = inputs_to_db(jobs, inputs, job_type = 'ligolwAdd').replace(".sqlite", ".xml.gz")
snr_cluster_sql_file = options.snr_cluster_sql_file if sim_tag is None else options.injection_snr_cluster_sql_file
cluster_sql_file = options.cluster_sql_file if sim_tag is None else options.injection_sql_file
likelihood_job = jobs['calcLikelihood'] if sim_tag is None else jobs['calcLikelihoodInj']
# cluster sub banks
cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = parents,
opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":inputs}
)
# merge sub banks
merge_node = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes = [cluster_node],
input_files = {"":inputs},
output_files = {"output":xml}
)
# cluster and simplify sub banks
cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [merge_node],
opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":xml}
)
# assign likelihoods
likelihood_node = dagparts.DAGNode(likelihood_job, dag,
parent_nodes = [cluster_node],
opts = {"tmp-space": dagparts.condor_scratch_space(), "force": ""},
input_files = {"likelihood-url":likelihood_url, "": xml}
)
sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [likelihood_node],
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":xml},
output_files = {"database":db},
)
sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode],
opts = {"sql-file": cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()},
input_files = {"":db}
)
innodes.setdefault(sim_tag_from_inj_file(sim_tag) if sim_tag else None, []).append(sqlitenode)
# make sure outnodes has a None key, even if its value is an empty list
# FIXME injection dag is broken
innodes.setdefault(None, [])
if options.vetoes is None:
vetoes = []
else:
vetoes = [options.vetoes]
chunk_nodes = []
dbs_to_delete = []
# Process the chirp mass bins in chunks to paralellize the merging process
for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)):
try:
dbs = [node.input_files[""] for node in nodes]
parents = nodes
except AttributeError:
# analysis started at merger step but seeded by lloid files which
# have already been merged into one file per background
# bin, thus the analysis will begin at this point
dbs = nodes
parents = []
dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
# Merge and cluster the final non injection database
noninjsqlitenode = merge_cluster_layer(dag, jobs, parents, noninjdb, dbs, options.cluster_sql_file)
chunk_nodes.append(noninjsqlitenode)
dbs_to_delete.append(noninjdb)
# Merge the final non injection database
outnodes = []
injdbs = []
if options.non_injection_db: #### injection-only run
noninjdb = options.non_injection_db
else:
final_nodes = []
for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)):
noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite')
# cluster the final non injection database
noninjsqlitenode = merge_cluster_layer(dag, jobs, nodes, noninjdb, [node.input_files[""] for node in nodes], options.cluster_sql_file)
final_nodes.append(noninjsqlitenode)
input_files = (vetoes + [options.frame_segments_file])
input_cache_files = [node.input_files[""] for node in final_nodes]
noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite')
noninjsqlitenode = merge_cluster_layer(dag, jobs, final_nodes, noninjdb, input_cache_files, options.cluster_sql_file, input_files=input_files)
if with_zero_lag:
cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [noninjsqlitenode],
input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
)
outnodes.append(cpnode)
else:
outnodes.append(noninjsqlitenode)
if options.injection_file:
iterable_injections = options.injection_file
else:
iterable_injections = options.injections_for_merger
for injections in iterable_injections:
# extract only the nodes that were used for injections
chunk_nodes = []
for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
try:
dbs = [injnode.input_files[""] for injnode in injnodes]
parents = injnodes
except AttributeError:
dbs = injnodes
parents = []
# Setup the final output names, etc.
dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs]
injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path)
# merge and cluster
clusternode = merge_cluster_layer(dag, jobs, parents, injdb, dbs, options.cluster_sql_file)
chunk_nodes.append(clusternode)
dbs_to_delete.append(injdb)
final_nodes = []
for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)):
# Setup the final output names, etc.
injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite')
# merge and cluster
clusternode = merge_cluster_layer(dag, jobs, injnodes, injdb, [node.input_files[""] for node in injnodes], options.injection_sql_file)
final_nodes.append(clusternode)
# Setup the final output names, etc.
injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite')
injdbs.append(injdb)
injxml = injdb.replace('.sqlite','.xml.gz')
xml_input = injxml
# merge and cluster
parent_nodes = final_nodes + ligolw_add_nodes
input_files = (vetoes + [options.frame_segments_file, injections])
input_cache_files = [node.input_files[""] for node in final_nodes]
clusternode = merge_cluster_layer(dag, jobs, parent_nodes, injdb, input_cache_files, options.injection_sql_file, input_files=input_files)
clusternode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [clusternode],
opts = {"tmp-space":dagparts.condor_scratch_space()},
output_files = {"extract":injxml},
input_files = {"database":injdb}
)
inspinjnode = dagparts.DAGNode(jobs['ligolwInspinjFind'], dag, parent_nodes = [clusternode],
opts = {"time-window":0.9},
input_files = {"":injxml}
)
sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [inspinjnode],
opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()},
output_files = {"database":injdb},
input_files = {"":xml_input}
)
if with_zero_lag:
cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [sqlitenode],
input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))}
)
outnodes.append(cpnode)
else:
outnodes.append(sqlitenode)
return injdbs, noninjdb, outnodes, dbs_to_delete
[docs]def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes, options, with_zero_lag = False):
ranknodes = [rankpdf_nodes, rankpdf_zerolag_nodes]
margjobs = [jobs['marginalize'], jobs['marginalizeWithZerolag']]
margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
if with_zero_lag:
filesuffixs = ['', '_with_zerolag']
else:
filesuffixs = ['']
margnum = 16
all_margcache = []
all_margnodes = []
final_margnodes = []
for nodes, job, margfile, filesuffix in zip(ranknodes, margjobs, margfiles, filesuffixs):
try:
margin = [node.output_files["output"] for node in nodes]
parents = nodes
except AttributeError: ### analysis started at merger step
margin = nodes
parents = []
margnodes = []
margcache = []
# split up the marginalization into groups of 10
# FIXME: is it actually groups of 10 or groups of 16?
for margchunk in dagparts.groups(margin, margnum):
if nodes:
marg_ce = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margchunk]
margcache.append(dagparts.group_T050017_filename_from_T050017_files(marg_ce, '.xml.gz', path = job.output_path))
margnodes.append(dagparts.DAGNode(job, dag, parent_nodes = parents,
opts = {"marginalize": "ranking-stat-pdf"},
output_files = {"output": margcache[-1]},
input_cache_files = {"likelihood-cache": margchunk},
input_cache_file_name = os.path.basename(margcache[-1]).replace('.xml.gz','.cache')
))
all_margcache.append(margcache)
all_margnodes.append(margnodes)
if not options.marginalized_likelihood_file: ### not an injection-only run
for nodes, job, margnodes, margcache, margfile, filesuffix in zip(ranknodes, margjobs, all_margnodes, all_margcache, margfiles, filesuffixs):
final_margnodes.append(dagparts.DAGNode(job, dag, parent_nodes = margnodes,
opts = {"marginalize": "ranking-stat-pdf"},
output_files = {"output": "marginalized_likelihood%s.xml.gz"%filesuffix},
input_cache_files = {"likelihood-cache": margcache},
input_cache_file_name = "marginalized_likelihood%s.cache"%filesuffix
))
return final_margnodes, dagparts.flatten(all_margcache)
[docs]def compute_far_layer(dag, jobs, margnodes, injdbs, noninjdb, final_sqlite_nodes, options, with_zero_lag = False):
"""compute FAPs and FARs
"""
margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file]
if with_zero_lag:
filesuffixs = ['', '_with_zerolag']
else:
filesuffixs = ['']
if options.marginalized_likelihood_file:
assert not margnodes, "no marg nodes should be produced in an injection-only DAG"
margnodes = [None, None]
for margnode, margfile, filesuffix in zip(margnodes, margfiles, filesuffixs):
if options.marginalized_likelihood_file:
parents = final_sqlite_nodes
marginalized_likelihood_file = margfile
else:
parents = [margnode] + final_sqlite_nodes
marginalized_likelihood_file = margnode.output_files["output"]
farnode = dagparts.DAGNode(jobs['ComputeFarFromSnrChisqHistograms'], dag, parent_nodes = parents,
opts = {"tmp-space":dagparts.condor_scratch_space()},
input_files = {
"background-bins-file": marginalized_likelihood_file,
"injection-db": [injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') for injdb in injdbs] if 'zerolag' in filesuffix else injdbs,
"non-injection-db": noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') if 'zerolag' in filesuffix else noninjdb
}
)
if 'zerolag' not in filesuffix:
outnode = farnode
return outnode
[docs]def make_mc_vtplot_layer(dag, jobs, parent_nodes, add_parent_node, options, instrument_set, output_dir, injdbs = None):
make_mc_vtplot_nodes = []
injs = [inj.split(":")[-1] for inj in options.injection_file]
for inj in injs + ["COMBINED"]:
sim_name = sim_tag_from_inj_file(inj)
if sim_name in parent_nodes:
parent = parent_nodes[sim_name]
if injdbs:
inj = [injdb for injdb in injdbs if sim_name in injdb][0]
elif inj == "COMBINED":
parent = sum(parent_nodes.values(), [])
if injdbs:
inj = injdbs
else:
inj = injs
else:
raise ValueError("internal error")
job = jobs['makeMcVtplot']
lnlrcdf_files = [node.output_files["output-file"] for node in parent]
lnlrcdf_caches = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(lnlrcdf_file)) for lnlrcdf_file in lnlrcdf_files]
lnlrcdf_cache = dagparts.group_T050017_filename_from_T050017_files(lnlrcdf_caches, '.xml.gz', path = jobs['makeMcVtplot'].output_path)
input_files = {
"ranking-stat-pdf": "post_marginalized_likelihood.xml.gz",
}
if injdbs:
input_files["injection-database"] = inj
else:
input_files["injection-files"] = inj
if injdbs:
job = jobs['makeMcVtplotCheck']
input_files["check-vt"] = ""
lnlrcdf_cache = dagparts.group_T050017_filename_from_T050017_files(lnlrcdf_caches, '.xml.gz', path = jobs['makeMcVtplotCheck'].output_path)
make_mc_vtplot_node = dagparts.DAGNode(
job,
dag,
parent_nodes = parent + [add_parent_node],
opts = {"instrument" : instrument_set},
input_files = input_files,
output_files = {"output-dir": output_dir},
input_cache_files = {"lnlrcdf-cache": lnlrcdf_files},
input_cache_file_name = os.path.basename(lnlrcdf_cache).replace('.xml.gz','.cache') if sim_name in parent_nodes else "COMBINED.cache"
)
make_mc_vtplot_nodes.append(make_mc_vtplot_node)
return make_mc_vtplot_nodes
[docs]def horizon_dist_layer(dag, jobs, marg_nodes, output_dir, instruments):
"""calculate horizon distance from marginalize diststats
"""
dagparts.DAGNode(jobs['horizon'], dag,
parent_nodes = list(marg_nodes.values()),
input_files = {"" : [node.output_files["output"] for node in marg_nodes.values()]},
opts = {"outdir" : output_dir}
)
[docs]def summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs, output_dir):
"""create a summary page
"""
output_user_tags = ["ALL_LLOID_COMBINED", "PRECESSION_LLOID_COMBINED"]
output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1] for injdb in injdbs])
output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID", "PRECESSION_LLOID") for injdb in injdbs])
dagparts.DAGNode(jobs['summaryPage'], dag, parent_nodes = plotnodes,
opts = {
"title":"gstlal-%d-%d-closed-box" % (int(boundary_seg[0]), int(boundary_seg[1])),
"webserver-dir":options.web_dir,
"glob-path":output_dir,
"output-user-tag":output_user_tags
}
)
#
# environment utilities
#
[docs]def webserver_url():
"""!
The stupid pet tricks to find webserver on the LDG.
"""
host = socket.getfqdn()
#FIXME add more hosts as you need them
if "cit" in host or "ligo.caltech.edu" in host:
return "https://ldas-jobs.ligo.caltech.edu"
if ".phys.uwm.edu" in host or ".cgca.uwm.edu" in host or ".nemo.uwm.edu" in host:
return "https://ldas-jobs.cgca.uwm.edu"
# FIXME: this next system does not have a web server, but not
# having a web server is treated as a fatal error so we have to
# make something up if we want to make progress
if ".icrr.u-tokyo.ac.jp" in host:
return "https://ldas-jobs.icrr.u-tokyo.ac.jp"
raise NotImplementedError("I don't know where the webserver is for this environment")
#
# DAG utilities
#
[docs]def load_analysis_output(options):
# load triggers
bgbin_lloid_map = defaultdict(dict)
for ce in map(CacheEntry, open(options.lloid_cache)):
try:
bgbin_idx, _, inj = ce.description.split('_', 2)
except:
bgbin_idx, _ = ce.description.split('_', 1)
inj = None
finally:
bgbin_lloid_map[sim_tag_from_inj_file(inj)].setdefault(bgbin_idx, []).append((ce.path, []))
# load dist stats
lloid_diststats = {}
for ce in map(CacheEntry, open(options.dist_stats_cache)):
if 'DIST_STATS' in ce.description and not 'CREATE_PRIOR' in ce.description:
lloid_diststats.setdefault(ce.description.split("_")[0], []).append(ce.path)
# load svd dtdphi map
svd_dtdphi_map, instrument_set = load_svd_dtdphi_map(options)
# modify injections option, as is done in 'adapt_inspiral_output'
# FIXME: don't do this, find a cleaner way of handling this generally
options.injection_file = [inj.split(':')[-1] for inj in options.injection_file]
return bgbin_lloid_map, lloid_diststats, svd_dtdphi_map, instrument_set
[docs]def load_svd_dtdphi_map(options):
svd_dtdphi_map = {}
bank_cache = load_bank_cache(options)
instrument_set = list(bank_cache.keys())
for ifo, list_of_svd_caches in bank_cache.items():
bin_offset = 0
for j, svd_caches in enumerate(list_of_svd_caches):
for i, individual_svd_cache in enumerate(ce.path for ce in map(CacheEntry, open(svd_caches))):
svd_dtdphi_map["%04d" % (i+bin_offset)] = options.dtdphi_file[j]
bin_offset += i+1
return svd_dtdphi_map, instrument_set
[docs]def get_threshold_values(template_mchirp_dict, bgbin_indices, svd_bank_strings, options):
"""Calculate the appropriate ht-gate-threshold values according to the scale given
"""
if options.ht_gate_threshold_linear is not None:
# A scale is given
mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
# use max mchirp in a given svd bank to decide gate threshold
bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices]
gate_mchirp_ratio = (ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)
return [gate_mchirp_ratio*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
elif options.ht_gate_threshold is not None:
return [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given
else:
return None
[docs]def cache_to_db(cache, jobs):
hi_index = cache[-1].description.split('_')[0]
db = os.path.join(jobs['toSqlite'].output_path, os.path.basename(cache[-1].path))
db.replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,))
return db
[docs]def get_rank_file(instruments, boundary_seg, n, basename, job=None):
if job:
return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.xml.gz', path = job.output_path)
else:
return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.cache')
#
# Utility functions
#
[docs]def parse_cache_str(instr):
"""!
A way to decode a command line option that specifies different bank
caches for different detectors, e.g.,
>>> bankcache = parse_cache_str("H1=H1_split_bank.cache,L1=L1_split_bank.cache,V1=V1_split_bank.cache")
>>> bankcache
{'V1': 'V1_split_bank.cache', 'H1': 'H1_split_bank.cache', 'L1': 'L1_split_bank.cache'}
"""
dictcache = {}
if instr is None: return dictcache
for c in instr.split(','):
ifo = c.split("=")[0]
cache = c.replace(ifo+"=","")
dictcache[ifo] = cache
return dictcache
[docs]def build_bank_groups(cachedict, numbanks = [2], maxjobs = None):
"""!
given a dictionary of bank cache files keyed by ifo from .e.g.,
parse_cache_str(), group the banks into suitable size chunks for a single svd
bank file according to numbanks. Note, numbanks can be should be a list and uses
the algorithm in the group() function
"""
outstrs = []
ifos = sorted(list(cachedict.keys()))
files = zip(*[[CacheEntry(f).path for f in open(cachedict[ifo],'r').readlines()] for ifo in ifos])
for n, bank_group in enumerate(group(list(files), numbanks)):
if maxjobs is not None and n > maxjobs:
break
c = dict(zip(ifos, zip(*bank_group)))
outstrs.append(c)
return outstrs
[docs]def get_svd_bank_params_online(svd_bank_cache):
template_mchirp_dict = {}
for ii, ce in enumerate([CacheEntry(f) for f in open(svd_bank_cache)]):
min_mchirp, max_mchirp = float("inf"), 0
xmldoc = ligolw_utils.load_url(ce.path, contenthandler = svd_bank.DefaultContentHandler)
for root in (elem for elem in xmldoc.getElementsByTagName(ligolw.LIGO_LW.tagName) if elem.hasAttribute(u"Name") and elem.Name == "gstlal_svd_bank_Bank"):
snglinspiraltable = lsctables.SnglInspiralTable.get_table(root)
mchirp_column = snglinspiraltable.getColumnByName("mchirp")
min_mchirp, max_mchirp = min(min_mchirp, min(mchirp_column)), max(max_mchirp, max(mchirp_column))
template_mchirp_dict["%04d" % ii] = (min_mchirp, max_mchirp)
xmldoc.unlink()
return template_mchirp_dict
[docs]def get_svd_bank_params(svd_bank_cache, online = False):
if not online:
bgbin_file_map = {}
max_time = 0
template_mchirp_dict = {}
for ce in sorted([CacheEntry(f) for f in open(svd_bank_cache)], cmp = lambda x,y: cmp(int(x.description.split("_")[0]), int(y.description.split("_")[0]))):
if not online:
bgbin_file_map.setdefault(ce.observatory, []).append(ce.path)
if not template_mchirp_dict.setdefault(ce.description.split("_")[0], []):
min_mchirp, max_mchirp = float("inf"), 0
xmldoc = ligolw_utils.load_url(ce.path, contenthandler = svd_bank.DefaultContentHandler)
for root in (elem for elem in xmldoc.getElementsByTagName(ligolw.LIGO_LW.tagName) if elem.hasAttribute(u"Name") and elem.Name == "gstlal_svd_bank_Bank"):
snglinspiraltable = lsctables.SnglInspiralTable.get_table(root)
mchirp_column = snglinspiraltable.getColumnByName("mchirp")
min_mchirp, max_mchirp = min(min_mchirp, min(mchirp_column)), max(max_mchirp, max(mchirp_column))
if not online:
max_time = max(max_time, max(snglinspiraltable.getColumnByName("template_duration")))
template_mchirp_dict[ce.description.split("_")[0]] = (min_mchirp, max_mchirp)
xmldoc.unlink()
if not online:
return template_mchirp_dict, bgbin_file_map, max_time
else:
return template_mchirp_dict
[docs]def sim_tag_from_inj_file(injections):
if injections is None:
return None
return os.path.basename(injections).replace('.xml', '').replace('.gz', '').replace('-','_')
[docs]def load_bank_cache(options):
bank_cache = {}
for bank_cache_str in options.bank_cache:
for c in bank_cache_str.split(','):
ifo = c.split("=")[0]
cache = c.replace(ifo+"=","")
bank_cache.setdefault(ifo, []).append(cache)
return bank_cache
[docs]def get_bank_params(options, verbose = False):
bank_cache = load_bank_cache(options)
max_time = 0
template_mchirp_dict = {}
for n, cache in enumerate(list(bank_cache.values())[0]):
for ce in map(CacheEntry, open(cache)):
for ce in map(CacheEntry, open(ce.path)):
xmldoc = ligolw_utils.load_filename(ce.path, verbose = verbose, contenthandler = LIGOLWContentHandler)
snglinspiraltable = lsctables.SnglInspiralTable.get_table(xmldoc)
max_time = max(max_time, max(snglinspiraltable.getColumnByName('template_duration')))
idx = options.overlap[n] // 2
template_mchirp_dict[ce.path] = [min(snglinspiraltable.getColumnByName('mchirp')[idx:-idx]), max(snglinspiraltable.getColumnByName('mchirp')[idx:-idx])]
xmldoc.unlink()
return template_mchirp_dict, bank_cache, max_time
[docs]def subdir_path(dirlist):
output_path = '/'.join(dirlist)
try:
os.mkdir(output_path)
except:
pass
return output_path
[docs]def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2):
"""get a dictionary of all the disjoint 2+ detector combination segments
"""
segsdict = segments.segmentlistdict()
# 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code
start_pad = 512 + max_template_length
# Chosen so that the overlap is only a ~5% hit in run time for long segments...
segment_length = int(5 * start_pad)
for n in range(min_instruments, 1 + len(analyzable_instruments_set)):
for ifo_combos in itertools.combinations(list(analyzable_instruments_set), n):
segsdict[frozenset(ifo_combos)] = allsegs.intersection(ifo_combos) - allsegs.union(analyzable_instruments_set - set(ifo_combos))
segsdict[frozenset(ifo_combos)] &= segments.segmentlist([boundary_seg])
segsdict[frozenset(ifo_combos)] = segsdict[frozenset(ifo_combos)].protract(start_pad)
segsdict[frozenset(ifo_combos)] = dagparts.breakupsegs(segsdict[frozenset(ifo_combos)], segment_length, start_pad)
if not segsdict[frozenset(ifo_combos)]:
del segsdict[frozenset(ifo_combos)]
return segsdict
[docs]def create_svd_bank_strings(svd_nodes, instruments = None):
# FIXME assume that the number of svd nodes is the same per ifo, a good assumption though
outstrings = []
for i in range(len(list(svd_nodes.values())[0])):
svd_bank_string = ""
for ifo in svd_nodes:
if instruments is not None and ifo not in instruments:
continue
try:
svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i].output_files["write-svd"])
except AttributeError:
svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i])
svd_bank_string = svd_bank_string.strip(",")
outstrings.append(svd_bank_string)
return outstrings
[docs]def svd_bank_cache_maker(svd_bank_strings, injection = False):
if injection:
dir_name = "gstlal_inspiral_inj"
else:
dir_name = "gstlal_inspiral"
svd_cache_entries = []
parsed_svd_bank_strings = [inspiral.parse_svdbank_string(single_svd_bank_string) for single_svd_bank_string in svd_bank_strings]
for svd_bank_parsed_dict in parsed_svd_bank_strings:
for filename in svd_bank_parsed_dict.values():
svd_cache_entries.append(CacheEntry.from_T050017(filename))
return [svd_cache_entry.url for svd_cache_entry in svd_cache_entries]
[docs]def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict):
# first get the previous output in a usable form
lloid_output = {}
for inj in options.injection_file + [None]:
lloid_output[sim_tag_from_inj_file(inj)] = {}
lloid_diststats = {}
if options.dist_stats_cache:
for ce in map(CacheEntry, open(options.dist_stats_cache)):
lloid_diststats[ce.description.split("_")[0]] = [ce.path]
for ifos in segsdict:
for seg in segsdict[ifos]:
# iterate over the mass space chunks for each segment
for node in inspiral_nodes[(ifos, None)][seg]:
if node is None:
break
len_out_files = len(node.output_files["output-cache"])
for f in node.output_files["output-cache"]:
# Store the output files and the node for use as a parent dependency
lloid_output[None].setdefault(CacheEntry.from_T050017(f).description.split("_")[0], []).append((f, [node]))
for f in node.output_files["ranking-stat-output-cache"]:
lloid_diststats.setdefault(CacheEntry.from_T050017(f).description.split("_")[0] ,[]).append(f)
for inj in options.injection_file:
for injnode in inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))][seg]:
if injnode is None:
continue
for f in injnode.output_files["output-cache"]:
# Store the output files and the node and injnode for use as a parent dependencies
bgbin_index = CacheEntry.from_T050017(f).description.split("_")[0]
try:
lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, lloid_output[None][bgbin_index][-1][1]+[injnode]))
except KeyError:
lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, [injnode]))
return lloid_output, lloid_diststats
[docs]def set_up_scripts(options):
# Make an xml integrity checker
if options.gzip_test:
with open("gzip_test.sh", "w") as f:
f.write("#!/bin/bash\nsleep 60\ngzip --test $@")
os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
# A pre script to backup data before feeding to lossy programs
# (e.g. clustering routines)
with open("store_raw.sh", "w") as f:
f.write("""#!/bin/bash
for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""")
os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR)
[docs]def load_reference_psd(options):
ref_psd = lal.series.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lal.series.PSDContentHandler))
# FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache
with open('reference_psd.cache', "w") as output_cache_file:
output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(options.reference_psd)))
return ref_psd
if __name__ == "__main__":
import doctest
doctest.testmod()