Source code for datasource

# Copyright (C) 2009--2013  Kipp Cannon, Chad Hanna, Drew Keppel
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


__doc__ = """Gravitational wave datasource utilities, including abstractions for storing the required
information needed to connect to data sources. The main elements of the included API are:

    1. DataSourceInfo class for storing necessary information to connect to datasource
    2. Useful constants to help configuring a DataSourceInfo, like the Detector and DataSource classes
    3. Pipeline building utilities
    4. Command-line utilities for parsing and converting values into DataSource Info
"""

# Potential alternate copyright method
# from gstlal.utilities import admin
# admin.add_copyright(authors=['Kipp Cannon', 'Chad Hanna', 'Drew Keppel'], start_year=2009, end_year=2013)


import collections
import enum
import optparse
import os
from pathlib import Path
import tempfile
import time
import types
from typing import Union, Dict

import lal
from lal import LIGOTimeGPS
from ligo import segments
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import segments as ligolw_segments

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst

try:
	import bottle
	if tuple(map(int, bottle.__version__.split("."))) < (0, 13):
		# FIXME:  see
		# https://git.ligo.org/lscsoft/gstlal/-/merge_requests/146
		# if the required patch is added to the distro-supplied
		# bottle before a 0.13 is released, update the version
		# check to the correct version
		raise ImportError
except ImportError:
	# FIXME:  remove after system-wide isntall can be relied on
	from gstlal import bottle
from gstlal import datafind
from gstlal import pipeparts
from gstlal.utilities import admin

GObject.threads_init()
Gst.init(None)

DEFAULT_BLOCK_SIZE = 16384 * 8 * 512


################################################################################
#                               USEFUL CONSTANTS                               #
################################################################################


[docs]class Detector(str, enum.Enum): """Enumeration of available detectors """ G1 = 'G1' H1 = 'H1' H2 = 'H2' K1 = 'K1' L1 = 'L1' V1 = 'V1'
[docs]class DataSource(str, enum.Enum): """Enumeration of available data sources TODO: include descriptions of the options """ ALIGO = 'AdvLIGO' AVirgo = 'AdvVirgo' Frames = 'frames' FrameXMIT = 'framexmit' LIGO = 'LIGO' LVSHM = 'lvshm' DEVSHM = 'devshm' NDS = 'nds' Silence = 'silence' White = 'white'
KNOWN_DATASOURCES = [ DataSource.ALIGO, DataSource.AVirgo, DataSource.Frames, DataSource.FrameXMIT, DataSource.LIGO, DataSource.LVSHM, DataSource.DEVSHM, DataSource.NDS, DataSource.Silence, DataSource.White, ] KNOWN_LIVE_DATASOURCES = [ DataSource.FrameXMIT, DataSource.LVSHM, DataSource.DEVSHM, ] DEFAULT_SHARED_MEMORY_PARTITION = { Detector.H1.value: "LHO_Data", Detector.L1.value: "LLO_Data", Detector.V1.value: "VIRGO_Data" } DEFAULT_SHARED_MEMORY_DIR = { Detector.H1.value: "/dev/shm/kafka/H1_Data", Detector.L1.value: "/dev/shm/kafka/L1_Data", Detector.V1.value: "/dev/shm/kafka/V1_Data" } DEFAULT_STATE_CHANNEL = {detector.value: "LLD-DQ_VECTOR" for detector in Detector} DEFAULT_DQ_CHANNEL = {detector.value: "DMT-DQ_VECTOR" for detector in Detector} DEFAULT_FRAMXMIT_ADDR = { Detector.H1.value: ("224.3.2.1", 7096), Detector.L1.value: ("224.3.2.2", 7097), Detector.V1.value: ("224.3.2.3", 7098), } DEFAULT_STATE_VECTOR_ON_OFF = { Detector.H1.value: [0x7, 0x160], Detector.H2.value: [0x7, 0x160], Detector.L1.value: [0x7, 0x160], Detector.V1.value: [0x67, 0x100] } DEFAULT_DQ_VECTOR_ON_OFF = { Detector.H1.value: [0x7, 0x0], Detector.H2.value: [0x7, 0x0], Detector.L1.value: [0x7, 0x0], Detector.V1.value: [0x7, 0x0] } HostInfo = collections.namedtuple('DataFindServerInfo', 'host port')
[docs]class DataFindServer: """Enumeration of available datafind servers""" GeneralProp = HostInfo('datafind.ligo.org', 443)
[docs]class DataSourceConfigError(ValueError): """Error subclass for configuration errors"""
################################################################################ # Primary Datasource information abstraction # ################################################################################
[docs]class DataSourceInfo: """A pythonic representation of a datasource with configured settings necessary for usage in pipelines """ def __init__(self, data_source: Union[str, DataSource], channel_name: Dict[Detector, str], gps_start_time: Union[int, LIGOTimeGPS] = None, gps_end_time: Union[int, LIGOTimeGPS] = None, shared_memory_partition: Dict[Detector, str] = None, shared_memory_dir: Dict[Detector, str] = None, frame_segments_name: str = None, state_vector_on_bits: Dict[Detector, str] = None, state_vector_off_bits: Dict[Detector, str] = None, dq_vector_on_bits: str = None, dq_vector_off_bits: str = None, frame_cache: Union[str, Path] = None, injections: Union[str, Path] = None, nds_host: str = None, nds_port: int = None, nds_channel_type: str = 'online', shared_memory_assumed_duration: int = 4, shared_memory_block_size: int = 4096, frame_segments_file: Union[str, Path] = None, block_size: int = DEFAULT_BLOCK_SIZE, frame_type: Dict[Detector, str] = None, data_find_server: Union[str or DataFindServer] = None, framexmit_addr: str = None, framexmit_iface: str = None, state_channel_name: Dict[Detector, str] = None, dq_channel_name: Dict[Detector, str] = None, idq_channel_name: Dict[Detector, str] = None, idq_state_channel_name: Dict[Detector, str] = None): """Create a DataSource information object that contains the necessary details to produce GStreamer elements for loading gravitational wave data from a variety of possible sources. Args: data_source: str or DataSource, the data source from [frames|framexmit|lvshm|nds|silence|white] which to download data gps_start_time: int or LIGOTimeGPS, the start time of the segment to analyze in GPS seconds. Required unless data_source=lvshm gps_end_time: int or LIGOTimeGPS, the end time of the segment to analyze in GPS seconds. Required unless data_source=lvshm channel_name: Dict[Detector, str] or Dict[Detector, HostInfo], the name of the channels to process per detector framexmit_addr: Dict[Detector, str] or Dict[Detector, HostInfo], the address of the framexmit service. framexmit_iface: str, the multicast interface address of the framexmit service. state_channel_name: Dict[Detector, str], the name of the state vector channel. This channel will be used to control the flow of data via the on/off bits. dq_channel_name: Dict[Detector, str], the name of the data quality channel. This channel will be used to control the flow of data via the on/off bits. idq_channel_name: Dict[Detector, str], the name of the idq channel. This channel will be used to create the idq_series information. idq_state_channel_name: Dict[Detector, str], the name of the idq state channel. This channel will be used to gate the idq_series information created by idq_channel_name. shared_memory_partition: Dict[Detector, str], the name of the shared memory partition for a given detector. shared_memory_dir: Dict[Detector, str], the name of the shared memory directory for a given detector. frame_segments_name: str, the name of the segments to extract from the segment tables. Required iff frame_segments_file is given state_vector_on_bits: Dict[Detector, str], default None, the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with {Detector:bits} Only currently has meaning for online (lvshm) data. state_vector_off_bits: Dict[Detector, str], default None, the state vector on bits to process (optional). The default is 0x160 for all detectors. Override with {Detector:bits} Only currently has meaning for online (lvshm) data. dq_vector_on_bits: Dict[Detector, str], default None, the dq vector on bits to process (optional). The default is 0x7 for all detectors. Override with {Detector:bits} Only currently has meaning for online (lvshm) data. dq_vector_off_bits: Dict[Detector, str], default None, the dq vector off bits to process (optional). The default is 0x160 for all detectors. Override with {Detector:bits} Only currently has meaning for online (lvshm) data. frame_cache: str or Path, the name of the LAL cache listing the LIGO-Virgo .gwf frame files injections: str or Path, default None, the name of the LIGO light-weight XML file from which to load injections (optional) nds_host: str, the NDS server address. only used if data_source=nds nds_port: int, the NDS server port. only used if data_source=nds nds_channel_type: str, default 'online', the NDS channel type. Only used if data_source=nds shared_memory_assumed_duration: int, default 4, the assumed span of files in seconds. Default = 4. shared_memory_block_size: int, default 4096, the byte size to read per buffer. frame_segments_file: str or Path, the name of the LIGO light-weight XML file from which to load frame segments. Optional iff data_source=frames block_size: int, default 16384 * 8 * 512 (512 seconds of double precision data at 16384 Hz), Data block size to read in bytes. This parameter is only used if data_source is one of {white, silence, AdvVirgo, LIGO, AdvLIGO, nds}. frame_type: Dict[Detector, str], default None, a dictionary setting the frame type (string) for each detector, e.g. {Detector.H1: 'H1_GWOSC_O2_16KHZ_R1'}. Used with data_source='frames'. data_find_server: str, default None, the data find server for LIGO data discovery. Used with data_source=frames. """ self.data_source = data_source.name if isinstance(data_source, DataSourceInfo) else data_source self.block_size = block_size self.frame_cache = frame_cache.as_posix() if isinstance(frame_cache, Path) else frame_cache self.frame_type = {} if frame_type is None else frame_type self.data_find_server = '{}:{:d}'.format(data_find_server.host, data_find_server.port) if isinstance(data_find_server, HostInfo) else data_find_server self.gps_start_time = gps_start_time.gpsSeconds if isinstance(gps_start_time, LIGOTimeGPS) else gps_start_time self.gps_end_time = gps_end_time.gpsSeconds if isinstance(gps_end_time, LIGOTimeGPS) else gps_end_time self.injections = injections.as_posix() if isinstance(injections, Path) else injections self.channel_name = {} if channel_name is None else channel_name self.nds_host = nds_host self.nds_port = nds_port self.nds_channel_type = nds_channel_type self.framexmit_addr = DEFAULT_FRAMXMIT_ADDR.copy() self.framexmit_iface = framexmit_iface self.state_channel_name = DEFAULT_STATE_CHANNEL.copy() self.dq_channel_name = DEFAULT_DQ_CHANNEL.copy() self.idq_channel_name = {} if idq_channel_name is None else idq_channel_name self.idq_state_channel_name = {} if idq_state_channel_name is None else idq_state_channel_name self.shared_memory_partition = DEFAULT_SHARED_MEMORY_PARTITION.copy() self.shared_memory_assumed_duration = shared_memory_assumed_duration self.shared_memory_block_size = shared_memory_block_size self.shared_memory_dir = DEFAULT_SHARED_MEMORY_DIR.copy() self.frame_segments_file = frame_segments_file self.frame_segments_name = frame_segments_name self.state_vector_on_bits = {} if state_vector_on_bits is None else state_vector_on_bits self.state_vector_off_bits = {} if state_vector_off_bits is None else state_vector_off_bits self.dq_vector_on_bits = {} if dq_vector_on_bits is None else dq_vector_on_bits self.dq_vector_off_bits = {} if dq_vector_off_bits is None else dq_vector_off_bits if state_channel_name: self.state_channel_name.update(state_channel_name) if dq_channel_name: self.dq_channel_name.update(dq_channel_name) if framexmit_addr: self.framexmit_addr.update(framexmit_addr) if shared_memory_partition: self.shared_memory_partition.update(shared_memory_partition) if shared_memory_dir: self.shared_memory_dir.update(shared_memory_dir) self.validate() # Set legacy attribute aliases self.injection_filename = self.injections self.channel_dict = self.channel_name self.dq_channel_dict = self.dq_channel_name self.state_channel_dict = self.state_channel_name self.shm_part_dict = self.shared_memory_partition self.shm_assumed_duration = self.shared_memory_assumed_duration self.shm_block_size = self.shared_memory_block_size self.shm_dir_dict = self.shared_memory_dir self.seg = None if self.gps_start_time is not None: start = LIGOTimeGPS(self.gps_start_time) end = LIGOTimeGPS(self.gps_end_time) self.seg = segments.segment(start, end) if self.frame_segments_file is not None: self.frame_segments = ligolw_segments.segmenttable_get_by_name( ligolw_utils.load_filename(self.frame_segments_file, contenthandler=ligolw_segments.LIGOLWContentHandler), self.frame_segments_name).coalesce() if self.seg is not None: # Clip frame segments to seek segment if it # exists (not required, just saves some # memory and I/O overhead) self.frame_segments = segments.segmentlistdict((detector, seglist & segments.segmentlist([self.seg])) for detector, seglist in self.frame_segments.items()) else: ## if no frame segments provided, set them to an empty segment list dictionary self.frame_segments = segments.segmentlistdict((detector, None) for detector in self.channel_dict) if self.data_source == DataSource.Frames and (self.frame_cache is None and self.frame_type): frame_type_dict = self.frame_type _frame_cache = datafind.load_frame_cache(start, end, frame_type_dict, host=self.data_find_server) ## create a temporary cache file self._frame_cache_fileobj = tempfile.NamedTemporaryFile(suffix=".cache", dir=os.getenv("_CONDOR_SCRATCH_DIR", tempfile.gettempdir())) self.frame_cache = self._frame_cache_fileobj.name with open(self.frame_cache, "w") as f: for cacheentry in _frame_cache: print(str(cacheentry), file=f) self.state_vector_on_off_bits = zip_dict_values(self.state_vector_on_bits, self.state_vector_off_bits, defaults=DEFAULT_STATE_VECTOR_ON_OFF) self.dq_vector_on_off_bits = zip_dict_values(self.dq_vector_on_bits, self.dq_vector_off_bits, defaults=DEFAULT_DQ_VECTOR_ON_OFF)
[docs] def validate(self): """Validation of configuration""" # Validate data_source if self.data_source not in KNOWN_DATASOURCES: raise DataSourceConfigError('Unknown datasource {}, must be one of: {}'.format(self.data_source, ', '.join(KNOWN_DATASOURCES))) # Validate data_source == frames if self.data_source == DataSource.Frames and (self.frame_cache is None and self.frame_type is None): raise DataSourceConfigError("frame_cache or frame_type must be specified when using data_source='frames'") # Validate channel_name not empty if not self.channel_name: raise DataSourceConfigError("Must specify at least one channel as {Detector: name}") # Validate frame_segments_file if self.frame_segments_file is not None and self.data_source != DataSource.Frames: raise DataSourceConfigError("Can only give frame_segments_file if data_source='frames'") # Validate frame_segments_name if self.frame_segments_name is not None and self.frame_segments_file is None: raise DataSourceConfigError("Can only specify frame_segments_name if frame_segments_file is given") # Validate idq_channel_name comes with idq_state_channel_name if self.idq_channel_name is not None and self.idq_state_channel_name is None: raise DataSourceConfigError("Can only specify --idq-channel-name if --idq-state-channel-name is given") elif self.idq_channel_name is None and self.idq_state_channel_name is not None: raise DataSourceConfigError("Can only specify --idq-state-channel-name if --idq-channel-name is given") # Validate NDS arguments if self.data_source == DataSource.NDS: if self.nds_host is None or self.nds_port is None: raise DataSourceConfigError("Must specify nds_host and nds_port if data_source='nds'") # Validate start and end arguments if self.data_source in KNOWN_LIVE_DATASOURCES and (self.gps_start_time is not None or self.gps_end_time is not None): raise DataSourceConfigError("Cannot set gps_start_time or gps_end_time for live source: {}".format(self.data_source)) if self.data_source not in KNOWN_LIVE_DATASOURCES and (self.gps_start_time is None or self.gps_end_time is None): raise DataSourceConfigError("Must set gps_start_time and gps_end_time for non-live source: {}".format(self.data_source)) if self.gps_start_time is not None and self.gps_end_time is not None: try: start = LIGOTimeGPS(self.gps_start_time) except ValueError as e: raise DataSourceConfigError("invalid gps_start_time {:d}".format(self.gps_start_time)) from e try: end = LIGOTimeGPS(self.gps_end_time) except ValueError as e: raise DataSourceConfigError("invalid gps_end_time {:d}".format(self.gps_end_time)) from e if start >= end: raise DataSourceConfigError('Must specify gps_start_time < gps_end_time')
[docs] @staticmethod def from_optparse(options: optparse.Values): """Construct a DataSourceInfo object from an optparer.OptionParser Args: options: Values, with all of the arguments defined in append_options Returns: DataSourceInfo object """ channel_dict = parse_list_to_dict(options.channel_name) frame_type = parse_list_to_dict(options.frame_type) shared_memory_part = parse_list_to_dict(options.shared_memory_partition) shared_memory_dir = parse_list_to_dict(options.shared_memory_dir) state_channel_name = parse_list_to_dict(options.state_channel_name) dq_channel_name = parse_list_to_dict(options.dq_channel_name) idq_channel_name = parse_list_to_dict(options.idq_channel_name) idq_state_channel_name = parse_list_to_dict(options.idq_state_channel_name) state_vector_on_bits = parse_list_to_dict(options.state_vector_on_bits, value_transform=parse_int) state_vector_off_bits = parse_list_to_dict(options.state_vector_off_bits, value_transform=parse_int) dq_vector_on_bits = parse_list_to_dict(options.dq_vector_on_bits, value_transform=parse_int) dq_vector_off_bits = parse_list_to_dict(options.dq_vector_off_bits, value_transform=parse_int) return DataSourceInfo(data_source=options.data_source, block_size=options.block_size, frame_cache=options.frame_cache, frame_type=frame_type, data_find_server=options.data_find_server, gps_start_time=options.gps_start_time, gps_end_time=options.gps_end_time, injections=options.injection_file, channel_name=channel_dict, nds_host=options.nds_host if options.data_source == DataSource.NDS else None, nds_port=options.nds_port if options.data_source == DataSource.NDS else None, nds_channel_type=options.nds_channel_type if options.data_source == DataSource.NDS else None, framexmit_addr=options.framexmit_addr, framexmit_iface=options.framexmit_iface, state_channel_name=state_channel_name, dq_channel_name=dq_channel_name, idq_channel_name=idq_channel_name, idq_state_channel_name=idq_state_channel_name, shared_memory_partition=shared_memory_part, shared_memory_assumed_duration=options.shared_memory_assumed_duration, shared_memory_block_size=options.shared_memory_block_size, shared_memory_dir=shared_memory_dir, frame_segments_file=options.frame_segments_file, frame_segments_name=options.frame_segments_name, state_vector_on_bits=state_vector_on_bits, state_vector_off_bits=state_vector_off_bits, dq_vector_on_bits=dq_vector_on_bits, dq_vector_off_bits=dq_vector_off_bits)
################################################################################ # CLI Parsing Utilities # ################################################################################
[docs]def parse_host(host: str): """Value transform for use with parse_list_to_dict that splits host into name and port tuple""" name, port = host.split(':') return (name, int(port))
[docs]def parse_int(inp: str) -> int: """Value transform for use with parse_list_to_dict that coerces string input to int""" if inp.startswith('0x'): return int(inp, 16) return int(inp)
[docs]def parse_list_to_dict(lst: list, value_transform: types.FunctionType = None, sep: str = '=', key_is_range: bool = False, range_sep: str = ':') -> dict: """A general list to dict argument parsing coercion function Args: lst: list, a list of the form ['A=V1', 'B=V2', ...], where "=" only has to match the sep_str argument value_transform: Function, default None. An optional transformation function to apply on values of the dictionary sep: str, default '=', the separator string between dict keys and values in list elements key_is_range: bool, default False. If True, the keys of the list are compound and contain range information e.g. "start:stop:remaining,list,of,items" range_sep: str, default ':' the separator string for range key information Returns: dict of the form {'A': value_transform('V1'), ...} Examples: >>> parse_list_to_dict(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"]) # doctest: +SKIP {'H1': 'LSC-STRAIN', 'H2': 'SOMETHING-ELSE'} >>> parse_list_to_dict(["0000:0002:H1=LSC_STRAIN_1,L1=LSC_STRAIN_2", "0002:0004:H1=LSC_STRAIN_3,L1=LSC_STRAIN_4", "0004:0006:H1=LSC_STRAIN_5,L1=LSC_STRAIN_6"], key_is_range=True) # doctest: +SKIP {'0000': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0001': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0002': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0003': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0004': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0005': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}} """ if lst is None: return coerced = {} if key_is_range: # This will produce tuples (start, stop, str-to-dict) splits = [e.split(range_sep) for e in lst] for start, stop, val in splits: for i in range(int(start), int(stop)): key = str(i).zfill(4) coerced[key] = parse_list_to_dict([l.strip() for l in val.split(',')], value_transform=value_transform, sep=sep, key_is_range=False) else: if len(lst) == 1 and sep not in lst[0]: # non-dict entry return lst[0] coerced = dict([e.split(sep) for e in lst]) if value_transform is not None: for k in coerced: coerced[k] = value_transform(coerced[k]) return coerced return coerced
[docs]def ravel_host(host: tuple) -> str: """Value transform for use with ravel_dict_to_list that ravels a host name and port into a string""" return '{}:{}'.format(host[0], str(host[1]))
[docs]def ravel_dict_to_list(dct: dict, value_transform: types.FunctionType = str, sep: str = '=', key_is_range: bool = False, range_sep: str = ':', join_arg: str = None, unzip: bool = False, filter_keys: list = None, range_elem: int = None) -> list: """Functional inverse of parse_list_to_dict. TODO: This function exists to work around pipeline.py's inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option. Args: dct: dict, the dict to transform into a list value_transform: Function, default str, the way to transform values of the dict before adding to list sep: str, default '=' separator character to join key-value pairs key_is_range: bool, default False. If True, aggregate keys into ranges range_sep: str, default ':', the range separator to join range key min and max values unzip: bool, default False, if True unzip the dct arg and ravel each single-valued dict separately Returns: List[str] the dict converted into lists of strings """ if key_is_range: if range_elem is None: raise NotImplementedError dct = dct[str(range_elem).zfill(4)] if filter_keys: dct = dict([(k, v) for k, v in dct.items() if k in filter_keys]) if unzip: dicts = unzip_dict_values(dct) if not isinstance(join_arg, list): join_arg = len(dicts) * [join_arg] return [ravel_dict_to_list(d, value_transform=value_transform, sep=sep, key_is_range=key_is_range, range_sep=range_sep, join_arg=j, unzip=False) for d, j in zip(dicts, join_arg)] else: lst = ['{}{}{}'.format(k, sep, value_transform(v)) for k, v in sorted(dct.items(), key=lambda x: x[0])] if len(lst) == 0: return lst if join_arg is not None: if len(lst) == 1: return lst[0] lst = lst[0] + ' ' + ' '.join('--' + join_arg + '=' + l for l in lst[1:]) return lst
[docs]def zip_dict_values(*dicts, defaults: dict = None, key_union: bool = True): """Zip dict values by matching keys Args: *dicts: Iterable[dict], a collection of dicts whose values will be grouped by key in the order given defaults: dict, default None, if specified fill in missing values with these defaults key_union: bool, default True, if True then use a union of keys, else use intersection. Examples: >>> on_bit_list = parse_list_to_dict(["V1=7", "H1=7", "L1=7"], value_transform=int) # doctest: +SKIP >>> off_bit_list = parse_list_to_dict(["V1=256", "H1=352", "L1=352"], value_transform=int) # doctest: +SKIP >>> zip_dict_values(on_bit_list, off_bit_list, defaults=DEFAULT_STATE_VECTOR_ON_OFF) # doctest: +SKIP {'V1': [7, 256], 'H1': [7, 352], 'H2': [7, 352], 'L1': [7, 352]} >>> zip_dict_values(on_bit_list, off_bit_list,{}) # doctest: +SKIP {'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]} >>> on_bit_list = parse_list_to_dict(["V1=0x7", "H1=0x7", "L1=0x7"], value_transform=lambda x: int(x, 16)) # doctest: +SKIP >>> off_bit_list = parse_list_to_dict(["V1=0x256", "H1=0x352", "L1=0x352"], value_transform=lambda x: int(x, 16)) # doctest: +SKIP >>> zip_dict_values(on_bit_list, off_bit_list,{}) # doctest: +SKIP {'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]} """ keys = list(getattr(set(), 'union' if key_union else 'intersection')(*([set(d.keys()) for d in dicts] + ([] if defaults is None else [set(defaults.keys())])))) unified = {} for k in keys: ordered_value = [d[k] for d in dicts if k in d] if not ordered_value: # the only way this can happen is if k was in defaults but none of dicts, so we can assume defaults is not None ordered_value = defaults[k] unified[k] = ordered_value return unified
[docs]def unzip_dict_values(dct: dict): """Split multi-valued dict into ordered collection of single-valued dicts with common keys All values should be of same length. """ num_vals = len(list(dct.values())[0]) if any(len(v) != num_vals for v in dct.values()): raise ValueError('All dict values must have same length.') dicts = [dict() for n in range(num_vals)] for k in sorted(list(dct.keys())): ordered_vals = list(dct[k]) for n in range(num_vals): dicts[n][k] = ordered_vals[n] return dicts
[docs]def append_options(parser): """ Append generic data source options to an OptionParser object in order to have consistent an unified command lines and parsing throughout the project for applications that read GW data. - --data-source [string] Set the data source from [frames|framexmit|lvshm|nds|silence|white]. - --block-size [int] (bytes) Data block size to read in bytes. Default 16384 * 8 * 512 which is 512 seconds of double precision data at 16384 Hz. This parameter is only used if --data-source is one of white, silence, AdvVirgo, LIGO, AdvLIGO, nds. - --frame-cache [filename] Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional). - --frame-type [string] Set the frame type for a given instrument. Can be given multiple times as --frame-type=IFO=FRAME-TYPE - --gps-start-time [int] (seconds) Set the start time of the segment to analyze in GPS seconds. Required unless --data-source is lvshm or framexmit - --gps-end-time [int] (seconds) Set the end time of the segment to analyze in GPS seconds. Required unless --data-source in lvshm,framexmit - --injection-file [filename] Set the name of the LIGO light-weight XML file from which to load injections (optional). - --channel-name [string] Set the name of the channels to process. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME - --nds-host [hostname] Set the remote host or IP address that serves nds data. This is required iff --data-source is nds - --nds-port [portnumber] Set the port of the remote host that serves nds data, default = 31200. This is required iff --data-source is nds - --nds-channel-type [string] type FIXME please document - --framexmit-addr [string] Set the address of the framexmit service. Can be given multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port - --framexmit-iface [string] Set the address of the framexmit interface. - --state-channel-name [string] Set the name of the state vector channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --state-channel-name=IFO=STATE-CHANNEL-NAME - --dq-channel-name [string] Set the name of the data quality channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --state-channel-name=IFO=DQ-CHANNEL-NAME - --shared-memory-partition [string] Set the name of the shared memory partition for a given instrument. Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME - --shared-memory-dir [string] Set the name of the shared memory directory for a given instrument. Can be given multiple times as --shared-memory-dir=IFO=DIR-NAME - --shared-memory-assumed-duration [int] Set the assumed span of files in seconds. Default = 4 seconds. - --shared-memory-block-size [int] Set the byte size to read per buffer. Default = 4096 bytes. - --frame-segments-file [filename] Set the name of the LIGO light-weight XML file from which to load frame segments. Optional iff --data-source is frames - --frame-segments-name [string] Set the name of the segments to extract from the segment tables. Required iff --frame-segments-file is given - --state-vector-on-bits [hex] Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data - --state-vector-off-bits [hex] Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data - --dq-vector-on-bits [hex] Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data - --dq-vector-off-bits [hex] Set the dq vector off bits to process (optional). The default is 0x0 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data **Typical usage case examples** 1. Reading data from frames:: --data-source=frames --gps-start-time=999999000 --gps-end-time=999999999 \\ --channel-name=H1=LDAS-STRAIN --frame-segments-file=segs.xml \\ --frame-segments-name=datasegments 2. Reading data from a fake LIGO source:: --data-source=LIGO --gps-start-time=999999000 --gps-end-time=999999999 \\ --channel-name=H1=FAIKE-STRAIN 3. Reading online data via framexmit:: --data-source=framexmit --channel-name=H1=FAIKE-STRAIN 4. Many other combinations possible, please add some! """ group = optparse.OptionGroup(parser, "Data source options", "Use these options to set up the appropriate data source") group.add_option("--data-source", metavar="source", help="Set the data source from [frames|framexmit|lvshm|nds|silence|white]. Required.") group.add_option("--block-size", type="int", metavar="bytes", default=16384 * 8 * 512, help="Data block size to read in bytes. Default 16384 * 8 * 512 (512 seconds of double precision data at 16384 Hz. This parameter is only used if --data-source is one of white, silence, AdvVirgo, LIGO, AdvLIGO, nds.") group.add_option("--frame-cache", metavar="filename", help="Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional).") group.add_option("--frame-type", metavar="name", action="append", help="Set the frame type for a given instrument. Can be given multiple times as --frame-type=IFO=FRAME-TYPE. Used with --data-source=frames") group.add_option("--data-find-server", metavar="url", help="Set the data find server for LIGO data discovery. Used with --data-source=frames") group.add_option("--gps-start-time", metavar="seconds", help="Set the start time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm") group.add_option("--gps-end-time", metavar="seconds", help="Set the end time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm") group.add_option("--injection-file", metavar="filename", help="Set the name of the LIGO light-weight XML file from which to load injections (optional).") group.add_option("--channel-name", metavar="name", action="append", help="Set the name of the channels to process. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME") group.add_option("--idq-channel-name", metavar="idqname", action="append", help="iDQ channel names to process. Must also provide idq-state-channel-name. Can be given multiple times as --idq-channel-name=IFO=IDQ-CHANNEL-NAME") group.add_option("--idq-state-channel-name", metavar="idqstatename", action="append", help="iDQ state channel names to process. Can be given multiple times as --idq-state-channel-name=IFO=IDQ-STATE-CHANNEL-NAME") group.add_option("--nds-host", metavar="hostname", help="Set the remote host or IP address that serves nds data. This is required iff --data-source=nds") group.add_option("--nds-port", metavar="portnumber", type=int, default=31200, help="Set the port of the remote host that serves nds data. This is required iff --data-source=nds") group.add_option("--nds-channel-type", metavar="type", default="online", help="Set the port of the remote host that serves nds data. This is required only if --data-source=nds. default==online") group.add_option("--framexmit-addr", metavar="name", action="append", help="Set the address of the framexmit service. Can be given multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port") group.add_option("--framexmit-iface", metavar="name", help="Set the multicast interface address of the framexmit service.") group.add_option("--state-channel-name", metavar="name", action="append", help="Set the name of the state vector channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME") group.add_option("--dq-channel-name", metavar="name", action="append", help="Set the name of the data quality channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME") group.add_option("--shared-memory-partition", metavar="name", action="append", help="Set the name of the shared memory partition for a given instrument. Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME") group.add_option("--shared-memory-dir", metavar="name", action="append", help="Set the name of the shared memory directory for a given instrument. Can be given multiple times as --shared-memory-dir=IFO=DIR-NAME") group.add_option("--shared-memory-assumed-duration", type="int", default=4, help="Set the assumed span of files in seconds. Default = 4.") group.add_option("--shared-memory-block-size", type="int", default=4096, help="Set the byte size to read per buffer. Default = 4096.") group.add_option("--frame-segments-file", metavar="filename", help="Set the name of the LIGO light-weight XML file from which to load frame segments. Optional iff --data-source=frames") group.add_option("--frame-segments-name", metavar="name", help="Set the name of the segments to extract from the segment tables. Required iff --frame-segments-file is given") group.add_option("--state-vector-on-bits", metavar="bits", default=[], action="append", help="Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.") group.add_option("--state-vector-off-bits", metavar="bits", default=[], action="append", help="Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.") group.add_option("--dq-vector-on-bits", metavar="bits", default=[], action="append", help="Set the DQ vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.") group.add_option("--dq-vector-off-bits", metavar="bits", default=[], action="append", help="Set the DQ vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.") parser.add_option_group(group)
################################################################################ # Pipeline building utilities # ################################################################################
[docs]def pipeline_seek_for_gps(pipeline, gps_start_time, gps_end_time, flags=Gst.SeekFlags.FLUSH): """ Create a new seek event, i.e., Gst.Event.new_seek() for a given gps_start_time and gps_end_time, with optional flags. @param gps_start_time start time as LIGOTimeGPS, double or float @param gps_end_time start time as LIGOTimeGPS, double or float """ def seek_args_for_gps(gps_time): """! Convenience routine to convert a GPS time to a seek type and a GStreamer timestamp. """ if gps_time is None or gps_time == -1: return (Gst.SeekType.NONE, -1) # -1 == Gst.CLOCK_TIME_NONE elif hasattr(gps_time, 'ns'): return (Gst.SeekType.SET, gps_time.ns()) else: return (Gst.SeekType.SET, int(float(gps_time) * Gst.SECOND)) start_type, start_time = seek_args_for_gps(gps_start_time) stop_type, stop_time = seek_args_for_gps(gps_end_time) # FIXME: should seek whole pipeline, but there are several # problems preventing us from doing that. # # because the framecpp demuxer has no source pads until decoding # begins, the bottom halves of pipelines start out disconnected # from the top halves of pipelines, which means the seek events # (which are sent to sink elements) don't make it all the way to # the source elements. dynamic pipeline building will not fix the # problem because the dumxer does not carry the "SINK" flag so even # though it starts with only a sink pad and no source pads it still # won't be sent the seek event. gstreamer's own demuxers must # somehow have a solution to this problem, but I don't know what it # is. I notice that many implement the send_event() method # override, and it's possible that's part of the solution. # # seeking the pipeline can only be done in the PAUSED state. the # GstBaseSrc baseclass seeks itself to 0 when changing to the # paused state, and the preroll is performed before the seek event # we send to the pipeline is processed, so the preroll occurs with # whatever random data a seek to "0" causes source elements to # produce. for us, when processing GW data, this leads to the # whitener element's initial spectrum estimate being initialized # from that random data, and a non-zero chance of even getting # triggers out of it, all of which is very bad. # # the only way we have at the moment to solve both problems --- to # ensure seek events arrive at source elements and to work around # GstBaseSrc's initial seek to 0 --- is to send seek events # directly to the source elements ourselves before putting the # pipeline into the PAUSED state. the elements are happy to # receive seek events in the READY state, and GstBaseSrc updtes its # current segment using that seek so that when it transitions to # the PAUSED state and does its intitial seek it seeks to our # requested time, not to 0. # # So: this function needs to be called with the pipeline in the # READY state in order to guarantee the data stream starts at the # requested start time, and does not get prerolled with random # data. For safety we include a check of the pipeline's current # state. # # if in the future we find some other solution to these problems # the story might change and the pipeline state required on entry # into this function might change. # pipeline.seek(1.0, Gst.Format(Gst.Format.TIME), flags, start_type, start_time, stop_type, stop_time) if pipeline.current_state != Gst.State.READY: raise ValueError("pipeline must be in READY state") for elem in pipeline.iterate_sources(): elem.seek(1.0, Gst.Format(Gst.Format.TIME), flags, start_type, start_time, stop_type, stop_time)
[docs]def mksegmentsrcgate(pipeline, src, segment_list, invert_output=False, rate=1, **kwargs): """ Takes a segment list and produces a gate driven by it. Hook up your own input and output. @param kwargs passed through to pipeparts.mkgate(), e.g., used to set the gate's name. Gstreamer graph describing this function: .. graphviz:: digraph G { compound=true; node [shape=record fontsize=10 fontname="Verdana"]; rankdir=LR; lal_segmentsrc; lal_gate; in [label="\<src\>"]; out [label="\<return value\>"]; in -> lal_gate -> out; lal_segmentsrc -> lal_gate; } """ return pipeparts.mkgate(pipeline, src, threshold=1, control=pipeparts.mkcapsfilter(pipeline, pipeparts.mksegmentsrc(pipeline, segment_list, invert_output=invert_output), caps="audio/x-raw, rate=%d" % rate), **kwargs)
[docs]def mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose=False): """ All the conditionals and stupid pet tricks for reading real or simulated h(t) data in one place. Consult the append_options() function and the GWDataSourceInfo class This src in general supports only one instrument although GWDataSourceInfo contains dictionaries of multi-instrument things. By specifying the instrument when calling this function you will get ony a single instrument source. A code wishing to have multiple basicsrcs will need to call this function for each instrument. **Gstreamer Graph** .. graphviz:: digraph mkbasicsrc { compound=true; node [shape=record fontsize=10 fontname="Verdana"]; subgraph clusterfakesrc { fake_0 [label="fakesrc: white, silence, AdvVirgo, LIGO, AdvLIGO"]; color=black; label="Possible path #1"; } subgraph clusterframes { color=black; frames_0 [label="lalcachesrc: frames"]; frames_1 [label ="framecppchanneldemux"]; frames_2 [label ="queue"]; frames_3 [label ="gate (if user provides segments)", style=filled, color=lightgrey]; frames_4 [label ="audiorate"]; frames_0 -> frames_1 -> frames_2 -> frames_3 ->frames_4; label="Possible path #2"; } subgraph clusteronline { color=black; online_0 [label="lvshmsrc|framexmit"]; online_1 [label ="framecppchanneldemux"]; online_2a [label ="strain queue"]; online_2b [label ="statevector queue"]; online_3 [label ="statevector"]; online_4 [label ="gate"]; online_5 [label ="audiorate"]; online_6 [label ="queue"]; online_0 -> online_1; online_1 -> online_2a; online_1 -> online_2b; online_2b -> online_3; online_2a -> online_4; online_3 -> online_4 -> online_5 -> online_6; label="Possible path #3"; } subgraph clusternds { nds_0 [label="ndssrc"]; color=black; label="Possible path #4"; } audioconv [label="audioconvert"]; progress [label="progressreport (if verbose)", style=filled, color=lightgrey]; sim [label="lalsimulation (if injections requested)", style=filled, color=lightgrey]; queue [label="queue (if injections requested)", style=filled, color=lightgrey]; // The connections fake_0 -> audioconv [ltail=clusterfakesrc]; frames_4 -> audioconv [ltail=clusterframes]; online_6 -> audioconv [ltail=clusteronline]; nds_0 -> audioconv [ltail=clusternds]; audioconv -> progress -> sim -> queue -> "?"; } """ statevector = dqvector = None idqseries = None idqstatevector = None # NOTE: timestamp_offset is a hack to allow seeking with fake # sources, a real solution should be fixing the general timestamp # problem which would allow seeking to work properly if gw_data_source_info.data_source == DataSource.White: src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize=gw_data_source_info.block_size, volume=1.0, timestamp_offset=int(gw_data_source_info.seg[0]) * Gst.SECOND) elif gw_data_source_info.data_source == DataSource.Silence: src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize=gw_data_source_info.block_size, wave=4, timestamp_offset=int(gw_data_source_info.seg[0]) * Gst.SECOND) elif gw_data_source_info.data_source == DataSource.LIGO: src = pipeparts.mkfakeLIGOsrc(pipeline, instrument=instrument, channel_name=gw_data_source_info.channel_dict[instrument], blocksize=gw_data_source_info.block_size) elif gw_data_source_info.data_source == DataSource.ALIGO: src = pipeparts.mkfakeadvLIGOsrc(pipeline, instrument=instrument, channel_name=gw_data_source_info.channel_dict[instrument], blocksize=gw_data_source_info.block_size) elif gw_data_source_info.data_source == DataSource.AVirgo: src = pipeparts.mkfakeadvvirgosrc(pipeline, instrument=instrument, channel_name=gw_data_source_info.channel_dict[instrument], blocksize=gw_data_source_info.block_size) elif gw_data_source_info.data_source == DataSource.Frames: if instrument == Detector.V1: # FIXME Hack because virgo often just uses "V" in # the file names rather than "V1". We need to # sieve on "V" src = pipeparts.mklalcachesrc(pipeline, location=gw_data_source_info.frame_cache, cache_src_regex="V") else: src = pipeparts.mklalcachesrc(pipeline, location=gw_data_source_info.frame_cache, cache_src_regex=instrument[0], cache_dsc_regex=instrument) demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum=False, channel_list=list(map("%s:%s".__mod__, gw_data_source_info.channel_dict.items()))) pipeparts.framecpp_channeldemux_set_units(demux, dict.fromkeys(demux.get_property("channel-list"), "strain")) # allow frame reading and decoding to occur in a diffrent # thread src = pipeparts.mkqueue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=8 * Gst.SECOND) pipeparts.src_deferred_link(demux, "%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), src.get_static_pad("sink")) if gw_data_source_info.frame_segments[instrument] is not None: # FIXME: make segmentsrc generate segment samples # at the sample rate of h(t)? # FIXME: make gate leaky when I'm certain that # will work. src = pipeparts.mkgate(pipeline, src, threshold=1, control=pipeparts.mksegmentsrc(pipeline, gw_data_source_info.frame_segments[instrument]), name="%s_frame_segments_gate" % instrument) pipeparts.framecpp_channeldemux_check_segments.set_probe(src.get_static_pad("src"), gw_data_source_info.frame_segments[instrument]) # FIXME: remove this when pipeline can handle disconts src = pipeparts.mkaudiorate(pipeline, src, skip_to_first=True, silent=False) elif gw_data_source_info.data_source in (DataSource.FrameXMIT.value, DataSource.LVSHM.value, DataSource.DEVSHM.value): # See https://wiki.ligo.org/DAC/ER2DataDistributionPlan#LIGO_Online_DQ_Channel_Specifica state_vector_on_bits, state_vector_off_bits = gw_data_source_info.state_vector_on_off_bits[instrument] dq_vector_on_bits, dq_vector_off_bits = gw_data_source_info.dq_vector_on_off_bits[instrument] if gw_data_source_info.data_source == DataSource.LVSHM: # FIXME make wait_time adjustable through web # interface or command line or both src = pipeparts.mklvshmsrc(pipeline, shm_name=gw_data_source_info.shm_part_dict[instrument], assumed_duration=gw_data_source_info.shm_assumed_duration, blocksize=gw_data_source_info.shm_block_size, wait_time=120) elif gw_data_source_info.data_source == DataSource.FrameXMIT: src = pipeparts.mkframexmitsrc(pipeline, multicast_iface=gw_data_source_info.framexmit_iface, multicast_group=gw_data_source_info.framexmit_addr[instrument][0], port=gw_data_source_info.framexmit_addr[instrument][1], wait_time=120) elif gw_data_source_info.data_source == DataSource.DEVSHM: src = pipeparts.mkdevshmsrc(pipeline, shm_dirname=gw_data_source_info.shared_memory_dir[instrument], wait_time=60, watch_suffix='.gwf') else: # impossible code path raise ValueError(gw_data_source_info.data_source) # 10 minutes of buffering, then demux src = pipeparts.mkqueue(pipeline, src, max_size_buffers=0, max_size_bytes=0, max_size_time=Gst.SECOND * 60 * 10) src = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum=False, skip_bad_files=True) # extract state vector and DQ vector and convert to # booleans if gw_data_source_info.dq_channel_dict[instrument] == gw_data_source_info.state_channel_dict[instrument]: dqstatetee = pipeparts.mktee(pipeline, None) statevectorelem = statevector = pipeparts.mkstatevector(pipeline, dqstatetee, required_on=state_vector_on_bits, required_off=state_vector_off_bits, name="%s_state_vector" % instrument) dqvectorelem = dqvector = pipeparts.mkstatevector(pipeline, dqstatetee, required_on=dq_vector_on_bits, required_off=dq_vector_off_bits, name="%s_dq_vector" % instrument) pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.state_channel_dict[instrument]), dqstatetee.get_static_pad("sink")) else: # DQ and state vector are distinct channels # first DQ dqvectorelem = dqvector = pipeparts.mkstatevector(pipeline, None, required_on=dq_vector_on_bits, required_off=dq_vector_off_bits, name="%s_dq_vector" % instrument) pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.dq_channel_dict[instrument]), dqvector.get_static_pad("sink")) # then State statevectorelem = statevector = pipeparts.mkstatevector(pipeline, None, required_on=state_vector_on_bits, required_off=state_vector_off_bits, name="%s_state_vector" % instrument) pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.state_channel_dict[instrument]), statevector.get_static_pad("sink")) @bottle.route("/%s/statevector_on.txt" % instrument) def state_vector_state(elem=statevectorelem): t = float(lal.UTCToGPS(time.gmtime())) on = elem.get_property("on-samples") return "%.9f %d" % (t, on) @bottle.route("/%s/statevector_off.txt" % instrument) def state_vector_state(elem=statevectorelem): t = float(lal.UTCToGPS(time.gmtime())) off = elem.get_property("off-samples") return "%.9f %d" % (t, off) @bottle.route("/%s/statevector_gap.txt" % instrument) def state_vector_state(elem=statevectorelem): t = float(lal.UTCToGPS(time.gmtime())) gap = elem.get_property("gap-samples") return "%.9f %d" % (t, gap) @bottle.route("/%s/dqvector_on.txt" % instrument) def dq_vector_state(elem=dqvectorelem): t = float(lal.UTCToGPS(time.gmtime())) on = elem.get_property("on-samples") return "%.9f %d" % (t, on) @bottle.route("/%s/dqvector_off.txt" % instrument) def dq_vector_state(elem=dqvectorelem): t = float(lal.UTCToGPS(time.gmtime())) off = elem.get_property("off-samples") return "%.9f %d" % (t, off) @bottle.route("/%s/dqvector_gap.txt" % instrument) def dq_vector_state(elem=dqvectorelem): t = float(lal.UTCToGPS(time.gmtime())) gap = elem.get_property("gap-samples") return "%.9f %d" % (t, gap) # extract strain with 1 buffer of buffering strain = pipeparts.mkqueue(pipeline, None, max_size_buffers=1, max_size_bytes=0, max_size_time=0) pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), strain.get_static_pad("sink")) pipeparts.framecpp_channeldemux_set_units(src, {"%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]): "strain"}) # extract idq series and idqstatevector if instrument in gw_data_source_info.idq_channel_name: idqseries = pipeparts.mkqueue(pipeline, None, max_size_buffers=1, max_size_bytes=0, max_size_time=0) pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.idq_channel_name[instrument]), idqseries.get_static_pad("sink")) if instrument in gw_data_source_info.idq_state_channel_name: idqstatevector = pipeparts.mkqueue(pipeline, None, max_size_buffers=1, max_size_bytes=0, max_size_time=0) pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.idq_state_channel_name[instrument]), idqstatevector.get_static_pad("sink")) # fill in holes, skip duplicate data statevector = pipeparts.mkreblock(pipeline, pipeparts.mkaudiorate(pipeline, statevector, skip_to_first=True, silent=False), block_duration = Gst.SECOND // 4) dqvector = pipeparts.mkreblock(pipeline, pipeparts.mkaudiorate(pipeline, dqvector, skip_to_first=True, silent=False), block_duration = Gst.SECOND // 4) src = pipeparts.mkreblock(pipeline, pipeparts.mkaudiorate(pipeline, strain, skip_to_first=True, silent=False, name="%s_strain_audiorate" % instrument), block_duration = Gst.SECOND // 4) if instrument in gw_data_source_info.idq_channel_name: idqseries = pipeparts.mkreblock(pipeline, pipeparts.mkaudiorate(pipeline, idqseries, skip_to_first=True, silent=False), block_duration = Gst.SECOND // 4) if instrument in gw_data_source_info.idq_state_channel_name: idqstatevector = pipeparts.mkreblock(pipeline, pipeparts.mkaudiorate(pipeline, idqstatevector, skip_to_first=True, silent=False), block_duration = Gst.SECOND // 4) @bottle.route("/%s/strain_dropped.txt" % instrument) # FIXME don't hard code the sample rate def strain_add(elem=src, rate=16384): t = float(lal.UTCToGPS(time.gmtime())) # yes I realize we are reading the "add" property for a # route called dropped. That is because the data which # is "dropped" on route is "added" by the audiorate # element add = elem.get_property("add") return "%.9f %d" % (t, add // rate) # use state vector and DQ vector to gate strain. the sizes # of the queues on the control inputs are not important. # they must be large enough to buffer the state vector # streams until they are needed, but the streams will be # consumed immediately when needed so there is no risk that # these queues actually fill up or add latency. be # generous. statevector = pipeparts.mktee(pipeline, statevector) dqvector = pipeparts.mktee(pipeline, dqvector) src = pipeparts.mkgate(pipeline, src, threshold=1, control=pipeparts.mkqueue(pipeline, statevector, max_size_buffers=0, max_size_bytes=0, max_size_time=0), default_state=False, name="%s_state_vector_gate" % instrument) src = pipeparts.mkgate(pipeline, src, threshold=1, control=pipeparts.mkqueue(pipeline, dqvector, max_size_buffers=0, max_size_bytes=0, max_size_time=0), default_state=False, name="%s_dq_vector_gate" % instrument) # extract idq state vector # use the idq state vector, state and dq vectors to gate idq series. if instrument in gw_data_source_info.idq_channel_name: idqseries = pipeparts.mkgate(pipeline, idqseries, threshold=1, control=pipeparts.mkqueue(pipeline, statevector, max_size_buffers=0, max_size_bytes=0, max_size_time=0), default_state=False, name="%s_idq_state_vector_gate" % instrument) idqseries = pipeparts.mkgate(pipeline, idqseries, threshold=1, control=pipeparts.mkqueue(pipeline, dqvector, max_size_buffers=0, max_size_bytes=0, max_size_time=0), default_state=False, name="%s_idq_dq_vector_gate" % instrument) if instrument in gw_data_source_info.idq_state_channel_name: idqseries = pipeparts.mkgate(pipeline, idqseries, threshold=1, control=pipeparts.mkqueue(pipeline, idqstatevector, max_size_buffers=0, max_size_bytes=0, max_size_time=0), default_state=False, name="%s_idq_idqstate_vector_gate" % instrument) elif gw_data_source_info.data_source == DataSource.NDS: src = pipeparts.mkndssrc(pipeline, gw_data_source_info.nds_host, instrument, gw_data_source_info.channel_dict[instrument], gw_data_source_info.nds_channel_type, blocksize=gw_data_source_info.block_size, port=gw_data_source_info.nds_port) else: raise ValueError("invalid data_source: %s" % gw_data_source_info.data_source) # # provide an audioconvert element to allow Virgo data (which is # single-precision) to be adapted into the pipeline # src = pipeparts.mkaudioconvert(pipeline, src) # # progress report # if verbose: src = pipeparts.mkprogressreport(pipeline, src, "progress_src_%s" % instrument) # # optional injections # if gw_data_source_info.injection_filename is not None: src = pipeparts.mkinjections(pipeline, src, gw_data_source_info.injection_filename) # let the injection code run in a different thread than the # whitener, etc., src = pipeparts.mkqueue(pipeline, src, max_size_bytes=0, max_size_buffers=0, max_size_time=Gst.SECOND * 64) # # done # return src, statevector, dqvector, idqseries
[docs]def mkhtgate(pipeline, src, control=None, threshold=8.0, attack_length=128, hold_length=128, **kwargs): """ A convenience function to provide thresholds on input data. This can be used to remove large spikes / glitches etc. Of course you can use it for other stuff by plugging whatever you want as input and ouput NOTE: the queues constructed by this code assume the attack and hold lengths combined are less than 1 second in duration. **Gstreamer Graph** .. graphviz:: digraph G { compound=true; node [shape=record fontsize=10 fontname="Verdana"]; rankdir=LR; tee ; inputqueue ; lal_gate ; in [label="\<src\>"]; out [label="\<return\>"]; in -> tee -> inputqueue -> lal_gate -> out; tee -> lal_gate; } """ # FIXME someday explore a good bandpass filter # src = pipeparts.mkaudiochebband(pipeline, src, low_frequency, high_frequency) if control is None: control = src = pipeparts.mktee(pipeline, src) src = pipeparts.mkqueue(pipeline, src, max_size_time=Gst.SECOND, max_size_bytes=0, max_size_buffers=0) return pipeparts.mkgate(pipeline, src, control=control, threshold=threshold, attack_length=-attack_length, hold_length=-hold_length, invert_control=True, **kwargs)
################################################################################ # Legacy API Below, deprecated # ################################################################################
[docs]@admin.deprecated('replaced by parse_list_to_dict') def channel_dict_from_channel_list(channel_list): """ Given a list of channels, produce a dictionary keyed by ifo of channel names: The list here typically comes from an option parser with options that specify the "append" action. Examples: >>> channel_dict_from_channel_list(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"]) # doctest: +SKIP {'H1': 'LSC-STRAIN', 'H2': 'SOMETHING-ELSE'} """ return parse_list_to_dict(channel_list)
[docs]@admin.deprecated('replaced by parse_list_to_dict with key_is_range=True') def channel_dict_from_channel_list_with_node_range(channel_list): """ Given a list of channels with a range of mass bins, produce a dictionary keyed by ifo of channel names: The list here typically comes from an option parser with options that specify the "append" action. Examples: >>> channel_dict_from_channel_list_with_node_range(["0000:0002:H1=LSC_STRAIN_1,L1=LSC_STRAIN_2", "0002:0004:H1=LSC_STRAIN_3,L1=LSC_STRAIN_4", "0004:0006:H1=LSC_STRAIN_5,L1=LSC_STRAIN_6"]) # doctest: +SKIP {'0000': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0001': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0002': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0003': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0004': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0005': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}} """ outdict = {} for instrument_channel_full in channel_list: instrument_channel_split = instrument_channel_full.split(':') for ii in range(int(instrument_channel_split[0]), int(instrument_channel_split[1])): outdict[str(ii).zfill(4)] = dict((instrument_channel.split("=")) for instrument_channel in instrument_channel_split[2].split(',')) return outdict
[docs]@admin.deprecated('replaced by ravel_dict_to_list with join_arg="channel-name" and filter_keys=ifos') def pipeline_channel_list_from_channel_dict(channel_dict, ifos=None, opt="channel-name"): """ Creates a string of channel names options from a dictionary keyed by ifos. FIXME: This function exists to work around pipeline.py's inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option. - override --channel-name with a different option by setting opt. - restrict the ifo keys to a subset of the channel_dict by setting ifos Examples: >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}) # doctest: +SKIP 'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN ' >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, ifos=["H1"]) # doctest: +SKIP 'H1=LSC-STRAIN ' >>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, opt="test-string") # doctest: +SKIP 'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN ' """ outstr = "" if ifos is None: ifos = channel_dict.keys() for i, ifo in enumerate(ifos): if i == 0: outstr += "%s=%s " % (ifo, channel_dict[ifo]) else: outstr += "--%s=%s=%s " % (opt, ifo, channel_dict[ifo]) return outstr
[docs]@admin.deprecated('replaced by ravel_dict_to_list with key_is_range=True, range_elem=node, join_arg="channel-name", filter_keys=ifos') def pipeline_channel_list_from_channel_dict_with_node_range(channel_dict, node=0, ifos=None, opt="channel-name"): """ Creates a string of channel names options from a dictionary keyed by ifos. FIXME: This function exists to work around pipeline.py's inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option. - override --channel-name with a different option by setting opt. - restrict the ifo keys to a subset of the channel_dict by. setting ifos Examples: >>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0) # doctest: +SKIP 'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN ' >>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, ifos=["H1"]) # doctest: +SKIP 'H1=LSC-STRAIN ' >>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, opt="test-string") # doctest: +SKIP 'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN ' """ outstr = "" node = str(node).zfill(4) if ifos is None: ifos = channel_dict[node].keys() for i, ifo in enumerate(ifos): if i == 0: outstr += "%s=%s " % (ifo, channel_dict[node][ifo]) else: outstr += "--%s=%s=%s " % (opt, ifo, channel_dict[node][ifo]) return outstr
[docs]@admin.deprecated('replaced by parse_list_to_dict with key_is_range=True') def injection_dict_from_channel_list_with_node_range(injection_list): """ Given a list of injection xml files with a range of mass bins, produce a dictionary keyed by bin number: The list here typically comes from an option parser with options that specify the "append" action. Examples: >>> injection_dict_from_channel_list_with_node_range(["0000:0002:Injection_1.xml", "0002:0004:Injection_2.xml"]) # doctest: +SKIP {'0000': 'Injection_1.xml', '0001': 'Injection_1.xml', '0002': 'Injection_2.xml', '0003': 'Injection_2.xml'} """ outdict = {} for injection_name in injection_list: injection_name_split = injection_name.split(':') for ii in range(int(injection_name_split[0]), int(injection_name_split[1])): outdict[str(ii).zfill(4)] = injection_name_split[2] return outdict
[docs]@admin.deprecated('replaced by combination of zip_dict_values and parse_list_to_dict with value_transform=parse_int') def state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list, state_vector_on_off_dict=DEFAULT_STATE_VECTOR_ON_OFF): """ Produce a dictionary (keyed by detector) of on / off bit tuples from a list provided on the command line. Takes default values from module level datasource.state_vector_on_off_dict if state_vector_on_off_dict is not given Inputs must be given as base 10 or 16 integers Examples: >>> on_bit_list = ["V1=7", "H1=7", "L1=7"] # doctest: +SKIP >>> off_bit_list = ["V1=256", "H1=352", "L1=352"] # doctest: +SKIP >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list) # doctest: +SKIP {'H1': [7, 352], 'H2': [7, 352], 'L1': [7, 352], 'V1': [7, 256]} >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{}) # doctest: +SKIP {'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]} >>> on_bit_list = ["V1=0x7", "H1=0x7", "L1=0x7"] # doctest: +SKIP >>> off_bit_list = ["V1=0x256", "H1=0x352", "L1=0x352"] # doctest: +SKIP >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{}) # doctest: +SKIP {'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]} """ for ifo, bits in [line.strip().split("=", 1) for line in on_bit_list]: bits = int(bits, 16) if bits.startswith("0x") else int(bits) try: state_vector_on_off_dict[ifo][0] = bits except KeyError: state_vector_on_off_dict[ifo] = [bits, 0] for ifo, bits in [line.strip().split("=", 1) for line in off_bit_list]: bits = int(bits, 16) if bits.startswith("0x") else int(bits) # shouldn't have to worry about key errors at this point state_vector_on_off_dict[ifo][1] = bits return state_vector_on_off_dict
[docs]@admin.deprecated('replaced by combination of ravel_dict_to_list with unzip=True and join_arg=["state-vector-on-bits", "state-vector-off-bits"]') def state_vector_on_off_list_from_bits_dict(bit_dict): """ Produce a tuple of useful command lines from a dictionary of on / off state vector bits keyed by detector FIXME: This function exists to work around pipeline.py's inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option. Examples: >>> state_vector_on_off_dict = {"H1":[0x7, 0x160], "H2":[0x7, 0x160], "L1":[0x7, 0x160], "V1":[0x67, 0x100]} # doctest: +SKIP >>> state_vector_on_off_list_from_bits_dict(state_vector_on_off_dict) # doctest: +SKIP ('H1=7 --state-vector-on-bits=H2=7 --state-vector-on-bits=L1=7 --state-vector-on-bits=V1=103 ', 'H1=352 --state-vector-off-bits=H2=352 --state-vector-off-bits=L1=352 --state-vector-off-bits=V1=256 ') """ onstr = "" offstr = "" for i, ifo in enumerate(bit_dict): if i == 0: onstr += "%s=%s " % (ifo, bit_dict[ifo][0]) offstr += "%s=%s " % (ifo, bit_dict[ifo][1]) else: onstr += "--state-vector-on-bits=%s=%s " % (ifo, bit_dict[ifo][0]) offstr += "--state-vector-off-bits=%s=%s " % (ifo, bit_dict[ifo][1]) return onstr, offstr
[docs]@admin.deprecated('replaced by parse_list_to_dict with value_transform=parse_host.') def framexmit_dict_from_framexmit_list(framexmit_list): """ Given a list of framexmit addresses with ports, produce a dictionary keyed by ifo: The list here typically comes from an option parser with options that specify the "append" action. Examples: >>> framexmit_dict_from_framexmit_list(["H1=224.3.2.1:7096", "L1=224.3.2.2:7097", "V1=224.3.2.3:7098"]) # doctest: +SKIP {'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097), 'V1': ('224.3.2.3', 7098)} """ out = [] for instrument_addr in framexmit_list: ifo, addr_port = instrument_addr.split("=") addr, port = addr_port.split(':') out.append((ifo, (addr, int(port)))) return dict(out)
[docs]@admin.deprecated('replaced by ravel_dict_to_list with value_transform=ravel_host.') def framexmit_list_from_framexmit_dict(framexmit_dict, ifos=None, opt="framexmit-addr"): """ Creates a string of framexmit address options from a dictionary keyed by ifos. Examples: >>> framexmit_list_from_framexmit_dict({'V1': ('224.3.2.3', 7098), 'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097)}) # doctest: +SKIP 'V1=224.3.2.3:7098 --framexmit-addr=H1=224.3.2.1:7096 --framexmit-addr=L1=224.3.2.2:7097 ' """ outstr = "" if ifos is None: ifos = framexmit_dict.keys() for i, ifo in enumerate(ifos): if i == 0: outstr += "%s=%s:%s " % (ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1]) else: outstr += "--%s=%s=%s:%s " % (opt, ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1]) return outstr
[docs]@admin.deprecated('replaced by parse_list_to_dict.') def frame_type_dict_from_frame_type_list(frame_type_list): """ Given a list of frame types, produce a dictionary keyed by ifo: The list here typically comes from an option parser with options that specify the "append" action. Examples: >>> frame_type_dict_from_frame_type_list(['H1=H1_GWOSC_O2_16KHZ_R1', 'L1=L1_GWOSC_O2_16KHZ_R1']) # doctest: +SKIP {'H1': 'H1_GWOSC_O2_16KHZ_R1', 'L1': 'L1_GWOSC_O2_16KHZ_R1'} """ out = {} for frame_opt in frame_type_list: ifo, frame_type = frame_opt.split("=") out[ifo] = frame_type return out