# Copyright (C) 2009--2013 Kipp Cannon, Chad Hanna, Drew Keppel
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__doc__ = """Gravitational wave datasource utilities, including abstractions for storing the required
information needed to connect to data sources. The main elements of the included API are:
1. DataSourceInfo class for storing necessary information to connect to datasource
2. Useful constants to help configuring a DataSourceInfo, like the Detector and DataSource classes
3. Pipeline building utilities
4. Command-line utilities for parsing and converting values into DataSource Info
"""
# Potential alternate copyright method
# from gstlal.utilities import admin
# admin.add_copyright(authors=['Kipp Cannon', 'Chad Hanna', 'Drew Keppel'], start_year=2009, end_year=2013)
import collections
import enum
import optparse
import os
from pathlib import Path
import tempfile
import time
import types
from typing import Union, Dict
import lal
from lal import LIGOTimeGPS
from ligo import segments
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import segments as ligolw_segments
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
try:
import bottle
if tuple(map(int, bottle.__version__.split("."))) < (0, 13):
# FIXME: see
# https://git.ligo.org/lscsoft/gstlal/-/merge_requests/146
# if the required patch is added to the distro-supplied
# bottle before a 0.13 is released, update the version
# check to the correct version
raise ImportError
except ImportError:
# FIXME: remove after system-wide isntall can be relied on
from gstlal import bottle
from gstlal import datafind
from gstlal import pipeparts
from gstlal.utilities import admin
GObject.threads_init()
Gst.init(None)
DEFAULT_BLOCK_SIZE = 16384 * 8 * 512
################################################################################
# USEFUL CONSTANTS #
################################################################################
[docs]class Detector(str, enum.Enum):
"""Enumeration of available detectors
"""
G1 = 'G1'
H1 = 'H1'
H2 = 'H2'
K1 = 'K1'
L1 = 'L1'
V1 = 'V1'
[docs]class DataSource(str, enum.Enum):
"""Enumeration of available data sources
TODO: include descriptions of the options
"""
ALIGO = 'AdvLIGO'
AVirgo = 'AdvVirgo'
Frames = 'frames'
FrameXMIT = 'framexmit'
LIGO = 'LIGO'
LVSHM = 'lvshm'
DEVSHM = 'devshm'
NDS = 'nds'
Silence = 'silence'
White = 'white'
KNOWN_DATASOURCES = [
DataSource.ALIGO,
DataSource.AVirgo,
DataSource.Frames,
DataSource.FrameXMIT,
DataSource.LIGO,
DataSource.LVSHM,
DataSource.DEVSHM,
DataSource.NDS,
DataSource.Silence,
DataSource.White,
]
KNOWN_LIVE_DATASOURCES = [
DataSource.FrameXMIT,
DataSource.LVSHM,
DataSource.DEVSHM,
]
DEFAULT_SHARED_MEMORY_PARTITION = {
Detector.H1.value: "LHO_Data",
Detector.L1.value: "LLO_Data",
Detector.V1.value: "VIRGO_Data"
}
DEFAULT_SHARED_MEMORY_DIR = {
Detector.H1.value: "/dev/shm/kafka/H1_Data",
Detector.L1.value: "/dev/shm/kafka/L1_Data",
Detector.V1.value: "/dev/shm/kafka/V1_Data"
}
DEFAULT_STATE_CHANNEL = {detector.value: "LLD-DQ_VECTOR" for detector in Detector}
DEFAULT_DQ_CHANNEL = {detector.value: "DMT-DQ_VECTOR" for detector in Detector}
DEFAULT_FRAMXMIT_ADDR = {
Detector.H1.value: ("224.3.2.1", 7096),
Detector.L1.value: ("224.3.2.2", 7097),
Detector.V1.value: ("224.3.2.3", 7098),
}
DEFAULT_STATE_VECTOR_ON_OFF = {
Detector.H1.value: [0x7, 0x160],
Detector.H2.value: [0x7, 0x160],
Detector.L1.value: [0x7, 0x160],
Detector.V1.value: [0x67, 0x100]
}
DEFAULT_DQ_VECTOR_ON_OFF = {
Detector.H1.value: [0x7, 0x0],
Detector.H2.value: [0x7, 0x0],
Detector.L1.value: [0x7, 0x0],
Detector.V1.value: [0x7, 0x0]
}
HostInfo = collections.namedtuple('DataFindServerInfo', 'host port')
[docs]class DataFindServer:
"""Enumeration of available datafind servers"""
GeneralProp = HostInfo('datafind.ligo.org', 443)
[docs]class DataSourceConfigError(ValueError):
"""Error subclass for configuration errors"""
################################################################################
# Primary Datasource information abstraction #
################################################################################
[docs]class DataSourceInfo:
"""A pythonic representation of a datasource with configured settings necessary for usage in pipelines
"""
def __init__(self, data_source: Union[str, DataSource], channel_name: Dict[Detector, str],
gps_start_time: Union[int, LIGOTimeGPS] = None, gps_end_time: Union[int, LIGOTimeGPS] = None,
shared_memory_partition: Dict[Detector, str] = None, shared_memory_dir: Dict[Detector, str] = None,
frame_segments_name: str = None, state_vector_on_bits: Dict[Detector, str] = None,
state_vector_off_bits: Dict[Detector, str] = None, dq_vector_on_bits: str = None, dq_vector_off_bits: str = None, frame_cache: Union[str, Path] = None,
injections: Union[str, Path] = None, nds_host: str = None, nds_port: int = None, nds_channel_type: str = 'online', shared_memory_assumed_duration: int = 4,
shared_memory_block_size: int = 4096, frame_segments_file: Union[str, Path] = None, block_size: int = DEFAULT_BLOCK_SIZE, frame_type: Dict[Detector, str] = None,
data_find_server: Union[str or DataFindServer] = None, framexmit_addr: str = None, framexmit_iface: str = None,
state_channel_name: Dict[Detector, str] = None, dq_channel_name: Dict[Detector, str] = None,
idq_channel_name: Dict[Detector, str] = None,
idq_state_channel_name: Dict[Detector, str] = None):
"""Create a DataSource information object that contains the necessary details to produce GStreamer elements
for loading gravitational wave data from a variety of possible sources.
Args:
data_source:
str or DataSource, the data source from [frames|framexmit|lvshm|nds|silence|white] which to download data
gps_start_time:
int or LIGOTimeGPS, the start time of the segment to analyze in GPS seconds. Required unless data_source=lvshm
gps_end_time:
int or LIGOTimeGPS, the end time of the segment to analyze in GPS seconds. Required unless data_source=lvshm
channel_name:
Dict[Detector, str] or Dict[Detector, HostInfo], the name of the channels to process per detector
framexmit_addr:
Dict[Detector, str] or Dict[Detector, HostInfo], the address of the framexmit service.
framexmit_iface:
str, the multicast interface address of the framexmit service.
state_channel_name:
Dict[Detector, str], the name of the state vector channel. This channel will be used to control the flow of data via the on/off bits.
dq_channel_name:
Dict[Detector, str], the name of the data quality channel. This channel will be used to control the flow of data via the on/off bits.
idq_channel_name:
Dict[Detector, str], the name of the idq channel. This channel will be used to create the idq_series information.
idq_state_channel_name:
Dict[Detector, str], the name of the idq state channel. This channel will be used to gate the idq_series information created by idq_channel_name.
shared_memory_partition:
Dict[Detector, str], the name of the shared memory partition for a given detector.
shared_memory_dir:
Dict[Detector, str], the name of the shared memory directory for a given detector.
frame_segments_name:
str, the name of the segments to extract from the segment tables. Required iff frame_segments_file is given
state_vector_on_bits:
Dict[Detector, str], default None, the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with {Detector:bits} Only
currently has meaning for online (lvshm) data.
state_vector_off_bits:
Dict[Detector, str], default None, the state vector on bits to process (optional). The default is 0x160 for all detectors. Override with {Detector:bits} Only
currently has meaning for online (lvshm) data.
dq_vector_on_bits:
Dict[Detector, str], default None, the dq vector on bits to process (optional). The default is 0x7 for all detectors. Override with {Detector:bits} Only
currently has meaning for online (lvshm) data.
dq_vector_off_bits:
Dict[Detector, str], default None, the dq vector off bits to process (optional). The default is 0x160 for all detectors. Override with {Detector:bits} Only
currently has meaning for online (lvshm) data.
frame_cache:
str or Path, the name of the LAL cache listing the LIGO-Virgo .gwf frame files
injections:
str or Path, default None, the name of the LIGO light-weight XML file from which to load injections (optional)
nds_host:
str, the NDS server address. only used if data_source=nds
nds_port:
int, the NDS server port. only used if data_source=nds
nds_channel_type:
str, default 'online', the NDS channel type. Only used if data_source=nds
shared_memory_assumed_duration:
int, default 4, the assumed span of files in seconds. Default = 4.
shared_memory_block_size:
int, default 4096, the byte size to read per buffer.
frame_segments_file:
str or Path, the name of the LIGO light-weight XML file from which to load frame segments. Optional iff data_source=frames
block_size:
int, default 16384 * 8 * 512 (512 seconds of double precision data at 16384 Hz), Data block size to read in bytes. This parameter is only used if
data_source is one of {white, silence, AdvVirgo, LIGO, AdvLIGO, nds}.
frame_type:
Dict[Detector, str], default None, a dictionary setting the frame type (string) for each detector, e.g. {Detector.H1: 'H1_GWOSC_O2_16KHZ_R1'}. Used with data_source='frames'.
data_find_server:
str, default None, the data find server for LIGO data discovery. Used with data_source=frames.
"""
self.data_source = data_source.name if isinstance(data_source, DataSourceInfo) else data_source
self.block_size = block_size
self.frame_cache = frame_cache.as_posix() if isinstance(frame_cache, Path) else frame_cache
self.frame_type = {} if frame_type is None else frame_type
self.data_find_server = '{}:{:d}'.format(data_find_server.host, data_find_server.port) if isinstance(data_find_server, HostInfo) else data_find_server
self.gps_start_time = gps_start_time.gpsSeconds if isinstance(gps_start_time, LIGOTimeGPS) else gps_start_time
self.gps_end_time = gps_end_time.gpsSeconds if isinstance(gps_end_time, LIGOTimeGPS) else gps_end_time
self.injections = injections.as_posix() if isinstance(injections, Path) else injections
self.channel_name = {} if channel_name is None else channel_name
self.nds_host = nds_host
self.nds_port = nds_port
self.nds_channel_type = nds_channel_type
self.framexmit_addr = DEFAULT_FRAMXMIT_ADDR.copy()
self.framexmit_iface = framexmit_iface
self.state_channel_name = DEFAULT_STATE_CHANNEL.copy()
self.dq_channel_name = DEFAULT_DQ_CHANNEL.copy()
self.idq_channel_name = {} if idq_channel_name is None else idq_channel_name
self.idq_state_channel_name = {} if idq_state_channel_name is None else idq_state_channel_name
self.shared_memory_partition = DEFAULT_SHARED_MEMORY_PARTITION.copy()
self.shared_memory_assumed_duration = shared_memory_assumed_duration
self.shared_memory_block_size = shared_memory_block_size
self.shared_memory_dir = DEFAULT_SHARED_MEMORY_DIR.copy()
self.frame_segments_file = frame_segments_file
self.frame_segments_name = frame_segments_name
self.state_vector_on_bits = {} if state_vector_on_bits is None else state_vector_on_bits
self.state_vector_off_bits = {} if state_vector_off_bits is None else state_vector_off_bits
self.dq_vector_on_bits = {} if dq_vector_on_bits is None else dq_vector_on_bits
self.dq_vector_off_bits = {} if dq_vector_off_bits is None else dq_vector_off_bits
if state_channel_name:
self.state_channel_name.update(state_channel_name)
if dq_channel_name:
self.dq_channel_name.update(dq_channel_name)
if framexmit_addr:
self.framexmit_addr.update(framexmit_addr)
if shared_memory_partition:
self.shared_memory_partition.update(shared_memory_partition)
if shared_memory_dir:
self.shared_memory_dir.update(shared_memory_dir)
self.validate()
# Set legacy attribute aliases
self.injection_filename = self.injections
self.channel_dict = self.channel_name
self.dq_channel_dict = self.dq_channel_name
self.state_channel_dict = self.state_channel_name
self.shm_part_dict = self.shared_memory_partition
self.shm_assumed_duration = self.shared_memory_assumed_duration
self.shm_block_size = self.shared_memory_block_size
self.shm_dir_dict = self.shared_memory_dir
self.seg = None
if self.gps_start_time is not None:
start = LIGOTimeGPS(self.gps_start_time)
end = LIGOTimeGPS(self.gps_end_time)
self.seg = segments.segment(start, end)
if self.frame_segments_file is not None:
self.frame_segments = ligolw_segments.segmenttable_get_by_name(
ligolw_utils.load_filename(self.frame_segments_file, contenthandler=ligolw_segments.LIGOLWContentHandler), self.frame_segments_name).coalesce()
if self.seg is not None:
# Clip frame segments to seek segment if it
# exists (not required, just saves some
# memory and I/O overhead)
self.frame_segments = segments.segmentlistdict((detector, seglist & segments.segmentlist([self.seg])) for detector, seglist in self.frame_segments.items())
else:
## if no frame segments provided, set them to an empty segment list dictionary
self.frame_segments = segments.segmentlistdict((detector, None) for detector in self.channel_dict)
if self.data_source == DataSource.Frames and (self.frame_cache is None and self.frame_type):
frame_type_dict = self.frame_type
_frame_cache = datafind.load_frame_cache(start, end, frame_type_dict, host=self.data_find_server)
## create a temporary cache file
self._frame_cache_fileobj = tempfile.NamedTemporaryFile(suffix=".cache", dir=os.getenv("_CONDOR_SCRATCH_DIR", tempfile.gettempdir()))
self.frame_cache = self._frame_cache_fileobj.name
with open(self.frame_cache, "w") as f:
for cacheentry in _frame_cache:
print(str(cacheentry), file=f)
self.state_vector_on_off_bits = zip_dict_values(self.state_vector_on_bits, self.state_vector_off_bits, defaults=DEFAULT_STATE_VECTOR_ON_OFF)
self.dq_vector_on_off_bits = zip_dict_values(self.dq_vector_on_bits, self.dq_vector_off_bits, defaults=DEFAULT_DQ_VECTOR_ON_OFF)
[docs] def validate(self):
"""Validation of configuration"""
# Validate data_source
if self.data_source not in KNOWN_DATASOURCES:
raise DataSourceConfigError('Unknown datasource {}, must be one of: {}'.format(self.data_source, ', '.join(KNOWN_DATASOURCES)))
# Validate data_source == frames
if self.data_source == DataSource.Frames and (self.frame_cache is None and self.frame_type is None):
raise DataSourceConfigError("frame_cache or frame_type must be specified when using data_source='frames'")
# Validate channel_name not empty
if not self.channel_name:
raise DataSourceConfigError("Must specify at least one channel as {Detector: name}")
# Validate frame_segments_file
if self.frame_segments_file is not None and self.data_source != DataSource.Frames:
raise DataSourceConfigError("Can only give frame_segments_file if data_source='frames'")
# Validate frame_segments_name
if self.frame_segments_name is not None and self.frame_segments_file is None:
raise DataSourceConfigError("Can only specify frame_segments_name if frame_segments_file is given")
# Validate idq_channel_name comes with idq_state_channel_name
if self.idq_channel_name is not None and self.idq_state_channel_name is None:
raise DataSourceConfigError("Can only specify --idq-channel-name if --idq-state-channel-name is given")
elif self.idq_channel_name is None and self.idq_state_channel_name is not None:
raise DataSourceConfigError("Can only specify --idq-state-channel-name if --idq-channel-name is given")
# Validate NDS arguments
if self.data_source == DataSource.NDS:
if self.nds_host is None or self.nds_port is None:
raise DataSourceConfigError("Must specify nds_host and nds_port if data_source='nds'")
# Validate start and end arguments
if self.data_source in KNOWN_LIVE_DATASOURCES and (self.gps_start_time is not None or self.gps_end_time is not None):
raise DataSourceConfigError("Cannot set gps_start_time or gps_end_time for live source: {}".format(self.data_source))
if self.data_source not in KNOWN_LIVE_DATASOURCES and (self.gps_start_time is None or self.gps_end_time is None):
raise DataSourceConfigError("Must set gps_start_time and gps_end_time for non-live source: {}".format(self.data_source))
if self.gps_start_time is not None and self.gps_end_time is not None:
try:
start = LIGOTimeGPS(self.gps_start_time)
except ValueError as e:
raise DataSourceConfigError("invalid gps_start_time {:d}".format(self.gps_start_time)) from e
try:
end = LIGOTimeGPS(self.gps_end_time)
except ValueError as e:
raise DataSourceConfigError("invalid gps_end_time {:d}".format(self.gps_end_time)) from e
if start >= end:
raise DataSourceConfigError('Must specify gps_start_time < gps_end_time')
[docs] @staticmethod
def from_optparse(options: optparse.Values):
"""Construct a DataSourceInfo object from an optparer.OptionParser
Args:
options:
Values, with all of the arguments defined in append_options
Returns:
DataSourceInfo object
"""
channel_dict = parse_list_to_dict(options.channel_name)
frame_type = parse_list_to_dict(options.frame_type)
shared_memory_part = parse_list_to_dict(options.shared_memory_partition)
shared_memory_dir = parse_list_to_dict(options.shared_memory_dir)
state_channel_name = parse_list_to_dict(options.state_channel_name)
dq_channel_name = parse_list_to_dict(options.dq_channel_name)
idq_channel_name = parse_list_to_dict(options.idq_channel_name)
idq_state_channel_name = parse_list_to_dict(options.idq_state_channel_name)
state_vector_on_bits = parse_list_to_dict(options.state_vector_on_bits, value_transform=parse_int)
state_vector_off_bits = parse_list_to_dict(options.state_vector_off_bits, value_transform=parse_int)
dq_vector_on_bits = parse_list_to_dict(options.dq_vector_on_bits, value_transform=parse_int)
dq_vector_off_bits = parse_list_to_dict(options.dq_vector_off_bits, value_transform=parse_int)
return DataSourceInfo(data_source=options.data_source,
block_size=options.block_size,
frame_cache=options.frame_cache,
frame_type=frame_type,
data_find_server=options.data_find_server,
gps_start_time=options.gps_start_time,
gps_end_time=options.gps_end_time,
injections=options.injection_file,
channel_name=channel_dict,
nds_host=options.nds_host if options.data_source == DataSource.NDS else None,
nds_port=options.nds_port if options.data_source == DataSource.NDS else None,
nds_channel_type=options.nds_channel_type if options.data_source == DataSource.NDS else None,
framexmit_addr=options.framexmit_addr,
framexmit_iface=options.framexmit_iface,
state_channel_name=state_channel_name,
dq_channel_name=dq_channel_name,
idq_channel_name=idq_channel_name,
idq_state_channel_name=idq_state_channel_name,
shared_memory_partition=shared_memory_part,
shared_memory_assumed_duration=options.shared_memory_assumed_duration,
shared_memory_block_size=options.shared_memory_block_size,
shared_memory_dir=shared_memory_dir,
frame_segments_file=options.frame_segments_file,
frame_segments_name=options.frame_segments_name,
state_vector_on_bits=state_vector_on_bits,
state_vector_off_bits=state_vector_off_bits,
dq_vector_on_bits=dq_vector_on_bits,
dq_vector_off_bits=dq_vector_off_bits)
################################################################################
# CLI Parsing Utilities #
################################################################################
[docs]def parse_host(host: str):
"""Value transform for use with parse_list_to_dict that splits host into name and port tuple"""
name, port = host.split(':')
return (name, int(port))
[docs]def parse_int(inp: str) -> int:
"""Value transform for use with parse_list_to_dict that coerces string input to int"""
if inp.startswith('0x'):
return int(inp, 16)
return int(inp)
[docs]def parse_list_to_dict(lst: list, value_transform: types.FunctionType = None, sep: str = '=', key_is_range: bool = False, range_sep: str = ':') -> dict:
"""A general list to dict argument parsing coercion function
Args:
lst:
list, a list of the form ['A=V1', 'B=V2', ...], where "=" only has to match the sep_str argument
value_transform:
Function, default None. An optional transformation function to apply on values of the dictionary
sep:
str, default '=', the separator string between dict keys and values in list elements
key_is_range:
bool, default False. If True, the keys of the list are compound and contain range information e.g. "start:stop:remaining,list,of,items"
range_sep:
str, default ':' the separator string for range key information
Returns:
dict of the form {'A': value_transform('V1'), ...}
Examples:
>>> parse_list_to_dict(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"]) # doctest: +SKIP
{'H1': 'LSC-STRAIN', 'H2': 'SOMETHING-ELSE'}
>>> parse_list_to_dict(["0000:0002:H1=LSC_STRAIN_1,L1=LSC_STRAIN_2", "0002:0004:H1=LSC_STRAIN_3,L1=LSC_STRAIN_4", "0004:0006:H1=LSC_STRAIN_5,L1=LSC_STRAIN_6"], key_is_range=True) # doctest: +SKIP
{'0000': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0001': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0002': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0003': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0004': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0005': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}}
"""
if lst is None:
return
coerced = {}
if key_is_range:
# This will produce tuples (start, stop, str-to-dict)
splits = [e.split(range_sep) for e in lst]
for start, stop, val in splits:
for i in range(int(start), int(stop)):
key = str(i).zfill(4)
coerced[key] = parse_list_to_dict([l.strip() for l in val.split(',')], value_transform=value_transform, sep=sep, key_is_range=False)
else:
if len(lst) == 1 and sep not in lst[0]: # non-dict entry
return lst[0]
coerced = dict([e.split(sep) for e in lst])
if value_transform is not None:
for k in coerced:
coerced[k] = value_transform(coerced[k])
return coerced
return coerced
[docs]def ravel_host(host: tuple) -> str:
"""Value transform for use with ravel_dict_to_list that ravels a host name and port into a string"""
return '{}:{}'.format(host[0], str(host[1]))
[docs]def ravel_dict_to_list(dct: dict, value_transform: types.FunctionType = str, sep: str = '=', key_is_range: bool = False, range_sep: str = ':', join_arg: str = None,
unzip: bool = False, filter_keys: list = None, range_elem: int = None) -> list:
"""Functional inverse of parse_list_to_dict.
TODO: This function exists to work around pipeline.py's inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option.
Args:
dct:
dict, the dict to transform into a list
value_transform:
Function, default str, the way to transform values of the dict before adding to list
sep:
str, default '=' separator character to join key-value pairs
key_is_range:
bool, default False. If True, aggregate keys into ranges
range_sep:
str, default ':', the range separator to join range key min and max values
unzip:
bool, default False, if True unzip the dct arg and ravel each single-valued dict separately
Returns:
List[str] the dict converted into lists of strings
"""
if key_is_range:
if range_elem is None:
raise NotImplementedError
dct = dct[str(range_elem).zfill(4)]
if filter_keys:
dct = dict([(k, v) for k, v in dct.items() if k in filter_keys])
if unzip:
dicts = unzip_dict_values(dct)
if not isinstance(join_arg, list):
join_arg = len(dicts) * [join_arg]
return [ravel_dict_to_list(d, value_transform=value_transform, sep=sep, key_is_range=key_is_range, range_sep=range_sep, join_arg=j, unzip=False) for d, j in
zip(dicts, join_arg)]
else:
lst = ['{}{}{}'.format(k, sep, value_transform(v)) for k, v in sorted(dct.items(), key=lambda x: x[0])]
if len(lst) == 0:
return lst
if join_arg is not None:
if len(lst) == 1:
return lst[0]
lst = lst[0] + ' ' + ' '.join('--' + join_arg + '=' + l for l in lst[1:])
return lst
[docs]def zip_dict_values(*dicts, defaults: dict = None, key_union: bool = True):
"""Zip dict values by matching keys
Args:
*dicts:
Iterable[dict], a collection of dicts whose values will be grouped by key in the order given
defaults:
dict, default None, if specified fill in missing values with these defaults
key_union:
bool, default True, if True then use a union of keys, else use intersection.
Examples:
>>> on_bit_list = parse_list_to_dict(["V1=7", "H1=7", "L1=7"], value_transform=int) # doctest: +SKIP
>>> off_bit_list = parse_list_to_dict(["V1=256", "H1=352", "L1=352"], value_transform=int) # doctest: +SKIP
>>> zip_dict_values(on_bit_list, off_bit_list, defaults=DEFAULT_STATE_VECTOR_ON_OFF) # doctest: +SKIP
{'V1': [7, 256], 'H1': [7, 352], 'H2': [7, 352], 'L1': [7, 352]}
>>> zip_dict_values(on_bit_list, off_bit_list,{}) # doctest: +SKIP
{'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
>>> on_bit_list = parse_list_to_dict(["V1=0x7", "H1=0x7", "L1=0x7"], value_transform=lambda x: int(x, 16)) # doctest: +SKIP
>>> off_bit_list = parse_list_to_dict(["V1=0x256", "H1=0x352", "L1=0x352"], value_transform=lambda x: int(x, 16)) # doctest: +SKIP
>>> zip_dict_values(on_bit_list, off_bit_list,{}) # doctest: +SKIP
{'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]}
"""
keys = list(getattr(set(), 'union' if key_union else 'intersection')(*([set(d.keys()) for d in dicts] + ([] if defaults is None else [set(defaults.keys())]))))
unified = {}
for k in keys:
ordered_value = [d[k] for d in dicts if k in d]
if not ordered_value: # the only way this can happen is if k was in defaults but none of dicts, so we can assume defaults is not None
ordered_value = defaults[k]
unified[k] = ordered_value
return unified
[docs]def unzip_dict_values(dct: dict):
"""Split multi-valued dict into ordered collection of single-valued dicts with common keys
All values should be of same length.
"""
num_vals = len(list(dct.values())[0])
if any(len(v) != num_vals for v in dct.values()):
raise ValueError('All dict values must have same length.')
dicts = [dict() for n in range(num_vals)]
for k in sorted(list(dct.keys())):
ordered_vals = list(dct[k])
for n in range(num_vals):
dicts[n][k] = ordered_vals[n]
return dicts
[docs]def append_options(parser):
"""
Append generic data source options to an OptionParser object in order
to have consistent an unified command lines and parsing throughout the project
for applications that read GW data.
- --data-source [string]
Set the data source from [frames|framexmit|lvshm|nds|silence|white].
- --block-size [int] (bytes)
Data block size to read in bytes. Default 16384 * 8 * 512 which is 512 seconds of double
precision data at 16384 Hz. This parameter is only used if --data-source is one of
white, silence, AdvVirgo, LIGO, AdvLIGO, nds.
- --frame-cache [filename]
Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional).
- --frame-type [string]
Set the frame type for a given instrument.
Can be given multiple times as --frame-type=IFO=FRAME-TYPE
- --gps-start-time [int] (seconds)
Set the start time of the segment to analyze in GPS seconds.
Required unless --data-source is lvshm or framexmit
- --gps-end-time [int] (seconds)
Set the end time of the segment to analyze in GPS seconds.
Required unless --data-source in lvshm,framexmit
- --injection-file [filename]
Set the name of the LIGO light-weight XML file from which to load injections (optional).
- --channel-name [string]
Set the name of the channels to process.
Can be given multiple times as --channel-name=IFO=CHANNEL-NAME
- --nds-host [hostname]
Set the remote host or IP address that serves nds data.
This is required iff --data-source is nds
- --nds-port [portnumber]
Set the port of the remote host that serves nds data, default = 31200.
This is required iff --data-source is nds
- --nds-channel-type [string] type
FIXME please document
- --framexmit-addr [string]
Set the address of the framexmit service. Can be given
multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port
- --framexmit-iface [string]
Set the address of the framexmit interface.
- --state-channel-name [string]
Set the name of the state vector channel.
This channel will be used to control the flow of data via the on/off bits.
Can be given multiple times as --state-channel-name=IFO=STATE-CHANNEL-NAME
- --dq-channel-name [string]
Set the name of the data quality channel.
This channel will be used to control the flow of data via the on/off bits.
Can be given multiple times as --state-channel-name=IFO=DQ-CHANNEL-NAME
- --shared-memory-partition [string]
Set the name of the shared memory partition for a given instrument.
Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME
- --shared-memory-dir [string]
Set the name of the shared memory directory for a given instrument.
Can be given multiple times as --shared-memory-dir=IFO=DIR-NAME
- --shared-memory-assumed-duration [int]
Set the assumed span of files in seconds. Default = 4 seconds.
- --shared-memory-block-size [int]
Set the byte size to read per buffer. Default = 4096 bytes.
- --frame-segments-file [filename]
Set the name of the LIGO light-weight XML file from which to load frame segments.
Optional iff --data-source is frames
- --frame-segments-name [string]
Set the name of the segments to extract from the segment tables.
Required iff --frame-segments-file is given
- --state-vector-on-bits [hex]
Set the state vector on bits to process (optional).
The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times.
Only currently has meaning for online (lvshm, framexmit) data
- --state-vector-off-bits [hex]
Set the state vector off bits to process (optional).
The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times.
Only currently has meaning for online (lvshm, framexmit) data
- --dq-vector-on-bits [hex]
Set the state vector on bits to process (optional).
The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times.
Only currently has meaning for online (lvshm, framexmit) data
- --dq-vector-off-bits [hex]
Set the dq vector off bits to process (optional).
The default is 0x0 for all detectors. Override with IFO=bits can be given multiple times.
Only currently has meaning for online (lvshm, framexmit) data
**Typical usage case examples**
1. Reading data from frames::
--data-source=frames --gps-start-time=999999000 --gps-end-time=999999999 \\
--channel-name=H1=LDAS-STRAIN --frame-segments-file=segs.xml \\
--frame-segments-name=datasegments
2. Reading data from a fake LIGO source::
--data-source=LIGO --gps-start-time=999999000 --gps-end-time=999999999 \\
--channel-name=H1=FAIKE-STRAIN
3. Reading online data via framexmit::
--data-source=framexmit --channel-name=H1=FAIKE-STRAIN
4. Many other combinations possible, please add some!
"""
group = optparse.OptionGroup(parser, "Data source options", "Use these options to set up the appropriate data source")
group.add_option("--data-source", metavar="source", help="Set the data source from [frames|framexmit|lvshm|nds|silence|white]. Required.")
group.add_option("--block-size", type="int", metavar="bytes", default=16384 * 8 * 512,
help="Data block size to read in bytes. Default 16384 * 8 * 512 (512 seconds of double precision data at 16384 Hz. This parameter is only used if --data-source is one of white, silence, AdvVirgo, LIGO, AdvLIGO, nds.")
group.add_option("--frame-cache", metavar="filename", help="Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional).")
group.add_option("--frame-type", metavar="name", action="append",
help="Set the frame type for a given instrument. Can be given multiple times as --frame-type=IFO=FRAME-TYPE. Used with --data-source=frames")
group.add_option("--data-find-server", metavar="url", help="Set the data find server for LIGO data discovery. Used with --data-source=frames")
group.add_option("--gps-start-time", metavar="seconds", help="Set the start time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm")
group.add_option("--gps-end-time", metavar="seconds", help="Set the end time of the segment to analyze in GPS seconds. Required unless --data-source=lvshm")
group.add_option("--injection-file", metavar="filename", help="Set the name of the LIGO light-weight XML file from which to load injections (optional).")
group.add_option("--channel-name", metavar="name", action="append",
help="Set the name of the channels to process. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
group.add_option("--idq-channel-name", metavar="idqname", action="append", help="iDQ channel names to process. Must also provide idq-state-channel-name. Can be given multiple times as --idq-channel-name=IFO=IDQ-CHANNEL-NAME")
group.add_option("--idq-state-channel-name", metavar="idqstatename", action="append", help="iDQ state channel names to process. Can be given multiple times as --idq-state-channel-name=IFO=IDQ-STATE-CHANNEL-NAME")
group.add_option("--nds-host", metavar="hostname", help="Set the remote host or IP address that serves nds data. This is required iff --data-source=nds")
group.add_option("--nds-port", metavar="portnumber", type=int, default=31200,
help="Set the port of the remote host that serves nds data. This is required iff --data-source=nds")
group.add_option("--nds-channel-type", metavar="type", default="online",
help="Set the port of the remote host that serves nds data. This is required only if --data-source=nds. default==online")
group.add_option("--framexmit-addr", metavar="name", action="append",
help="Set the address of the framexmit service. Can be given multiple times as --framexmit-addr=IFO=xxx.xxx.xxx.xxx:port")
group.add_option("--framexmit-iface", metavar="name", help="Set the multicast interface address of the framexmit service.")
group.add_option("--state-channel-name", metavar="name", action="append",
help="Set the name of the state vector channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
group.add_option("--dq-channel-name", metavar="name", action="append",
help="Set the name of the data quality channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as --channel-name=IFO=CHANNEL-NAME")
group.add_option("--shared-memory-partition", metavar="name", action="append",
help="Set the name of the shared memory partition for a given instrument. Can be given multiple times as --shared-memory-partition=IFO=PARTITION-NAME")
group.add_option("--shared-memory-dir", metavar="name", action="append",
help="Set the name of the shared memory directory for a given instrument. Can be given multiple times as --shared-memory-dir=IFO=DIR-NAME")
group.add_option("--shared-memory-assumed-duration", type="int", default=4, help="Set the assumed span of files in seconds. Default = 4.")
group.add_option("--shared-memory-block-size", type="int", default=4096, help="Set the byte size to read per buffer. Default = 4096.")
group.add_option("--frame-segments-file", metavar="filename",
help="Set the name of the LIGO light-weight XML file from which to load frame segments. Optional iff --data-source=frames")
group.add_option("--frame-segments-name", metavar="name", help="Set the name of the segments to extract from the segment tables. Required iff --frame-segments-file is given")
group.add_option("--state-vector-on-bits", metavar="bits", default=[], action="append",
help="Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
group.add_option("--state-vector-off-bits", metavar="bits", default=[], action="append",
help="Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
group.add_option("--dq-vector-on-bits", metavar="bits", default=[], action="append",
help="Set the DQ vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
group.add_option("--dq-vector-off-bits", metavar="bits", default=[], action="append",
help="Set the DQ vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm) data.")
parser.add_option_group(group)
################################################################################
# Pipeline building utilities #
################################################################################
[docs]def pipeline_seek_for_gps(pipeline, gps_start_time, gps_end_time, flags=Gst.SeekFlags.FLUSH):
"""
Create a new seek event, i.e., Gst.Event.new_seek() for a given
gps_start_time and gps_end_time, with optional flags.
@param gps_start_time start time as LIGOTimeGPS, double or float
@param gps_end_time start time as LIGOTimeGPS, double or float
"""
def seek_args_for_gps(gps_time):
"""!
Convenience routine to convert a GPS time to a seek type and a
GStreamer timestamp.
"""
if gps_time is None or gps_time == -1:
return (Gst.SeekType.NONE, -1) # -1 == Gst.CLOCK_TIME_NONE
elif hasattr(gps_time, 'ns'):
return (Gst.SeekType.SET, gps_time.ns())
else:
return (Gst.SeekType.SET, int(float(gps_time) * Gst.SECOND))
start_type, start_time = seek_args_for_gps(gps_start_time)
stop_type, stop_time = seek_args_for_gps(gps_end_time)
# FIXME: should seek whole pipeline, but there are several
# problems preventing us from doing that.
#
# because the framecpp demuxer has no source pads until decoding
# begins, the bottom halves of pipelines start out disconnected
# from the top halves of pipelines, which means the seek events
# (which are sent to sink elements) don't make it all the way to
# the source elements. dynamic pipeline building will not fix the
# problem because the dumxer does not carry the "SINK" flag so even
# though it starts with only a sink pad and no source pads it still
# won't be sent the seek event. gstreamer's own demuxers must
# somehow have a solution to this problem, but I don't know what it
# is. I notice that many implement the send_event() method
# override, and it's possible that's part of the solution.
#
# seeking the pipeline can only be done in the PAUSED state. the
# GstBaseSrc baseclass seeks itself to 0 when changing to the
# paused state, and the preroll is performed before the seek event
# we send to the pipeline is processed, so the preroll occurs with
# whatever random data a seek to "0" causes source elements to
# produce. for us, when processing GW data, this leads to the
# whitener element's initial spectrum estimate being initialized
# from that random data, and a non-zero chance of even getting
# triggers out of it, all of which is very bad.
#
# the only way we have at the moment to solve both problems --- to
# ensure seek events arrive at source elements and to work around
# GstBaseSrc's initial seek to 0 --- is to send seek events
# directly to the source elements ourselves before putting the
# pipeline into the PAUSED state. the elements are happy to
# receive seek events in the READY state, and GstBaseSrc updtes its
# current segment using that seek so that when it transitions to
# the PAUSED state and does its intitial seek it seeks to our
# requested time, not to 0.
#
# So: this function needs to be called with the pipeline in the
# READY state in order to guarantee the data stream starts at the
# requested start time, and does not get prerolled with random
# data. For safety we include a check of the pipeline's current
# state.
#
# if in the future we find some other solution to these problems
# the story might change and the pipeline state required on entry
# into this function might change.
# pipeline.seek(1.0, Gst.Format(Gst.Format.TIME), flags, start_type, start_time, stop_type, stop_time)
if pipeline.current_state != Gst.State.READY:
raise ValueError("pipeline must be in READY state")
for elem in pipeline.iterate_sources():
elem.seek(1.0, Gst.Format(Gst.Format.TIME), flags, start_type, start_time, stop_type, stop_time)
[docs]def mksegmentsrcgate(pipeline, src, segment_list, invert_output=False, rate=1, **kwargs):
"""
Takes a segment list and produces a gate driven by it. Hook up your own input and output.
@param kwargs passed through to pipeparts.mkgate(), e.g., used to set the gate's name.
Gstreamer graph describing this function:
.. graphviz::
digraph G {
compound=true;
node [shape=record fontsize=10 fontname="Verdana"];
rankdir=LR;
lal_segmentsrc;
lal_gate;
in [label="\<src\>"];
out [label="\<return value\>"];
in -> lal_gate -> out;
lal_segmentsrc -> lal_gate;
}
"""
return pipeparts.mkgate(pipeline, src, threshold=1, control=pipeparts.mkcapsfilter(pipeline, pipeparts.mksegmentsrc(pipeline, segment_list, invert_output=invert_output),
caps="audio/x-raw, rate=%d" % rate), **kwargs)
[docs]def mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose=False):
"""
All the conditionals and stupid pet tricks for reading real or
simulated h(t) data in one place.
Consult the append_options() function and the GWDataSourceInfo class
This src in general supports only one instrument although
GWDataSourceInfo contains dictionaries of multi-instrument things. By
specifying the instrument when calling this function you will get ony a single
instrument source. A code wishing to have multiple basicsrcs will need to call
this function for each instrument.
**Gstreamer Graph**
.. graphviz::
digraph mkbasicsrc {
compound=true;
node [shape=record fontsize=10 fontname="Verdana"];
subgraph clusterfakesrc {
fake_0 [label="fakesrc: white, silence, AdvVirgo, LIGO, AdvLIGO"];
color=black;
label="Possible path #1";
}
subgraph clusterframes {
color=black;
frames_0 [label="lalcachesrc: frames"];
frames_1 [label ="framecppchanneldemux"];
frames_2 [label ="queue"];
frames_3 [label ="gate (if user provides segments)", style=filled, color=lightgrey];
frames_4 [label ="audiorate"];
frames_0 -> frames_1 -> frames_2 -> frames_3 ->frames_4;
label="Possible path #2";
}
subgraph clusteronline {
color=black;
online_0 [label="lvshmsrc|framexmit"];
online_1 [label ="framecppchanneldemux"];
online_2a [label ="strain queue"];
online_2b [label ="statevector queue"];
online_3 [label ="statevector"];
online_4 [label ="gate"];
online_5 [label ="audiorate"];
online_6 [label ="queue"];
online_0 -> online_1;
online_1 -> online_2a;
online_1 -> online_2b;
online_2b -> online_3;
online_2a -> online_4;
online_3 -> online_4 -> online_5 -> online_6;
label="Possible path #3";
}
subgraph clusternds {
nds_0 [label="ndssrc"];
color=black;
label="Possible path #4";
}
audioconv [label="audioconvert"];
progress [label="progressreport (if verbose)", style=filled, color=lightgrey];
sim [label="lalsimulation (if injections requested)", style=filled, color=lightgrey];
queue [label="queue (if injections requested)", style=filled, color=lightgrey];
// The connections
fake_0 -> audioconv [ltail=clusterfakesrc];
frames_4 -> audioconv [ltail=clusterframes];
online_6 -> audioconv [ltail=clusteronline];
nds_0 -> audioconv [ltail=clusternds];
audioconv -> progress -> sim -> queue -> "?";
}
"""
statevector = dqvector = None
idqseries = None
idqstatevector = None
# NOTE: timestamp_offset is a hack to allow seeking with fake
# sources, a real solution should be fixing the general timestamp
# problem which would allow seeking to work properly
if gw_data_source_info.data_source == DataSource.White:
src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize=gw_data_source_info.block_size, volume=1.0,
timestamp_offset=int(gw_data_source_info.seg[0]) * Gst.SECOND)
elif gw_data_source_info.data_source == DataSource.Silence:
src = pipeparts.mkfakesrc(pipeline, instrument, gw_data_source_info.channel_dict[instrument], blocksize=gw_data_source_info.block_size, wave=4,
timestamp_offset=int(gw_data_source_info.seg[0]) * Gst.SECOND)
elif gw_data_source_info.data_source == DataSource.LIGO:
src = pipeparts.mkfakeLIGOsrc(pipeline, instrument=instrument, channel_name=gw_data_source_info.channel_dict[instrument], blocksize=gw_data_source_info.block_size)
elif gw_data_source_info.data_source == DataSource.ALIGO:
src = pipeparts.mkfakeadvLIGOsrc(pipeline, instrument=instrument, channel_name=gw_data_source_info.channel_dict[instrument], blocksize=gw_data_source_info.block_size)
elif gw_data_source_info.data_source == DataSource.AVirgo:
src = pipeparts.mkfakeadvvirgosrc(pipeline, instrument=instrument, channel_name=gw_data_source_info.channel_dict[instrument], blocksize=gw_data_source_info.block_size)
elif gw_data_source_info.data_source == DataSource.Frames:
if instrument == Detector.V1:
# FIXME Hack because virgo often just uses "V" in
# the file names rather than "V1". We need to
# sieve on "V"
src = pipeparts.mklalcachesrc(pipeline, location=gw_data_source_info.frame_cache, cache_src_regex="V")
else:
src = pipeparts.mklalcachesrc(pipeline, location=gw_data_source_info.frame_cache, cache_src_regex=instrument[0], cache_dsc_regex=instrument)
demux = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum=False, channel_list=list(map("%s:%s".__mod__, gw_data_source_info.channel_dict.items())))
pipeparts.framecpp_channeldemux_set_units(demux, dict.fromkeys(demux.get_property("channel-list"), "strain"))
# allow frame reading and decoding to occur in a diffrent
# thread
src = pipeparts.mkqueue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=8 * Gst.SECOND)
pipeparts.src_deferred_link(demux, "%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), src.get_static_pad("sink"))
if gw_data_source_info.frame_segments[instrument] is not None:
# FIXME: make segmentsrc generate segment samples
# at the sample rate of h(t)?
# FIXME: make gate leaky when I'm certain that
# will work.
src = pipeparts.mkgate(pipeline, src, threshold=1, control=pipeparts.mksegmentsrc(pipeline, gw_data_source_info.frame_segments[instrument]),
name="%s_frame_segments_gate" % instrument)
pipeparts.framecpp_channeldemux_check_segments.set_probe(src.get_static_pad("src"), gw_data_source_info.frame_segments[instrument])
# FIXME: remove this when pipeline can handle disconts
src = pipeparts.mkaudiorate(pipeline, src, skip_to_first=True, silent=False)
elif gw_data_source_info.data_source in (DataSource.FrameXMIT.value, DataSource.LVSHM.value, DataSource.DEVSHM.value):
# See https://wiki.ligo.org/DAC/ER2DataDistributionPlan#LIGO_Online_DQ_Channel_Specifica
state_vector_on_bits, state_vector_off_bits = gw_data_source_info.state_vector_on_off_bits[instrument]
dq_vector_on_bits, dq_vector_off_bits = gw_data_source_info.dq_vector_on_off_bits[instrument]
if gw_data_source_info.data_source == DataSource.LVSHM:
# FIXME make wait_time adjustable through web
# interface or command line or both
src = pipeparts.mklvshmsrc(pipeline, shm_name=gw_data_source_info.shm_part_dict[instrument], assumed_duration=gw_data_source_info.shm_assumed_duration,
blocksize=gw_data_source_info.shm_block_size, wait_time=120)
elif gw_data_source_info.data_source == DataSource.FrameXMIT:
src = pipeparts.mkframexmitsrc(pipeline, multicast_iface=gw_data_source_info.framexmit_iface, multicast_group=gw_data_source_info.framexmit_addr[instrument][0],
port=gw_data_source_info.framexmit_addr[instrument][1], wait_time=120)
elif gw_data_source_info.data_source == DataSource.DEVSHM:
src = pipeparts.mkdevshmsrc(pipeline, shm_dirname=gw_data_source_info.shared_memory_dir[instrument], wait_time=60, watch_suffix='.gwf')
else:
# impossible code path
raise ValueError(gw_data_source_info.data_source)
# 10 minutes of buffering, then demux
src = pipeparts.mkqueue(pipeline, src, max_size_buffers=0, max_size_bytes=0, max_size_time=Gst.SECOND * 60 * 10)
src = pipeparts.mkframecppchanneldemux(pipeline, src, do_file_checksum=False, skip_bad_files=True)
# extract state vector and DQ vector and convert to
# booleans
if gw_data_source_info.dq_channel_dict[instrument] == gw_data_source_info.state_channel_dict[instrument]:
dqstatetee = pipeparts.mktee(pipeline, None)
statevectorelem = statevector = pipeparts.mkstatevector(pipeline, dqstatetee, required_on=state_vector_on_bits, required_off=state_vector_off_bits,
name="%s_state_vector" % instrument)
dqvectorelem = dqvector = pipeparts.mkstatevector(pipeline, dqstatetee, required_on=dq_vector_on_bits, required_off=dq_vector_off_bits,
name="%s_dq_vector" % instrument)
pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.state_channel_dict[instrument]), dqstatetee.get_static_pad("sink"))
else:
# DQ and state vector are distinct channels
# first DQ
dqvectorelem = dqvector = pipeparts.mkstatevector(pipeline, None, required_on=dq_vector_on_bits, required_off=dq_vector_off_bits, name="%s_dq_vector" % instrument)
pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.dq_channel_dict[instrument]), dqvector.get_static_pad("sink"))
# then State
statevectorelem = statevector = pipeparts.mkstatevector(pipeline, None, required_on=state_vector_on_bits, required_off=state_vector_off_bits,
name="%s_state_vector" % instrument)
pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.state_channel_dict[instrument]), statevector.get_static_pad("sink"))
@bottle.route("/%s/statevector_on.txt" % instrument)
def state_vector_state(elem=statevectorelem):
t = float(lal.UTCToGPS(time.gmtime()))
on = elem.get_property("on-samples")
return "%.9f %d" % (t, on)
@bottle.route("/%s/statevector_off.txt" % instrument)
def state_vector_state(elem=statevectorelem):
t = float(lal.UTCToGPS(time.gmtime()))
off = elem.get_property("off-samples")
return "%.9f %d" % (t, off)
@bottle.route("/%s/statevector_gap.txt" % instrument)
def state_vector_state(elem=statevectorelem):
t = float(lal.UTCToGPS(time.gmtime()))
gap = elem.get_property("gap-samples")
return "%.9f %d" % (t, gap)
@bottle.route("/%s/dqvector_on.txt" % instrument)
def dq_vector_state(elem=dqvectorelem):
t = float(lal.UTCToGPS(time.gmtime()))
on = elem.get_property("on-samples")
return "%.9f %d" % (t, on)
@bottle.route("/%s/dqvector_off.txt" % instrument)
def dq_vector_state(elem=dqvectorelem):
t = float(lal.UTCToGPS(time.gmtime()))
off = elem.get_property("off-samples")
return "%.9f %d" % (t, off)
@bottle.route("/%s/dqvector_gap.txt" % instrument)
def dq_vector_state(elem=dqvectorelem):
t = float(lal.UTCToGPS(time.gmtime()))
gap = elem.get_property("gap-samples")
return "%.9f %d" % (t, gap)
# extract strain with 1 buffer of buffering
strain = pipeparts.mkqueue(pipeline, None, max_size_buffers=1, max_size_bytes=0, max_size_time=0)
pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]), strain.get_static_pad("sink"))
pipeparts.framecpp_channeldemux_set_units(src, {"%s:%s" % (instrument, gw_data_source_info.channel_dict[instrument]): "strain"})
# extract idq series and idqstatevector
if instrument in gw_data_source_info.idq_channel_name:
idqseries = pipeparts.mkqueue(pipeline, None, max_size_buffers=1, max_size_bytes=0, max_size_time=0)
pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.idq_channel_name[instrument]), idqseries.get_static_pad("sink"))
if instrument in gw_data_source_info.idq_state_channel_name:
idqstatevector = pipeparts.mkqueue(pipeline, None, max_size_buffers=1, max_size_bytes=0, max_size_time=0)
pipeparts.src_deferred_link(src, "%s:%s" % (instrument, gw_data_source_info.idq_state_channel_name[instrument]), idqstatevector.get_static_pad("sink"))
# fill in holes, skip duplicate data
statevector = pipeparts.mkreblock(pipeline, pipeparts.mkaudiorate(pipeline, statevector, skip_to_first=True, silent=False), block_duration = Gst.SECOND // 4)
dqvector = pipeparts.mkreblock(pipeline, pipeparts.mkaudiorate(pipeline, dqvector, skip_to_first=True, silent=False), block_duration = Gst.SECOND // 4)
src = pipeparts.mkreblock(pipeline, pipeparts.mkaudiorate(pipeline, strain, skip_to_first=True, silent=False, name="%s_strain_audiorate" % instrument), block_duration = Gst.SECOND // 4)
if instrument in gw_data_source_info.idq_channel_name:
idqseries = pipeparts.mkreblock(pipeline, pipeparts.mkaudiorate(pipeline, idqseries, skip_to_first=True, silent=False), block_duration = Gst.SECOND // 4)
if instrument in gw_data_source_info.idq_state_channel_name:
idqstatevector = pipeparts.mkreblock(pipeline, pipeparts.mkaudiorate(pipeline, idqstatevector, skip_to_first=True, silent=False), block_duration = Gst.SECOND // 4)
@bottle.route("/%s/strain_dropped.txt" % instrument)
# FIXME don't hard code the sample rate
def strain_add(elem=src, rate=16384):
t = float(lal.UTCToGPS(time.gmtime()))
# yes I realize we are reading the "add" property for a
# route called dropped. That is because the data which
# is "dropped" on route is "added" by the audiorate
# element
add = elem.get_property("add")
return "%.9f %d" % (t, add // rate)
# use state vector and DQ vector to gate strain. the sizes
# of the queues on the control inputs are not important.
# they must be large enough to buffer the state vector
# streams until they are needed, but the streams will be
# consumed immediately when needed so there is no risk that
# these queues actually fill up or add latency. be
# generous.
statevector = pipeparts.mktee(pipeline, statevector)
dqvector = pipeparts.mktee(pipeline, dqvector)
src = pipeparts.mkgate(pipeline, src, threshold=1, control=pipeparts.mkqueue(pipeline, statevector, max_size_buffers=0, max_size_bytes=0, max_size_time=0),
default_state=False, name="%s_state_vector_gate" % instrument)
src = pipeparts.mkgate(pipeline, src, threshold=1, control=pipeparts.mkqueue(pipeline, dqvector, max_size_buffers=0, max_size_bytes=0, max_size_time=0),
default_state=False, name="%s_dq_vector_gate" % instrument)
# extract idq state vector
# use the idq state vector, state and dq vectors to gate idq series.
if instrument in gw_data_source_info.idq_channel_name:
idqseries = pipeparts.mkgate(pipeline, idqseries, threshold=1, control=pipeparts.mkqueue(pipeline, statevector, max_size_buffers=0, max_size_bytes=0, max_size_time=0),
default_state=False, name="%s_idq_state_vector_gate" % instrument)
idqseries = pipeparts.mkgate(pipeline, idqseries, threshold=1, control=pipeparts.mkqueue(pipeline, dqvector, max_size_buffers=0, max_size_bytes=0, max_size_time=0),
default_state=False, name="%s_idq_dq_vector_gate" % instrument)
if instrument in gw_data_source_info.idq_state_channel_name:
idqseries = pipeparts.mkgate(pipeline, idqseries, threshold=1, control=pipeparts.mkqueue(pipeline, idqstatevector, max_size_buffers=0, max_size_bytes=0, max_size_time=0),
default_state=False, name="%s_idq_idqstate_vector_gate" % instrument)
elif gw_data_source_info.data_source == DataSource.NDS:
src = pipeparts.mkndssrc(pipeline, gw_data_source_info.nds_host, instrument, gw_data_source_info.channel_dict[instrument], gw_data_source_info.nds_channel_type,
blocksize=gw_data_source_info.block_size, port=gw_data_source_info.nds_port)
else:
raise ValueError("invalid data_source: %s" % gw_data_source_info.data_source)
#
# provide an audioconvert element to allow Virgo data (which is
# single-precision) to be adapted into the pipeline
#
src = pipeparts.mkaudioconvert(pipeline, src)
#
# progress report
#
if verbose:
src = pipeparts.mkprogressreport(pipeline, src, "progress_src_%s" % instrument)
#
# optional injections
#
if gw_data_source_info.injection_filename is not None:
src = pipeparts.mkinjections(pipeline, src, gw_data_source_info.injection_filename)
# let the injection code run in a different thread than the
# whitener, etc.,
src = pipeparts.mkqueue(pipeline, src, max_size_bytes=0, max_size_buffers=0, max_size_time=Gst.SECOND * 64)
#
# done
#
return src, statevector, dqvector, idqseries
[docs]def mkhtgate(pipeline, src, control=None, threshold=8.0, attack_length=128, hold_length=128, **kwargs):
"""
A convenience function to provide thresholds on input data. This can
be used to remove large spikes / glitches etc. Of course you can use it for
other stuff by plugging whatever you want as input and ouput
NOTE: the queues constructed by this code assume the attack and
hold lengths combined are less than 1 second in duration.
**Gstreamer Graph**
.. graphviz::
digraph G {
compound=true;
node [shape=record fontsize=10 fontname="Verdana"];
rankdir=LR;
tee ;
inputqueue ;
lal_gate ;
in [label="\<src\>"];
out [label="\<return\>"];
in -> tee -> inputqueue -> lal_gate -> out;
tee -> lal_gate;
}
"""
# FIXME someday explore a good bandpass filter
# src = pipeparts.mkaudiochebband(pipeline, src, low_frequency, high_frequency)
if control is None:
control = src = pipeparts.mktee(pipeline, src)
src = pipeparts.mkqueue(pipeline, src, max_size_time=Gst.SECOND, max_size_bytes=0, max_size_buffers=0)
return pipeparts.mkgate(pipeline, src, control=control, threshold=threshold, attack_length=-attack_length, hold_length=-hold_length, invert_control=True, **kwargs)
################################################################################
# Legacy API Below, deprecated #
################################################################################
[docs]@admin.deprecated('replaced by parse_list_to_dict')
def channel_dict_from_channel_list(channel_list):
"""
Given a list of channels, produce a dictionary keyed by ifo of channel names:
The list here typically comes from an option parser with options that
specify the "append" action.
Examples:
>>> channel_dict_from_channel_list(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"]) # doctest: +SKIP
{'H1': 'LSC-STRAIN', 'H2': 'SOMETHING-ELSE'}
"""
return parse_list_to_dict(channel_list)
[docs]@admin.deprecated('replaced by parse_list_to_dict with key_is_range=True')
def channel_dict_from_channel_list_with_node_range(channel_list):
"""
Given a list of channels with a range of mass bins, produce a dictionary
keyed by ifo of channel names:
The list here typically comes from an option parser with options that
specify the "append" action.
Examples:
>>> channel_dict_from_channel_list_with_node_range(["0000:0002:H1=LSC_STRAIN_1,L1=LSC_STRAIN_2", "0002:0004:H1=LSC_STRAIN_3,L1=LSC_STRAIN_4", "0004:0006:H1=LSC_STRAIN_5,L1=LSC_STRAIN_6"]) # doctest: +SKIP
{'0000': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0001': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0002': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0003': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0004': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0005': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}}
"""
outdict = {}
for instrument_channel_full in channel_list:
instrument_channel_split = instrument_channel_full.split(':')
for ii in range(int(instrument_channel_split[0]), int(instrument_channel_split[1])):
outdict[str(ii).zfill(4)] = dict((instrument_channel.split("=")) for instrument_channel in instrument_channel_split[2].split(','))
return outdict
[docs]@admin.deprecated('replaced by ravel_dict_to_list with join_arg="channel-name" and filter_keys=ifos')
def pipeline_channel_list_from_channel_dict(channel_dict, ifos=None, opt="channel-name"):
"""
Creates a string of channel names options from a dictionary keyed by ifos.
FIXME: This function exists to work around pipeline.py's inability to
give the same option more than once by producing a string to pass as an argument
that encodes the other instances of the option.
- override --channel-name with a different option by setting opt.
- restrict the ifo keys to a subset of the channel_dict by
setting ifos
Examples:
>>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}) # doctest: +SKIP
'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN '
>>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, ifos=["H1"]) # doctest: +SKIP
'H1=LSC-STRAIN '
>>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, opt="test-string") # doctest: +SKIP
'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN '
"""
outstr = ""
if ifos is None:
ifos = channel_dict.keys()
for i, ifo in enumerate(ifos):
if i == 0:
outstr += "%s=%s " % (ifo, channel_dict[ifo])
else:
outstr += "--%s=%s=%s " % (opt, ifo, channel_dict[ifo])
return outstr
[docs]@admin.deprecated('replaced by ravel_dict_to_list with key_is_range=True, range_elem=node, join_arg="channel-name", filter_keys=ifos')
def pipeline_channel_list_from_channel_dict_with_node_range(channel_dict, node=0, ifos=None, opt="channel-name"):
"""
Creates a string of channel names options from a dictionary keyed by ifos.
FIXME: This function exists to work around pipeline.py's inability to
give the same option more than once by producing a string to pass as an argument
that encodes the other instances of the option.
- override --channel-name with a different option by setting opt.
- restrict the ifo keys to a subset of the channel_dict by.
setting ifos
Examples:
>>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0) # doctest: +SKIP
'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN '
>>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, ifos=["H1"]) # doctest: +SKIP
'H1=LSC-STRAIN '
>>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, opt="test-string") # doctest: +SKIP
'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN '
"""
outstr = ""
node = str(node).zfill(4)
if ifos is None:
ifos = channel_dict[node].keys()
for i, ifo in enumerate(ifos):
if i == 0:
outstr += "%s=%s " % (ifo, channel_dict[node][ifo])
else:
outstr += "--%s=%s=%s " % (opt, ifo, channel_dict[node][ifo])
return outstr
[docs]@admin.deprecated('replaced by parse_list_to_dict with key_is_range=True')
def injection_dict_from_channel_list_with_node_range(injection_list):
"""
Given a list of injection xml files with a range of mass bins, produce a
dictionary keyed by bin number:
The list here typically comes from an option parser with options that
specify the "append" action.
Examples:
>>> injection_dict_from_channel_list_with_node_range(["0000:0002:Injection_1.xml", "0002:0004:Injection_2.xml"]) # doctest: +SKIP
{'0000': 'Injection_1.xml', '0001': 'Injection_1.xml', '0002': 'Injection_2.xml', '0003': 'Injection_2.xml'}
"""
outdict = {}
for injection_name in injection_list:
injection_name_split = injection_name.split(':')
for ii in range(int(injection_name_split[0]), int(injection_name_split[1])):
outdict[str(ii).zfill(4)] = injection_name_split[2]
return outdict
[docs]@admin.deprecated('replaced by combination of zip_dict_values and parse_list_to_dict with value_transform=parse_int')
def state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list, state_vector_on_off_dict=DEFAULT_STATE_VECTOR_ON_OFF):
"""
Produce a dictionary (keyed by detector) of on / off bit tuples from a
list provided on the command line.
Takes default values from module level datasource.state_vector_on_off_dict
if state_vector_on_off_dict is not given
Inputs must be given as base 10 or 16 integers
Examples:
>>> on_bit_list = ["V1=7", "H1=7", "L1=7"] # doctest: +SKIP
>>> off_bit_list = ["V1=256", "H1=352", "L1=352"] # doctest: +SKIP
>>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list) # doctest: +SKIP
{'H1': [7, 352], 'H2': [7, 352], 'L1': [7, 352], 'V1': [7, 256]}
>>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{}) # doctest: +SKIP
{'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
>>> on_bit_list = ["V1=0x7", "H1=0x7", "L1=0x7"] # doctest: +SKIP
>>> off_bit_list = ["V1=0x256", "H1=0x352", "L1=0x352"] # doctest: +SKIP
>>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{}) # doctest: +SKIP
{'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]}
"""
for ifo, bits in [line.strip().split("=", 1) for line in on_bit_list]:
bits = int(bits, 16) if bits.startswith("0x") else int(bits)
try:
state_vector_on_off_dict[ifo][0] = bits
except KeyError:
state_vector_on_off_dict[ifo] = [bits, 0]
for ifo, bits in [line.strip().split("=", 1) for line in off_bit_list]:
bits = int(bits, 16) if bits.startswith("0x") else int(bits)
# shouldn't have to worry about key errors at this point
state_vector_on_off_dict[ifo][1] = bits
return state_vector_on_off_dict
[docs]@admin.deprecated('replaced by combination of ravel_dict_to_list with unzip=True and join_arg=["state-vector-on-bits", "state-vector-off-bits"]')
def state_vector_on_off_list_from_bits_dict(bit_dict):
"""
Produce a tuple of useful command lines from a dictionary of on / off state
vector bits keyed by detector
FIXME: This function exists to work around pipeline.py's inability to
give the same option more than once by producing a string to pass as an argument
that encodes the other instances of the option.
Examples:
>>> state_vector_on_off_dict = {"H1":[0x7, 0x160], "H2":[0x7, 0x160], "L1":[0x7, 0x160], "V1":[0x67, 0x100]} # doctest: +SKIP
>>> state_vector_on_off_list_from_bits_dict(state_vector_on_off_dict) # doctest: +SKIP
('H1=7 --state-vector-on-bits=H2=7 --state-vector-on-bits=L1=7 --state-vector-on-bits=V1=103 ', 'H1=352 --state-vector-off-bits=H2=352 --state-vector-off-bits=L1=352 --state-vector-off-bits=V1=256 ')
"""
onstr = ""
offstr = ""
for i, ifo in enumerate(bit_dict):
if i == 0:
onstr += "%s=%s " % (ifo, bit_dict[ifo][0])
offstr += "%s=%s " % (ifo, bit_dict[ifo][1])
else:
onstr += "--state-vector-on-bits=%s=%s " % (ifo, bit_dict[ifo][0])
offstr += "--state-vector-off-bits=%s=%s " % (ifo, bit_dict[ifo][1])
return onstr, offstr
[docs]@admin.deprecated('replaced by parse_list_to_dict with value_transform=parse_host.')
def framexmit_dict_from_framexmit_list(framexmit_list):
"""
Given a list of framexmit addresses with ports, produce a dictionary keyed by ifo:
The list here typically comes from an option parser with options that
specify the "append" action.
Examples:
>>> framexmit_dict_from_framexmit_list(["H1=224.3.2.1:7096", "L1=224.3.2.2:7097", "V1=224.3.2.3:7098"]) # doctest: +SKIP
{'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097), 'V1': ('224.3.2.3', 7098)}
"""
out = []
for instrument_addr in framexmit_list:
ifo, addr_port = instrument_addr.split("=")
addr, port = addr_port.split(':')
out.append((ifo, (addr, int(port))))
return dict(out)
[docs]@admin.deprecated('replaced by ravel_dict_to_list with value_transform=ravel_host.')
def framexmit_list_from_framexmit_dict(framexmit_dict, ifos=None, opt="framexmit-addr"):
"""
Creates a string of framexmit address options from a dictionary keyed by ifos.
Examples:
>>> framexmit_list_from_framexmit_dict({'V1': ('224.3.2.3', 7098), 'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097)}) # doctest: +SKIP
'V1=224.3.2.3:7098 --framexmit-addr=H1=224.3.2.1:7096 --framexmit-addr=L1=224.3.2.2:7097 '
"""
outstr = ""
if ifos is None:
ifos = framexmit_dict.keys()
for i, ifo in enumerate(ifos):
if i == 0:
outstr += "%s=%s:%s " % (ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1])
else:
outstr += "--%s=%s=%s:%s " % (opt, ifo, framexmit_dict[ifo][0], framexmit_dict[ifo][1])
return outstr
[docs]@admin.deprecated('replaced by parse_list_to_dict.')
def frame_type_dict_from_frame_type_list(frame_type_list):
"""
Given a list of frame types, produce a dictionary keyed by ifo:
The list here typically comes from an option parser with options that
specify the "append" action.
Examples:
>>> frame_type_dict_from_frame_type_list(['H1=H1_GWOSC_O2_16KHZ_R1', 'L1=L1_GWOSC_O2_16KHZ_R1']) # doctest: +SKIP
{'H1': 'H1_GWOSC_O2_16KHZ_R1', 'L1': 'L1_GWOSC_O2_16KHZ_R1'}
"""
out = {}
for frame_opt in frame_type_list:
ifo, frame_type = frame_opt.split("=")
out[ifo] = frame_type
return out