Source code for dags.layers.inspiral

# Copyright (C) 2020  Patrick Godwin (patrick.godwin@ligo.org)
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


from collections.abc import Mapping
import itertools
import os
import math
import sys

import numpy

from lal.utils import CacheEntry
from ligo.segments import segment

from gstlal import plugins
from gstlal.datafind import DataType, DataCache
from gstlal.dags import Argument, Option
from gstlal.dags import util as dagutil
from gstlal.dags.layers import Layer, Node


DEFAULT_SPLIT_INJECTION_TIME = 20000
DEFAULT_MAX_FILES = 500

# 10 bins per job corresponds to 20-30 min execution time
PLOT_BIN_BACKGROUND_MAX_FILES = 10


[docs]def split_bank_layer(config, dag, psd_cache, bank_cache): layer = Layer( "gstlal_inspiral_bank_splitter", requirements={ "request_cpus": 1, "request_memory": 4000, "request_disk": "2GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) split_bank_cache = DataCache.generate(DataType.SVD_BANK, config.all_ifos, config.span, svd_bins=config.svd.bins) arguments = [ Option("f-low", config.svd.f_low), Option("f-final", config.svd.max_f_final), Option("approximant", config.svd.approximant), Option("overlap", config.svd.overlap), Option("instrument", ifo_to_string(config.all_ifos)), Option("n", config.svd.num_split_templates), Option("num-banks", config.svd.num_banks), Option("sort-by", config.svd.sort_by), ] if config.svd.num_mu_bins: arguments.append(Option("group-by-mu", config.svd.num_mu_bins)) else: arguments.append(Option("group-by-chi", config.svd.num_chi_bins)) layer += Node( arguments = arguments, inputs = [ Argument("template-bank", bank_cache.files, track=False), Argument("psd-xml", psd_cache.files, track=False), ], outputs = [ Option("output-path", str(split_bank_cache.name).lower()), Option("output-stats-file", DataType.SVD_MANIFEST.filename(config.ifos)), ], ) dag.attach(layer) return split_bank_cache
[docs]def svd_bank_layer(config, dag, median_psd_cache, split_bank_cache=None): layer = Layer( "gstlal_inspiral_svd_bank", requirements={ "request_cpus": 1, "request_memory": 4000, "request_disk": "2GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) svd_cache = DataCache.generate( DataType.SVD_BANK, config.ifos, config.span, svd_bins=config.svd.bins, root="filter", ) split_banks = split_bank_cache.groupby("bin") for (ifo, svd_bin), svd_banks in svd_cache.groupby("ifo", "bin").items(): # grab sub-bank specific configuration if available if "bank_name" in config.svd.stats.bins[svd_bin]: bank_name = config.svd.stats.bins[svd_bin]["bank_name"] svd_config = config.svd.sub_banks[bank_name] else: svd_config = config.svd arguments = [ Option("instrument-override", ifo), Option("flow", svd_config.f_low), Option("samples-min", svd_config.samples_min), Option("samples-max-64", svd_config.samples_max_64), Option("samples-max-256", svd_config.samples_max_256), Option("samples-max", svd_config.samples_max), Option("svd-tolerance", svd_config.tolerance), Option("autocorrelation-length", config.svd.stats.bins[svd_bin]["ac_length"]), ] if "max_duration" in svd_config: arguments.append(Option("max-duration", svd_config.max_duration)) if "sample_rate" in svd_config: arguments.append(Option("sample-rate", svd_config.sample_rate)) layer += Node( arguments = arguments, inputs = [ Option("reference-psd", median_psd_cache.files), Argument("split-banks", sorted(split_banks[svd_bin].files)), ], outputs = Option("write-svd", svd_banks.files) ) dag.attach(layer) return svd_cache
[docs]def checkerboard_layer(config, dag, ref_psd_cache, svd_bank_cache): layer = Layer( "gstlal_svd_bank_checkerboard", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) # set up arguments arguments = [Option("in-place"), Option("reference-psd", ref_psd_cache.files)] if config.svd.checkerboard == "even": arguments.append(Option("even")) chunk_size = 20 for svd_banks in svd_bank_cache.chunked(chunk_size): layer += Node( arguments = arguments, inputs = Option("svd-files", svd_banks.files), outputs = Argument("checkered-svd-files", svd_banks.files, suppress=True), ) dag.attach(layer) return svd_bank_cache
[docs]def filter_layer(config, dag, ref_psd_cache, svd_bank_cache): layer = Layer( "gstlal_inspiral", requirements={ "request_cpus": 2, "request_memory": 4000, "request_disk": "5GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, dynamic_memory=True, ) dist_stat_cache = DataCache.generate( DataType.DIST_STATS, config.ifo_combos, config.time_bins, svd_bins=config.svd.bins, root="filter", ) trigger_cache = DataCache.generate( DataType.TRIGGERS, config.ifo_combos, config.time_bins, svd_bins=config.svd.bins, root="filter", ) common_opts = [ Option("track-psd"), Option("data-source", "frames"), Option("control-peak-time", 0), Option("psd-fft-length", config.psd.fft_length), Option("frame-segments-name", config.source.frame_segments_name), Option("tmp-space", dagutil.condor_scratch_space()), Option("coincidence-threshold", config.filter.coincidence_threshold), Option("fir-stride", config.filter.fir_stride), Option("min-instruments", config.min_ifos), ] # disable service discovery if using singularity if config.condor.singularity_image: common_opts.append(Option("disable-service-discovery")) # checkpoint by grouping SVD bins together + enable local # frame caching if there if more than one SVD bin per group if config.condor.transfer_files: max_concurrency = 10 else: max_concurrency = 20 num_per_group = min(1 + len(config.svd.bins) // 20, max_concurrency) if num_per_group > 1: common_opts.append(Option("local-frame-caching")) ref_psds = ref_psd_cache.groupby("ifo", "time") svd_banks = svd_bank_cache.groupby("ifo", "bin") dist_stats = dist_stat_cache.groupby("ifo", "time", "bin") for (ifo_combo, span), triggers in trigger_cache.groupby("ifo", "time").items(): ifos = config.to_ifo_list(ifo_combo) start, end = span filter_opts = [ Option("gps-start-time", int(start)), Option("gps-end-time", int(end)), Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)), ] inputs = [ Option("frame-segments-file", config.source.frame_segments_file), Option("veto-segments-file", config.filter.veto_segments_file), Option("reference-psd", ref_psds[(ifo_combo, span)].files), Option("time-slide-file", config.filter.time_slide_file), ] if config.source.frame_cache: inputs.append(Option("frame-cache", config.source.frame_cache, track=False)) else: filter_opts.extend([ Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)), Option("data-find-server", config.source.data_find_server), ]) if config.source.idq_channel_name: filter_opts.append(Option("idq-channel-name", dagutil.format_ifo_args(ifos, config.source.idq_channel_name))) filter_opts.extend(common_opts) if config.source.idq_channel_name and config.filter.idq_gate_threshold: filter_opts.append(Option("idq-gate-threshold", config.filter.idq_gate_threshold)) if config.source.idq_channel_name and config.source.idq_state_channel_name: filter_opts.append(Option("idq-state-channel-name", dagutil.format_ifo_args(ifos, config.source.idq_state_channel_name))) for trigger_group in triggers.chunked(num_per_group): svd_bins = trigger_group.groupby("bin").keys() thresholds = [config.svd.stats.bins[svd_bin]["ht_gate_threshold"] for svd_bin in svd_bins] these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts] svd_bank_files = dagutil.flatten( [svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins] ) dist_stat_files = dagutil.flatten( [dist_stats[(ifo_combo, span, svd_bin)].files for svd_bin in svd_bins] ) layer += Node( arguments = these_opts, inputs = [ Option("svd-bank", svd_bank_files), *inputs, ], outputs = [ Option("output", trigger_group.files), Option("ranking-stat-output", dist_stat_files), ], ) dag.attach(layer) return trigger_cache, dist_stat_cache
[docs]def filter_injections_layer(config, dag, ref_psd_cache, svd_bank_cache): layer = Layer( "gstlal_inspiral", name="gstlal_inspiral_inj", requirements={ "request_cpus": 2, "request_memory": 5000, "request_disk": "5GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, dynamic_memory=True, ) trigger_cache = DataCache(DataType.TRIGGERS) for inj_name, inj_args in config.filter.injections.items(): min_mchirp, max_mchirp = map(float, inj_args["range"].split(":")) svd_bins = mchirp_range_to_bins(min_mchirp, max_mchirp, config.svd.stats) trigger_cache += DataCache.generate( DataType.TRIGGERS, config.ifo_combos, config.time_bins, svd_bins=svd_bins, subtype=inj_name, root="filter", ) common_opts = [ Option("track-psd"), Option("data-source", "frames"), Option("control-peak-time", 0), Option("psd-fft-length", config.psd.fft_length), Option("frame-segments-name", config.source.frame_segments_name), Option("tmp-space", dagutil.condor_scratch_space()), Option("coincidence-threshold", config.filter.coincidence_threshold), Option("fir-stride", config.filter.fir_stride), Option("min-instruments", config.min_ifos), Option("injections"), ] # disable service discovery if using singularity if config.condor.singularity_image: common_opts.append(Option("disable-service-discovery")) # checkpoint by grouping SVD bins together + enable local # frame caching if there if more than one SVD bin per group if config.condor.transfer_files: max_concurrency = 10 else: max_concurrency = 20 num_per_group = min(1 + len(config.svd.bins) // 20, max_concurrency) if num_per_group > 1: common_opts.append(Option("local-frame-caching")) ref_psds = ref_psd_cache.groupby("ifo", "time") svd_banks = svd_bank_cache.groupby("ifo", "bin") for (ifo_combo, span, inj_type), triggers in trigger_cache.groupby("ifo", "time", "subtype").items(): ifos = config.to_ifo_list(ifo_combo) start, end = span injection_file = config.filter.injections[inj_type.lower()]["file"] filter_opts = [ Option("gps-start-time", int(start)), Option("gps-end-time", int(end)), Option("channel-name", dagutil.format_ifo_args(ifos, config.source.channel_name)), ] inputs = [ Option("frame-segments-file", config.source.frame_segments_file), Option("veto-segments-file", config.filter.veto_segments_file), Option("reference-psd", ref_psds[(ifo_combo, span)].files), Option("time-slide-file", config.filter.injection_time_slide_file), Option("injection-file", injection_file), ] if config.source.frame_cache: inputs.append(Option("frame-cache", config.source.frame_cache, track=False)) else: filter_opts.extend([ Option("frame-type", dagutil.format_ifo_args(ifos, config.source.frame_type)), Option("data-find-server", config.source.data_find_server), ]) if config.source.idq_channel_name: filter_opts.append(Option("idq-channel-name", dagutil.format_ifo_args(ifos, config.source.idq_channel_name))) if config.source.idq_channel_name and config.filter.idq_gate_threshold: filter_opts.append(Option("idq-gate-threshold", config.filter.idq_gate_threshold)) if config.source.idq_channel_name and config.source.idq_state_channel_name: filter_opts.append(Option("idq-state-channel-name", dagutil.format_ifo_args(ifos, config.source.idq_state_channel_name))) filter_opts.extend(common_opts) for trigger_group in triggers.chunked(num_per_group): svd_bins = trigger_group.groupby("bin").keys() thresholds = [config.svd.stats.bins[svd_bin]["ht_gate_threshold"] for svd_bin in svd_bins] these_opts = [Option("ht-gate-threshold", thresholds), *filter_opts] svd_bank_files = dagutil.flatten( [svd_banks[(ifo, svd_bin)].files for ifo in ifos for svd_bin in svd_bins] ) layer += Node( arguments = these_opts, inputs = [ Option("svd-bank", svd_bank_files), *inputs, ], outputs = Option("output", trigger_group.files), ) dag.attach(layer) return trigger_cache
[docs]def aggregate_layer(config, dag, trigger_cache, dist_stat_cache=None): trg_layer = Layer( "ligolw_run_sqlite", name="cluster_triggers_by_snr", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) # FIXME: find better way of discovering SQL file share_path = os.path.split(dagutil.which("gstlal_inspiral"))[0].replace("bin", "share/gstlal") snr_cluster_sql_file = os.path.join(share_path, "snr_simplify_and_cluster.sql") inj_snr_cluster_sql_file = os.path.join(share_path, "inj_snr_simplify_and_cluster.sql") # cluster triggers by SNR for (svd_bin, inj_type), triggers in trigger_cache.groupby("bin", "subtype").items(): trg_layer += Node( arguments = [ Option("sql-file", inj_snr_cluster_sql_file if inj_type else snr_cluster_sql_file), Option("tmp-space", dagutil.condor_scratch_space()), ], inputs = Argument("triggers", triggers.files), outputs = Argument("clustered-triggers", triggers.files, suppress=True), ) dag.attach(trg_layer) # if no dist stats files to marginalize, only return triggers if not dist_stat_cache: return trigger_cache # marginalize dist stats across time dist_layer = Layer( "gstlal_inspiral_marginalize_likelihood", name="marginalize_dist_stats_across_time_filter", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) agg_dist_stat_cache = DataCache.generate( DataType.DIST_STATS, config.all_ifos, config.span, svd_bins=config.svd.bins, root="filter", ) dist_stats = dist_stat_cache.groupby("bin") for svd_bin, agg_dist_stats in agg_dist_stat_cache.groupby("bin").items(): dist_layer += Node( arguments = Option("marginalize", "ranking-stat"), inputs = Argument("dist-stats", dist_stats[svd_bin].files), outputs = Option("output", agg_dist_stats.files) ) dag.attach(dist_layer) return trigger_cache, agg_dist_stat_cache
[docs]def create_prior_layer(config, dag, svd_bank_cache, median_psd_cache, dist_stat_cache=None): layer = Layer( "gstlal_inspiral_create_prior_diststats", requirements={ "request_cpus": 2, "request_memory": 4000, "request_disk": "2GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) if "df" in config.prior: prior_df = config.prior.df assert prior_df in ["analytic", "bandwidth"] else: raise ValueError("config.prior.df is required, but not found in config. Choose from [bandwidth, analytic]") if dist_stat_cache: prior_cache = DataCache.generate( DataType.PRIOR_DIST_STATS, config.all_ifos, config.span, svd_bins=config.svd.bins, root="rank", ) else: prior_cache = DataCache.generate( DataType.DIST_STATS, config.all_ifos, config.span, svd_bins=config.svd.bins, ) svd_banks = svd_bank_cache.groupby("bin") for svd_bin, prior in prior_cache.groupby("bin").items(): inputs = [ Option("svd-file", svd_banks[svd_bin].files), *add_ranking_stat_file_options(config, svd_bin), ] if prior_df == "bandwidth": inputs += [Option("psd-xml", median_psd_cache.files)] layer += Node( arguments = [ Option("df", prior_df), Option("background-prior", 1), Option("instrument", config.ifos), Option("min-instruments", config.min_ifos), Option("coincidence-threshold", config.filter.coincidence_threshold), ], inputs = inputs, outputs = Option("write-likelihood", prior.files), ) dag.attach(layer) return prior_cache
[docs]def marginalize_layer(config, dag, prior_cache, dist_stat_cache): layer = Layer( "gstlal_inspiral_marginalize_likelihood", name="marginalize_dist_stats_across_time_rank", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) marg_dist_stat_cache = DataCache.generate( DataType.MARG_DIST_STATS, config.all_ifos, config.span, svd_bins=config.svd.bins, root="rank", ) prior = prior_cache.groupby("bin") dist_stats = dist_stat_cache.groupby("bin") for svd_bin, marg_dist_stats in marg_dist_stat_cache.groupby("bin").items(): layer += Node( arguments = Option("marginalize", "ranking-stat"), inputs = [ Argument("dist-stats", dist_stats[svd_bin].files + prior[svd_bin].files), *add_ranking_stat_file_options(config, svd_bin, transfer_only=True), ], outputs = Option("output", marg_dist_stats.files) ) dag.attach(layer) return marg_dist_stat_cache
[docs]def calc_pdf_layer(config, dag, dist_stat_cache): # FIXME: expose this in configuration num_cores = 4 layer = Layer( "gstlal_inspiral_calc_rank_pdfs", requirements={ "request_cpus": num_cores, "request_memory": 3000, "request_disk": "2GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) pdf_cache = DataCache.generate( DataType.DIST_STAT_PDFS, config.all_ifos, config.span, svd_bins=config.svd.bins, root="rank", ) dist_stats = dist_stat_cache.groupby("bin") for svd_bin, pdfs in pdf_cache.groupby("bin").items(): layer += Node( arguments = [ Option("ranking-stat-samples", config.rank.ranking_stat_samples), Option("num-cores", num_cores), ], inputs = [ Argument("dist-stats", dist_stats[svd_bin].files), *add_ranking_stat_file_options(config, svd_bin, transfer_only=True), ], outputs = Option("output", pdfs.files) ) dag.attach(layer) return pdf_cache
[docs]def marginalize_pdf_layer(config, dag, pdf_cache): round1_layer = Layer( "gstlal_inspiral_marginalize_likelihood", name="gstlal_inspiral_marginalize_pdfs_round_one", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) round2_layer = Layer( "gstlal_inspiral_marginalize_likelihood", name="gstlal_inspiral_marginalize_pdfs_round_two", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) marg_pdf_cache = DataCache.generate( DataType.DIST_STAT_PDFS, config.all_ifos, config.span, root="rank", ) # if number of bins is large, combine in two stages instead if len(config.svd.bins) > DEFAULT_MAX_FILES: partial_pdf_files = [] for pdf_subset in pdf_cache.chunked(DEFAULT_MAX_FILES): # determine bin range and determine partial file name svd_bins = list(pdf_subset.groupby("bin").keys()) min_bin, max_bin = min(svd_bins), max(svd_bins) partial_pdf_filename = pdf_subset.name.filename( config.all_ifos, config.span, svd_bin=f"{min_bin}_{max_bin}", ) partial_pdf_path = os.path.join( pdf_subset.name.directory(root="rank", start=config.span[0]), partial_pdf_filename, ) partial_pdf_files.append(partial_pdf_path) # combine across subset of bins (stage 1) round1_layer += Node( arguments = Option("marginalize", "ranking-stat-pdf"), inputs = Argument("dist-stat-pdfs", pdf_subset.files), outputs = Option("output", partial_pdf_path) ) # combine across all bins (stage 2) round2_layer += Node( arguments = Option("marginalize", "ranking-stat-pdf"), inputs = Argument("dist-stat-pdfs", partial_pdf_files), outputs = Option("output", marg_pdf_cache.files) ) else: round1_layer += Node( arguments = Option("marginalize", "ranking-stat-pdf"), inputs = Argument("dist-stat-pdfs", pdf_cache.files), outputs = Option("output", marg_pdf_cache.files) ) dag.attach(round1_layer) if round2_layer.nodes: dag.attach(round2_layer) return marg_pdf_cache
[docs]def calc_likelihood_layer(config, dag, trigger_cache, dist_stat_cache): layer = Layer( "gstlal_inspiral_calc_likelihood", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) # assign likelihood to triggers calc_trigger_cache = trigger_cache.copy(root="rank") calc_triggers = calc_trigger_cache.groupby("bin", "subtype", "dirname") dist_stats = dist_stat_cache.groupby("bin") for (svd_bin, inj_type, dirname), triggers in trigger_cache.groupby("bin", "subtype", "dirname").items(): # find path relative to current directory # where assigned triggers will reside split_dirname = dirname.split(os.sep) dir_idx = split_dirname.index("filter") calc_dirname = os.path.join(*split_dirname[dir_idx:]).replace("filter", "rank") arguments = [ Option("force"), Option("copy"), Option("tmp-space", dagutil.condor_scratch_space()), ] # if file transfer not enabled, need to specify directory to # copy triggers into as remaps aren't relevant here if not config.condor.transfer_files: arguments.append(Option("copy-dir", calc_dirname)) layer += Node( arguments = arguments, inputs = [ Argument("triggers", triggers.files), Option("likelihood-url", dist_stats[svd_bin].files), *add_ranking_stat_file_options(config, svd_bin, transfer_only=True), ], outputs = Argument( "calc-triggers", calc_triggers[(svd_bin, inj_type, calc_dirname)].files, suppress_with_remap=True ), ) dag.attach(layer) return calc_trigger_cache
[docs]def cluster_layer(config, dag, trigger_cache, injection_cache): # cluster triggers by likelihood combine_round1_layer = Layer( "ligolw_add", name="combine_triggers_across_bins_round_one", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) combine_round2_layer = Layer( "ligolw_add", name="combine_triggers_across_bins_round_two", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) cluster_round1_layer = Layer( "ligolw_run_sqlite", name="cluster_triggers_by_likelihood_round_one", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) cluster_round2_layer = Layer( "ligolw_run_sqlite", name="cluster_triggers_by_likelihood_round_two", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) sqlite_layer = Layer( "ligolw_sqlite", name="convert_triggers_to_sqlite", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) # set up data caches inj_types = list(trigger_cache.groupby("subtype").keys()) combined_trigger_cache = DataCache.generate( DataType.TRIGGERS, config.ifo_combos, config.time_bins, subtype=inj_types, root="rank", ) trigger_db_cache = DataCache.generate( DataType.TRIGGERS, config.all_ifos, config.time_boundaries, subtype=inj_types, extension="sqlite", root="rank" ) # FIXME: find better way of discovering SQL file share_path = os.path.split(dagutil.which("gstlal_inspiral"))[0].replace("bin", "share/gstlal") cluster_sql_file = os.path.join(share_path, "simplify_and_cluster.sql") inj_cluster_sql_file = os.path.join(share_path, "inj_simplify_and_cluster.sql") # combine/cluster triggers across SVD bins # if triggers are from an injection job, also add in the injections combined_triggers_across_bins = combined_trigger_cache.groupby("time", "subtype") for (span, inj_type), triggers in trigger_cache.groupby("time", "subtype").items(): combined_triggers = combined_triggers_across_bins[(span, inj_type)] # if number of bins is large, combine in two stages instead if len(config.svd.bins) > DEFAULT_MAX_FILES: partial_trigger_files = [] for trigger_subset in triggers.chunked(DEFAULT_MAX_FILES): # determine bin range and determine partial file name svd_bins = list(trigger_subset.groupby("bin").keys()) min_bin, max_bin = min(svd_bins), max(svd_bins) partial_trigger_filename = trigger_subset.name.filename( config.all_ifos, span, svd_bin=f"{min_bin}_{max_bin}", subtype=inj_type, ) partial_trigger_path = os.path.join( trigger_subset.name.directory(root="rank", start=span[0]), partial_trigger_filename, ) partial_trigger_files.append(partial_trigger_path) # combine across subset of bins (stage 1) combine_round1_layer += Node( inputs = Argument("inputs", trigger_subset.files), outputs = Option("output", partial_trigger_path), ) # combine across all bins (stage 2) combine_round2_layer += Node( inputs = Argument("inputs", partial_trigger_files), outputs = Option("output", combined_triggers.files), ) else: combine_round1_layer += Node( inputs = Argument("inputs", triggers.files), outputs = Option("output", combined_triggers.files), ) # cluster by likelihood cluster_round1_layer += Node( arguments = [ Option("sql-file", inj_cluster_sql_file if inj_type else cluster_sql_file), Option("tmp-space", dagutil.condor_scratch_space()), ], inputs = Argument("triggers", combined_triggers.files), outputs = Argument("calc-triggers", combined_triggers.files, suppress=True), ) dag.attach(combine_round1_layer) if combine_round2_layer.nodes: dag.attach(combine_round2_layer) dag.attach(cluster_round1_layer) # combine/cluster triggers across time injections = injection_cache.groupby("subtype") combined_triggers_by_time = combined_trigger_cache.groupby("subtype") for inj_type, trigger_dbs_by_time in trigger_db_cache.groupby("subtype").items(): for span, trigger_dbs in trigger_dbs_by_time.groupby_bins("time", config.time_boundaries).items(): combined_triggers = combined_triggers_by_time[inj_type].groupby_bins("time", config.time_boundaries) # add input files for sqlite jobs xml_files = [] if inj_type: xml_files.extend(injections[inj_type].files) xml_files.extend(combined_triggers[span].files) xml_files.append(config.source.frame_segments_file) # convert triggers to sqlite sqlite_layer += Node( arguments = [ Option("replace"), Option("tmp-space", dagutil.condor_scratch_space()), ], inputs = Argument("xml-files", xml_files), outputs = Option("database", trigger_dbs.files), ) # cluster by likelihood cluster_round2_layer += Node( arguments = [ Option("sql-file", inj_cluster_sql_file if inj_type else cluster_sql_file), Option("tmp-space", dagutil.condor_scratch_space()), ], inputs = Argument("triggers", trigger_dbs.files), outputs = Argument("calc-triggers", trigger_dbs.files, suppress=True), ) dag.attach(sqlite_layer) dag.attach(cluster_round2_layer) return trigger_db_cache
[docs]def find_injections_layer(config, dag, trigger_db_cache): inj_sqlite_to_xml_layer = Layer( "ligolw_sqlite", name="inj_sqlite_to_xml", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) injfind_layer = Layer( "lalinspiral_injfind", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) inj_xml_to_sqlite_layer = Layer( "ligolw_sqlite", name="inj_xml_to_sqlite", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) # set up data caches grouped_trigger_dbs = trigger_db_cache.groupby("subtype") if "" in grouped_trigger_dbs: grouped_trigger_dbs.pop("") inj_trigger_cache = DataCache.generate( DataType.TRIGGERS, config.all_ifos, config.time_boundaries, subtype=grouped_trigger_dbs.keys(), root="rank", ) # generate layers for inj_type, triggers_by_time in inj_trigger_cache.groupby("subtype").items(): for span, triggers in triggers_by_time.groupby_bins("time", config.time_boundaries).items(): trigger_dbs = grouped_trigger_dbs[inj_type].groupby_bins("time", config.time_boundaries) # convert triggers to XML inj_sqlite_to_xml_layer += Node( arguments = [ Option("replace"), Option("tmp-space", dagutil.condor_scratch_space()), ], inputs = Option("database", trigger_dbs[span].files), outputs = Option("extract", triggers.files), ) # find injections injfind_layer += Node( arguments = Option("time-window", 0.9), inputs = Argument("triggers", triggers.files), outputs = Argument("inj-triggers", triggers.files, suppress=True), ) # convert triggers back to sqlite inj_xml_to_sqlite_layer += Node( arguments = [ Option("replace"), Option("tmp-space", dagutil.condor_scratch_space()), ], inputs = Argument("triggers", triggers.files), outputs = Option("database", trigger_dbs[span].files), ) dag.attach(inj_sqlite_to_xml_layer) dag.attach(injfind_layer) dag.attach(inj_xml_to_sqlite_layer) return trigger_db_cache, inj_trigger_cache
[docs]def compute_far_layer(config, dag, trigger_cache, pdf_cache): collect_zerolag_layer = Layer( "gstlal_inspiral_diststatpdf_collect_zerolag", name="collect_zerolag", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) assign_far_layer = Layer( "gstlal_compute_far_from_snr_chisq_histograms", name="compute_far", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) post_pdf_cache = DataCache.generate( DataType.POST_DIST_STAT_PDFS, config.all_ifos, config.span, root="rank", ) # split trigger cache into injections and non-injections grouped_triggers = trigger_cache.groupby("subtype") noninj_trigger_cache = grouped_triggers.pop("") inj_trigger_cache = DataCache( trigger_cache.name, list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])), ) for span, noninj_triggers in noninj_trigger_cache.groupby("time").items(): collect_zerolag_layer += Node( inputs = [ Option("background-bins-file", pdf_cache.files), Option("non-injection-db", noninj_triggers.files), ], outputs = [Option("output-background-bins-file", post_pdf_cache.files)] ) inputs = [ Option("databases", noninj_triggers.files), Option("background-bins-file", post_pdf_cache.files), ] assign_far_layer += Node( arguments = Option("tmp-space", dagutil.condor_scratch_space()), inputs = inputs, outputs = [ Argument("databases", noninj_triggers.files, suppress=True), ] ) for span, inj_triggers in inj_trigger_cache.groupby("time").items(): inputs = [ Option("databases", inj_triggers.files), Option("background-bins-file", post_pdf_cache.files), ] assign_far_layer += Node( arguments = Option("tmp-space", dagutil.condor_scratch_space()), inputs = inputs, outputs = [ Argument("databases", inj_triggers.files, suppress=True), ] ) dag.attach(collect_zerolag_layer) dag.attach(assign_far_layer) return trigger_cache, post_pdf_cache
[docs]def split_injections_layer(config, dag): layer = Layer( "gstlal_injsplitter", name="split_injections", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) # split up injections across time, tuned for job runtime num_splits = time_to_num_split_injections(config.span) # split injections up injection_cache = DataCache(DataType.SPLIT_INJECTIONS) for inj_type, inj_params in config.filter.injections.items(): inj_tag = inj_type.upper() inj_file = inj_params["file"] # infer metadata from injection filename try: entry = CacheEntry.from_T050017(inj_file) except ValueError: ifos = "H1K1L1V1" span = segment(0, 0) else: ifos = entry.observatory span = entry.segment # create split injection jobs split_injections = DataCache.generate( DataType.SPLIT_INJECTIONS, ifos, span, svd_bins=[f"{i:04d}" for i in range(num_splits)], subtype=inj_tag, root="filter", ) injection_cache += split_injections out_path = DataType.SPLIT_INJECTIONS.directory(root="filter", start=span[0]) layer += Node( arguments = [ Option("nsplit", num_splits), Option("usertag", inj_tag), ], inputs = Argument("injection-file", inj_file), outputs = [ Option("output-path", out_path, remap=False), Argument("split-injections", split_injections.files, suppress=True), ], ) dag.attach(layer) return injection_cache
[docs]def match_injections_layer(config, dag, injection_cache): layer = Layer( "gstlal_inspiral_injection_template_match", name="match_injections", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) # split up injections across time, tuned for job runtime num_splits = time_to_num_split_injections(config.span) inj_types = list(config.filter.injections.keys()) match_inj_cache = DataCache.generate( DataType.MATCHED_INJECTIONS, config.all_ifos, config.span, svd_bins=[f"{i:04d}" for i in range(num_splits)], subtype=[inj_type.upper() for inj_type in inj_types], root="filter", ) if isinstance(config.data.template_bank, Mapping): template_banks = list(config.data.template_bank.values()) else: template_banks = config.data.template_bank # match injections to templates matched_injections = match_inj_cache.groupby("subtype", "bin") for (inj_type, split_bin), injections in injection_cache.groupby("subtype", "bin").items(): layer += Node( inputs = [ Option("injection-file", injections.files), Option("template-bank", template_banks), ], outputs = Option("output", matched_injections[(inj_type, split_bin)].files), ) dag.attach(layer) return match_inj_cache
[docs]def calc_expected_snr_layer(config, dag, psd_cache, injection_cache): layer = Layer( "gstlal_inspiral_injection_snr", name="calc_expected_snr", requirements={ "request_cpus": 2, "request_memory": 2000, "request_disk": "2GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) arguments = [] if hasattr(config, "injections") and config.injections.expected_snr: if "f_low" in config.injections.expected_snr: arguments.append(Option("flow", config.injections.expected_snr.f_low)) if "f_high" in config.injections.expected_snr: arguments.append(Option("fmax", config.injections.expected_snr.f_high)) # calculate expected snr for injections for (inj_type, split_bin), injections in injection_cache.groupby("subtype", "bin").items(): layer += Node( arguments = arguments, inputs = [ Option("injection-file", injections.files), Option("reference-psd", psd_cache.files), ], outputs = Argument("calc-injection-file", injections.files, suppress=True), ) dag.attach(layer) return injection_cache
[docs]def measure_lnlr_cdf_layer(config, dag, dist_stats_cache, injection_cache): layer = Layer( "gstlal_inspiral_lnlrcdf_signal", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) lnlr_cdf_cache = DataCache(DataType.LNLR_SIGNAL_CDF) for (inj_type, split_bin), injections in injection_cache.groupby("subtype", "bin").items(): for chunk_bin, dist_stats in enumerate(dist_stats_cache.chunked(20)): lnlr_cdfs = DataCache.generate( DataType.LNLR_SIGNAL_CDF, config.all_ifos, config.span, svd_bins=f"{split_bin}_{chunk_bin:04d}", subtype=inj_type, root="rank", ) layer += Node( inputs = [ Option("injection-template-match-file", injections.files), Option("likelihood-url", dist_stats.files), *add_ranking_stat_file_options(config, transfer_only=True), ], outputs = Option("output-file", lnlr_cdfs.files), ) lnlr_cdf_cache += lnlr_cdfs dag.attach(layer) return lnlr_cdf_cache
[docs]def plot_analytic_vt_layer(config, dag, trigger_cache, pdf_cache, lnlr_cdf_cache): trg_layer = Layer( "gstlal_inspiral_make_mc_vtplot", name="plot_mc_vt_triggers", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) inj_layer = Layer( "gstlal_inspiral_make_mc_vtplot", name="plot_mc_vt_injections", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) triggers = trigger_cache.groupby("subtype") triggers.pop("") # make plots for inj_type, lnlr_cdfs in lnlr_cdf_cache.groupby("subtype").items(): injection_file = config.filter.injections[inj_type.lower()]["file"] trg_layer += Node( arguments = [ Option("check-vt"), Option("instrument", config.all_ifos), ], inputs = [ Argument("lnlr-cdfs", lnlr_cdfs.files), Option("ranking-stat-pdf", pdf_cache.files), Option("injection-database", triggers[inj_type].files), ], outputs = Option("output-dir", "plots"), ) inj_layer += Node( arguments = Option("instrument", config.all_ifos), inputs = [ Argument("lnlr-cdfs", lnlr_cdfs.files), Option("ranking-stat-pdf", pdf_cache.files), Option("injection-files", os.path.join(config.data.analysis_dir, injection_file)), ], outputs = Option("output-dir", "plots"), ) dag.attach(trg_layer) dag.attach(inj_layer)
[docs]def plot_horizon_distance_layer(config, dag, marg_dist_stat_caches): layer = Layer( "gstlal_inspiral_plot_rankingstats_horizon", name="plot_horizon_distance", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, ) layer += Node( inputs = [ Argument("rankingstats", marg_dist_stat_caches.files), Argument("mass-model", config.prior.mass_model, track=False, suppress=True), ], outputs = Option("outdir", "plots"), ) dag.attach(layer)
[docs]def plot_summary_layer(config, dag, trigger_cache, post_pdf_cache): requirements = { "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit } # common plot options common_plot_args = [ Option("segments-name", config.source.frame_segments_name), Option("tmp-space", dagutil.condor_scratch_space()), Option("shrink-data-segments", 32.0), Option("extend-veto-segments", 8.), ] # split trigger cache into injections and non-injections grouped_triggers = trigger_cache.groupby("subtype") noninj_trigger_cache = grouped_triggers.pop("") inj_trigger_cache = DataCache( trigger_cache.name, list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])), ) # plot summary job layer = Layer( "gstlal_inspiral_plotsummary", name="summary_plots", requirements=requirements, transfer_files=config.condor.transfer_files, ) layer += Node( arguments = [ Option("user-tag", "ALL_COMBINED"), Option("remove-precession"), *common_plot_args, ], inputs = [ Argument("input-files", trigger_cache.files), Option("likelihood-file", post_pdf_cache.files), ], outputs = Option("output-dir", "plots"), ) dag.attach(layer) # precession plot summary job layer = Layer( "gstlal_inspiral_plotsummary", name="summary_plots_precession", requirements=requirements, transfer_files=config.condor.transfer_files, ) layer += Node( arguments = [ Option("user-tag", "PRECESSION_COMBINED"), Option("isolate-precession"), Option("plot-group", 1), *common_plot_args, ], inputs = [ Argument("input-files", trigger_cache.files), Option("likelihood-file", post_pdf_cache.files), ], outputs = Option("output-dir", "plots"), ) dag.attach(layer) # single injection plot summary jobs layer = Layer( "gstlal_inspiral_plotsummary", name="sngl_injection_summary_plots", requirements=requirements, transfer_files=config.condor.transfer_files, ) for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items(): layer += Node( arguments = [ Option("user-tag", f"{inj_type}_INJECTION"), Option("remove-precession"), Option("plot-group", 1), *common_plot_args, ], inputs = [ Argument("input-files", noninj_trigger_cache.files + inj_triggers.files), Option("likelihood-file", post_pdf_cache.files), ], outputs = Option("output-dir", "plots"), ) dag.attach(layer) # single injection precession plot summary jobs layer = Layer( "gstlal_inspiral_plotsummary", name="sngl_injection_precession_summary_plots", requirements=requirements, transfer_files=config.condor.transfer_files, ) for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items(): layer += Node( arguments = [ Option("user-tag", f"{inj_type}_INJECTION_PRECESSION"), Option("isolate-precession"), Option("plot-group", 1), *common_plot_args, ], inputs = [ Argument("input-files", noninj_trigger_cache.files + inj_triggers.files), Option("likelihood-file", post_pdf_cache.files), ], outputs = Option("output-dir", "plots"), ) dag.attach(layer)
[docs]def plot_sensitivity_layer(config, dag, trigger_cache): requirements = { "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit } layer = Layer( "gstlal_inspiral_plot_sensitivity", requirements=requirements, transfer_files=config.condor.transfer_files, ) # common options common_args = [ Option("tmp-space", dagutil.condor_scratch_space()), Option("veto-segments-name", "vetoes"), Option("bin-by-source-type"), Option("dist-bins", 200), Option("data-segments-name", "datasegments"), ] # split trigger cache into injections and non-injections grouped_triggers = trigger_cache.groupby("subtype") noninj_trigger_cache = grouped_triggers.pop("") inj_trigger_cache = DataCache( trigger_cache.name, list(itertools.chain(*[datacache.cache for datacache in grouped_triggers.values()])), ) layer += Node( arguments = [ Option("user-tag", "ALL_COMBINED"), *common_args, ], inputs = [ Option("zero-lag-database", noninj_trigger_cache.files), Argument("injection-database", inj_trigger_cache.files), ], outputs = Option("output-dir", "plots"), ) for inj_type, inj_triggers in inj_trigger_cache.groupby("subtype").items(): layer += Node( arguments = [ Option("user-tag", f"{inj_type}_INJECTIONS"), *common_args, ], inputs = [ Option("zero-lag-database", noninj_trigger_cache.files), Argument("injection-database", inj_triggers.files), ], outputs = Option("output-dir", "plots"), ) dag.attach(layer)
[docs]def plot_background_layer(config, dag, trigger_cache, post_pdf_cache): requirements = { "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit } layer = Layer( "gstlal_inspiral_plot_background", requirements=requirements, transfer_files=config.condor.transfer_files, ) non_inj_triggers = trigger_cache.groupby("subtype")[""] layer += Node( arguments = [ Option("user-tag", "ALL_COMBINED"), ], inputs = [ Option("database", non_inj_triggers.files), Argument("post-marg-file", post_pdf_cache.files), ], outputs = Option("output-dir", "plots"), ) dag.attach(layer)
[docs]def plot_bin_background_layer(config, dag, marg_dist_stat_cache): requirements = { "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit } layer = Layer( "gstlal_inspiral_plot_background", name="gstlal_inspiral_plot_bin_background", requirements=requirements, transfer_files=config.condor.transfer_files, ) for marg_dist_stats in marg_dist_stat_cache.chunked(PLOT_BIN_BACKGROUND_MAX_FILES): layer += Node( inputs = [ Argument("marg-files", marg_dist_stats.files), *add_ranking_stat_file_options(config, transfer_only=True), ], outputs = Option("output-dir", "plots"), ) dag.attach(layer)
[docs]def filter_online_layer(config, dag, svd_bank_cache, dist_stat_cache, zerolag_pdf_cache, marg_pdf_cache): layer = Layer( "gstlal_inspiral", requirements={ "request_cpus": 1, "request_memory": 5000, "request_disk": "2GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, retries=1000, ) # set up datasource options if config.source.data_source == "framexmit": datasource_opts = [ Option("data-source", "framexmit"), Option("framexmit-addr", dagutil.format_ifo_args(config.ifos, config.source.framexmit_addr)), Option("framexmit-iface", config.source.framexmit_iface), ] elif config.source.data_source == "lvshm": datasource_opts = [ Option("data-source", "lvshm"), Option("shared-memory-partition", dagutil.format_ifo_args(config.ifos, config.source.shared_memory_partition)), Option("shared-memory-block-size", config.source.shared_memory_block_size), Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration), ] elif config.source.data_source == "devshm": datasource_opts = [ Option("data-source", "devshm"), Option("shared-memory-dir", dagutil.format_ifo_args(config.ifos, config.source.shared_memory_dir)), Option("shared-memory-block-size", config.source.shared_memory_block_size), Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration), ] else: raise ValueError(f"data source = {config.source.data_source} not valid for online jobs") # set up common options common_opts = [ Option("track-psd"), Option("control-peak-time", 0), Option("psd-fft-length", config.psd.fft_length), Option("channel-name", dagutil.format_ifo_args(config.ifos, config.source.channel_name)), Option("state-channel-name", dagutil.format_ifo_args(config.ifos, config.source.state_channel_name)), Option("dq-channel-name", dagutil.format_ifo_args(config.ifos, config.source.dq_channel_name)), Option("state-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_on_bits)), Option("state-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_off_bits)), Option("dq-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_on_bits)), Option("dq-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_off_bits)), Option("tmp-space", dagutil.condor_scratch_space()), Option("coincidence-threshold", config.filter.coincidence_threshold), Option("fir-stride", config.filter.fir_stride), Option("min-instruments", config.min_ifos), Option("analysis-tag", config.tag), Option("gracedb-far-threshold", config.upload.gracedb_far_threshold), Option("gracedb-group", config.upload.gracedb_group), Option("gracedb-pipeline", config.upload.gracedb_pipeline), Option("gracedb-search", config.upload.gracedb_search), Option("gracedb-label", config.upload.gracedb_label), Option("gracedb-service-url", config.upload.gracedb_service_url), Option("far-trials-factor", config.upload.far_trials_factor), Option("likelihood-snapshot-interval", config.filter.likelihood_snapshot_interval), ] if config.services.kafka_server: common_opts.append(Option("output-kafka-server", config.services.kafka_server)), if config.upload.before_merger: common_opts.append(Option("upload-time-before-merger")) if config.upload.delay_uploads: common_opts.append(Option("delay-uploads")) if config.filter.cap_singles: common_opts.append(Option("cap-singles")) # set up activation counts if provided if config.filter.activation_counts_file: common_opts.append(Option("activation-counts-file", config.filter.activation_counts_file)) # compress ranking stat if requested if config.filter.compress_ranking_stat: common_opts.extend([ Option("compress-ranking-stat"), Option("compress-ranking-stat-threshold", config.filter.compress_ranking_stat_threshold), ]) # disable service discovery if using singularity if config.condor.singularity_image: common_opts.append(Option("disable-service-discovery")) if config.filter.verbose: common_opts.extend([ Option("verbose") ]) # if using idq, add idq channel names for each ifo if config.source.idq_channel_name: common_opts.append(Option("idq-channel-name", dagutil.format_ifo_args(config.source.idq_channel_name.keys(), config.source.idq_channel_name))) if config.source.idq_channel_name and config.filter.idq_gate_threshold: common_opts.append(Option("idq-gate-threshold", config.filter.idq_gate_threshold)) if config.source.idq_channel_name and config.source.idq_state_channel_name: common_opts.append(Option("idq-state-channel-name", dagutil.format_ifo_args(config.source.idq_state_channel_name.keys(), config.source.idq_state_channel_name))) dist_stats = dist_stat_cache.groupby("bin") zerolag_pdfs = zerolag_pdf_cache.groupby("bin") for svd_bin, svd_banks in svd_bank_cache.groupby("bin").items(): job_tag = f"{int(svd_bin):04d}_noninj" filter_opts = [ Option("job-tag", job_tag), Option("ht-gate-threshold", config.svd.stats.bins[svd_bin]["ht_gate_threshold"]), ] filter_opts.extend(common_opts) filter_opts.extend(datasource_opts) layer += Node( arguments = filter_opts, inputs = [ Option("svd-bank", svd_banks.files), Option("reference-psd", config.data.reference_psd), Option("time-slide-file", config.filter.time_slide_file), Option("ranking-stat-input", dist_stats[svd_bin].files), Option("ranking-stat-pdf", marg_pdf_cache.files), ], outputs = [ Option("output", "/dev/null"), Option("ranking-stat-output", dist_stats[svd_bin].files), Option("zerolag-rankingstat-pdf", zerolag_pdfs[svd_bin].files), ], ) dag.attach(layer)
[docs]def filter_injections_online_layer(config, dag, svd_bank_cache, dist_stat_cache, zerolag_pdf_cache, marg_pdf_cache): layer = Layer( "gstlal_inspiral", name = "gstlal_inspiral_inj", requirements={ "request_cpus": 1, "request_memory": 5000, "request_disk": "2GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, retries=1000, ) if config.source.data_source == "framexmit": datasource_opts = [ Option("data-source", "framexmit"), Option("framexmit-addr", dagutil.format_ifo_args(config.ifos, config.source.framexmit_addr)), Option("framexmit-iface", config.source.framexmit_iface), ] elif config.source.data_source == "lvshm": datasource_opts = [ Option("data-source", "lvshm"), Option("shared-memory-partition", dagutil.format_ifo_args(config.ifos, config.source.shared_memory_partition)), Option("shared-memory-block-size", config.source.shared_memory_block_size), Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration), ] elif config.source.data_source == "devshm": datasource_opts = [ Option("data-source", "devshm"), Option("shared-memory-dir", dagutil.format_ifo_args(config.ifos, config.source.inj_shared_memory_dir)), Option("shared-memory-block-size", config.source.shared_memory_block_size), Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration), ] else: raise ValueError(f"data source = {config.source.data_source} not valid for online jobs") channels = dagutil.format_ifo_args(config.ifos, config.source.inj_channel_name) if config.source.inj_channel_name else dagutil.format_ifo_args(config.ifos, config.source.channel_name) common_opts = [ Option("track-psd"), Option("control-peak-time", 0), Option("psd-fft-length", config.psd.fft_length), Option("channel-name", channels), Option("state-channel-name", dagutil.format_ifo_args(config.ifos, config.source.state_channel_name)), Option("dq-channel-name", dagutil.format_ifo_args(config.ifos, config.source.dq_channel_name)), Option("state-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_on_bits)), Option("state-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.state_vector_off_bits)), Option("dq-vector-on-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_on_bits)), Option("dq-vector-off-bits", dagutil.format_ifo_args(config.ifos, config.source.dq_vector_off_bits)), Option("tmp-space", dagutil.condor_scratch_space()), Option("coincidence-threshold", config.filter.coincidence_threshold), Option("fir-stride", config.filter.fir_stride), Option("min-instruments", config.min_ifos), Option("analysis-tag", config.tag), Option("gracedb-far-threshold", config.upload.gracedb_far_threshold), Option("gracedb-group", config.upload.gracedb_group), Option("gracedb-pipeline", config.upload.gracedb_pipeline), Option("gracedb-search", config.upload.gracedb_search), Option("gracedb-label", config.upload.gracedb_label), Option("gracedb-service-url", config.upload.gracedb_service_url), Option("far-trials-factor", config.upload.far_trials_factor), Option("likelihood-snapshot-interval", config.filter.likelihood_snapshot_interval), Option("injections") ] if config.services.kafka_server: common_opts.append(Option("output-kafka-server", config.services.kafka_server)) if config.upload.before_merger: common_opts.append(Option("upload-time-before-merger")) if config.upload.delay_uploads: common_opts.append(Option("delay-uploads")) if config.filter.cap_singles: common_opts.append(Option("cap-singles")) if config.filter.activation_counts_file: common_opts.append(Option("activation-counts-file", config.filter.activation_counts_file)) if config.filter.compress_ranking_stat: common_opts.extend([ Option("compress-ranking-stat"), Option("compress-ranking-stat-threshold", config.filter.compress_ranking_stat_threshold), ]) if config.condor.singularity_image: common_opts.append(Option("disable-service-discovery")) if config.filter.verbose: common_opts.extend([ Option("verbose") ]) #if using idq, add channel names for each ifo if config.source.idq_channel_name: common_opts.append(Option("idq-channel-name", dagutil.format_ifo_args(config.source.idq_channel_name.keys(), config.source.idq_channel_name))) if config.source.idq_channel_name and config.filter.idq_gate_threshold: common_opts.append(Option("idq-gate-threshold", config.filter.idq_gate_threshold)) if config.source.idq_channel_name and config.source.idq_state_channel_name: common_opts.append(Option("idq-state-channel-name", dagutil.format_ifo_args(config.source.idq_state_channel_name.keys(), config.source.idq_state_channel_name))) dist_stats = dist_stat_cache.groupby("bin") zerolag_pdfs = zerolag_pdf_cache.groupby("bin") for svd_bin, svd_banks in svd_bank_cache.groupby("bin").items(): filter_opts = [ Option("ht-gate-threshold", config.svd.stats.bins[svd_bin]["ht_gate_threshold"]), ] filter_opts.extend(common_opts) filter_opts.extend(datasource_opts) for inj_idx, inj_name in enumerate(config.filter.injections.keys(), start=1): if config.filter.injections[inj_name].file: injection_file = config.filter.injections[inj_name].file else: injection_file = None inj_job_tag = f"{int(svd_bin):04d}_inj_{inj_name}" injection_opts = [ Option("job-tag", inj_job_tag), *filter_opts, ] injection_inputs = [ Option("svd-bank", svd_banks.files), Option("reference-psd", config.data.reference_psd), Option("time-slide-file", config.filter.injection_time_slide_file), Option("ranking-stat-input", dist_stats[svd_bin].files), Option("ranking-stat-pdf", marg_pdf_cache.files), ] if injection_file: injection_inputs.extend([Option("injection-file", injection_file)]) layer += Node( arguments = injection_opts, inputs = injection_inputs, outputs = Option("output", "/dev/null"), ) dag.attach(layer)
[docs]def marginalize_online_layer(config, dag, marg_pdf_cache): layer = Layer( "gstlal_inspiral_marginalize_likelihoods_online", requirements={ "request_cpus": 2, "request_memory": 4000, "request_disk": "5GB", **config.condor.submit}, transfer_files=config.condor.transfer_files, retries=1000, ) registries = list(f"{int(svd_bin):04d}_noninj_registry.txt" for svd_bin in config.svd.bins) arguments = [ Option("registry", registries), Option("output", list(marg_pdf_cache.files)), Option("output-kafka-server", config.services.kafka_server), Option("tag", config.tag), Option("verbose"), ] layer += Node(arguments = arguments) dag.attach(layer)
[docs]def upload_events_layer(config, dag): layer = Layer( "gstlal_ll_inspiral_event_uploader", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, retries=1000, ) input_topics = ["events", "inj_events"] if config.upload.enable_injection_uploads else ["events"] for input_topic in input_topics: layer += Node( arguments = [ Option("kafka-server", config.services.kafka_server), Option("gracedb-group", config.upload.gracedb_group), Option("gracedb-pipeline", config.upload.gracedb_pipeline), Option("gracedb-search", config.upload.gracedb_search), Option("gracedb-service-url", config.upload.gracedb_service_url), Option("far-threshold", config.upload.aggregator_far_threshold), Option("far-trials-factor", config.upload.aggregator_far_trials_factor), Option("upload-cadence-type", config.upload.aggregator_cadence_type), Option("upload-cadence-factor", config.upload.aggregator_cadence_factor), Option("num-jobs", len(config.svd.bins)), Option("tag", config.tag), Option("input-topic", input_topic), Option("rootdir", "event_uploader"), Option("verbose"), Option("scald-config", config.metrics.scald_config), ], ) dag.attach(layer)
[docs]def upload_pastro_layer(config, dag, marg_pdf_cache): layer = Layer( "gstlal_ll_inspiral_pastro_uploader", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, retries=1000, ) input_topics = ["uploads", "inj_uploads"] if config.upload.enable_injection_uploads else ["uploads"] for input_topic in input_topics: for model, options in config.pastro.items(): arguments = [ Option("kafka-server", config.services.kafka_server), Option("gracedb-service-url", config.upload.gracedb_service_url), Option("tag", config.tag), Option("input-topic", input_topic), Option("model-name", model), Option("pastro-filename", options.upload_file), Option("pastro-model-file", options.mass_model), Option("rank-stat", marg_pdf_cache.files), Option("verbose"), ] if options.update_model_cadence: arguments.append(Option("update-model-cadence", options.update_model_cadence)) layer += Node( arguments = arguments ) dag.attach(layer)
[docs]def plot_events_layer(config, dag): layer = Layer( "gstlal_ll_inspiral_event_plotter", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, retries=1000, ) upload_topics = ["uploads", "inj_uploads"] if config.upload.enable_injection_uploads else ["uploads"] ranking_stat_topics = ["ranking_stat", "inj_ranking_stat"] if config.upload.enable_injection_uploads else ["ranking_stat"] to_upload = ("RANKING_DATA", "RANKING_PLOTS", "SNR_PLOTS", "PSD_PLOTS", "DTDPHI_PLOTS") for upload_topic, ranking_stat_topic in zip(upload_topics, ranking_stat_topics): for upload in to_upload: layer += Node( arguments = [ Option("kafka-server", config.services.kafka_server), Option("upload-topic", upload_topic), Option("ranking-stat-topic", ranking_stat_topic), Option("gracedb-group", config.upload.gracedb_group), Option("gracedb-pipeline", config.upload.gracedb_pipeline), Option("gracedb-search", config.upload.gracedb_search), Option("gracedb-service-url", config.upload.gracedb_service_url), Option("tag", config.tag), Option("plot", upload), Option("verbose"), ], ) dag.attach(layer)
[docs]def count_events_layer(config, dag, dist_stat_cache): layer = Layer( "gstlal_ll_inspiral_trigger_counter", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, retries=1000, ) zerolag_pdf = DataCache.generate(DataType.ZEROLAG_DIST_STAT_PDFS, config.all_ifos) layer += Node( arguments = [ Option("kafka-server", config.services.kafka_server), Option("gracedb-pipeline", config.upload.gracedb_pipeline), Option("gracedb-search", config.upload.gracedb_search), Option("output-period", 300), Option("topic", "coinc"), Option("tag", config.tag), Option("bootstrap", dist_stat_cache.files[0]), ], outputs = Option("output", zerolag_pdf.files), ) dag.attach(layer)
[docs]def collect_metrics_layer(config, dag): requirements = { "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit } event_layer = Layer( "scald", name="scald_event_collector", requirements=requirements, transfer_files=config.condor.transfer_files, retries=1000, ) metric_leader_layer = Layer( "scald", name="scald_metric_collector_leader", requirements=requirements, transfer_files=config.condor.transfer_files, retries=1000, ) # set up common options common_opts = [ Argument("command", "aggregate"), Option("config", config.metrics.scald_config), Option("uri", f"kafka://{config.tag}-collect@{config.services.kafka_server}"), ] # set topic_prefix to distinguish inj and noninj topics topic_prefix = ['', 'inj_'] if config.filter.injections else [''] # define metrics used for aggregation jobs snr_metrics = [f"{prefix}{ifo}_snr_history" for ifo in config.ifos for prefix in topic_prefix] network_metrics = [] for prefix, topic in list(itertools.product(topic_prefix, ('likelihood_history', 'snr_history', 'latency_history', 'far_history'))): network_metrics.append(f"{prefix}{topic}") heartbeat_metrics = [] for prefix, topic in list(itertools.product(topic_prefix, ('uptime', 'event_uploader_heartbeat', 'event_plotter_heartbeat', 'pastro_uploader_heartbeat'))): heartbeat_metrics.append(f"{prefix}{topic}") heartbeat_metrics.append('marginalize_likelihoods_online_heartbeat') heartbeat_metrics.append('trigger_counter_heartbeat') state_metrics = [f"{prefix}{ifo}_strain_dropped" for ifo in config.ifos for prefix in topic_prefix] usage_metrics = [f"{prefix}ram_history" for prefix in topic_prefix] latency_metrics = [f"{prefix}{ifo}_{stage}_latency" for ifo in config.ifos for stage in ("datasource", "whitening", "snrSlice") for prefix in topic_prefix] latency_metrics.extend([f"{prefix}all_itacac_latency" for prefix in topic_prefix]) agg_metrics = list(itertools.chain(snr_metrics, network_metrics, usage_metrics, state_metrics, latency_metrics, heartbeat_metrics)) gates = [f"{gate}segments" for gate in ("statevector", "dqvector", "whiteht")] seg_metrics = [f"{prefix}{ifo}_{gate}" for ifo in config.ifos for gate in gates for prefix in topic_prefix] # set up partitioning: # rough estimate of the number of topics each scald job can consume. # assume 1000 Hz as max msg rate that scald can keep up with. # assume each inspiral job sends metrics at 1 Hz. Then topics per # job is msg rate / number of jobs max_msg_per_sec = 1000 num_jobs = len(config.svd.bins) topics_per_job = max(math.floor(max_msg_per_sec / num_jobs), 1) # timeseries metrics agg_metrics = dagutil.groups(agg_metrics, topics_per_job) seg_metrics = dagutil.groups(seg_metrics, topics_per_job) for metrics in itertools.chain(agg_metrics, seg_metrics): # add jobs to consume each metric arguments = list(common_opts) topics = [] schemas = [] for metric in metrics: if "latency_history" in metric: # for latency history we want to # aggregate by max and median so # we need two schemas for aggfunc in ('max', 'median'): topics.append(f'gstlal.{config.tag}.{metric}') schemas.append(f'{metric}_{aggfunc}') else: topics.append(f'gstlal.{config.tag}.{metric}') schemas.append(metric) arguments.extend([ Option("data-type", "timeseries"), Option("topic", topics), Option("schema", schemas), ]) arguments.append(Option("across-jobs")) metric_leader_layer += Node(arguments=arguments) dag.attach(metric_leader_layer) # event metrics schemas = ["coinc", "inj_coinc"] if config.filter.injections else ["coinc"] for schema in schemas: event_arguments = list(common_opts) event_arguments.extend([ Option("data-type", "triggers"), Option("topic", f"gstlal.{config.tag}.{schema}"), Option("schema", schema), Option("uri", f"kafka://{config.tag}-collect@{config.services.kafka_server}"), ]) event_layer += Node(arguments=event_arguments) dag.attach(event_layer)
[docs]def track_noise_layer(config, dag): layer = Layer( "gstlal_ll_dq", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, retries=1000, ) inj_layer = Layer( "gstlal_ll_dq", name = "gstlal_ll_dq_inj", requirements={ "request_cpus": 1, "request_memory": 2000, "request_disk": "1GB", **config.condor.submit }, transfer_files=config.condor.transfer_files, retries=1000, ) for ifo in config.ifos: channel_names = [config.source.channel_name[ifo]] if config.filter.injections and ifo in config.source.inj_channel_name.keys(): channel_names.append(config.source.inj_channel_name[ifo]) for channel in channel_names: # set up datasource options if config.source.data_source == "framexmit": arguments = [ Option("data-source", "framexmit"), Option("framexmit-addr", f"{ifo}={config.source.framexmit_addr[ifo]}"), Option("framexmit-iface", config.source.framexmit_iface), ] elif config.source.data_source == "lvshm": arguments = [ Option("data-source", "lvshm"), Option("shared-memory-partition", f"{ifo}={config.source.shared_memory_partition[ifo]}"), Option("shared-memory-block-size", config.source.shared_memory_block_size), Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration), ] elif config.source.data_source == "devshm": if config.filter.injections and channel in config.source.inj_channel_name.values(): memory_location = config.source.inj_shared_memory_dir[ifo] else: memory_location = config.source.shared_memory_dir[ifo] arguments = [ Option("data-source", "devshm"), Option("shared-memory-dir", f"{ifo}={memory_location}"), Option("shared-memory-block-size", config.source.shared_memory_block_size), Option("shared-memory-assumed-duration", config.source.shared_memory_assumed_duration), ] else: raise ValueError(f"data source = {config.source.data_source} not valid for online jobs") arguments.extend([ Option("analysis-tag", config.tag), Option("psd-fft-length", config.psd.fft_length), Option("channel-name", f"{ifo}={channel}"), Option("state-channel-name", f"{ifo}={config.source.state_channel_name[ifo]}"), Option("dq-channel-name", f"{ifo}={config.source.dq_channel_name[ifo]}"), Option("state-vector-on-bits", f"{ifo}={config.source.state_vector_on_bits[ifo]}"), Option("state-vector-off-bits", f"{ifo}={config.source.state_vector_off_bits[ifo]}"), Option("dq-vector-on-bits", f"{ifo}={config.source.dq_vector_on_bits[ifo]}"), Option("dq-vector-off-bits", f"{ifo}={config.source.dq_vector_off_bits[ifo]}"), Option("reference-psd", config.data.reference_psd), Option("scald-config", config.metrics.scald_config) ]) if config.services.kafka_server: arguments.extend([Option("output-kafka-server", config.services.kafka_server)]) if config.filter.injections and channel in config.source.inj_channel_name.values(): arguments.extend([Option("injection-channel")]) inj_layer += Node(arguments = arguments) else: layer += Node(arguments = arguments) dag.attach(layer) if config.filter.injections: dag.attach(inj_layer)
[docs]def add_ranking_stat_file_options(config, svd_bin=None, transfer_only=False): """ Return a list of options relating to files used for terms in the ranking statistic, including: * dtdphi * iDQ timeseries if transfer_only is True, do not add options to programs, instead, only use these files as inputs for Condor file transfer. This is required for jobs that require these files, as their paths are tracked through the ranking stat, so Condor needs to be aware of these files when not relying on a shared file system. """ if transfer_only: kwargs = {"track": False, "suppress": True} else: kwargs = {} if svd_bin is None: inputs = [Option("mass-model-file%d" % i, mass_model_file, **kwargs) for i, mass_model_file in enumerate(set(stats_bin["mass_model_file"] for stats_bin in config.svd.stats.bins.values()))] if config.prior.idq_timeseries: inputs.extend(Option("idq-file%d" % i, idq_file, **kwargs) for i, idq_file in enumerate(set(stats_bin["idq_file"] for stats_bin in config.svd.stats.bins.values()))) inputs.extend(Option("dtdphi-file%d" % i, dtdphi_file, **kwargs) for i, dtdphi_file in enumerate(set(stats_bin["dtdphi_file"] for stats_bin in config.svd.stats.bins.values()))) else: stats_bin = config.svd.stats.bins[svd_bin] inputs = [Option("mass-model-file", stats_bin["mass_model_file"], **kwargs)] if config.prior.idq_timeseries: inputs.extend([Option("idq-file", stats_bin["idq_file"], **kwargs)]) inputs.extend([Option("dtdphi-file", stats_bin["dtdphi_file"], **kwargs)]) return inputs
[docs]def mchirp_range_to_bins(min_mchirp, max_mchirp, svd_metadata): """ Given a range of chirp masses and the SVD metadata, determine and return the SVD bins that overlap. """ svd_bins = [] mchirp_range = segment(min_mchirp, max_mchirp) for svd_bin, bin_metadata in svd_metadata["bins"].items(): bin_range = segment(bin_metadata["min_mchirp"], bin_metadata["max_mchirp"]) if mchirp_range.intersects(bin_range): svd_bins.append(svd_bin) return svd_bins
[docs]def ifo_to_string(ifos): """Given a list of ifos, converts this to a string. """ return "".join(sorted(list(ifos)))
[docs]def time_to_num_split_injections(span, time_per_split=DEFAULT_SPLIT_INJECTION_TIME): """Determine how many injection splits given analysis start/end times. """ return max(int(abs(span)) // time_per_split, 1)
[docs]@plugins.register def layers(): return { "split_bank": split_bank_layer, "svd_bank": svd_bank_layer, "checkerboard": checkerboard_layer, "filter": filter_layer, "filter_injections": filter_injections_layer, "aggregate": aggregate_layer, "create_prior": create_prior_layer, "calc_pdf": calc_pdf_layer, "marginalize": marginalize_layer, "marginalize_pdf": marginalize_pdf_layer, "calc_likelihood": calc_likelihood_layer, "cluster": cluster_layer, "compute_far": compute_far_layer, "split_injections": split_injections_layer, "find_injections": find_injections_layer, "match_injections": match_injections_layer, "calc_expected_snr": calc_expected_snr_layer, "measure_lnlr_cdf": measure_lnlr_cdf_layer, "plot_analytic_vt": plot_analytic_vt_layer, "plot_horizon_distance": plot_horizon_distance_layer, "plot_summary": plot_summary_layer, "plot_background": plot_background_layer, "plot_bin_background": plot_bin_background_layer, "plot_sensitivity": plot_sensitivity_layer, "filter_online": filter_online_layer, "filter_injections_online": filter_injections_online_layer, "marginalize_online": marginalize_online_layer, "upload_events": upload_events_layer, "upload_pastro": upload_pastro_layer, "plot_events": plot_events_layer, "count_events": count_events_layer, "collect_metrics": collect_metrics_layer, "track_noise": track_noise_layer, }