Source code for lloidhandler

# Copyright (C) 2009--2013  Kipp Cannon, Chad Hanna, Drew Keppel
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

##
# @file
#
# gstlal_inspiral's GStreamer pipeline handler.
#
#
# Review Status
#
# migrated from lloidparts.py 2018-06-15
#
# | Names                                 | Hash                                     | Date       | Diff to Head of Master      |
# | ------------------------------------- | ---------------------------------------- | ---------- | --------------------------- |
# | Sathya, Duncan Me, Jolien, Kipp, Chad | 2f5f73f15a1903dc7cc4383ef30a4187091797d1 | 2014-05-02 | <a href="@gstlal_inspiral_cgit_diff/python/lloidparts.py?id=HEAD&id2=2f5f73f15a1903dc7cc4383ef30a4187091797d1">lloidparts.py</a> |
#
#

##
# @package lloidparts
#
# gstlal_inspiral's GStreamer pipeline handler.
#


#
# =============================================================================
#
#                                   Preamble
#
# =============================================================================
#


from collections import defaultdict, deque
try:
	from fpconst import NaN
	from fpconst import PosInf
except ImportError:
	# fpconst is not part of the standard library and might not be
	# available
	NaN = float("nan")
	PosInf = float("+inf")
import itertools
import math
import numpy
import os
import resource
from scipy.interpolate import interp1d
import io
import sys
import threading
import time
import shutil
import json


import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)


from ligo.lw import ligolw
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import process as ligolw_process
from ligo.lw.utils import segments as ligolw_segments
import lal
from lal import LIGOTimeGPS
from lal import rate
from lal.utils import CacheEntry
from ligo import segments


try:
	import bottle
	if tuple(map(int, bottle.__version__.split("."))) < (0, 13):
		# FIXME:  see
		# https://git.ligo.org/lscsoft/gstlal/-/merge_requests/146
		# if the required patch is added to the distro-supplied
		# bottle before a 0.13 is released, update the version
		# check to the correct version
		raise ImportError
except ImportError:
	# FIXME:  remove after system-wide isntall can be relied on
	from gstlal import bottle
from gstlal import far
from gstlal import inspiral
from gstlal import p_astro_gstlal
from gstlal import pipeio
from gstlal import streamthinca


#
# =============================================================================
#
#                                     Misc
#
# =============================================================================
#


[docs]def subdir_from_T050017_filename(fname): path = str(CacheEntry.from_T050017(fname).segment[0])[:5] try: os.mkdir(path) except OSError: pass return path
# # ============================================================================= # # Web Eye Candy # # ============================================================================= #
[docs]class EyeCandy(object): def __init__(self, instruments, kafka_server, analysis_tag, job_tag, stream, segmentstracker, latencytracker, f_final, node_id=None): self.kafka_server = kafka_server self.analysis = analysis_tag if job_tag: # online jobs have job tag of the form # {SVDBIN}_inj_{TAG} or {SVDBIN}_noninj self.tag = [job_tag.split('_')[0], '_'.join(job_tag.split('_')[1:]), str(f_final)] if 'noninj' in self.tag[1]: self.topic_prefix = '' else: self.topic_prefix = 'inj_' else: # offline jobs dont use job tags self.tag = [] self.topic_prefix = '' self.node_id = node_id self.gate_history = segmentstracker.gate_history if latencytracker: self.pipeline_latency_history = latencytracker.pipeline_history else: self.pipeline_latency_history = None self.latency_histogram = rate.BinnedArray(rate.NDBins((rate.LinearPlusOverflowBins(5, 205, 22),))) # NOTE most of this data is collected at 1Hz, thus a 300 # element deque should hold about 5 minutes of history. # Keeping the deque short is desirable for efficiency in # downloads, but it could mean that data is lost (though the # snapshot every ~4 hours will contain the information in # general) self.latency_history = deque(maxlen = 300) self.snr_history = deque(maxlen = 300) self.likelihood_history = deque(maxlen = 300) self.far_history = deque(maxlen = 300) self.ram_history = deque(maxlen = 2) self.ifo_snr_history = dict((instrument, deque(maxlen = 300)) for instrument in instruments) self.strain = {} for instrument in instruments: name = "%s_strain_audiorate" % instrument elem = stream.get_element_by_name(name) if elem is not None: self.strain[instrument] = elem self.time_since_last_state = None # # setup bottle routes # bottle.route("/latency_histogram.txt")(self.web_get_latency_histogram) bottle.route("/latency_history.txt")(self.web_get_latency_history) bottle.route("/snr_history.txt")(self.web_get_snr_history) if "H1" in instruments: bottle.route("/H1_snr_history.txt")(self.web_get_H1_snr_history) if "L1" in instruments: bottle.route("/L1_snr_history.txt")(self.web_get_L1_snr_history) if "V1" in instruments: bottle.route("/V1_snr_history.txt")(self.web_get_V1_snr_history) bottle.route("/likelihood_history.txt")(self.web_get_likelihood_history) bottle.route("/far_history.txt")(self.web_get_far_history) bottle.route("/ram_history.txt")(self.web_get_ram_history) # # Setup kafka client # if self.kafka_server is not None: from ligo.scald.io import kafka snr_routes = [f"{self.topic_prefix}{ifo}_snr_history" for ifo in instruments] network_routes = [f"{self.topic_prefix}likelihood_history", f"{self.topic_prefix}snr_history", f"{self.topic_prefix}latency_history"] state_routes = [f"{self.topic_prefix}{ifo}_strain_dropped" for ifo in instruments] usage_routes = [f"{self.topic_prefix}ram_history"] gates = [f"{self.topic_prefix}{gate}segments" for gate in ("statevector", "dqvector", "whiteht")] seg_routes = [f"{self.topic_prefix}{ifo}_{gate}" for ifo in instruments for gate in gates] instrument_latency_routes = [f"{self.topic_prefix}{ifo}_{stage}_latency" for ifo in instruments for stage in ["datasource", "whitening", "snrSlice"]] pipeline_latency_routes = [f"{self.topic_prefix}_all_{stage}_latency" for stage in ["itacac"]] heartbeat_routes = [f"gstlal.{self.analysis}.{self.topic_prefix}events", f"{self.topic_prefix}uptime"] topics = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, seg_routes, instrument_latency_routes, pipeline_latency_routes, heartbeat_routes)) self.client = kafka.Client("kafka://{}".format(self.kafka_server)) self.client.subscribe([f"gstlal.{self.analysis}.{topic}" for topic in topics]) self.heartbeat_time = 0.0 self.startup_time = inspiral.now() else: self.client = None # FIXME, it is silly to store kafka data like this since we # have all the other data structures, but since we are also # maintaining the bottle route methods, we should keep this a # bit separate for now to not disrupt too much. self.kafka_data = defaultdict(lambda: {'time': [], 'data': []}) self.kafka_data[f"{self.topic_prefix}coinc"] = []
[docs] def update(self, events, last_coincs): self.ram_history.append((float(lal.UTCToGPS(time.gmtime())), (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss) / 1048576.)) # GB if events: maxevents = {} for event in events: if (event.ifo not in maxevents) or (event.snr > maxevents[event.ifo].snr): maxevents[event.ifo] = event for ifo, event in maxevents.items(): t, snr = float(event.end), event.snr self.ifo_snr_history[ifo].append((t, snr)) if self.client is not None: self.kafka_data[f"{self.topic_prefix}{ifo}_snr_history"]["time"].append(t) self.kafka_data[f"{self.topic_prefix}{ifo}_snr_history"]["data"].append(snr) if last_coincs: coinc_inspiral_index = last_coincs.coinc_inspiral_index coinc_event_index = last_coincs.coinc_event_index sngl_inspiral_index = last_coincs.sngl_inspiral_index coinc_dict_list = [] for coinc_event_id in coinc_event_index: coinc_dict = {} for attr in ("combined_far", "snr", "false_alarm_rate"): try: coinc_dict[attr] = float(getattr(coinc_inspiral_index[coinc_event_id], attr)) except TypeError as e: pass#print >>sys.stderr, e, attr, getattr(coinc_inspiral_index[coinc_event_id], attr) coinc_dict["end"] = float(coinc_inspiral_index[coinc_event_id].end) for attr in ("likelihood",): try: coinc_dict[attr] = float(getattr(coinc_event_index[coinc_event_id], attr)) except TypeError as e: pass#print >>sys.stderr, e, attr, getattr(coinc_event_index[coinc_event_id], attr) for sngl_row in sngl_inspiral_index[coinc_event_id]: for attr in ("snr", "chisq", "mass1", "mass2", "spin1z", "spin2z", "coa_phase"): coinc_dict["%s_%s" % (sngl_row.ifo, attr)] = float(getattr(sngl_row, attr)) coinc_dict["%s_end" % sngl_row.ifo] = float(sngl_row.end) coinc_dict_list.append(coinc_dict) self.kafka_data[f"{self.topic_prefix}coinc"].extend(coinc_dict_list) for coinc_inspiral in coinc_inspiral_index.values(): # latency in .minimum_duration # FIXME: update when a proper column is available self.latency_histogram[coinc_inspiral.minimum_duration,] += 1 # latency in .minimum_duration # FIXME: update when a proper column is available max_latency, max_latency_t = max((coinc_inspiral.minimum_duration, float(coinc_inspiral.end)) for coinc_inspiral in coinc_inspiral_index.values()) self.latency_history.append((max_latency_t, max_latency)) max_snr, max_snr_t = max((coinc_inspiral.snr, float(coinc_inspiral.end)) for coinc_inspiral in coinc_inspiral_index.values()) self.snr_history.append((max_snr_t, max_snr)) max_likelihood, max_likelihood_t, max_likelihood_far = max((coinc_event_index[coinc_event_id].likelihood, float(coinc_inspiral.end), coinc_inspiral.combined_far) for coinc_event_id, coinc_inspiral in coinc_inspiral_index.items()) if max_likelihood is not None: self.likelihood_history.append((max_likelihood_t, max_likelihood)) if max_likelihood_far is not None: self.far_history.append((max_likelihood_t, max_likelihood_far)) if self.client is not None: for ii, column in enumerate(["time", "data"]): self.kafka_data[f"{self.topic_prefix}latency_history"][column].append(float(self.latency_history[-1][ii])) self.kafka_data[f"{self.topic_prefix}snr_history"][column].append(float(self.snr_history[-1][ii])) if max_likelihood is not None: self.kafka_data[f"{self.topic_prefix}likelihood_history"]["time"].append(float(max_likelihood_t)) self.kafka_data[f"{self.topic_prefix}likelihood_history"]["data"].append(float(max_likelihood)) if max_likelihood_far is not None: self.kafka_data[f"{self.topic_prefix}far_history"]["time"].append(float(max_likelihood_t)) self.kafka_data[f"{self.topic_prefix}far_history"]["data"].append(float(max_likelihood_far)) t = inspiral.now() if self.time_since_last_state is None: self.time_since_last_state = t # send state/segment information to kafka every second if self.client is not None and (t - self.time_since_last_state) >= 1: self.time_since_last_state = t for ii, column in enumerate(["time", "data"]): self.kafka_data[f"{self.topic_prefix}ram_history"][column].append(float(self.ram_history[-1][ii])) # collect gate segments for gate in self.gate_history.keys(): for instrument, seg_history in self.gate_history[gate].items(): if not seg_history: continue # get on/off points, add point at +inf gate_interp_times, gate_interp_onoff = zip(*seg_history) gate_interp_times = list(gate_interp_times) gate_interp_times.append(2000000000) gate_interp_onoff = list(gate_interp_onoff) gate_interp_onoff.append(gate_interp_onoff[-1]) # regularly sample from on/off points gate_times = numpy.arange(int(self.time_since_last_state), int(t + 1), 0.25) gate_onoff = interp1d(gate_interp_times, gate_interp_onoff, kind='zero')(gate_times) self.kafka_data[f"{self.topic_prefix}{instrument}_{gate}"]["time"].extend([t for t in gate_times if t >= self.time_since_last_state]) self.kafka_data[f"{self.topic_prefix}{instrument}_{gate}"]["data"].extend([state for t, state in zip(gate_times, gate_onoff) if t >= self.time_since_last_state]) # collect any new latencies if self.pipeline_latency_history: for stage in self.pipeline_latency_history.keys(): for instrument in self.pipeline_latency_history[stage].keys(): while (self.pipeline_latency_history[stage][instrument]): data = self.pipeline_latency_history[stage][instrument].pop() self.kafka_data[f"{self.topic_prefix}{instrument}_{stage}_latency"]["time"].append(data[0]) self.kafka_data[f"{self.topic_prefix}{instrument}_{stage}_latency"]["data"].append(data[1]) # collect strain dropped samples for instrument, elem in self.strain.items(): # I know the name is strain_drop even though it # comes from the "add" property. that is # because audiorate has to "add" samples when # data is dropped. # FIXME don't hard code the rate self.kafka_data[f"{self.topic_prefix}{instrument}_strain_dropped"]["time"].append(float(t)) self.kafka_data[f"{self.topic_prefix}{instrument}_strain_dropped"]["data"].append(elem.get_property("add") / 16384.) # collect uptime of this job self.kafka_data[f"{self.topic_prefix}uptime"]["time"].append(float(t)) self.kafka_data[f"{self.topic_prefix}uptime"]["data"].append(float(inspiral.now() - self.startup_time)) # Send all of the kafka messages and clear the data #self.producer.send(self.tag, self.kafka_data) for route in self.kafka_data.keys(): self.client.write(f"gstlal.{self.analysis}.{route}", self.kafka_data[route], tags=self.tag) #send heartbeat messages if t - self.heartbeat_time > 10*60: self.heartbeat_time = int(t) self.client.write(f"gstlal.{self.analysis}.{self.topic_prefix}events", {'time': self.heartbeat_time}, tags='heartbeat') # This line forces the send but is blocking!! not the # best idea for production running since we value # latency over getting metric data out #self.producer.flush() for route in self.kafka_data.keys(): self.kafka_data[route] = {'time': [], 'data': []} self.kafka_data[f"{self.topic_prefix}coinc"] = []
[docs] def web_get_latency_histogram(self): with self.lock: for latency, number in zip(self.latency_histogram.centres()[0][1:-1], self.latency_histogram.array[1:-1]): yield "%e %e\n" % (latency, number)
[docs] def web_get_latency_history(self): with self.lock: # first one in the list is sacrificed for a time stamp for time, latency in self.latency_history: yield "%f %e\n" % (time, latency)
[docs] def web_get_snr_history(self): with self.lock: # first one in the list is sacrificed for a time stamp for time, snr in self.snr_history: yield "%f %e\n" % (time, snr)
[docs] def web_get_H1_snr_history(self): with self.lock: # first one in the list is sacrificed for a time stamp for time, snr in self.ifo_snr_history["H1"]: yield "%f %e\n" % (time, snr)
[docs] def web_get_L1_snr_history(self): with self.lock: # first one in the list is sacrificed for a time stamp for time, snr in self.ifo_snr_history["L1"]: yield "%f %e\n" % (time, snr)
[docs] def web_get_V1_snr_history(self): with self.lock: # first one in the list is sacrificed for a time stamp for time, snr in self.ifo_snr_history["V1"]: yield "%f %e\n" % (time, snr)
[docs] def web_get_likelihood_history(self): with self.lock: # first one in the list is sacrificed for a time stamp for time, like in self.likelihood_history: yield "%f %e\n" % (time, like)
[docs] def web_get_far_history(self): with self.lock: # first one in the list is sacrificed for a time stamp for time, far in self.far_history: yield "%f %e\n" % (time, far)
[docs] def web_get_ram_history(self): with self.lock: # first one in the list is sacrificed for a time stamp for time, ram in self.ram_history: yield "%f %e\n" % (time, ram)
# # ============================================================================= # # Segmentlist Tracker # # ============================================================================= #
[docs]class SegmentsTracker(object): def __init__(self, stream, instruments, segment_history_duration = LIGOTimeGPS(2592000), verbose = False): self.lock = threading.Lock() self.verbose = verbose # setup segment list collection from gates # # FIXME: knowledge of what gates are present in what # configurations, what they are called, etc., somehow needs to live # with the code that constructs the gates # # FIXME: in the offline pipeline, state vector segments # don't get recorded. however, except for the h(t) gate # segments these are all inputs to the pipeline so it # probably doesn't matter. nevertheless, they maybe should # go into the event database for completeness of that # record, or maybe not because it could result in a lot of # duplication of on-disk data. who knows. think about it. gate_suffix = { # FIXME uncomment the framesegments line once the # online analysis has a frame segments gate #"framesegments": "frame_segments_gate", "statevectorsegments": "state_vector_gate", "dqvectorsegments": "dq_vector_gate", "whitehtsegments": "ht_gate", "idqsegments": "idq_gate" } # dictionary mapping segtype to segmentlist dictionary # mapping instrument to segment list self.seglistdicts = dict((segtype, segments.segmentlistdict((instrument, segments.segmentlist()) for instrument in instruments)) for segtype in gate_suffix) # create a copy to keep track of recent segment history self.recent_segment_histories = self.seglistdicts.copy() self.segment_history_duration = segment_history_duration # recent gate history encoded in on/off bits self.gate_history = {segtype: {instrument: deque(maxlen = 20) for instrument in instruments} for segtype in gate_suffix} # iterate over segment types and instruments, look for the # gate element that should provide those segments, and # connect handlers to collect the segments if verbose: print("connecting segment handlers to gates ...", file=sys.stderr) for segtype, seglistdict in self.seglistdicts.items(): for instrument in seglistdict: try: name = "%s_%s" % (instrument, gate_suffix[segtype]) except KeyError: # this segtype doesn't come from # gate elements continue elem = stream.get_element_by_name(name) if elem is None: # ignore missing gate elements if verbose: print("\tcould not find %s for %s '%s'" % (name, instrument, segtype), file=sys.stderr) continue if verbose: print("\tfound %s for %s '%s'" % (name, instrument, segtype), file=sys.stderr) elem.connect("start", self.gatehandler, (segtype, instrument, "on")) elem.connect("stop", self.gatehandler, (segtype, instrument, "off")) elem.set_property("emit-signals", True) if verbose: print("... done connecting segment handlers to gates", file=sys.stderr) def __gatehandler(self, elem, timestamp, seg_state_input): """! A handler that intercepts gate state transitions. @param elem A reference to the lal_gate element or None (only used for verbosity) @param timestamp A gstreamer time stamp (integer nanoseconds) that marks the state transition @param segtype the class of segments this gate is defining, e.g., "datasegments", etc.. @param instrument the instrument this state transtion is to be attributed to, e.g., "H1", etc.. @param new_state the state transition, must be either "on" or "off" Must be called with the lock held. """ # convert integer nanoseconds to LIGOTimeGPS timestamp = LIGOTimeGPS(0, timestamp) # unpack argument tuple: segtype, instrument, new_state = seg_state_input if self.verbose: print("%s: %s '%s' state transition: %s @ %s" % ((elem.get_name() if elem is not None else "<internal>"), instrument, segtype, new_state, str(timestamp)), file= sys.stderr) if new_state == "off": # record end of segment self.seglistdicts[segtype][instrument] -= segments.segmentlist((segments.segment(timestamp, segments.PosInfinity),)) self.gate_history[segtype][instrument].append((float(timestamp), 0.)) elif new_state == "on": # record start of new segment self.seglistdicts[segtype][instrument] += segments.segmentlist((segments.segment(timestamp, segments.PosInfinity),)) self.gate_history[segtype][instrument].append((float(timestamp), 1.)) else: assert False, "impossible new_state '%s'" % new_state
[docs] def gatehandler(self, elem, timestamp, seg_state_input): segtype, instrument, new_state = seg_state_input with self.lock: self.__gatehandler(elem, timestamp, (segtype, instrument, new_state))
[docs] def terminate(self, timestamp): # terminate all segments with self.lock: for segtype, seglistdict in self.seglistdicts.items(): for instrument in seglistdict: self.__gatehandler(None, timestamp, (segtype, instrument, "off"))
[docs] def gen_segments_xmldoc(self): """! A method to output the segment list in a valid ligolw xml format. Must be called with the lock held. """ xmldoc = ligolw.Document() xmldoc.appendChild(ligolw.LIGO_LW()) process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {}) with ligolw_segments.LigolwSegments(xmldoc, process) as ligolwsegments: for segtype, seglistdict in self.seglistdicts.items(): ligolwsegments.insert_from_segmentlistdict(seglistdict, name = segtype, comment = "LLOID snapshot") process.set_end_time_now() return xmldoc
def __T050017_filename(self, description, extension): """! Must be called with the lock held. """ # input check if "-" in description: raise ValueError("invalid characters in description '%s'" % description) # determine current extent instruments = set(instrument for seglistdict in self.seglistdicts.values() for instrument in seglistdict) segs = segments.segmentlist(seglistdict.extent_all() for seglistdict in self.seglistdicts.values() if any(seglistdict.values())) if segs: start, end = segs.extent() if math.isinf(end): end = inspiral.now() else: # silence errors at start-up. # FIXME: this is probably dumb. who cares. start = end = inspiral.now() # construct and return filename start, end = int(math.floor(start)), int(math.ceil(end)) return "%s-%s-%d-%d.%s" % ("".join(sorted(instruments)), description, start, end - start, extension)
[docs] def T050017_filename(self, description, extension): with self.lock: return self.__T050017_filename(description, extension)
[docs] def flush_segments_to_disk(self, tag, timestamp): """! Flush segments to disk, e.g., when checkpointing or shutting down an online pipeline. @param timestamp the LIGOTimeGPS timestamp of the current buffer in order to close off open segment intervals before writing to disk """ with self.lock: # make a copy of the current segmentlistdicts seglistdicts = dict((key, value.copy()) for key, value in self.seglistdicts.items()) # keep everything before timestamp in the current # segmentlistdicts. keep everything after # timestamp in the copy. we need to apply the cut # this way around so that the T050017 filename # constructed below has the desired start and # duration for seglistdict in self.seglistdicts.values(): seglistdict -= seglistdict.fromkeys(seglistdict, segments.segmentlist([segments.segment(timestamp, segments.PosInfinity)])) for seglistdict in seglistdicts.values(): seglistdict -= seglistdict.fromkeys(seglistdict, segments.segmentlist([segments.segment(segments.NegInfinity, timestamp)])) # write the current (clipped) segmentlistdicts to # disk fname = self.__T050017_filename("%s_SEGMENTS" % tag, "xml.gz") fname = os.path.join(subdir_from_T050017_filename(fname), fname) ligolw_utils.write_filename(self.gen_segments_xmldoc(), fname, verbose = self.verbose, trap_signals = None) # continue with the (clipped) copy self.seglistdicts = seglistdicts
[docs] def web_get_segments_xml(self): """! provide a bottle route to get segment information via a url """ with self.lock: output = io.StringIO() ligolw_utils.write_fileobj(self.gen_segments_xmldoc(), output) outstr = output.getvalue() output.close() return outstr
[docs] def update_recent_segment_history(self): """! A method to update the recent segment histories Must be called with the lock held. """ current_gps_time = lal.GPSTimeNow() interval_to_discard = segments.segmentlist((segments.segment(segments.NegInfinity, current_gps_time - self.segment_history_duration),)) for segtype, seglistdict in self.recent_segment_histories.items(): seglistdict.extend(self.seglistdicts[segtype]) seglistdict.coalesce() for seglist in seglistdict.values(): seglist -= interval_to_discard
[docs] def gen_recent_segment_history_xmldoc(self): """! Construct and return a LIGOLW XML tree containing the recent segment histories. Must be called with the lock held. """ self.update_recent_segment_history() xmldoc = ligolw.Document() xmldoc.appendChild(ligolw.LIGO_LW()) process = ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {}) with ligolw_segments.LigolwSegments(xmldoc, process) as ligolwsegments: for segtype, seglistdict in self.recent_segment_histories.items(): ligolwsegments.insert_from_segmentlistdict(seglistdict, name = segtype, comment = "LLOID snapshot") process.set_end_time_now() return xmldoc
[docs] def web_get_recent_segment_history_xml(self): """! provide a bottle route to get recent segment history information via a url """ with self.lock: output = io.StringIO() ligolw_utils.write_fileobj(self.gen_recent_segment_history_xmldoc(), output) outstr = output.getvalue() output.close() return outstr
# # ============================================================================= # # Pipeline Latency Tracker # # ============================================================================= #
[docs]class LatencyTracker(object): def __init__(self, stream, instruments, verbose = False): self.lock = threading.Lock() self.verbose = verbose # recent pipeline stage latencies ifo_pipe_stages = ["datasource", "whitening", "snrSlice"] all_ifo_stages = ["itacac"] self.pipeline_history = {stage: {instrument: deque(maxlen = 20) for instrument in instruments} for stage in ifo_pipe_stages} for stage in all_ifo_stages: self.pipeline_history[stage] = {"all": deque(maxlen = 20)} # iterate over pipeline stages, look for the # stage element that should provide the latencies, and # connect handlers to collect the latencies if verbose: print(sys.stderr, "connecting pipeline latency handlers ...", file=sys.stderr) for stage in self.pipeline_history.keys(): for instrument in self.pipeline_history[stage].keys(): elem = stream.get_element_by_name("%s_%s_latency" % (instrument, stage)) if elem is None: if verbose: print("\tcould not find %s_%s_latency element for pipeline latency monitoring" % (instrument, stage), file=sys.stderr) continue if verbose: print("\tfound %s_%s_latency element for pipeline latency monitoring" % (instrument, stage), file=sys.stderr) elem.connect("notify::current-latency", self.latencyhandler, (stage, instrument, "stop")) if verbose: print("... done connecting pipeline latency handlers", file=sys.stderr) def __latencyhandler(self, elem, timestamp, stage_latency_input): """! A handler that intercepts pipeline latency calls. @param elem A reference to the lal_gate element or None (only used for verbosity) @param timestamp A gstreamer time stamp (integer nanoseconds) that marks the state transition @param stage_latency_input name, instrument and state of the stage from the pipeline Must be called with the lock held. """ # parse stage_latency_input stage, instrument, state = stage_latency_input # parse properties from element trigger latency = elem.get_property("current-latency") name = elem.get_property("name") if elem is not None else "<internal>" timestamp = elem.get_property("timestamp") if state == "stop": # record pipeline stage latency self.pipeline_history[stage][instrument].append((float(timestamp), float(latency))) else: assert False, "impossible state '%s'" % state
[docs] def latencyhandler(self, elem, timestamp, stage_latency_input): with self.lock: self.__latencyhandler(elem, timestamp, stage_latency_input)
# # ============================================================================= # # Pipeline Handler # # ============================================================================= #
[docs]class LLOIDTracker: """! Implements additional message handling for dealing with spectrum messages and checkpoints for the online analysis including periodic dumps of segment information, trigger files and background distribution statistics. """ def __init__(self, stream, coincs_document, rankingstat, horizon_distance_func, gracedbwrapper, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, sngls_snr_threshold = None, analysis_tag = "test", job_tag = "", kafka_server = "10.14.0.112:9092", cluster = False, cap_singles = False, FAR_trialsfactor = 1.0, activation_counts = None, track_latency = False, template_id_time_map = None, background_collector_type = "normal", node_id=None, f_final = 1024., verbose = False): """! @param dataclass A Data class instance @param job_tag The tag to use for naming file snapshots, e.g. the description will be "%s_LLOID" % tag @param verbose Be verbose """ # # initialize # self.lock = threading.Lock() self.stream = stream self.coincs_document = coincs_document self.analysis = analysis_tag self.tag = job_tag self.verbose = verbose # None to disable periodic snapshots, otherwise seconds self.likelihood_snapshot_interval = likelihood_snapshot_interval self.likelihood_snapshot_timestamp = None self.cluster = cluster self.cap_singles = cap_singles self.FAR_trialsfactor = FAR_trialsfactor self.activation_counts = activation_counts self.template_id_time_map = template_id_time_map self.gracedbwrapper = gracedbwrapper # FIXME: detangle this self.gracedbwrapper.lock = self.lock # # setup segment list collection from gates # self.segmentstracker = SegmentsTracker(stream, rankingstat.instruments, verbose = verbose) # # setup latency collection from pipeline if requested # if track_latency: self.latencytracker = LatencyTracker(stream, rankingstat.instruments, verbose = verbose) else: self.latencytracker = None # # set up metric collection # self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.analysis, self.tag, stream, self.segmentstracker, self.latencytracker, f_final, node_id=node_id) # FIXME: detangle this self.eye_candy.lock = self.lock # # setup bottle routes (and rahwts) # bottle.route("/cumulative_segments.xml")(self.segmentstracker.web_get_recent_segment_history_xml) bottle.route("/psds.xml")(self.web_get_psd_xml) bottle.route("/rankingstat.xml")(self.web_get_rankingstat) bottle.route("/segments.xml")(self.segmentstracker.web_get_segments_xml) bottle.route("/sngls_snr_threshold.txt", method = "GET")(self.web_get_sngls_snr_threshold) bottle.route("/sngls_snr_threshold.txt", method = "POST")(self.web_set_sngls_snr_threshold) bottle.route("/zerolag_rankingstatpdf.xml")(self.web_get_zerolag_rankingstatpdf) # # attach a StreamThinca instance to ourselves # self.stream_thinca = streamthinca.StreamThinca( coincs_document.xmldoc, coincs_document.process_id, delta_t = rankingstat.delta_t, min_instruments = rankingstat.min_instruments, sngls_snr_threshold = sngls_snr_threshold, background_collector_type = background_collector_type ) # # setup likelihood ratio book-keeping. # # in online mode, if ranking_stat_input_url is set then on # each snapshot interval, and before providing stream # thinca with its ranking statistic information, the # current rankingstat object is replaced with the contents # of that file. this is intended to be used by trigger # generator jobs on the injection branch of an online # analysis to import ranking statistic information from # their non-injection cousins instead of using whatever # statistics they've collected internally. # ranking_stat_input_url is not used when running offline. # # ranking_stat_output_url provides the name of the file to # which the internally-collected ranking statistic # information is to be written whenever output is written # to disk. if set to None, then only the trigger file will # be written, no ranking statistic information will be # written. normally it is set to a non-null value, but # injection jobs might be configured to disable ranking # statistic output since they produce nonsense. # self.ranking_stat_input_url = ranking_stat_input_url self.ranking_stat_output_url = ranking_stat_output_url self.rankingstat = rankingstat bottle.route("/remove_counts.txt", method = "GET")(self.web_get_remove_counts) bottle.route("/remove_counts.txt", method = "POST")(self.web_set_remove_counts) # # if we have been supplied with external ranking statistic # information then use it to enable ranking statistic # assignment in streamthinca. # if self.ranking_stat_input_url is not None: if self.rankingstat.is_healthy(self.verbose): self.stream_thinca.ln_lr_from_triggers = far.OnlineFrankensteinRankingStat(self.rankingstat, self.rankingstat).finish().ln_lr_from_triggers if self.verbose: print("ranking statistic assignment ENABLED", file=sys.stderr) else: self.stream_thinca.ln_lr_from_triggers = None if self.verbose: print("ranking statistic assignment DISABLED", file=sys.stderr) elif False: # FIXME: move sum-of-SNR^2 cut into this object's # .__call__() and then use as coinc sieve function # instead. left here temporariliy to remember how # to initialize it self.stream_thinca.ln_lr_from_triggers = far.DatalessRankingStat( template_ids = rankingstat.template_ids, instruments = rankingstat.instruments, min_instruments = rankingstat.min_instruments, delta_t = rankingstat.delta_t ).finish().ln_lr_from_triggers if self.verbose: print("ranking statistic assignment ENABLED", file=sys.stderr) else: self.stream_thinca.ln_lr_from_triggers = None if self.verbose: print("ranking statistic assignment DISABLED", file=sys.stderr) # # zero_lag_ranking_stats is a RankingStatPDF object that is # used to accumulate a histogram of the likelihood ratio # values assigned to zero-lag candidates. this is required # to implement the extinction model for low-significance # events during online running but otherwise is optional. # # FIXME: if the file does not exist or is not readable, # the code silently initializes a new, empty, histogram. # it would be better to determine whether or not the file # is required and fail when it is missing # if zerolag_rankingstatpdf_url is not None and os.access(ligolw_utils.local_path_from_url(zerolag_rankingstatpdf_url), os.R_OK): _, self.zerolag_rankingstatpdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(zerolag_rankingstatpdf_url, verbose = verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) if self.zerolag_rankingstatpdf is None: raise ValueError("\"%s\" does not contain ranking statistic PDF data" % zerolag_rankingstatpdf_url) elif zerolag_rankingstatpdf_url is not None: # initialize an all-zeros set of PDFs self.zerolag_rankingstatpdf = far.RankingStatPDF(rankingstat, nsamples = 0) else: self.zerolag_rankingstatpdf = None self.zerolag_rankingstatpdf_url = zerolag_rankingstatpdf_url # # set horizon distance calculator # self.horizon_distance_func = horizon_distance_func # # rankingstatpdf contains the RankingStatPDF object (loaded # from rankingstatpdf_url) used to initialize the FAPFAR # object for on-the-fly FAP and FAR assignment. except to # initialize the FAPFAR object it is not used for anything, # but is retained so that it can be exposed through the web # interface for diagnostic purposes and uploaded to gracedb # with candidates. the extinction model is applied to # initialize the FAPFAR object but the original is retained # for upload to gracedb, etc. # self.rankingstatpdf_url = rankingstatpdf_url self.load_rankingstat_pdf() # # most recent PSDs # self.psds = {} # # use state vector elements to 0 horizon distances when # instruments are off # if verbose: print("connecting horizon distance handlers to gates ...", file=sys.stderr) self.absent_instruments = set() for instrument in rankingstat.instruments: name = "%s_ht_gate" % instrument elem = self.stream.get_element_by_name(name) if elem is None: # FIXME: if there is no data for an # instrument for which we have ranking # statistic data then the horizon distance # record needs to indicate that it was off # for the entire segment. should probably # 0 the horizon distance history at start # and stop of each stream, but there is a # statement in the EOS handler that we # don't do that, and I can remember why # that is. self.absent_instruments.add(instrument) continue raise ValueError("cannot find \"%s\" element for %s" % (name, instrument)) if verbose: print("\tfound %s for %s" % (name, instrument), file=sys.stderr) elem.connect("start", self.horizgatehandler, (instrument, True)) elem.connect("stop", self.horizgatehandler, (instrument, False)) elem.set_property("emit-signals", True) if verbose: print("... done connecting horizon distance handlers to gates", file=sys.stderr)
[docs] def web_get_remove_counts(self): with self.lock: yield json.dumps([gps_time for gps_time in self.rankingstat.denominator.remove_counts_times])
[docs] def web_set_remove_counts(self): try: with self.lock: gps_time = bottle.request.forms["gps_time"] if gps_time is not None: if int(gps_time) not in self.rankingstat.denominator.remove_counts_times: self.rankingstat.denominator.remove_counts_times = numpy.append(self.rankingstat.denominator.remove_counts_times, int(gps_time)) yield "OK: gps_time=%d\n" % gps_time else: yield "OK: gps_time=\n" except: yield "error\n"
[docs] def on_spectrum_update(self, message): # get the instrument, psd, and timestamp. # the "stability" is a measure of the # fraction of the configured averaging # timescale used to obtain this # measurement. # NOTE: epoch is used for the timestamp, # this is the middle of the most recent FFT # interval used to obtain this PSD instrument = message.src.get_name().split("_")[-1] psd = pipeio.parse_spectrum_message(message) timestamp = psd.epoch stability = float(message.src.get_property("n-samples")) / message.src.get_property("average-samples") # save self.psds[instrument] = psd # update horizon distance history. if the # whitener's average is not satisfactorily # converged, claim the horizon distance is # 0 (equivalent to claiming the instrument # to be off at this time). this has the # effect of vetoing candidates from these # times. if stability > 0.3: horizon_distance = self.horizon_distance_func(psd, 8.0)[0] assert not (math.isnan(horizon_distance) or math.isinf(horizon_distance)) else: horizon_distance = 0. self.record_horizon_distance(instrument, float(timestamp), horizon_distance) return True
[docs] def on_checkpoint(self, message): self.checkpoint(LIGOTimeGPS(0, message.timestamp)) return True
[docs] def on_eos(self, message): with self.lock: # FIXME: how to choose correct timestamp? # note that EOS messages' timestamps are # set to CLOCK_TIME_NONE so they can't be # used for this. try: # seconds timestamp = self.rankingstat.segmentlists.extent_all()[1] except ValueError: # no segments return False # convert to integer nanoseconds timestamp = LIGOTimeGPS(timestamp).ns() # terminate all segments self.segmentstracker.terminate(timestamp) # NOTE: we do not zero the horizon # distances in the ranking statistic at EOS # NOTE: never return True from the EOS code-path, # so as to not stop the parent class from doing # EOS-related stuff return False
[docs] def load_rankingstat_pdf(self): # FIXME: if the file can't be accessed the code silently # disables FAP/FAR assignment. need to figure out when # failure is OK and when it's not OK and put a better check # here. if self.rankingstatpdf_url is not None and os.access(ligolw_utils.local_path_from_url(self.rankingstatpdf_url), os.R_OK): _, self.rankingstatpdf = far.parse_likelihood_control_doc(ligolw_utils.load_url(self.rankingstatpdf_url, verbose = self.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) if self.rankingstatpdf is None: raise ValueError("\"%s\" does not contain ranking statistic PDFs" % self.rankingstatpdf_url) if not self.rankingstat.template_ids <= self.rankingstatpdf.template_ids: raise ValueError("\"%s\" is for the wrong templates") if self.rankingstatpdf.is_healthy(self.verbose): self.fapfar = far.FAPFAR(self.rankingstatpdf.new_with_extinction()) if self.verbose: print("false-alarm probability and rate assignment ENABLED", file=sys.stderr) else: self.fapfar = None if self.verbose: print("false-alarm probability and rate assignment DISABLED", file=sys.stderr) else: self.rankingstatpdf = None self.fapfar = None
[docs] def on_buffer(self, buf): with self.lock: # retrieve triggers if buf.is_gap: events = () else: events = buf.data # FIXME: ugly way to get the instrument instruments = set([event.ifo for event in events]) # FIXME calculate a chisq weighted SNR and store it in the Gamma2 column for event in events: event.Gamma2 = event.snr / ((1 + max(1., event.chisq)**3)/2.0)**(1./5.) # extract segment. move the segment's upper # boundary to include all triggers. ARGH the 1 ns # offset is needed for the respective trigger to be # "in" the segment (segments are open from above) # FIXME: is there another way? buf_seg = dict((instrument, segments.segment(buf.t0, max(buf.t0 + LIGOTimeGPS(0, buf.duration), max(event.end for event in events if event.ifo == instrument) + 0.000000001))) for instrument in instruments) # safety check end times. we cannot allow triggr # times to go backwards. they cannot precede the # buffer's start because, below, streamthinca will # be told the trigger list is complete upto this # buffer's time stamp. this logic also requires # this method to be fed buffers in time order: we # must never receive a buffer whose timestamp # precedes the timestamp of a buffer we have # already received. NOTE: the trigger objects' # comparison operators can be used for this test # directly, without explicitly retrieving .end assert all(event >= buf.t0 for event in events) # assign IDs to triggers. set all effective # distances to NaN. gstlal_inspiral's effective # distances are incorrect, and the PE codes require # us to either provide correct effective distances # or communicate to them that they are incorrect. # they have explained that setting them to NaN is # sufficient for the latter. # FIXME: fix the effective distances for event in events: event.process_id = self.coincs_document.process_id event.event_id = self.coincs_document.get_next_sngl_id() event.eff_distance = NaN # update likelihood snapshot if needed if self.likelihood_snapshot_interval is not None and (self.likelihood_snapshot_timestamp is None or buf.t0 - self.likelihood_snapshot_timestamp >= self.likelihood_snapshot_interval): self.likelihood_snapshot_timestamp = buf.t0 # post a checkpoint message. # FIXME: make sure this triggers # self.snapshot_output_url() to be invoked. # lloidparts takes care of that for now, # but spreading the program logic around # like that isn't a good idea, this code # should be responsible for it somehow, no? # NOTE: self.snapshot_output_url() writes # the current rankingstat object to the # location identified by # .ranking_stat_output_url, so if that is # either not set or at least set to a # different name than # .ranking_stat_input_url the file that has # just been loaded above will not be # overwritten. self.stream.post_message("CHECKPOINT", timestamp = buf.t0.ns()) # if a ranking statistic source url is set # and is not the same as the file to which # we are writing our ranking statistic data # then overwrite rankingstat with its # contents. the use case is online # injection jobs that need to periodically # grab new ranking statistic data from # their corresponding non-injection partner if self.ranking_stat_input_url is not None and self.ranking_stat_input_url != self.ranking_stat_output_url: params_before = self.rankingstat.template_ids, self.rankingstat.instruments, self.rankingstat.min_instruments, self.rankingstat.delta_t for tries in range(10): try: self.rankingstat, _ = far.parse_likelihood_control_doc(ligolw_utils.load_url(self.ranking_stat_input_url, verbose = self.verbose, contenthandler = far.RankingStat.LIGOLWContentHandler)) except OSError as e: print(f'Error in reading rank stat on try {tries}: {e}', file=sys.stderr) time.sleep(1) else: break else: raise RunTimeError('Exceeded retries, exiting.') if params_before != (self.rankingstat.template_ids, self.rankingstat.instruments, self.rankingstat.min_instruments, self.rankingstat.delta_t): raise ValueError("'%s' contains incompatible ranking statistic configuration" % self.ranking_stat_input_url) # update streamthinca's ranking statistic # data if self.rankingstat.is_healthy(self.verbose): self.stream_thinca.ln_lr_from_triggers = far.OnlineFrankensteinRankingStat(self.rankingstat, self.rankingstat).finish().ln_lr_from_triggers self.rankingstat_upload = far.OnlineFrankensteinRankingStat(self.rankingstat, self.rankingstat) if self.verbose: print("ranking statistic assignment ENABLED", file=sys.stderr) else: self.stream_thinca.ln_lr_from_triggers = None self.rankingstat_upload = None if self.verbose: print("ranking statistic assignment DISABLED", file=sys.stderr) # optionally get updated ranking statistic # PDF data and enable FAP/FAR assignment self.load_rankingstat_pdf() # add triggers to trigger rate record. this needs # to be done without any cuts on coincidence, etc., # so that the total trigger count agrees with the # total livetime from the SNR buffers. we assume # the density of real signals is so small that this # count is not sensitive to their presence. NOTE: # this is not true locally. a genuine signal, if # loud, can significantly increase the local # density of triggers, therefore the trigger rate # measurement machinery must take care to average # the rate over a sufficiently long period of time # that the rate estimates are insensitive to the # presence of signals. the current implementation # averages over whole science segments. NOTE: this # must be done before running stream thinca (below) # so that the "how many instruments were on test" # is aware of this buffer. snr_min = self.rankingstat.snr_min if not buf.is_gap: for instrument in instruments: # FIXME At the moment, empty triggers are added to # inform the "how many instruments were on test", the # correct thing to do is probably to add metadata to # the buffer containing information about which # instruments were on self.rankingstat.denominator.triggerrates[instrument].add_ratebin(list(map(float, buf_seg[instrument])), len([event for event in events if event.snr >= snr_min and event.ifo == instrument and event.search == "signal_model"])) # FIXME At the moment, empty triggers are added to # inform the "how many instruments were on test", the # correct thing to do is probably to add metadata to # the buffer containing information about which # instruments were on real_events = [] for event in events: if event.snr >= snr_min: real_events.append(event) events = real_events # run stream thinca. instruments |= self.absent_instruments instruments |= self.rankingstat.instruments store_counts_times = [] for instrument in instruments: if not self.stream_thinca.push(instrument, [event for event in events if event.ifo == instrument], buf.t0): continue flushed_sngls = self.stream_thinca.pull(self.rankingstat, fapfar = self.fapfar, zerolag_rankingstatpdf = self.zerolag_rankingstatpdf, coinc_sieve = self.rankingstat.fast_path_cut_from_triggers, cluster = self.cluster, cap_singles = self.cap_singles, FAR_trialsfactor = self.FAR_trialsfactor,template_id_time_map = self.template_id_time_map) self.coincs_document.commit() # do GraceDB alerts and update eye candy gracedb_ids_times = self.__do_gracedb_alerts(self.stream_thinca.last_coincs) self.eye_candy.update(events, self.stream_thinca.last_coincs) # store count tracker counts around the GraceDB times if gracedb_ids_times is not None: store_counts_times.extend(gracedb_ids_times[1]) # after doing alerts, no longer need # per-trigger SNR data for the triggers # that are too old to be used to form # candidates. for event in flushed_sngls: del event.H1_snr_time_series del event.L1_snr_time_series del event.V1_snr_time_series del event.K1_snr_time_series self.rankingstat.denominator.store_counts(store_counts_times)
def _record_horizon_distance(self, instrument, timestamp, horizon_distance): """ timestamp can be a float or a slice with float boundaries. """ # retrieve the horizon history for this instrument horizon_history = self.rankingstat.numerator.horizon_history[instrument] # NOTE: timestamps are floats here (or float slices), not # LIGOTimeGPS. whitener should be reporting PSDs with # integer timestamps so the timestamps are not naturally # high precision, and, anyway, we don't need nanosecond # precision for the horizon distance history. horizon_history[timestamp] = horizon_distance
[docs] def record_horizon_distance(self, instrument, timestamp, horizon_distance): """ timestamp can be a float or a slice with float boundaries. """ with self.lock: self._record_horizon_distance(instrument, timestamp, horizon_distance)
[docs] def horizgatehandler(self, elem, timestamp, instrument_tpl): """! A handler that intercepts h(t) gate state transitions to 0 horizon distances. @param elem A reference to the lal_gate element or None (only used for verbosity) @param timestamp A gstreamer time stamp that marks the state transition (in nanoseconds) @param instrument the instrument this state transtion is to be attributed to, e.g., "H1", etc.. @param new_state the state transition, must be either True or False """ # essentially we want to set the horizon distance record to # 0 at both on-to-off and off-to-on transitions so that the # horizon distance is reported to be 0 at all times within # the "off" period. the problem is gaps due to glitch # excision, which is done using a gate that comes after the # whitener. the whitener sees data flowing during these # periods and can report PSDs (and thus trigger horizon # distances to get recorded) but because of the glitch # excision the real sensitivity of the instrument is 0. # there's not much we can do to prevent the PSD from being # reported, but we try to retroactively correct the problem # when it occurs by using a slice to re-zero the entire # "off" interval preceding each off-to-on transition. this # requires access to the segment data to get the timestamp # for the start of the off interval, but we could also # record that number in this class here for our own use if # the entanglement with the segments tracker becomes a # problem. # # FIXME: there is a potential race condition in that we're # relying on all spurious PSD messages that we are to # override with 0 to already have been reported when this # method gets invoked by the whitehtsegments, i.e., we're # relying on the whitehtsegments gate to report state # transitions *after* any potential whitener PSD messages # have been generated. this is normally guaranteed because # the whitener does not generate PSD messages during gap # intervals. timestamp = float(LIGOTimeGPS(0, timestamp)) instrument, new_state = instrument_tpl if self.verbose: print("%s: %s horizon distance state transition: %s @ %s" % (elem.get_name(), instrument, ("on" if new_state else "off"), str(timestamp)), file=sys.stderr) with self.lock: # retrieve the horizon history for this instrument horizon_history = self.rankingstat.numerator.horizon_history[instrument] if new_state: # this if statement is checking if # ~self.seglistdicts[segtype][instrument] # is empty, and if not then it zeros the # interval spanned by the last segment in # that list, but what's done below avoids # the explicit segment arithmetic segtype = "whitehtsegments" if len(self.segmentstracker.seglistdicts[segtype][instrument]) > 1: self._record_horizon_distance(instrument, slice(float(self.segmentstracker.seglistdicts[segtype][instrument][-2][-1]), timestamp), 0.) else: self._record_horizon_distance(instrument, timestamp, 0.) elif self.rankingstat.numerator.horizon_history[instrument]: self._record_horizon_distance(instrument, slice(timestamp, None), 0.) else: self._record_horizon_distance(instrument, timestamp, 0.)
[docs] def checkpoint(self, timestamp): """! Checkpoint, e.g., flush segments and triggers to disk. @param timestamp the LIGOTimeGPS timestamp of the current buffer in order to close off open segment intervals before writing to disk """ try: self.snapshot_output_url("%s_LLOID" % self.tag, "xml.gz", verbose = self.verbose) except TypeError as te: print("Warning: couldn't build output file on checkpoint, probably there aren't any triggers: %s" % te, file=sys.stderr) # FIXME: the timestamp is used to close off open segments # and so should *not* be the timestamp of the current # buffer, necessarily, but rather a GPS time guaranteed to # precede any future state transitions of any segment list. # especially if the pipeline is ever run in an "advanced # warning" configuration using the GPS time of a trigger # buffer would be an especially bad choice. self.segmentstracker.flush_segments_to_disk(self.tag, timestamp)
def __get_psd_xmldoc(self): xmldoc = lal.series.make_psd_xmldoc(self.psds) ligolw_process.register_to_xmldoc(xmldoc, "gstlal_inspiral", {}, instruments = self.psds).set_end_time_now() return xmldoc
[docs] def web_get_psd_xml(self): with self.lock: output = io.StringIO() ligolw_utils.write_fileobj(self.__get_psd_xmldoc(), output) outstr = output.getvalue() output.close() return outstr
def __get_rankingstat_xmldoc(self, clipped = False, gracedb_upload = False): # generate a ranking statistic output document. NOTE: if # we are in possession of ranking statistic PDFs then those # are included in the output. this allows a single # document to be uploaded to gracedb. in an online # analysis, those PDFs come from the marginlization process # and represent the full distribution of ranking statistics # across the search, and include with them the analysis' # total observed zero-lag ranking statistic histogram --- # everything required to re-evaluate the FAP and FAR for an # uploaded candidate. xmldoc = ligolw.Document() xmldoc.appendChild(ligolw.LIGO_LW()) process = ligolw_process.register_to_xmldoc(xmldoc, u"gstlal_inspiral", paramdict = {}, instruments = self.rankingstat.instruments) # FIXME: don't do this. find a way to reduce the storage # requirements of the horizon distance history and then go # back to uploading the full file to gracedb # NOTE : here it is referring to OnlineFrankensteinRankingStat without # KDE applied. if clipped: rankingstat = self.rankingstat.copy() if not gracedb_upload else self.rankingstat_upload.copy() try: endtime = rankingstat.numerator.horizon_history.maxkey() except ValueError: # empty horizon history pass else: # keep the last hour of history endtime -= 3600. * 1 for history in rankingstat.numerator.horizon_history.values(): del history[:endtime] else: rankingstat = self.rankingstat if not gracedb_upload else self.rankingstat_upload far.gen_likelihood_control_doc(xmldoc, rankingstat, self.rankingstatpdf) process.set_end_time_now() return xmldoc def __get_p_astro_json(self, lr, m1, m2, snr, far): return p_astro_gstlal.compute_p_astro(lr, m1, m2, snr, far, self.rankingstatpdf.copy(), activation_counts = self.activation_counts) def __get_rankingstat_xmldoc_for_gracedb(self): # FIXME: remove this wrapper when the horizon history # encoding is replaced with something that uses less space return self.__get_rankingstat_xmldoc(clipped = True, gracedb_upload = True)
[docs] def web_get_rankingstat(self): with self.lock: output = io.BytesIO() ligolw_utils.write_fileobj(self.__get_rankingstat_xmldoc(), output) outstr = output.getvalue() output.close() return outstr
def __get_zerolag_rankingstatpdf_xmldoc(self): xmldoc = ligolw.Document() xmldoc.appendChild(ligolw.LIGO_LW()) process = ligolw_process.register_to_xmldoc(xmldoc, u"gstlal_inspiral", paramdict = {}, instruments = self.rankingstat.instruments) far.gen_likelihood_control_doc(xmldoc, None, self.zerolag_rankingstatpdf) process.set_end_time_now() return xmldoc
[docs] def web_get_zerolag_rankingstatpdf(self): with self.lock: output = io.BytesIO() ligolw_utils.write_fileobj(self.__get_zerolag_rankingstatpdf_xmldoc(), output) outstr = output.getvalue() output.close() return outstr
def __flush(self): # run StreamThinca in flush mode. forms candidates from # whatever triggers remain in the queues, and processes # them flushed_sngls = self.stream_thinca.pull(self.rankingstat, fapfar = self.fapfar, zerolag_rankingstatpdf = self.zerolag_rankingstatpdf, coinc_sieve = self.rankingstat.fast_path_cut_from_triggers, flush = True, cluster = self.cluster, cap_singles = self.cap_singles, FAR_trialsfactor = self.FAR_trialsfactor, template_id_time_map = self.template_id_time_map) self.coincs_document.commit() # do GraceDB alerts gracedb_ids_times = self.__do_gracedb_alerts(self.stream_thinca.last_coincs) if gracedb_ids_times != None and gracedb_ids_times[1] != []: self.rankingstat.denominator.store_counts(gracedb_ids_times[1]) # after doing alerts, no longer need per-trigger SNR data # for the triggers that are too old to be used to form # candidates. for event in flushed_sngls: del event.H1_snr_time_series del event.L1_snr_time_series del event.V1_snr_time_series del event.K1_snr_time_series def __do_gracedb_alerts(self, last_coincs): # check for no-op if self.rankingstatpdf is None or not self.rankingstatpdf.is_healthy(): return # sanity check. all code paths that result in an # initialized and healthy rankingstatpdf object also # initialize a fapfar object from it; this is here to make # sure that remains true assert self.fapfar is not None # do alerts gracedb_ids, gracedb_times = self.gracedbwrapper.do_alerts(last_coincs, self.psds, self.__get_rankingstat_xmldoc_for_gracedb, self.segmentstracker.seglistdicts, self.__get_p_astro_json, sim_inspiral_table = self.coincs_document.sim_inspiral_table) return gracedb_ids, gracedb_times
[docs] def web_get_sngls_snr_threshold(self): with self.lock: if self.stream_thinca.sngls_snr_threshold is not None: yield "snr=%.17g\n" % self.stream_thinca.sngls_snr_threshold else: yield "snr=\n"
[docs] def web_set_sngls_snr_threshold(self): try: with self.lock: snr_threshold = bottle.request.forms["snr"] if snr_threshold: self.stream_thinca.sngls_snr_threshold = float(snr_threshold) yield "OK: snr=%.17g\n" % self.stream_thinca.sngls_snr_threshold else: self.stream_thinca.sngls_snr_threshold = None yield "OK: snr=\n" except: yield "error\n"
def __write_output_url(self, url = None, verbose = False): self.__flush() if url is not None: self.coincs_document.url = url with self.segmentstracker.lock: self.coincs_document.write_output_url(seglistdicts = self.segmentstracker.seglistdicts, verbose = verbose) # can't be used anymore del self.coincs_document def __write_ranking_stat_url(self, url, description, snapshot = False, verbose = False): # write the ranking statistic file. ligolw_utils.write_url(self.__get_rankingstat_xmldoc(), url, verbose = verbose, trap_signals = None) # Snapshots get their own custom file and path if snapshot: fname = self.segmentstracker.T050017_filename(description + '_DISTSTATS', 'xml.gz') shutil.copy(ligolw_utils.local_path_from_url(url), os.path.join(subdir_from_T050017_filename(fname), fname))
[docs] def write_output_url(self, url = None, description = "", verbose = False): with self.lock: if self.ranking_stat_output_url is not None: self.__write_ranking_stat_url(self.ranking_stat_output_url, description, verbose = verbose) self.__write_output_url(url = url, verbose = verbose)
[docs] def snapshot_output_url(self, description, extension, verbose = False): with self.lock: fname = self.segmentstracker.T050017_filename(description, extension) fname = os.path.join(subdir_from_T050017_filename(fname), fname) if self.ranking_stat_output_url is not None: self.__write_ranking_stat_url(self.ranking_stat_output_url, description, snapshot = True, verbose = verbose) if self.zerolag_rankingstatpdf_url is not None: ligolw_utils.write_url(self.__get_zerolag_rankingstatpdf_xmldoc(), self.zerolag_rankingstatpdf_url, verbose = verbose, trap_signals = None) coincs_document = self.coincs_document.get_another() self.__write_output_url(url = fname, verbose = verbose) self.coincs_document = coincs_document # NOTE: this operation requires stream_thinca to # have been flushed by calling .pull() with flush = # True. the .__write_output_url() code path does # that, but there is no check here to ensure that # remains true so be careful if you are editing # these methods. self.stream_thinca.set_xmldoc(self.coincs_document.xmldoc, self.coincs_document.process_id)