datasource module

Gravitational wave datasource utilities, including abstractions for storing the required information needed to connect to data sources. The main elements of the included API are:

  1. DataSourceInfo class for storing necessary information to connect to datasource

  2. Useful constants to help configuring a DataSourceInfo, like the Detector and DataSource classes

  3. Pipeline building utilities

  4. Command-line utilities for parsing and converting values into DataSource Info

class datasource.DataFindServer[source]

Bases: object

Enumeration of available datafind servers

GeneralProp = DataFindServerInfo(host='', port=443)
class datasource.DataSource(value)[source]

Bases: str, Enum

Enumeration of available data sources

TODO: include descriptions of the options

AVirgo = 'AdvVirgo'
DEVSHM = 'devshm'
FrameXMIT = 'framexmit'
Frames = 'frames'
LVSHM = 'lvshm'
NDS = 'nds'
Silence = 'silence'
White = 'white'
exception datasource.DataSourceConfigError[source]

Bases: ValueError

Error subclass for configuration errors

class datasource.DataSourceInfo(data_source: str, channel_name: Dict[Detector, str], gps_start_time: Optional[Union[int, LIGOTimeGPS]] = None, gps_end_time: Optional[Union[int, LIGOTimeGPS]] = None, shared_memory_partition: Optional[Dict[Detector, str]] = None, shared_memory_dir: Optional[Dict[Detector, str]] = None, frame_segments_name: Optional[str] = None, state_vector_on_bits: Optional[Dict[Detector, str]] = None, state_vector_off_bits: Optional[Dict[Detector, str]] = None, dq_vector_on_bits: Optional[str] = None, dq_vector_off_bits: Optional[str] = None, frame_cache: Optional[Union[str, Path]] = None, injections: Optional[Union[str, Path]] = None, nds_host: Optional[str] = None, nds_port: Optional[int] = None, nds_channel_type: str = 'online', shared_memory_assumed_duration: int = 4, shared_memory_block_size: int = 4096, frame_segments_file: Optional[Union[str, Path]] = None, block_size: int = 67108864, frame_type: Optional[Dict[Detector, str]] = None, data_find_server: Optional[str] = None, framexmit_addr: Optional[str] = None, framexmit_iface: Optional[str] = None, state_channel_name: Optional[Dict[Detector, str]] = None, dq_channel_name: Optional[Dict[Detector, str]] = None, idq_channel_name: Optional[Dict[Detector, str]] = None, idq_state_channel_name: Optional[Dict[Detector, str]] = None)[source]

Bases: object

A pythonic representation of a datasource with configured settings necessary for usage in pipelines

static from_optparse(options: Values)[source]

Construct a DataSourceInfo object from an optparer.OptionParser


Values, with all of the arguments defined in append_options


DataSourceInfo object


Validation of configuration

class datasource.Detector(value)[source]

Bases: str, Enum

Enumeration of available detectors

G1 = 'G1'
H1 = 'H1'
H2 = 'H2'
K1 = 'K1'
L1 = 'L1'
V1 = 'V1'

alias of DataFindServerInfo


Append generic data source options to an OptionParser object in order to have consistent an unified command lines and parsing throughout the project for applications that read GW data.

  • –data-source [string]

    Set the data source from [frames|framexmit|lvshm|nds|silence|white].

  • –block-size [int] (bytes)

    Data block size to read in bytes. Default 16384 * 8 * 512 which is 512 seconds of double precision data at 16384 Hz. This parameter is only used if –data-source is one of white, silence, AdvVirgo, LIGO, AdvLIGO, nds.

  • –frame-cache [filename]

    Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional).

  • –frame-type [string]

    Set the frame type for a given instrument. Can be given multiple times as –frame-type=IFO=FRAME-TYPE

  • –gps-start-time [int] (seconds)

    Set the start time of the segment to analyze in GPS seconds. Required unless –data-source is lvshm or framexmit

  • --gps-end-time

    [int] (seconds) Set the end time of the segment to analyze in GPS seconds. Required unless –data-source in lvshm,framexmit

  • –injection-file [filename]

    Set the name of the LIGO light-weight XML file from which to load injections (optional).

  • –channel-name [string]

    Set the name of the channels to process. Can be given multiple times as –channel-name=IFO=CHANNEL-NAME

  • –nds-host [hostname]

    Set the remote host or IP address that serves nds data. This is required iff –data-source is nds

  • –nds-port [portnumber]

    Set the port of the remote host that serves nds data, default = 31200. This is required iff –data-source is nds

  • –nds-channel-type [string] type

    FIXME please document

  • –framexmit-addr [string]

    Set the address of the framexmit service. Can be given multiple times as –

  • –framexmit-iface [string]

    Set the address of the framexmit interface.

  • –state-channel-name [string]

    Set the name of the state vector channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as –state-channel-name=IFO=STATE-CHANNEL-NAME

  • –dq-channel-name [string]

    Set the name of the data quality channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as –state-channel-name=IFO=DQ-CHANNEL-NAME

  • –shared-memory-partition [string]

    Set the name of the shared memory partition for a given instrument. Can be given multiple times as –shared-memory-partition=IFO=PARTITION-NAME

  • –shared-memory-dir [string]

    Set the name of the shared memory directory for a given instrument. Can be given multiple times as –shared-memory-dir=IFO=DIR-NAME

  • –shared-memory-assumed-duration [int]

    Set the assumed span of files in seconds. Default = 4 seconds.

  • –shared-memory-block-size [int]

    Set the byte size to read per buffer. Default = 4096 bytes.

  • –frame-segments-file [filename]

    Set the name of the LIGO light-weight XML file from which to load frame segments. Optional iff –data-source is frames

  • –frame-segments-name [string]

    Set the name of the segments to extract from the segment tables. Required iff –frame-segments-file is given

  • –state-vector-on-bits [hex]

    Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data

  • –state-vector-off-bits [hex]

    Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data

  • –dq-vector-on-bits [hex]

    Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data

  • –dq-vector-off-bits [hex]

    Set the dq vector off bits to process (optional). The default is 0x0 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data

    Typical usage case examples

    1. Reading data from frames:

      --data-source=frames --gps-start-time=999999000 --gps-end-time=999999999 \
      --channel-name=H1=LDAS-STRAIN --frame-segments-file=segs.xml \
    2. Reading data from a fake LIGO source:

      --data-source=LIGO --gps-start-time=999999000 --gps-end-time=999999999 \
    3. Reading online data via framexmit:

      --data-source=framexmit --channel-name=H1=FAIKE-STRAIN
    4. Many other combinations possible, please add some!


Given a list of channels, produce a dictionary keyed by ifo of channel names:

The list here typically comes from an option parser with options that specify the “append” action.

>>> channel_dict_from_channel_list(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"])  

Given a list of channels with a range of mass bins, produce a dictionary keyed by ifo of channel names:

The list here typically comes from an option parser with options that specify the “append” action.

>>> channel_dict_from_channel_list_with_node_range(["0000:0002:H1=LSC_STRAIN_1,L1=LSC_STRAIN_2", "0002:0004:H1=LSC_STRAIN_3,L1=LSC_STRAIN_4", "0004:0006:H1=LSC_STRAIN_5,L1=LSC_STRAIN_6"])  
{'0000': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0001': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0002': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0003': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0004': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0005': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}}

Given a list of frame types, produce a dictionary keyed by ifo:

The list here typically comes from an option parser with options that specify the “append” action.

>>> frame_type_dict_from_frame_type_list(['H1=H1_GWOSC_O2_16KHZ_R1', 'L1=L1_GWOSC_O2_16KHZ_R1'])  
{'H1': 'H1_GWOSC_O2_16KHZ_R1', 'L1': 'L1_GWOSC_O2_16KHZ_R1'}

Given a list of framexmit addresses with ports, produce a dictionary keyed by ifo:

The list here typically comes from an option parser with options that specify the “append” action.

>>> framexmit_dict_from_framexmit_list(["H1=", "L1=", "V1="])  
{'H1': ('', 7096), 'L1': ('', 7097), 'V1': ('', 7098)}
datasource.framexmit_list_from_framexmit_dict(framexmit_dict, ifos=None, opt='framexmit-addr')[source]

Creates a string of framexmit address options from a dictionary keyed by ifos.

>>> framexmit_list_from_framexmit_dict({'V1': ('', 7098), 'H1': ('', 7096), 'L1': ('', 7097)})  
'V1= --framexmit-addr=H1= --framexmit-addr=L1= '

Given a list of injection xml files with a range of mass bins, produce a dictionary keyed by bin number:

The list here typically comes from an option parser with options that specify the “append” action.

Examples: >>> injection_dict_from_channel_list_with_node_range([“0000:0002:Injection_1.xml”, “0002:0004:Injection_2.xml”]) # doctest: +SKIP {‘0000’: ‘Injection_1.xml’, ‘0001’: ‘Injection_1.xml’, ‘0002’: ‘Injection_2.xml’, ‘0003’: ‘Injection_2.xml’}

datasource.mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose=False)[source]

All the conditionals and stupid pet tricks for reading real or simulated h(t) data in one place.

Consult the append_options() function and the GWDataSourceInfo class

This src in general supports only one instrument although GWDataSourceInfo contains dictionaries of multi-instrument things. By specifying the instrument when calling this function you will get ony a single instrument source. A code wishing to have multiple basicsrcs will need to call this function for each instrument.

Gstreamer Graph

digraph mkbasicsrc {
     node [shape=record fontsize=10 fontname="Verdana"];
     subgraph clusterfakesrc {
             fake_0 [label="fakesrc: white, silence, AdvVirgo, LIGO, AdvLIGO"];
             label="Possible path #1";
     subgraph clusterframes {
             frames_0 [label="lalcachesrc: frames"];
             frames_1 [label ="framecppchanneldemux"];
             frames_2 [label ="queue"];
             frames_3 [label ="gate (if user provides segments)", style=filled, color=lightgrey];
             frames_4 [label ="audiorate"];
             frames_0 -> frames_1 -> frames_2 -> frames_3 ->frames_4;
             label="Possible path #2";
     subgraph clusteronline {
             online_0 [label="lvshmsrc|framexmit"];
             online_1 [label ="framecppchanneldemux"];
             online_2a [label ="strain queue"];
             online_2b [label ="statevector queue"];
             online_3 [label ="statevector"];
             online_4 [label ="gate"];
             online_5 [label ="audiorate"];
             online_6 [label ="queue"];
             online_0 -> online_1;
             online_1 -> online_2a;
             online_1 -> online_2b;
             online_2b -> online_3;
             online_2a -> online_4;
             online_3 -> online_4 -> online_5 -> online_6;
             label="Possible path #3";
     subgraph clusternds {
             nds_0 [label="ndssrc"];
             label="Possible path #4";
     audioconv [label="audioconvert"];
     progress [label="progressreport (if verbose)", style=filled, color=lightgrey];
     sim [label="lalsimulation (if injections requested)", style=filled, color=lightgrey];
     queue [label="queue (if injections requested)", style=filled, color=lightgrey];

     // The connections
     fake_0 -> audioconv [ltail=clusterfakesrc];
     frames_4 -> audioconv [ltail=clusterframes];
     online_6 -> audioconv [ltail=clusteronline];
     nds_0 -> audioconv [ltail=clusternds];
     audioconv -> progress -> sim -> queue -> "?";
datasource.mkhtgate(pipeline, src, control=None, threshold=8.0, attack_length=128, hold_length=128, **kwargs)[source]

A convenience function to provide thresholds on input data. This can be used to remove large spikes / glitches etc. Of course you can use it for other stuff by plugging whatever you want as input and ouput

NOTE: the queues constructed by this code assume the attack and hold lengths combined are less than 1 second in duration.

Gstreamer Graph

digraph G {
     node [shape=record fontsize=10 fontname="Verdana"];
     tee ;
     inputqueue ;
     lal_gate ;
     in [label="\<src\>"];
     out [label="\<return\>"];
     in -> tee -> inputqueue -> lal_gate -> out;
     tee -> lal_gate;
datasource.mksegmentsrcgate(pipeline, src, segment_list, invert_output=False, rate=1, **kwargs)[source]

Takes a segment list and produces a gate driven by it. Hook up your own input and output.

@param kwargs passed through to pipeparts.mkgate(), e.g., used to set the gate’s name.

Gstreamer graph describing this function:

digraph G {
      node [shape=record fontsize=10 fontname="Verdana"];
      in [label="\<src\>"];
      out [label="\<return value\>"];
      in -> lal_gate -> out;
      lal_segmentsrc -> lal_gate;
datasource.parse_host(host: str)[source]

Value transform for use with parse_list_to_dict that splits host into name and port tuple

datasource.parse_int(inp: str) int[source]

Value transform for use with parse_list_to_dict that coerces string input to int

datasource.parse_list_to_dict(lst: list, value_transform: Optional[function] = None, sep: str = '=', key_is_range: bool = False, range_sep: str = ':') dict[source]

A general list to dict argument parsing coercion function


list, a list of the form [‘A=V1’, ‘B=V2’, …], where “=” only has to match the sep_str argument


Function, default None. An optional transformation function to apply on values of the dictionary


str, default ‘=’, the separator string between dict keys and values in list elements


bool, default False. If True, the keys of the list are compound and contain range information e.g. “start:stop:remaining,list,of,items”


str, default ‘:’ the separator string for range key information


dict of the form {‘A’: value_transform(‘V1’), …}

>>> parse_list_to_dict(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"])  
>>> parse_list_to_dict(["0000:0002:H1=LSC_STRAIN_1,L1=LSC_STRAIN_2", "0002:0004:H1=LSC_STRAIN_3,L1=LSC_STRAIN_4", "0004:0006:H1=LSC_STRAIN_5,L1=LSC_STRAIN_6"], key_is_range=True)  
{'0000': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0001': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0002': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0003': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0004': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0005': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}}
datasource.pipeline_channel_list_from_channel_dict(channel_dict, ifos=None, opt='channel-name')[source]

Creates a string of channel names options from a dictionary keyed by ifos.

FIXME: This function exists to work around’s inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option.

  • override –channel-name with a different option by setting opt.

  • restrict the ifo keys to a subset of the channel_dict by setting ifos

>>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'})  
'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN '
>>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, ifos=["H1"])  
>>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, opt="test-string")  
'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN '
datasource.pipeline_channel_list_from_channel_dict_with_node_range(channel_dict, node=0, ifos=None, opt='channel-name')[source]

Creates a string of channel names options from a dictionary keyed by ifos.

FIXME: This function exists to work around’s inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option.

  • override –channel-name with a different option by setting opt.

  • restrict the ifo keys to a subset of the channel_dict by. setting ifos

>>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0)  
'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN '
>>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, ifos=["H1"])  
>>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, opt="test-string")  
'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN '
datasource.pipeline_seek_for_gps(pipeline, gps_start_time, gps_end_time, flags=<flags GST_SEEK_FLAG_FLUSH of type Gst.SeekFlags>)[source]

Create a new seek event, i.e., Gst.Event.new_seek() for a given gps_start_time and gps_end_time, with optional flags.

@param gps_start_time start time as LIGOTimeGPS, double or float @param gps_end_time start time as LIGOTimeGPS, double or float

datasource.ravel_dict_to_list(dct: dict, value_transform: function = <class 'str'>, sep: str = '=', key_is_range: bool = False, range_sep: str = ':', join_arg: ~typing.Optional[str] = None, unzip: bool = False, filter_keys: ~typing.Optional[list] = None, range_elem: ~typing.Optional[int] = None) list[source]

Functional inverse of parse_list_to_dict.

TODO: This function exists to work around’s inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option.


dict, the dict to transform into a list


Function, default str, the way to transform values of the dict before adding to list


str, default ‘=’ separator character to join key-value pairs


bool, default False. If True, aggregate keys into ranges


str, default ‘:’, the range separator to join range key min and max values


bool, default False, if True unzip the dct arg and ravel each single-valued dict separately


List[str] the dict converted into lists of strings

datasource.ravel_host(host: tuple) str[source]

Value transform for use with ravel_dict_to_list that ravels a host name and port into a string

datasource.state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list, state_vector_on_off_dict={'H1': [7, 352], 'H2': [7, 352], 'L1': [7, 352], 'V1': [103, 256]})[source]

Produce a dictionary (keyed by detector) of on / off bit tuples from a list provided on the command line.

Takes default values from module level datasource.state_vector_on_off_dict if state_vector_on_off_dict is not given

Inputs must be given as base 10 or 16 integers

>>> on_bit_list = ["V1=7", "H1=7", "L1=7"]  
>>> off_bit_list  = ["V1=256", "H1=352", "L1=352"]  
>>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list)  
{'H1': [7, 352], 'H2': [7, 352], 'L1': [7, 352], 'V1': [7, 256]}
>>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{})  
{'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
>>> on_bit_list = ["V1=0x7", "H1=0x7", "L1=0x7"]  
>>> off_bit_list = ["V1=0x256", "H1=0x352", "L1=0x352"]  
>>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{})  
{'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]}

Produce a tuple of useful command lines from a dictionary of on / off state vector bits keyed by detector

FIXME: This function exists to work around’s inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option.

>>> state_vector_on_off_dict = {"H1":[0x7, 0x160], "H2":[0x7, 0x160], "L1":[0x7, 0x160], "V1":[0x67, 0x100]}  
>>> state_vector_on_off_list_from_bits_dict(state_vector_on_off_dict)  
('H1=7 --state-vector-on-bits=H2=7 --state-vector-on-bits=L1=7 --state-vector-on-bits=V1=103 ', 'H1=352 --state-vector-off-bits=H2=352 --state-vector-off-bits=L1=352 --state-vector-off-bits=V1=256 ')
datasource.unzip_dict_values(dct: dict)[source]

Split multi-valued dict into ordered collection of single-valued dicts with common keys All values should be of same length.

datasource.zip_dict_values(*dicts, defaults: Optional[dict] = None, key_union: bool = True)[source]

Zip dict values by matching keys


Iterable[dict], a collection of dicts whose values will be grouped by key in the order given


dict, default None, if specified fill in missing values with these defaults


bool, default True, if True then use a union of keys, else use intersection.


>>> on_bit_list = parse_list_to_dict(["V1=7", "H1=7", "L1=7"], value_transform=int) 
>>> off_bit_list  = parse_list_to_dict(["V1=256", "H1=352", "L1=352"], value_transform=int) 
>>> zip_dict_values(on_bit_list, off_bit_list, defaults=DEFAULT_STATE_VECTOR_ON_OFF) 
{'V1': [7, 256], 'H1': [7, 352], 'H2': [7, 352], 'L1': [7, 352]}
>>> zip_dict_values(on_bit_list, off_bit_list,{})  
{'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
>>> on_bit_list = parse_list_to_dict(["V1=0x7", "H1=0x7", "L1=0x7"], value_transform=lambda x: int(x, 16))  
>>> off_bit_list = parse_list_to_dict(["V1=0x256", "H1=0x352", "L1=0x352"], value_transform=lambda x: int(x, 16))  
>>> zip_dict_values(on_bit_list, off_bit_list,{})  
{'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]}