# Copyright (C) 2020 Patrick Godwin (patrick.godwin@ligo.org)
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from collections.abc import Mapping
import itertools
import os
import math
import sys
import numpy
from lal.utils import CacheEntry
from ligo.segments import segment
from gstlal import plugins
from gstlal.datafind import DataType, DataCache
from gstlal.dags import Argument, Option
from gstlal.dags import util as dagutil
from gstlal.dags.layers import Layer, Node
DEFAULT_SPLIT_INJECTION_TIME = 20000
DEFAULT_MAX_FILES = 500
# 10 bins per job corresponds to 20-30 min execution time
PLOT_BIN_BACKGROUND_MAX_FILES = 10
[docs]def split_bank_layer(config, dag, psd_cache, bank_cache):
layer = Layer(
"gstlal_inspiral_bank_splitter",
requirements={
"request_cpus": 1,
"request_memory": 4000,
"request_disk": "2GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
split_bank_cache = DataCache.generate(DataType.SVD_BANK, config.all_ifos, config.span, svd_bins=config.svd.bins)
arguments = [
Option("f-low", config.svd.f_low),
Option("f-final", config.svd.max_f_final),
Option("approximant", config.svd.approximant),
Option("overlap", config.svd.overlap),
Option("instrument", ifo_to_string(config.all_ifos)),
Option("n", config.svd.num_split_templates),
Option("num-banks", config.svd.num_banks),
Option("sort-by", config.svd.sort_by),
]
if config.svd.num_mu_bins:
arguments.append(Option("group-by-mu", config.svd.num_mu_bins))
else:
arguments.append(Option("group-by-chi", config.svd.num_chi_bins))
layer += Node(
arguments = arguments,
inputs = [
Argument("template-bank", bank_cache.files, track=False),
Argument("psd-xml", psd_cache.files, track=False),
],
outputs = [
Option("output-path", str(split_bank_cache.name).lower()),
Option("output-stats-file", DataType.SVD_MANIFEST.filename(config.ifos)),
],
)
dag.attach(layer)
return split_bank_cache
[docs]def svd_bank_layer(config, dag, median_psd_cache, split_bank_cache=None):
layer = Layer(
"gstlal_inspiral_svd_bank",
requirements={
"request_cpus": 1,
"request_memory": 4000,
"request_disk": "2GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
svd_cache = DataCache.generate(
DataType.SVD_BANK,
config.ifos,
config.span,
svd_bins=config.svd.bins,
root="filter",
)
split_banks = split_bank_cache.groupby("bin")
for (ifo, svd_bin), svd_banks in svd_cache.groupby("ifo", "bin").items():
# grab sub-bank specific configuration if available
if "bank_name" in config.svd.stats.bins[svd_bin]:
bank_name = config.svd.stats.bins[svd_bin]["bank_name"]
svd_config = config.svd.sub_banks[bank_name]
else:
svd_config = config.svd
arguments = [
Option("instrument-override", ifo),
Option("flow", svd_config.f_low),
Option("samples-min", svd_config.samples_min),
Option("samples-max-64", svd_config.samples_max_64),
Option("samples-max-256", svd_config.samples_max_256),
Option("samples-max", svd_config.samples_max),
Option("svd-tolerance", svd_config.tolerance),
Option("autocorrelation-length", config.svd.stats.bins[svd_bin]["ac_length"]),
]
if "max_duration" in svd_config:
arguments.append(Option("max-duration", svd_config.max_duration))
if "sample_rate" in svd_config:
arguments.append(Option("sample-rate", svd_config.sample_rate))
layer += Node(
arguments = arguments,
inputs = [
Option("reference-psd", median_psd_cache.files),
Argument("split-banks", sorted(split_banks[svd_bin].files)),
],
outputs = Option("write-svd", svd_banks.files)
)
dag.attach(layer)
return svd_cache
[docs]def checkerboard_layer(config, dag, ref_psd_cache, svd_bank_cache):
layer = Layer(
"gstlal_svd_bank_checkerboard",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
# set up arguments
arguments = [Option("in-place"), Option("reference-psd", ref_psd_cache.files)]
if config.svd.checkerboard == "even":
arguments.append(Option("even"))
chunk_size = 20
for svd_banks in svd_bank_cache.chunked(chunk_size):
layer += Node(
arguments = arguments,
inputs = Option("svd-files", svd_banks.files),
outputs = Argument("checkered-svd-files", svd_banks.files, suppress=True),
)
dag.attach(layer)
return svd_bank_cache
[docs]def filter_layer(config, dag, ref_psd_cache, svd_bank_cache):
layer = Layer(
"gstlal_inspiral",
requirements={
"request_cpus": 2,
"request_memory": 4000,
"request_disk": "5GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
dynamic_memory=True,
)
dist_stat_cache = DataCache.generate(
DataType.DIST_STATS,
config.ifo_combos,
config.time_bins,
svd_bins=config.svd.bins,
root="filter",
)
trigger_cache = DataCache.generate(
DataType.TRIGGERS,
config.ifo_combos,
config.time_bins,
svd_bins=config.svd.bins,
root="filter",
)
common_opts = [
Option("track-psd"),
Option("data-source", "frames"),
Option("control-peak-time", 0),
Option("psd-fft-length", config.psd.fft_length),
Option("frame-segments-name", config.source.frame_segments_name),
Option("tmp-space", dagutil.condor_scratch_space()),
Option("coincidence-threshold", config.filter.coincidence_threshold),
Option("fir-stride", config.filter.fir_stride),
Option("min-instruments", config.min_ifos),
]
# disable service discovery if using singularity
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
# checkpoint by grouping SVD bins together + enable local
# frame caching if there if more than one SVD bin per group
if config.condor.transfer_files:
max_concurrency = 10
else:
max_concurrency = 20
num_per_group = min(1 + len(config.svd.bins) // 20, max_concurrency)
if num_per_group > 1:
common_opts.append(Option("local-frame-caching"))
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
dist_stats = dist_stat_cache.groupby("ifo", "time", "bin")
for (ifo_combo, span), triggers in trigger_cache.groupby("ifo", "time").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
filter_opts = [
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
]
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.time_slide_file),
]
if config.source.frame_cache:
inputs.append(Option("frame-cache", config.source.frame_cache, track=False))
else:
filter_opts.extend([
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
Option("data-find-server", config.source.data_find_server),
])
if config.source.idq_channel_name:
filter_opts.append(Option("idq-channel-name", dagutil.format_ifo_args(ifos, config.source.idq_channel_name)))
filter_opts.extend(common_opts)
if config.source.idq_channel_name and config.filter.idq_gate_threshold:
filter_opts.append(Option("idq-gate-threshold", config.filter.idq_gate_threshold))
if config.source.idq_channel_name and config.source.idq_state_channel_name:
filter_opts.append(Option("idq-state-channel-name", dagutil.format_ifo_args(ifos, config.source.idq_state_channel_name)))
for trigger_group in triggers.chunked(num_per_group):
svd_bins = trigger_group.groupby("bin").keys()
thresholds = [config.svd.stats.bins[svd_bin]["ht_gate_threshold"] for svd_bin in svd_bins]
these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]
svd_bank_files = dagutil.flatten(
[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
)
dist_stat_files = dagutil.flatten(
[dist_stats[(ifo_combo, span, svd_bin)].files for svd_bin in svd_bins]
)
layer += Node(
arguments = these_opts,
inputs = [
Option("svd-bank", svd_bank_files),
*inputs,
],
outputs = [
Option("output", trigger_group.files),
Option("ranking-stat-output", dist_stat_files),
],
)
dag.attach(layer)
return trigger_cache, dist_stat_cache
[docs]def filter_injections_layer(config, dag, ref_psd_cache, svd_bank_cache):
layer = Layer(
"gstlal_inspiral",
name="gstlal_inspiral_inj",
requirements={
"request_cpus": 2,
"request_memory": 5000,
"request_disk": "5GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
dynamic_memory=True,
)
trigger_cache = DataCache(DataType.TRIGGERS)
for inj_name, inj_args in config.filter.injections.items():
min_mchirp, max_mchirp = map(float, inj_args["range"].split(":"))
svd_bins = mchirp_range_to_bins(min_mchirp, max_mchirp, config.svd.stats)
trigger_cache += DataCache.generate(
DataType.TRIGGERS,
config.ifo_combos,
config.time_bins,
svd_bins=svd_bins,
subtype=inj_name,
root="filter",
)
common_opts = [
Option("track-psd"),
Option("data-source", "frames"),
Option("control-peak-time", 0),
Option("psd-fft-length", config.psd.fft_length),
Option("frame-segments-name", config.source.frame_segments_name),
Option("tmp-space", dagutil.condor_scratch_space()),
Option("coincidence-threshold", config.filter.coincidence_threshold),
Option("fir-stride", config.filter.fir_stride),
Option("min-instruments", config.min_ifos),
Option("injections"),
]
# disable service discovery if using singularity
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
# checkpoint by grouping SVD bins together + enable local
# frame caching if there if more than one SVD bin per group
if config.condor.transfer_files:
max_concurrency = 10
else:
max_concurrency = 20
num_per_group = min(1 + len(config.svd.bins) // 20, max_concurrency)
if num_per_group > 1:
common_opts.append(Option("local-frame-caching"))
ref_psds = ref_psd_cache.groupby("ifo", "time")
svd_banks = svd_bank_cache.groupby("ifo", "bin")
for (ifo_combo, span, inj_type), triggers in trigger_cache.groupby("ifo", "time", "subtype").items():
ifos = config.to_ifo_list(ifo_combo)
start, end = span
injection_file = config.filter.injections[inj_type.lower()]["file"]
filter_opts = [
Option("gps-start-time", int(start)),
Option("gps-end-time", int(end)),
Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)),
]
inputs = [
Option("frame-segments-file", config.source.frame_segments_file),
Option("veto-segments-file", config.filter.veto_segments_file),
Option("reference-psd", ref_psds[(ifo_combo, span)].files),
Option("time-slide-file", config.filter.injection_time_slide_file),
Option("injection-file", injection_file),
]
if config.source.frame_cache:
inputs.append(Option("frame-cache", config.source.frame_cache, track=False))
else:
filter_opts.extend([
Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)),
Option("data-find-server", config.source.data_find_server),
])
if config.source.idq_channel_name:
filter_opts.append(Option("idq-channel-name", dagutil.format_ifo_args(ifos, config.source.idq_channel_name)))
if config.source.idq_channel_name and config.filter.idq_gate_threshold:
filter_opts.append(Option("idq-gate-threshold", config.filter.idq_gate_threshold))
if config.source.idq_channel_name and config.source.idq_state_channel_name:
filter_opts.append(Option("idq-state-channel-name", dagutil.format_ifo_args(ifos, config.source.idq_state_channel_name)))
filter_opts.extend(common_opts)
for trigger_group in triggers.chunked(num_per_group):
svd_bins = trigger_group.groupby("bin").keys()
thresholds = [config.svd.stats.bins[svd_bin]["ht_gate_threshold"] for svd_bin in svd_bins]
these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts]
svd_bank_files = dagutil.flatten(
[svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins]
)
layer += Node(
arguments = these_opts,
inputs = [
Option("svd-bank", svd_bank_files),
*inputs,
],
outputs = Option("output", trigger_group.files),
)
dag.attach(layer)
return trigger_cache
[docs]def aggregate_layer(config, dag, trigger_cache, dist_stat_cache=None):
trg_layer = Layer(
"ligolw_run_sqlite",
name="cluster_triggers_by_snr",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
# FIXME: find better way of discovering SQL file
share_path = os.path.split(dagutil.which("gstlal_inspiral"))[0].replace("bin", "share/gstlal")
snr_cluster_sql_file = os.path.join(share_path, "snr_simplify_and_cluster.sql")
inj_snr_cluster_sql_file = os.path.join(share_path, "inj_snr_simplify_and_cluster.sql")
# cluster triggers by SNR
for (svd_bin, inj_type), triggers in trigger_cache.groupby("bin", "subtype").items():
trg_layer += Node(
arguments = [
Option("sql-file", inj_snr_cluster_sql_file if inj_type else snr_cluster_sql_file),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("triggers", triggers.files),
outputs = Argument("clustered-triggers", triggers.files, suppress=True),
)
dag.attach(trg_layer)
# if no dist stats files to marginalize, only return triggers
if not dist_stat_cache:
return trigger_cache
# marginalize dist stats across time
dist_layer = Layer(
"gstlal_inspiral_marginalize_likelihood",
name="marginalize_dist_stats_across_time_filter",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
agg_dist_stat_cache = DataCache.generate(
DataType.DIST_STATS,
config.all_ifos,
config.span,
svd_bins=config.svd.bins,
root="filter",
)
dist_stats = dist_stat_cache.groupby("bin")
for svd_bin, agg_dist_stats in agg_dist_stat_cache.groupby("bin").items():
dist_layer += Node(
arguments = Option("marginalize", "ranking-stat"),
inputs = Argument("dist-stats", dist_stats[svd_bin].files),
outputs = Option("output", agg_dist_stats.files)
)
dag.attach(dist_layer)
return trigger_cache, agg_dist_stat_cache
[docs]def create_prior_layer(config, dag, svd_bank_cache, median_psd_cache, dist_stat_cache=None):
layer = Layer(
"gstlal_inspiral_create_prior_diststats",
requirements={
"request_cpus": 2,
"request_memory": 4000,
"request_disk": "2GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
if "df" in config.prior:
prior_df = config.prior.df
assert prior_df in ["analytic", "bandwidth"]
else:
raise ValueError("config.prior.df is required, but not found in config. Choose from [bandwidth, analytic]")
if dist_stat_cache:
prior_cache = DataCache.generate(
DataType.PRIOR_DIST_STATS,
config.all_ifos,
config.span,
svd_bins=config.svd.bins,
root="rank",
)
else:
prior_cache = DataCache.generate(
DataType.DIST_STATS,
config.all_ifos,
config.span,
svd_bins=config.svd.bins,
)
svd_banks = svd_bank_cache.groupby("bin")
for svd_bin, prior in prior_cache.groupby("bin").items():
inputs = [
Option("svd-file", svd_banks[svd_bin].files),
*add_ranking_stat_file_options(config, svd_bin),
]
if prior_df == "bandwidth":
inputs += [Option("psd-xml", median_psd_cache.files)]
layer += Node(
arguments = [
Option("df", prior_df),
Option("background-prior", 1),
Option("instrument", config.ifos),
Option("min-instruments", config.min_ifos),
Option("coincidence-threshold", config.filter.coincidence_threshold),
],
inputs = inputs,
outputs = Option("write-likelihood", prior.files),
)
dag.attach(layer)
return prior_cache
[docs]def marginalize_layer(config, dag, prior_cache, dist_stat_cache):
layer = Layer(
"gstlal_inspiral_marginalize_likelihood",
name="marginalize_dist_stats_across_time_rank",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
marg_dist_stat_cache = DataCache.generate(
DataType.MARG_DIST_STATS,
config.all_ifos,
config.span,
svd_bins=config.svd.bins,
root="rank",
)
prior = prior_cache.groupby("bin")
dist_stats = dist_stat_cache.groupby("bin")
for svd_bin, marg_dist_stats in marg_dist_stat_cache.groupby("bin").items():
layer += Node(
arguments = Option("marginalize", "ranking-stat"),
inputs = [
Argument("dist-stats", dist_stats[svd_bin].files + prior[svd_bin].files),
*add_ranking_stat_file_options(config, svd_bin, transfer_only=True),
],
outputs = Option("output", marg_dist_stats.files)
)
dag.attach(layer)
return marg_dist_stat_cache
[docs]def calc_pdf_layer(config, dag, dist_stat_cache):
# FIXME: expose this in configuration
num_cores = 4
layer = Layer(
"gstlal_inspiral_calc_rank_pdfs",
requirements={
"request_cpus": num_cores,
"request_memory": 3000,
"request_disk": "2GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
pdf_cache = DataCache.generate(
DataType.DIST_STAT_PDFS,
config.all_ifos,
config.span,
svd_bins=config.svd.bins,
root="rank",
)
dist_stats = dist_stat_cache.groupby("bin")
for svd_bin, pdfs in pdf_cache.groupby("bin").items():
layer += Node(
arguments = [
Option("ranking-stat-samples", config.rank.ranking_stat_samples),
Option("num-cores", num_cores),
],
inputs = [
Argument("dist-stats", dist_stats[svd_bin].files),
*add_ranking_stat_file_options(config, svd_bin, transfer_only=True),
],
outputs = Option("output", pdfs.files)
)
dag.attach(layer)
return pdf_cache
[docs]def marginalize_pdf_layer(config, dag, pdf_cache):
round1_layer = Layer(
"gstlal_inspiral_marginalize_likelihood",
name="gstlal_inspiral_marginalize_pdfs_round_one",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
round2_layer = Layer(
"gstlal_inspiral_marginalize_likelihood",
name="gstlal_inspiral_marginalize_pdfs_round_two",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
marg_pdf_cache = DataCache.generate(
DataType.DIST_STAT_PDFS,
config.all_ifos,
config.span,
root="rank",
)
# if number of bins is large, combine in two stages instead
if len(config.svd.bins) > DEFAULT_MAX_FILES:
partial_pdf_files = []
for pdf_subset in pdf_cache.chunked(DEFAULT_MAX_FILES):
# determine bin range and determine partial file name
svd_bins = list(pdf_subset.groupby("bin").keys())
min_bin, max_bin = min(svd_bins), max(svd_bins)
partial_pdf_filename = pdf_subset.name.filename(
config.all_ifos,
config.span,
svd_bin=f"{min_bin}_{max_bin}",
)
partial_pdf_path = os.path.join(
pdf_subset.name.directory(root="rank", start=config.span[0]),
partial_pdf_filename,
)
partial_pdf_files.append(partial_pdf_path)
# combine across subset of bins (stage 1)
round1_layer += Node(
arguments = Option("marginalize", "ranking-stat-pdf"),
inputs = Argument("dist-stat-pdfs", pdf_subset.files),
outputs = Option("output", partial_pdf_path)
)
# combine across all bins (stage 2)
round2_layer += Node(
arguments = Option("marginalize", "ranking-stat-pdf"),
inputs = Argument("dist-stat-pdfs", partial_pdf_files),
outputs = Option("output", marg_pdf_cache.files)
)
else:
round1_layer += Node(
arguments = Option("marginalize", "ranking-stat-pdf"),
inputs = Argument("dist-stat-pdfs", pdf_cache.files),
outputs = Option("output", marg_pdf_cache.files)
)
dag.attach(round1_layer)
if round2_layer.nodes:
dag.attach(round2_layer)
return marg_pdf_cache
[docs]def calc_likelihood_layer(config, dag, trigger_cache, dist_stat_cache):
layer = Layer(
"gstlal_inspiral_calc_likelihood",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
# assign likelihood to triggers
calc_trigger_cache = trigger_cache.copy(root="rank")
calc_triggers = calc_trigger_cache.groupby("bin", "subtype", "dirname")
dist_stats = dist_stat_cache.groupby("bin")
for (svd_bin, inj_type, dirname), triggers in trigger_cache.groupby("bin", "subtype", "dirname").items():
# find path relative to current directory
# where assigned triggers will reside
split_dirname = dirname.split(os.sep)
dir_idx = split_dirname.index("filter")
calc_dirname = os.path.join(*split_dirname[dir_idx:]).replace("filter", "rank")
arguments = [
Option("force"),
Option("copy"),
Option("tmp-space", dagutil.condor_scratch_space()),
]
# if file transfer not enabled, need to specify directory to
# copy triggers into as remaps aren't relevant here
if not config.condor.transfer_files:
arguments.append(Option("copy-dir", calc_dirname))
layer += Node(
arguments = arguments,
inputs = [
Argument("triggers", triggers.files),
Option("likelihood-url", dist_stats[svd_bin].files),
*add_ranking_stat_file_options(config, svd_bin, transfer_only=True),
],
outputs = Argument(
"calc-triggers",
calc_triggers[(svd_bin, inj_type, calc_dirname)].files,
suppress_with_remap=True
),
)
dag.attach(layer)
return calc_trigger_cache
[docs]def cluster_layer(config, dag, trigger_cache, injection_cache):
# cluster triggers by likelihood
combine_round1_layer = Layer(
"ligolw_add",
name="combine_triggers_across_bins_round_one",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
combine_round2_layer = Layer(
"ligolw_add",
name="combine_triggers_across_bins_round_two",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
cluster_round1_layer = Layer(
"ligolw_run_sqlite",
name="cluster_triggers_by_likelihood_round_one",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
cluster_round2_layer = Layer(
"ligolw_run_sqlite",
name="cluster_triggers_by_likelihood_round_two",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
sqlite_layer = Layer(
"ligolw_sqlite",
name="convert_triggers_to_sqlite",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
# set up data caches
inj_types = list(trigger_cache.groupby("subtype").keys())
combined_trigger_cache = DataCache.generate(
DataType.TRIGGERS,
config.ifo_combos,
config.time_bins,
subtype=inj_types,
root="rank",
)
trigger_db_cache = DataCache.generate(
DataType.TRIGGERS,
config.all_ifos,
config.time_boundaries,
subtype=inj_types,
extension="sqlite",
root="rank"
)
# FIXME: find better way of discovering SQL file
share_path = os.path.split(dagutil.which("gstlal_inspiral"))[0].replace("bin", "share/gstlal")
cluster_sql_file = os.path.join(share_path, "simplify_and_cluster.sql")
inj_cluster_sql_file = os.path.join(share_path, "inj_simplify_and_cluster.sql")
# combine/cluster triggers across SVD bins
# if triggers are from an injection job, also add in the injections
combined_triggers_across_bins = combined_trigger_cache.groupby("time", "subtype")
for (span, inj_type), triggers in trigger_cache.groupby("time", "subtype").items():
combined_triggers = combined_triggers_across_bins[(span, inj_type)]
# if number of bins is large, combine in two stages instead
if len(config.svd.bins) > DEFAULT_MAX_FILES:
partial_trigger_files = []
for trigger_subset in triggers.chunked(DEFAULT_MAX_FILES):
# determine bin range and determine partial file name
svd_bins = list(trigger_subset.groupby("bin").keys())
min_bin, max_bin = min(svd_bins), max(svd_bins)
partial_trigger_filename = trigger_subset.name.filename(
config.all_ifos,
span,
svd_bin=f"{min_bin}_{max_bin}",
subtype=inj_type,
)
partial_trigger_path = os.path.join(
trigger_subset.name.directory(root="rank", start=span[0]),
partial_trigger_filename,
)
partial_trigger_files.append(partial_trigger_path)
# combine across subset of bins (stage 1)
combine_round1_layer += Node(
inputs = Argument("inputs", trigger_subset.files),
outputs = Option("output", partial_trigger_path),
)
# combine across all bins (stage 2)
combine_round2_layer += Node(
inputs = Argument("inputs", partial_trigger_files),
outputs = Option("output", combined_triggers.files),
)
else:
combine_round1_layer += Node(
inputs = Argument("inputs", triggers.files),
outputs = Option("output", combined_triggers.files),
)
# cluster by likelihood
cluster_round1_layer += Node(
arguments = [
Option("sql-file", inj_cluster_sql_file if inj_type else cluster_sql_file),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("triggers", combined_triggers.files),
outputs = Argument("calc-triggers", combined_triggers.files, suppress=True),
)
dag.attach(combine_round1_layer)
if combine_round2_layer.nodes:
dag.attach(combine_round2_layer)
dag.attach(cluster_round1_layer)
# combine/cluster triggers across time
injections = injection_cache.groupby("subtype")
combined_triggers_by_time = combined_trigger_cache.groupby("subtype")
for inj_type, trigger_dbs_by_time in trigger_db_cache.groupby("subtype").items():
for span, trigger_dbs in trigger_dbs_by_time.groupby_bins("time", config.time_boundaries).items():
combined_triggers = combined_triggers_by_time[inj_type].groupby_bins("time", config.time_boundaries)
# add input files for sqlite jobs
xml_files = []
if inj_type:
xml_files.extend(injections[inj_type].files)
xml_files.extend(combined_triggers[span].files)
xml_files.append(config.source.frame_segments_file)
# convert triggers to sqlite
sqlite_layer += Node(
arguments = [
Option("replace"),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("xml-files", xml_files),
outputs = Option("database", trigger_dbs.files),
)
# cluster by likelihood
cluster_round2_layer += Node(
arguments = [
Option("sql-file", inj_cluster_sql_file if inj_type else cluster_sql_file),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("triggers", trigger_dbs.files),
outputs = Argument("calc-triggers", trigger_dbs.files, suppress=True),
)
dag.attach(sqlite_layer)
dag.attach(cluster_round2_layer)
return trigger_db_cache
[docs]def find_injections_layer(config, dag, trigger_db_cache):
inj_sqlite_to_xml_layer = Layer(
"ligolw_sqlite",
name="inj_sqlite_to_xml",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
injfind_layer = Layer(
"lalinspiral_injfind",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
inj_xml_to_sqlite_layer = Layer(
"ligolw_sqlite",
name="inj_xml_to_sqlite",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
# set up data caches
grouped_trigger_dbs = trigger_db_cache.groupby("subtype")
if "" in grouped_trigger_dbs:
grouped_trigger_dbs.pop("")
inj_trigger_cache = DataCache.generate(
DataType.TRIGGERS,
config.all_ifos,
config.time_boundaries,
subtype=grouped_trigger_dbs.keys(),
root="rank",
)
# generate layers
for inj_type, triggers_by_time in inj_trigger_cache.groupby("subtype").items():
for span, triggers in triggers_by_time.groupby_bins("time", config.time_boundaries).items():
trigger_dbs = grouped_trigger_dbs[inj_type].groupby_bins("time", config.time_boundaries)
# convert triggers to XML
inj_sqlite_to_xml_layer += Node(
arguments = [
Option("replace"),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Option("database", trigger_dbs[span].files),
outputs = Option("extract", triggers.files),
)
# find injections
injfind_layer += Node(
arguments = Option("time-window", 0.9),
inputs = Argument("triggers", triggers.files),
outputs = Argument("inj-triggers", triggers.files, suppress=True),
)
# convert triggers back to sqlite
inj_xml_to_sqlite_layer += Node(
arguments = [
Option("replace"),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("triggers", triggers.files),
outputs = Option("database", trigger_dbs[span].files),
)
dag.attach(inj_sqlite_to_xml_layer)
dag.attach(injfind_layer)
dag.attach(inj_xml_to_sqlite_layer)
return trigger_db_cache, inj_trigger_cache
[docs]def compute_far_layer(config, dag, trigger_cache, pdf_cache):
collect_zerolag_layer = Layer(
"gstlal_inspiral_diststatpdf_collect_zerolag",
name="collect_zerolag",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
assign_far_layer = Layer(
"gstlal_compute_far_from_snr_chisq_histograms",
name="compute_far",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
post_pdf_cache = DataCache.generate(
DataType.POST_DIST_STAT_PDFS,
config.all_ifos,
config.span,
root="rank",
)
# split trigger cache into injections and non-injections
grouped_triggers = trigger_cache.groupby("subtype")
noninj_trigger_cache = grouped_triggers.pop("")
inj_trigger_cache = DataCache(
trigger_cache.name,
list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])),
)
for span, noninj_triggers in noninj_trigger_cache.groupby("time").items():
collect_zerolag_layer += Node(
inputs = [
Option("background-bins-file", pdf_cache.files),
Option("non-injection-db", noninj_triggers.files),
],
outputs = [Option("output-background-bins-file", post_pdf_cache.files)]
)
inputs = [
Option("databases", noninj_triggers.files),
Option("background-bins-file", post_pdf_cache.files),
]
assign_far_layer += Node(
arguments = Option("tmp-space", dagutil.condor_scratch_space()),
inputs = inputs,
outputs = [
Argument("databases", noninj_triggers.files, suppress=True),
]
)
for span, inj_triggers in inj_trigger_cache.groupby("time").items():
inputs = [
Option("databases", inj_triggers.files),
Option("background-bins-file", post_pdf_cache.files),
]
assign_far_layer += Node(
arguments = Option("tmp-space", dagutil.condor_scratch_space()),
inputs = inputs,
outputs = [
Argument("databases", inj_triggers.files, suppress=True),
]
)
dag.attach(collect_zerolag_layer)
dag.attach(assign_far_layer)
return trigger_cache, post_pdf_cache
[docs]def split_injections_layer(config, dag):
layer = Layer(
"gstlal_injsplitter",
name="split_injections",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
# split up injections across time, tuned for job runtime
num_splits = time_to_num_split_injections(config.span)
# split injections up
injection_cache = DataCache(DataType.SPLIT_INJECTIONS)
for inj_type, inj_params in config.filter.injections.items():
inj_tag = inj_type.upper()
inj_file = inj_params["file"]
# infer metadata from injection filename
try:
entry = CacheEntry.from_T050017(inj_file)
except ValueError:
ifos = "H1K1L1V1"
span = segment(0, 0)
else:
ifos = entry.observatory
span = entry.segment
# create split injection jobs
split_injections = DataCache.generate(
DataType.SPLIT_INJECTIONS,
ifos,
span,
svd_bins=[f"{i:04d}" for i in range(num_splits)],
subtype=inj_tag,
root="filter",
)
injection_cache += split_injections
out_path = DataType.SPLIT_INJECTIONS.directory(root="filter", start=span[0])
layer += Node(
arguments = [
Option("nsplit", num_splits),
Option("usertag", inj_tag),
],
inputs = Argument("injection-file", inj_file),
outputs = [
Option("output-path", out_path, remap=False),
Argument("split-injections", split_injections.files, suppress=True),
],
)
dag.attach(layer)
return injection_cache
[docs]def match_injections_layer(config, dag, injection_cache):
layer = Layer(
"gstlal_inspiral_injection_template_match",
name="match_injections",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
# split up injections across time, tuned for job runtime
num_splits = time_to_num_split_injections(config.span)
inj_types = list(config.filter.injections.keys())
match_inj_cache = DataCache.generate(
DataType.MATCHED_INJECTIONS,
config.all_ifos,
config.span,
svd_bins=[f"{i:04d}" for i in range(num_splits)],
subtype=[inj_type.upper() for inj_type in inj_types],
root="filter",
)
if isinstance(config.data.template_bank, Mapping):
template_banks = list(config.data.template_bank.values())
else:
template_banks = config.data.template_bank
# match injections to templates
matched_injections = match_inj_cache.groupby("subtype", "bin")
for (inj_type, split_bin), injections in injection_cache.groupby("subtype", "bin").items():
layer += Node(
inputs = [
Option("injection-file", injections.files),
Option("template-bank", template_banks),
],
outputs = Option("output", matched_injections[(inj_type, split_bin)].files),
)
dag.attach(layer)
return match_inj_cache
[docs]def calc_expected_snr_layer(config, dag, psd_cache, injection_cache):
layer = Layer(
"gstlal_inspiral_injection_snr",
name="calc_expected_snr",
requirements={
"request_cpus": 2,
"request_memory": 2000,
"request_disk": "2GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
arguments = []
if hasattr(config, "injections") and config.injections.expected_snr:
if "f_low" in config.injections.expected_snr:
arguments.append(Option("flow", config.injections.expected_snr.f_low))
if "f_high" in config.injections.expected_snr:
arguments.append(Option("fmax", config.injections.expected_snr.f_high))
# calculate expected snr for injections
for (inj_type, split_bin), injections in injection_cache.groupby("subtype", "bin").items():
layer += Node(
arguments = arguments,
inputs = [
Option("injection-file", injections.files),
Option("reference-psd", psd_cache.files),
],
outputs = Argument("calc-injection-file", injections.files, suppress=True),
)
dag.attach(layer)
return injection_cache
[docs]def measure_lnlr_cdf_layer(config, dag, dist_stats_cache, injection_cache):
layer = Layer(
"gstlal_inspiral_lnlrcdf_signal",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
lnlr_cdf_cache = DataCache(DataType.LNLR_SIGNAL_CDF)
for (inj_type, split_bin), injections in injection_cache.groupby("subtype", "bin").items():
for chunk_bin, dist_stats in enumerate(dist_stats_cache.chunked(20)):
lnlr_cdfs = DataCache.generate(
DataType.LNLR_SIGNAL_CDF,
config.all_ifos,
config.span,
svd_bins=f"{split_bin}_{chunk_bin:04d}",
subtype=inj_type,
root="rank",
)
layer += Node(
inputs = [
Option("injection-template-match-file", injections.files),
Option("likelihood-url", dist_stats.files),
*add_ranking_stat_file_options(config, transfer_only=True),
],
outputs = Option("output-file", lnlr_cdfs.files),
)
lnlr_cdf_cache += lnlr_cdfs
dag.attach(layer)
return lnlr_cdf_cache
[docs]def plot_analytic_vt_layer(config, dag, trigger_cache, pdf_cache, lnlr_cdf_cache):
trg_layer = Layer(
"gstlal_inspiral_make_mc_vtplot",
name="plot_mc_vt_triggers",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
inj_layer = Layer(
"gstlal_inspiral_make_mc_vtplot",
name="plot_mc_vt_injections",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
triggers = trigger_cache.groupby("subtype")
triggers.pop("")
# make plots
for inj_type, lnlr_cdfs in lnlr_cdf_cache.groupby("subtype").items():
injection_file = config.filter.injections[inj_type.lower()]["file"]
trg_layer += Node(
arguments = [
Option("check-vt"),
Option("instrument", config.all_ifos),
],
inputs = [
Argument("lnlr-cdfs", lnlr_cdfs.files),
Option("ranking-stat-pdf", pdf_cache.files),
Option("injection-database", triggers[inj_type].files),
],
outputs = Option("output-dir", "plots"),
)
inj_layer += Node(
arguments = Option("instrument", config.all_ifos),
inputs = [
Argument("lnlr-cdfs", lnlr_cdfs.files),
Option("ranking-stat-pdf", pdf_cache.files),
Option("injection-files", os.path.join(config.data.analysis_dir, injection_file)),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(trg_layer)
dag.attach(inj_layer)
[docs]def plot_horizon_distance_layer(config, dag, marg_dist_stat_caches):
layer = Layer(
"gstlal_inspiral_plot_rankingstats_horizon",
name="plot_horizon_distance",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
)
layer += Node(
inputs = [
Argument("rankingstats", marg_dist_stat_caches.files),
Argument("mass-model", config.prior.mass_model, track=False, suppress=True),
],
outputs = Option("outdir", "plots"),
)
dag.attach(layer)
[docs]def plot_summary_layer(config, dag, trigger_cache, post_pdf_cache):
requirements = {
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
}
# common plot options
common_plot_args = [
Option("segments-name", config.source.frame_segments_name),
Option("tmp-space", dagutil.condor_scratch_space()),
Option("shrink-data-segments", 32.0),
Option("extend-veto-segments", 8.),
]
# split trigger cache into injections and non-injections
grouped_triggers = trigger_cache.groupby("subtype")
noninj_trigger_cache = grouped_triggers.pop("")
inj_trigger_cache = DataCache(
trigger_cache.name,
list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])),
)
# plot summary job
layer = Layer(
"gstlal_inspiral_plotsummary",
name="summary_plots",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
layer += Node(
arguments = [
Option("user-tag", "ALL_COMBINED"),
Option("remove-precession"),
*common_plot_args,
],
inputs = [
Argument("input-files", trigger_cache.files),
Option("likelihood-file", post_pdf_cache.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
# precession plot summary job
layer = Layer(
"gstlal_inspiral_plotsummary",
name="summary_plots_precession",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
layer += Node(
arguments = [
Option("user-tag", "PRECESSION_COMBINED"),
Option("isolate-precession"),
Option("plot-group", 1),
*common_plot_args,
],
inputs = [
Argument("input-files", trigger_cache.files),
Option("likelihood-file", post_pdf_cache.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
# single injection plot summary jobs
layer = Layer(
"gstlal_inspiral_plotsummary",
name="sngl_injection_summary_plots",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items():
layer += Node(
arguments = [
Option("user-tag", f"{inj_type}_INJECTION"),
Option("remove-precession"),
Option("plot-group", 1),
*common_plot_args,
],
inputs = [
Argument("input-files", noninj_trigger_cache.files + inj_triggers.files),
Option("likelihood-file", post_pdf_cache.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
# single injection precession plot summary jobs
layer = Layer(
"gstlal_inspiral_plotsummary",
name="sngl_injection_precession_summary_plots",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items():
layer += Node(
arguments = [
Option("user-tag", f"{inj_type}_INJECTION_PRECESSION"),
Option("isolate-precession"),
Option("plot-group", 1),
*common_plot_args,
],
inputs = [
Argument("input-files", noninj_trigger_cache.files + inj_triggers.files),
Option("likelihood-file", post_pdf_cache.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
[docs]def plot_sensitivity_layer(config, dag, trigger_cache):
requirements = {
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
}
layer = Layer(
"gstlal_inspiral_plot_sensitivity",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
# common options
common_args = [
Option("tmp-space", dagutil.condor_scratch_space()),
Option("veto-segments-name", "vetoes"),
Option("bin-by-source-type"),
Option("dist-bins", 200),
Option("data-segments-name", "datasegments"),
]
# split trigger cache into injections and non-injections
grouped_triggers = trigger_cache.groupby("subtype")
noninj_trigger_cache = grouped_triggers.pop("")
inj_trigger_cache = DataCache(
trigger_cache.name,
list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])),
)
layer += Node(
arguments = [
Option("user-tag", "ALL_COMBINED"),
*common_args,
],
inputs = [
Option("zero-lag-database", noninj_trigger_cache.files),
Argument("injection-database", inj_trigger_cache.files),
],
outputs = Option("output-dir", "plots"),
)
for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items():
layer += Node(
arguments = [
Option("user-tag", f"{inj_type}_INJECTIONS"),
*common_args,
],
inputs = [
Option("zero-lag-database", noninj_trigger_cache.files),
Argument("injection-database", inj_triggers.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
[docs]def plot_background_layer(config, dag, trigger_cache, post_pdf_cache):
requirements = {
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
}
layer = Layer(
"gstlal_inspiral_plot_background",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
non_inj_triggers = trigger_cache.groupby("subtype")[""]
layer += Node(
arguments = [
Option("user-tag", "ALL_COMBINED"),
],
inputs = [
Option("database", non_inj_triggers.files),
Argument("post-marg-file", post_pdf_cache.files),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
[docs]def plot_bin_background_layer(config, dag, marg_dist_stat_cache):
requirements = {
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
}
layer = Layer(
"gstlal_inspiral_plot_background",
name="gstlal_inspiral_plot_bin_background",
requirements=requirements,
transfer_files=config.condor.transfer_files,
)
for marg_dist_stats in marg_dist_stat_cache.chunked(PLOT_BIN_BACKGROUND_MAX_FILES):
layer += Node(
inputs = [
Argument("marg-files", marg_dist_stats.files),
*add_ranking_stat_file_options(config, transfer_only=True),
],
outputs = Option("output-dir", "plots"),
)
dag.attach(layer)
[docs]def filter_online_layer(config, dag, svd_bank_cache, dist_stat_cache, zerolag_pdf_cache, marg_pdf_cache):
layer = Layer(
"gstlal_inspiral",
requirements={
"request_cpus": 1,
"request_memory": 5000,
"request_disk": "2GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
retries=1000,
)
# set up datasource options
if config.source.data_source == "framexmit":
datasource_opts = [
Option("data-source", "framexmit"),
Option("framexmit-addr", dagutil.format_ifo_args(config.ifos, config.source.framexmit_addr)),
Option("framexmit-iface", config.source.framexmit_iface),
]
elif config.source.data_source == "lvshm":
datasource_opts = [
Option("data-source", "lvshm"),
Option("shared-memory-partition", dagutil.format_ifo_args(config.ifos, config.source.shared_memory_partition)),
Option("shared-memory-block-size", config.source.shared_memory_block_size),
Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
]
elif config.source.data_source == "devshm":
datasource_opts = [
Option("data-source", "devshm"),
Option("shared-memory-dir", dagutil.format_ifo_args(config.ifos, config.source.shared_memory_dir)),
Option("shared-memory-block-size", config.source.shared_memory_block_size),
Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
]
else:
raise ValueError(f"data source = {config.source.data_source} not valid for online jobs")
# set up common options
common_opts = [
Option("track-psd"),
Option("control-peak-time", 0),
Option("psd-fft-length", config.psd.fft_length),
Option("channel-name", dagutil.format_ifo_args(config.ifos, config.source.channel_name)),
Option("state-channel-name", dagutil.format_ifo_args(config.ifos, config.source.state_channel_name)),
Option("dq-channel-name", dagutil.format_ifo_args(config.ifos, config.source.dq_channel_name)),
Option("state-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_on_bits)),
Option("state-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_off_bits)),
Option("dq-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_on_bits)),
Option("dq-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_off_bits)),
Option("tmp-space", dagutil.condor_scratch_space()),
Option("coincidence-threshold", config.filter.coincidence_threshold),
Option("fir-stride", config.filter.fir_stride),
Option("min-instruments", config.min_ifos),
Option("analysis-tag", config.tag),
Option("gracedb-far-threshold", config.upload.gracedb_far_threshold),
Option("gracedb-group", config.upload.gracedb_group),
Option("gracedb-pipeline", config.upload.gracedb_pipeline),
Option("gracedb-search", config.upload.gracedb_search),
Option("gracedb-label", config.upload.gracedb_label),
Option("gracedb-service-url", config.upload.gracedb_service_url),
Option("far-trials-factor", config.upload.far_trials_factor),
Option("likelihood-snapshot-interval", config.filter.likelihood_snapshot_interval),
]
if config.services.kafka_server:
common_opts.append(Option("output-kafka-server", config.services.kafka_server)),
if config.upload.before_merger:
common_opts.append(Option("upload-time-before-merger"))
if config.upload.delay_uploads:
common_opts.append(Option("delay-uploads"))
if config.filter.cap_singles:
common_opts.append(Option("cap-singles"))
# set up activation counts if provided
if config.filter.activation_counts_file:
common_opts.append(Option("activation-counts-file", config.filter.activation_counts_file))
# compress ranking stat if requested
if config.filter.compress_ranking_stat:
common_opts.extend([
Option("compress-ranking-stat"),
Option("compress-ranking-stat-threshold", config.filter.compress_ranking_stat_threshold),
])
# disable service discovery if using singularity
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
if config.filter.verbose:
common_opts.extend([
Option("verbose")
])
# if using idq, add idq channel names for each ifo
if config.source.idq_channel_name:
common_opts.append(Option("idq-channel-name", dagutil.format_ifo_args(config.source.idq_channel_name.keys(), config.source.idq_channel_name)))
if config.source.idq_channel_name and config.filter.idq_gate_threshold:
common_opts.append(Option("idq-gate-threshold", config.filter.idq_gate_threshold))
if config.source.idq_channel_name and config.source.idq_state_channel_name:
common_opts.append(Option("idq-state-channel-name", dagutil.format_ifo_args(config.source.idq_state_channel_name.keys(), config.source.idq_state_channel_name)))
dist_stats = dist_stat_cache.groupby("bin")
zerolag_pdfs = zerolag_pdf_cache.groupby("bin")
for svd_bin, svd_banks in svd_bank_cache.groupby("bin").items():
job_tag = f"{int(svd_bin):04d}_noninj"
filter_opts = [
Option("job-tag", job_tag),
Option("ht-gate-threshold", config.svd.stats.bins[svd_bin]["ht_gate_threshold"]),
]
filter_opts.extend(common_opts)
filter_opts.extend(datasource_opts)
layer += Node(
arguments = filter_opts,
inputs = [
Option("svd-bank", svd_banks.files),
Option("reference-psd", config.data.reference_psd),
Option("time-slide-file", config.filter.time_slide_file),
Option("ranking-stat-input", dist_stats[svd_bin].files),
Option("ranking-stat-pdf", marg_pdf_cache.files),
],
outputs = [
Option("output", "/dev/null"),
Option("ranking-stat-output", dist_stats[svd_bin].files),
Option("zerolag-rankingstat-pdf", zerolag_pdfs[svd_bin].files),
],
)
dag.attach(layer)
[docs]def filter_injections_online_layer(config, dag, svd_bank_cache, dist_stat_cache, zerolag_pdf_cache, marg_pdf_cache):
layer = Layer(
"gstlal_inspiral",
name = "gstlal_inspiral_inj",
requirements={
"request_cpus": 1,
"request_memory": 5000,
"request_disk": "2GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
retries=1000,
)
if config.source.data_source == "framexmit":
datasource_opts = [
Option("data-source", "framexmit"),
Option("framexmit-addr", dagutil.format_ifo_args(config.ifos, config.source.framexmit_addr)),
Option("framexmit-iface", config.source.framexmit_iface),
]
elif config.source.data_source == "lvshm":
datasource_opts = [
Option("data-source", "lvshm"),
Option("shared-memory-partition", dagutil.format_ifo_args(config.ifos, config.source.shared_memory_partition)),
Option("shared-memory-block-size", config.source.shared_memory_block_size),
Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
]
elif config.source.data_source == "devshm":
datasource_opts = [
Option("data-source", "devshm"),
Option("shared-memory-dir", dagutil.format_ifo_args(config.ifos, config.source.inj_shared_memory_dir)),
Option("shared-memory-block-size", config.source.shared_memory_block_size),
Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
]
else:
raise ValueError(f"data source = {config.source.data_source} not valid for online jobs")
channels = dagutil.format_ifo_args(config.ifos, config.source.inj_channel_name) if config.source.inj_channel_name else dagutil.format_ifo_args(config.ifos, config.source.channel_name)
common_opts = [
Option("track-psd"),
Option("control-peak-time", 0),
Option("psd-fft-length", config.psd.fft_length),
Option("channel-name", channels),
Option("state-channel-name", dagutil.format_ifo_args(config.ifos, config.source.state_channel_name)),
Option("dq-channel-name", dagutil.format_ifo_args(config.ifos, config.source.dq_channel_name)),
Option("state-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_on_bits)),
Option("state-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_off_bits)),
Option("dq-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_on_bits)),
Option("dq-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_off_bits)),
Option("tmp-space", dagutil.condor_scratch_space()),
Option("coincidence-threshold", config.filter.coincidence_threshold),
Option("fir-stride", config.filter.fir_stride),
Option("min-instruments", config.min_ifos),
Option("analysis-tag", config.tag),
Option("gracedb-far-threshold", config.upload.gracedb_far_threshold),
Option("gracedb-group", config.upload.gracedb_group),
Option("gracedb-pipeline", config.upload.gracedb_pipeline),
Option("gracedb-search", config.upload.gracedb_search),
Option("gracedb-label", config.upload.gracedb_label),
Option("gracedb-service-url", config.upload.gracedb_service_url),
Option("far-trials-factor", config.upload.far_trials_factor),
Option("likelihood-snapshot-interval", config.filter.likelihood_snapshot_interval),
Option("injections")
]
if config.services.kafka_server:
common_opts.append(Option("output-kafka-server", config.services.kafka_server))
if config.upload.before_merger:
common_opts.append(Option("upload-time-before-merger"))
if config.upload.delay_uploads:
common_opts.append(Option("delay-uploads"))
if config.filter.cap_singles:
common_opts.append(Option("cap-singles"))
if config.filter.activation_counts_file:
common_opts.append(Option("activation-counts-file", config.filter.activation_counts_file))
if config.filter.compress_ranking_stat:
common_opts.extend([
Option("compress-ranking-stat"),
Option("compress-ranking-stat-threshold", config.filter.compress_ranking_stat_threshold),
])
if config.condor.singularity_image:
common_opts.append(Option("disable-service-discovery"))
if config.filter.verbose:
common_opts.extend([
Option("verbose")
])
#if using idq, add channel names for each ifo
if config.source.idq_channel_name:
common_opts.append(Option("idq-channel-name", dagutil.format_ifo_args(config.source.idq_channel_name.keys(), config.source.idq_channel_name)))
if config.source.idq_channel_name and config.filter.idq_gate_threshold:
common_opts.append(Option("idq-gate-threshold", config.filter.idq_gate_threshold))
if config.source.idq_channel_name and config.source.idq_state_channel_name:
common_opts.append(Option("idq-state-channel-name", dagutil.format_ifo_args(config.source.idq_state_channel_name.keys(), config.source.idq_state_channel_name)))
dist_stats = dist_stat_cache.groupby("bin")
zerolag_pdfs = zerolag_pdf_cache.groupby("bin")
for svd_bin, svd_banks in svd_bank_cache.groupby("bin").items():
filter_opts = [
Option("ht-gate-threshold", config.svd.stats.bins[svd_bin]["ht_gate_threshold"]),
]
filter_opts.extend(common_opts)
filter_opts.extend(datasource_opts)
for inj_idx, inj_name in enumerate(config.filter.injections.keys(), start=1):
if config.filter.injections[inj_name].file:
injection_file = config.filter.injections[inj_name].file
else:
injection_file = None
inj_job_tag = f"{int(svd_bin):04d}_inj_{inj_name}"
injection_opts = [
Option("job-tag", inj_job_tag),
*filter_opts,
]
injection_inputs = [
Option("svd-bank", svd_banks.files),
Option("reference-psd", config.data.reference_psd),
Option("time-slide-file", config.filter.injection_time_slide_file),
Option("ranking-stat-input", dist_stats[svd_bin].files),
Option("ranking-stat-pdf", marg_pdf_cache.files),
]
if injection_file: injection_inputs.extend([Option("injection-file", injection_file)])
layer += Node(
arguments = injection_opts,
inputs = injection_inputs,
outputs = Option("output", "/dev/null"),
)
dag.attach(layer)
[docs]def marginalize_online_layer(config, dag, marg_pdf_cache):
layer = Layer(
"gstlal_inspiral_marginalize_likelihoods_online",
requirements={
"request_cpus": 2,
"request_memory": 4000,
"request_disk": "5GB",
**config.condor.submit},
transfer_files=config.condor.transfer_files,
retries=1000,
)
registries = list(f"{int(svd_bin):04d}_noninj_registry.txt" for svd_bin in config.svd.bins)
arguments = [
Option("registry", registries),
Option("output", list(marg_pdf_cache.files)),
Option("output-kafka-server", config.services.kafka_server),
Option("tag", config.tag),
Option("verbose"),
]
layer += Node(arguments = arguments)
dag.attach(layer)
[docs]def upload_events_layer(config, dag):
layer = Layer(
"gstlal_ll_inspiral_event_uploader",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
retries=1000,
)
input_topics = ["events", "inj_events"] if config.upload.enable_injection_uploads else ["events"]
for input_topic in input_topics:
layer += Node(
arguments = [
Option("kafka-server", config.services.kafka_server),
Option("gracedb-group", config.upload.gracedb_group),
Option("gracedb-pipeline", config.upload.gracedb_pipeline),
Option("gracedb-search", config.upload.gracedb_search),
Option("gracedb-service-url", config.upload.gracedb_service_url),
Option("far-threshold", config.upload.aggregator_far_threshold),
Option("far-trials-factor", config.upload.aggregator_far_trials_factor),
Option("upload-cadence-type", config.upload.aggregator_cadence_type),
Option("upload-cadence-factor", config.upload.aggregator_cadence_factor),
Option("num-jobs", len(config.svd.bins)),
Option("tag", config.tag),
Option("input-topic", input_topic),
Option("rootdir", "event_uploader"),
Option("verbose"),
Option("scald-config", config.metrics.scald_config),
],
)
dag.attach(layer)
[docs]def upload_pastro_layer(config, dag, marg_pdf_cache):
layer = Layer(
"gstlal_ll_inspiral_pastro_uploader",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
retries=1000,
)
input_topics = ["uploads", "inj_uploads"] if config.upload.enable_injection_uploads else ["uploads"]
for input_topic in input_topics:
for model, options in config.pastro.items():
arguments = [
Option("kafka-server", config.services.kafka_server),
Option("gracedb-service-url", config.upload.gracedb_service_url),
Option("tag", config.tag),
Option("input-topic", input_topic),
Option("model-name", model),
Option("pastro-filename", options.upload_file),
Option("pastro-model-file", options.mass_model),
Option("rank-stat", marg_pdf_cache.files),
Option("verbose"),
]
if options.update_model_cadence:
arguments.append(Option("update-model-cadence", options.update_model_cadence))
layer += Node(
arguments = arguments
)
dag.attach(layer)
[docs]def plot_events_layer(config, dag):
layer = Layer(
"gstlal_ll_inspiral_event_plotter",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
retries=1000,
)
upload_topics = ["uploads", "inj_uploads"] if config.upload.enable_injection_uploads else ["uploads"]
ranking_stat_topics = ["ranking_stat", "inj_ranking_stat"] if config.upload.enable_injection_uploads else ["ranking_stat"]
to_upload = ("RANKING_DATA", "RANKING_PLOTS", "SNR_PLOTS", "PSD_PLOTS", "DTDPHI_PLOTS")
for upload_topic, ranking_stat_topic in zip(upload_topics, ranking_stat_topics):
for upload in to_upload:
layer += Node(
arguments = [
Option("kafka-server", config.services.kafka_server),
Option("upload-topic", upload_topic),
Option("ranking-stat-topic", ranking_stat_topic),
Option("gracedb-group", config.upload.gracedb_group),
Option("gracedb-pipeline", config.upload.gracedb_pipeline),
Option("gracedb-search", config.upload.gracedb_search),
Option("gracedb-service-url", config.upload.gracedb_service_url),
Option("tag", config.tag),
Option("plot", upload),
Option("verbose"),
],
)
dag.attach(layer)
[docs]def count_events_layer(config, dag, dist_stat_cache):
layer = Layer(
"gstlal_ll_inspiral_trigger_counter",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
retries=1000,
)
zerolag_pdf = DataCache.generate(DataType.ZEROLAG_DIST_STAT_PDFS, config.all_ifos)
layer += Node(
arguments = [
Option("kafka-server", config.services.kafka_server),
Option("gracedb-pipeline", config.upload.gracedb_pipeline),
Option("gracedb-search", config.upload.gracedb_search),
Option("output-period", 300),
Option("topic", "coinc"),
Option("tag", config.tag),
Option("bootstrap", dist_stat_cache.files[0]),
],
outputs = Option("output", zerolag_pdf.files),
)
dag.attach(layer)
[docs]def collect_metrics_layer(config, dag):
requirements = {
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
}
event_layer = Layer(
"scald",
name="scald_event_collector",
requirements=requirements,
transfer_files=config.condor.transfer_files,
retries=1000,
)
metric_leader_layer = Layer(
"scald",
name="scald_metric_collector_leader",
requirements=requirements,
transfer_files=config.condor.transfer_files,
retries=1000,
)
# set up common options
common_opts = [
Argument("command", "aggregate"),
Option("config", config.metrics.scald_config),
Option("uri", f"kafka://{config.tag}-collect@{config.services.kafka_server}"),
]
# set topic_prefix to distinguish inj and noninj topics
topic_prefix = ['', 'inj_'] if config.filter.injections else ['']
# define metrics used for aggregation jobs
snr_metrics = [f"{prefix}{ifo}_snr_history" for ifo in config.ifos for prefix in topic_prefix]
network_metrics = []
for prefix, topic in list(itertools.product(topic_prefix, ('likelihood_history', 'snr_history', 'latency_history', 'far_history'))):
network_metrics.append(f"{prefix}{topic}")
heartbeat_metrics = []
for prefix, topic in list(itertools.product(topic_prefix, ('uptime', 'event_uploader_heartbeat', 'event_plotter_heartbeat', 'pastro_uploader_heartbeat'))):
heartbeat_metrics.append(f"{prefix}{topic}")
heartbeat_metrics.append('marginalize_likelihoods_online_heartbeat')
heartbeat_metrics.append('trigger_counter_heartbeat')
state_metrics = [f"{prefix}{ifo}_strain_dropped" for ifo in config.ifos for prefix in topic_prefix]
usage_metrics = [f"{prefix}ram_history" for prefix in topic_prefix]
latency_metrics = [f"{prefix}{ifo}_{stage}_latency" for ifo in config.ifos for stage in ("datasource", "whitening", "snrSlice") for prefix in topic_prefix]
latency_metrics.extend([f"{prefix}all_itacac_latency" for prefix in topic_prefix])
agg_metrics = list(itertools.chain(snr_metrics, network_metrics, usage_metrics, state_metrics, latency_metrics, heartbeat_metrics))
gates = [f"{gate}segments" for gate in ("statevector", "dqvector", "whiteht")]
seg_metrics = [f"{prefix}{ifo}_{gate}" for ifo in config.ifos for gate in gates for prefix in topic_prefix]
# set up partitioning:
# rough estimate of the number of topics each scald job can consume.
# assume 1000 Hz as max msg rate that scald can keep up with.
# assume each inspiral job sends metrics at 1 Hz. Then topics per
# job is msg rate / number of jobs
max_msg_per_sec = 1000
num_jobs = len(config.svd.bins)
topics_per_job = max(math.floor(max_msg_per_sec / num_jobs), 1)
# timeseries metrics
agg_metrics = dagutil.groups(agg_metrics, topics_per_job)
seg_metrics = dagutil.groups(seg_metrics, topics_per_job)
for metrics in itertools.chain(agg_metrics, seg_metrics):
# add jobs to consume each metric
arguments = list(common_opts)
topics = []
schemas = []
for metric in metrics:
if "latency_history" in metric:
# for latency history we want to
# aggregate by max and median so
# we need two schemas
for aggfunc in ('max', 'median'):
topics.append(f'gstlal.{config.tag}.{metric}')
schemas.append(f'{metric}_{aggfunc}')
else:
topics.append(f'gstlal.{config.tag}.{metric}')
schemas.append(metric)
arguments.extend([
Option("data-type", "timeseries"),
Option("topic", topics),
Option("schema", schemas),
])
arguments.append(Option("across-jobs"))
metric_leader_layer += Node(arguments=arguments)
dag.attach(metric_leader_layer)
# event metrics
schemas = ["coinc", "inj_coinc"] if config.filter.injections else ["coinc"]
for schema in schemas:
event_arguments = list(common_opts)
event_arguments.extend([
Option("data-type", "triggers"),
Option("topic", f"gstlal.{config.tag}.{schema}"),
Option("schema", schema),
Option("uri", f"kafka://{config.tag}-collect@{config.services.kafka_server}"),
])
event_layer += Node(arguments=event_arguments)
dag.attach(event_layer)
[docs]def track_noise_layer(config, dag):
layer = Layer(
"gstlal_ll_dq",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
retries=1000,
)
inj_layer = Layer(
"gstlal_ll_dq",
name = "gstlal_ll_dq_inj",
requirements={
"request_cpus": 1,
"request_memory": 2000,
"request_disk": "1GB",
**config.condor.submit
},
transfer_files=config.condor.transfer_files,
retries=1000,
)
for ifo in config.ifos:
channel_names = [config.source.channel_name[ifo]]
if config.filter.injections and ifo in config.source.inj_channel_name.keys():
channel_names.append(config.source.inj_channel_name[ifo])
for channel in channel_names:
# set up datasource options
if config.source.data_source == "framexmit":
arguments = [
Option("data-source", "framexmit"),
Option("framexmit-addr", f"{ifo}={config.source.framexmit_addr[ifo]}"),
Option("framexmit-iface", config.source.framexmit_iface),
]
elif config.source.data_source == "lvshm":
arguments = [
Option("data-source", "lvshm"),
Option("shared-memory-partition", f"{ifo}={config.source.shared_memory_partition[ifo]}"),
Option("shared-memory-block-size", config.source.shared_memory_block_size),
Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
]
elif config.source.data_source == "devshm":
if config.filter.injections and channel in config.source.inj_channel_name.values():
memory_location = config.source.inj_shared_memory_dir[ifo]
else:
memory_location = config.source.shared_memory_dir[ifo]
arguments = [
Option("data-source", "devshm"),
Option("shared-memory-dir", f"{ifo}={memory_location}"),
Option("shared-memory-block-size", config.source.shared_memory_block_size),
Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration),
]
else:
raise ValueError(f"data source = {config.source.data_source} not valid for online jobs")
arguments.extend([
Option("analysis-tag", config.tag),
Option("psd-fft-length", config.psd.fft_length),
Option("channel-name", f"{ifo}={channel}"),
Option("state-channel-name", f"{ifo}={config.source.state_channel_name[ifo]}"),
Option("dq-channel-name", f"{ifo}={config.source.dq_channel_name[ifo]}"),
Option("state-vector-on-bits", f"{ifo}={config.source.state_vector_on_bits[ifo]}"),
Option("state-vector-off-bits", f"{ifo}={config.source.state_vector_off_bits[ifo]}"),
Option("dq-vector-on-bits", f"{ifo}={config.source.dq_vector_on_bits[ifo]}"),
Option("dq-vector-off-bits", f"{ifo}={config.source.dq_vector_off_bits[ifo]}"),
Option("reference-psd", config.data.reference_psd),
Option("scald-config", config.metrics.scald_config)
])
if config.services.kafka_server:
arguments.extend([Option("output-kafka-server", config.services.kafka_server)])
if config.filter.injections and channel in config.source.inj_channel_name.values():
arguments.extend([Option("injection-channel")])
inj_layer += Node(arguments = arguments)
else:
layer += Node(arguments = arguments)
dag.attach(layer)
if config.filter.injections:
dag.attach(inj_layer)
[docs]def add_ranking_stat_file_options(config, svd_bin=None, transfer_only=False):
"""
Return a list of options relating to files used for
terms in the ranking statistic,
including:
* dtdphi
* iDQ timeseries
if transfer_only is True, do not add options to programs,
instead, only use these files as inputs for Condor file
transfer. This is required for jobs that require these files,
as their paths are tracked through the ranking stat, so
Condor needs to be aware of these files when not relying on
a shared file system.
"""
if transfer_only:
kwargs = {"track": False, "suppress": True}
else:
kwargs = {}
if svd_bin is None:
inputs = [Option("mass-model-file%d" % i, mass_model_file, **kwargs) for i, mass_model_file in enumerate(set(stats_bin["mass_model_file"] for stats_bin in config.svd.stats.bins.values()))]
if config.prior.idq_timeseries:
inputs.extend(Option("idq-file%d" % i, idq_file, **kwargs) for i, idq_file in enumerate(set(stats_bin["idq_file"] for stats_bin in config.svd.stats.bins.values())))
inputs.extend(Option("dtdphi-file%d" % i, dtdphi_file, **kwargs) for i, dtdphi_file in enumerate(set(stats_bin["dtdphi_file"] for stats_bin in config.svd.stats.bins.values())))
else:
stats_bin = config.svd.stats.bins[svd_bin]
inputs = [Option("mass-model-file", stats_bin["mass_model_file"], **kwargs)]
if config.prior.idq_timeseries:
inputs.extend([Option("idq-file", stats_bin["idq_file"], **kwargs)])
inputs.extend([Option("dtdphi-file", stats_bin["dtdphi_file"], **kwargs)])
return inputs
[docs]def mchirp_range_to_bins(min_mchirp, max_mchirp, svd_metadata):
"""
Given a range of chirp masses and the SVD metadata, determine
and return the SVD bins that overlap.
"""
svd_bins = []
mchirp_range = segment(min_mchirp, max_mchirp)
for svd_bin, bin_metadata in svd_metadata["bins"].items():
bin_range = segment(bin_metadata["min_mchirp"], bin_metadata["max_mchirp"])
if mchirp_range.intersects(bin_range):
svd_bins.append(svd_bin)
return svd_bins
[docs]def ifo_to_string(ifos):
"""Given a list of ifos, converts this to a string.
"""
return "".join(sorted(list(ifos)))
[docs]def time_to_num_split_injections(span, time_per_split=DEFAULT_SPLIT_INJECTION_TIME):
"""Determine how many injection splits given analysis start/end times.
"""
return max(int(abs(span)) // time_per_split, 1)
[docs]@plugins.register
def layers():
return {
"split_bank": split_bank_layer,
"svd_bank": svd_bank_layer,
"checkerboard": checkerboard_layer,
"filter": filter_layer,
"filter_injections": filter_injections_layer,
"aggregate": aggregate_layer,
"create_prior": create_prior_layer,
"calc_pdf": calc_pdf_layer,
"marginalize": marginalize_layer,
"marginalize_pdf": marginalize_pdf_layer,
"calc_likelihood": calc_likelihood_layer,
"cluster": cluster_layer,
"compute_far": compute_far_layer,
"split_injections": split_injections_layer,
"find_injections": find_injections_layer,
"match_injections": match_injections_layer,
"calc_expected_snr": calc_expected_snr_layer,
"measure_lnlr_cdf": measure_lnlr_cdf_layer,
"plot_analytic_vt": plot_analytic_vt_layer,
"plot_horizon_distance": plot_horizon_distance_layer,
"plot_summary": plot_summary_layer,
"plot_background": plot_background_layer,
"plot_bin_background": plot_bin_background_layer,
"plot_sensitivity": plot_sensitivity_layer,
"filter_online": filter_online_layer,
"filter_injections_online": filter_injections_online_layer,
"marginalize_online": marginalize_online_layer,
"upload_events": upload_events_layer,
"upload_pastro": upload_pastro_layer,
"plot_events": plot_events_layer,
"count_events": count_events_layer,
"collect_metrics": collect_metrics_layer,
"track_noise": track_noise_layer,
}