datasource module¶
Gravitational wave datasource utilities, including abstractions for storing the required information needed to connect to data sources. The main elements of the included API are:
DataSourceInfo class for storing necessary information to connect to datasource
Useful constants to help configuring a DataSourceInfo, like the Detector and DataSource classes
Pipeline building utilities
Command-line utilities for parsing and converting values into DataSource Info
- class datasource.DataFindServer[source]¶
Bases:
object
Enumeration of available datafind servers
- GeneralProp = DataFindServerInfo(host='datafind.ligo.org', port=443)¶
- class datasource.DataSource(value)[source]¶
-
Enumeration of available data sources
TODO: include descriptions of the options
- ALIGO = 'AdvLIGO'¶
- AVirgo = 'AdvVirgo'¶
- DEVSHM = 'devshm'¶
- FrameXMIT = 'framexmit'¶
- Frames = 'frames'¶
- LIGO = 'LIGO'¶
- LVSHM = 'lvshm'¶
- NDS = 'nds'¶
- Silence = 'silence'¶
- White = 'white'¶
- exception datasource.DataSourceConfigError[source]¶
Bases:
ValueError
Error subclass for configuration errors
- class datasource.DataSourceInfo(data_source: str, channel_name: Dict[Detector, str], gps_start_time: Optional[Union[int, LIGOTimeGPS]] = None, gps_end_time: Optional[Union[int, LIGOTimeGPS]] = None, shared_memory_partition: Optional[Dict[Detector, str]] = None, shared_memory_dir: Optional[Dict[Detector, str]] = None, frame_segments_name: Optional[str] = None, state_vector_on_bits: Optional[Dict[Detector, str]] = None, state_vector_off_bits: Optional[Dict[Detector, str]] = None, dq_vector_on_bits: Optional[str] = None, dq_vector_off_bits: Optional[str] = None, frame_cache: Optional[Union[str, Path]] = None, injections: Optional[Union[str, Path]] = None, nds_host: Optional[str] = None, nds_port: Optional[int] = None, nds_channel_type: str = 'online', shared_memory_assumed_duration: int = 4, shared_memory_block_size: int = 4096, frame_segments_file: Optional[Union[str, Path]] = None, block_size: int = 67108864, frame_type: Optional[Dict[Detector, str]] = None, data_find_server: Optional[str] = None, framexmit_addr: Optional[str] = None, framexmit_iface: Optional[str] = None, state_channel_name: Optional[Dict[Detector, str]] = None, dq_channel_name: Optional[Dict[Detector, str]] = None, idq_channel_name: Optional[Dict[Detector, str]] = None, idq_state_channel_name: Optional[Dict[Detector, str]] = None)[source]¶
Bases:
object
A pythonic representation of a datasource with configured settings necessary for usage in pipelines
- class datasource.Detector(value)[source]¶
-
Enumeration of available detectors
- G1 = 'G1'¶
- H1 = 'H1'¶
- H2 = 'H2'¶
- K1 = 'K1'¶
- L1 = 'L1'¶
- V1 = 'V1'¶
- datasource.HostInfo¶
alias of
DataFindServerInfo
- datasource.append_options(parser)[source]¶
Append generic data source options to an OptionParser object in order to have consistent an unified command lines and parsing throughout the project for applications that read GW data.
- –data-source [string]
Set the data source from [frames|framexmit|lvshm|nds|silence|white].
- –block-size [int] (bytes)
Data block size to read in bytes. Default 16384 * 8 * 512 which is 512 seconds of double precision data at 16384 Hz. This parameter is only used if –data-source is one of white, silence, AdvVirgo, LIGO, AdvLIGO, nds.
- –frame-cache [filename]
Set the name of the LAL cache listing the LIGO-Virgo .gwf frame files (optional).
- –frame-type [string]
Set the frame type for a given instrument. Can be given multiple times as –frame-type=IFO=FRAME-TYPE
- –gps-start-time [int] (seconds)
Set the start time of the segment to analyze in GPS seconds. Required unless –data-source is lvshm or framexmit
- --gps-end-time
[int] (seconds) Set the end time of the segment to analyze in GPS seconds. Required unless –data-source in lvshm,framexmit
- –injection-file [filename]
Set the name of the LIGO light-weight XML file from which to load injections (optional).
- –channel-name [string]
Set the name of the channels to process. Can be given multiple times as –channel-name=IFO=CHANNEL-NAME
- –nds-host [hostname]
Set the remote host or IP address that serves nds data. This is required iff –data-source is nds
- –nds-port [portnumber]
Set the port of the remote host that serves nds data, default = 31200. This is required iff –data-source is nds
- –nds-channel-type [string] type
FIXME please document
- –framexmit-addr [string]
Set the address of the framexmit service. Can be given multiple times as –framexmit-addr=IFO=xxx.xxx.xxx.xxx:port
- –framexmit-iface [string]
Set the address of the framexmit interface.
- –state-channel-name [string]
Set the name of the state vector channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as –state-channel-name=IFO=STATE-CHANNEL-NAME
- –dq-channel-name [string]
Set the name of the data quality channel. This channel will be used to control the flow of data via the on/off bits. Can be given multiple times as –state-channel-name=IFO=DQ-CHANNEL-NAME
- –shared-memory-partition [string]
Set the name of the shared memory partition for a given instrument. Can be given multiple times as –shared-memory-partition=IFO=PARTITION-NAME
- –shared-memory-dir [string]
Set the name of the shared memory directory for a given instrument. Can be given multiple times as –shared-memory-dir=IFO=DIR-NAME
- –shared-memory-assumed-duration [int]
Set the assumed span of files in seconds. Default = 4 seconds.
- –shared-memory-block-size [int]
Set the byte size to read per buffer. Default = 4096 bytes.
- –frame-segments-file [filename]
Set the name of the LIGO light-weight XML file from which to load frame segments. Optional iff –data-source is frames
- –frame-segments-name [string]
Set the name of the segments to extract from the segment tables. Required iff –frame-segments-file is given
- –state-vector-on-bits [hex]
Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data
- –state-vector-off-bits [hex]
Set the state vector off bits to process (optional). The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data
- –dq-vector-on-bits [hex]
Set the state vector on bits to process (optional). The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data
- –dq-vector-off-bits [hex]
Set the dq vector off bits to process (optional). The default is 0x0 for all detectors. Override with IFO=bits can be given multiple times. Only currently has meaning for online (lvshm, framexmit) data
Typical usage case examples
Reading data from frames:
--data-source=frames --gps-start-time=999999000 --gps-end-time=999999999 \ --channel-name=H1=LDAS-STRAIN --frame-segments-file=segs.xml \ --frame-segments-name=datasegments
Reading data from a fake LIGO source:
--data-source=LIGO --gps-start-time=999999000 --gps-end-time=999999999 \ --channel-name=H1=FAIKE-STRAIN
Reading online data via framexmit:
--data-source=framexmit --channel-name=H1=FAIKE-STRAIN
Many other combinations possible, please add some!
- datasource.channel_dict_from_channel_list(channel_list)[source]¶
Given a list of channels, produce a dictionary keyed by ifo of channel names:
The list here typically comes from an option parser with options that specify the “append” action.
- Examples:
>>> channel_dict_from_channel_list(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"]) {'H1': 'LSC-STRAIN', 'H2': 'SOMETHING-ELSE'}
- datasource.channel_dict_from_channel_list_with_node_range(channel_list)[source]¶
Given a list of channels with a range of mass bins, produce a dictionary keyed by ifo of channel names:
The list here typically comes from an option parser with options that specify the “append” action.
- Examples:
>>> channel_dict_from_channel_list_with_node_range(["0000:0002:H1=LSC_STRAIN_1,L1=LSC_STRAIN_2", "0002:0004:H1=LSC_STRAIN_3,L1=LSC_STRAIN_4", "0004:0006:H1=LSC_STRAIN_5,L1=LSC_STRAIN_6"]) {'0000': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0001': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0002': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0003': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0004': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0005': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}}
- datasource.frame_type_dict_from_frame_type_list(frame_type_list)[source]¶
Given a list of frame types, produce a dictionary keyed by ifo:
The list here typically comes from an option parser with options that specify the “append” action.
- Examples:
>>> frame_type_dict_from_frame_type_list(['H1=H1_GWOSC_O2_16KHZ_R1', 'L1=L1_GWOSC_O2_16KHZ_R1']) {'H1': 'H1_GWOSC_O2_16KHZ_R1', 'L1': 'L1_GWOSC_O2_16KHZ_R1'}
- datasource.framexmit_dict_from_framexmit_list(framexmit_list)[source]¶
Given a list of framexmit addresses with ports, produce a dictionary keyed by ifo:
The list here typically comes from an option parser with options that specify the “append” action.
- Examples:
>>> framexmit_dict_from_framexmit_list(["H1=224.3.2.1:7096", "L1=224.3.2.2:7097", "V1=224.3.2.3:7098"]) {'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097), 'V1': ('224.3.2.3', 7098)}
- datasource.framexmit_list_from_framexmit_dict(framexmit_dict, ifos=None, opt='framexmit-addr')[source]¶
Creates a string of framexmit address options from a dictionary keyed by ifos.
- Examples:
>>> framexmit_list_from_framexmit_dict({'V1': ('224.3.2.3', 7098), 'H1': ('224.3.2.1', 7096), 'L1': ('224.3.2.2', 7097)}) 'V1=224.3.2.3:7098 --framexmit-addr=H1=224.3.2.1:7096 --framexmit-addr=L1=224.3.2.2:7097 '
- datasource.injection_dict_from_channel_list_with_node_range(injection_list)[source]¶
Given a list of injection xml files with a range of mass bins, produce a dictionary keyed by bin number:
The list here typically comes from an option parser with options that specify the “append” action.
Examples: >>> injection_dict_from_channel_list_with_node_range([“0000:0002:Injection_1.xml”, “0002:0004:Injection_2.xml”]) # doctest: +SKIP {‘0000’: ‘Injection_1.xml’, ‘0001’: ‘Injection_1.xml’, ‘0002’: ‘Injection_2.xml’, ‘0003’: ‘Injection_2.xml’}
- datasource.mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose=False)[source]¶
All the conditionals and stupid pet tricks for reading real or simulated h(t) data in one place.
Consult the append_options() function and the GWDataSourceInfo class
This src in general supports only one instrument although GWDataSourceInfo contains dictionaries of multi-instrument things. By specifying the instrument when calling this function you will get ony a single instrument source. A code wishing to have multiple basicsrcs will need to call this function for each instrument.
Gstreamer Graph
- datasource.mkhtgate(pipeline, src, control=None, threshold=8.0, attack_length=128, hold_length=128, **kwargs)[source]¶
A convenience function to provide thresholds on input data. This can be used to remove large spikes / glitches etc. Of course you can use it for other stuff by plugging whatever you want as input and ouput
NOTE: the queues constructed by this code assume the attack and hold lengths combined are less than 1 second in duration.
Gstreamer Graph
- datasource.mksegmentsrcgate(pipeline, src, segment_list, invert_output=False, rate=1, **kwargs)[source]¶
Takes a segment list and produces a gate driven by it. Hook up your own input and output.
@param kwargs passed through to pipeparts.mkgate(), e.g., used to set the gate’s name.
Gstreamer graph describing this function:
- datasource.parse_host(host: str)[source]¶
Value transform for use with parse_list_to_dict that splits host into name and port tuple
- datasource.parse_int(inp: str) int [source]¶
Value transform for use with parse_list_to_dict that coerces string input to int
- datasource.parse_list_to_dict(lst: list, value_transform: Optional[function] = None, sep: str = '=', key_is_range: bool = False, range_sep: str = ':') dict [source]¶
A general list to dict argument parsing coercion function
- Args:
- lst:
list, a list of the form [‘A=V1’, ‘B=V2’, …], where “=” only has to match the sep_str argument
- value_transform:
Function, default None. An optional transformation function to apply on values of the dictionary
- sep:
str, default ‘=’, the separator string between dict keys and values in list elements
- key_is_range:
bool, default False. If True, the keys of the list are compound and contain range information e.g. “start:stop:remaining,list,of,items”
- range_sep:
str, default ‘:’ the separator string for range key information
- Returns:
dict of the form {‘A’: value_transform(‘V1’), …}
- Examples:
>>> parse_list_to_dict(["H1=LSC-STRAIN", "H2=SOMETHING-ELSE"]) {'H1': 'LSC-STRAIN', 'H2': 'SOMETHING-ELSE'}
>>> parse_list_to_dict(["0000:0002:H1=LSC_STRAIN_1,L1=LSC_STRAIN_2", "0002:0004:H1=LSC_STRAIN_3,L1=LSC_STRAIN_4", "0004:0006:H1=LSC_STRAIN_5,L1=LSC_STRAIN_6"], key_is_range=True) {'0000': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0001': {'H1': 'LSC_STRAIN_1', 'L1': 'LSC_STRAIN_2'}, '0002': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0003': {'H1': 'LSC_STRAIN_3', 'L1': 'LSC_STRAIN_4'}, '0004': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}, '0005': {'H1': 'LSC_STRAIN_5', 'L1': 'LSC_STRAIN_6'}}
- datasource.pipeline_channel_list_from_channel_dict(channel_dict, ifos=None, opt='channel-name')[source]¶
Creates a string of channel names options from a dictionary keyed by ifos.
FIXME: This function exists to work around pipeline.py’s inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option.
override –channel-name with a different option by setting opt.
restrict the ifo keys to a subset of the channel_dict by setting ifos
- Examples:
>>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}) 'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN '
>>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, ifos=["H1"]) 'H1=LSC-STRAIN '
>>> pipeline_channel_list_from_channel_dict({'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}, opt="test-string") 'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN '
- datasource.pipeline_channel_list_from_channel_dict_with_node_range(channel_dict, node=0, ifos=None, opt='channel-name')[source]¶
Creates a string of channel names options from a dictionary keyed by ifos.
FIXME: This function exists to work around pipeline.py’s inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option.
override –channel-name with a different option by setting opt.
restrict the ifo keys to a subset of the channel_dict by. setting ifos
- Examples:
>>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0) 'H2=SOMETHING-ELSE --channel-name=H1=LSC-STRAIN '
>>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, ifos=["H1"]) 'H1=LSC-STRAIN '
>>> pipeline_channel_list_from_channel_dict_with_node_range({'0000': {'H2': 'SOMETHING-ELSE', 'H1': 'LSC-STRAIN'}}, node=0, opt="test-string") 'H2=SOMETHING-ELSE --test-string=H1=LSC-STRAIN '
- datasource.pipeline_seek_for_gps(pipeline, gps_start_time, gps_end_time, flags=<flags GST_SEEK_FLAG_FLUSH of type Gst.SeekFlags>)[source]¶
Create a new seek event, i.e., Gst.Event.new_seek() for a given gps_start_time and gps_end_time, with optional flags.
@param gps_start_time start time as LIGOTimeGPS, double or float @param gps_end_time start time as LIGOTimeGPS, double or float
- datasource.ravel_dict_to_list(dct: dict, value_transform: function = <class 'str'>, sep: str = '=', key_is_range: bool = False, range_sep: str = ':', join_arg: ~typing.Optional[str] = None, unzip: bool = False, filter_keys: ~typing.Optional[list] = None, range_elem: ~typing.Optional[int] = None) list [source]¶
Functional inverse of parse_list_to_dict.
TODO: This function exists to work around pipeline.py’s inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option.
- Args:
- dct:
dict, the dict to transform into a list
- value_transform:
Function, default str, the way to transform values of the dict before adding to list
- sep:
str, default ‘=’ separator character to join key-value pairs
- key_is_range:
bool, default False. If True, aggregate keys into ranges
- range_sep:
str, default ‘:’, the range separator to join range key min and max values
- unzip:
bool, default False, if True unzip the dct arg and ravel each single-valued dict separately
- Returns:
List[str] the dict converted into lists of strings
- datasource.ravel_host(host: tuple) str [source]¶
Value transform for use with ravel_dict_to_list that ravels a host name and port into a string
- datasource.state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list, state_vector_on_off_dict={'H1': [7, 352], 'H2': [7, 352], 'L1': [7, 352], 'V1': [103, 256]})[source]¶
Produce a dictionary (keyed by detector) of on / off bit tuples from a list provided on the command line.
Takes default values from module level datasource.state_vector_on_off_dict if state_vector_on_off_dict is not given
Inputs must be given as base 10 or 16 integers
- Examples:
>>> on_bit_list = ["V1=7", "H1=7", "L1=7"] >>> off_bit_list = ["V1=256", "H1=352", "L1=352"] >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list) {'H1': [7, 352], 'H2': [7, 352], 'L1': [7, 352], 'V1': [7, 256]}
>>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{}) {'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
>>> on_bit_list = ["V1=0x7", "H1=0x7", "L1=0x7"] >>> off_bit_list = ["V1=0x256", "H1=0x352", "L1=0x352"] >>> state_vector_on_off_dict_from_bit_lists(on_bit_list, off_bit_list,{}) {'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]}
- datasource.state_vector_on_off_list_from_bits_dict(bit_dict)[source]¶
Produce a tuple of useful command lines from a dictionary of on / off state vector bits keyed by detector
FIXME: This function exists to work around pipeline.py’s inability to give the same option more than once by producing a string to pass as an argument that encodes the other instances of the option.
- Examples:
>>> state_vector_on_off_dict = {"H1":[0x7, 0x160], "H2":[0x7, 0x160], "L1":[0x7, 0x160], "V1":[0x67, 0x100]} >>> state_vector_on_off_list_from_bits_dict(state_vector_on_off_dict) ('H1=7 --state-vector-on-bits=H2=7 --state-vector-on-bits=L1=7 --state-vector-on-bits=V1=103 ', 'H1=352 --state-vector-off-bits=H2=352 --state-vector-off-bits=L1=352 --state-vector-off-bits=V1=256 ')
- datasource.unzip_dict_values(dct: dict)[source]¶
Split multi-valued dict into ordered collection of single-valued dicts with common keys All values should be of same length.
- datasource.zip_dict_values(*dicts, defaults: Optional[dict] = None, key_union: bool = True)[source]¶
Zip dict values by matching keys
- Args:
- *dicts:
Iterable[dict], a collection of dicts whose values will be grouped by key in the order given
- defaults:
dict, default None, if specified fill in missing values with these defaults
- key_union:
bool, default True, if True then use a union of keys, else use intersection.
Examples:
>>> on_bit_list = parse_list_to_dict(["V1=7", "H1=7", "L1=7"], value_transform=int) >>> off_bit_list = parse_list_to_dict(["V1=256", "H1=352", "L1=352"], value_transform=int) >>> zip_dict_values(on_bit_list, off_bit_list, defaults=DEFAULT_STATE_VECTOR_ON_OFF) {'V1': [7, 256], 'H1': [7, 352], 'H2': [7, 352], 'L1': [7, 352]}
>>> zip_dict_values(on_bit_list, off_bit_list,{}) {'V1': [7, 256], 'H1': [7, 352], 'L1': [7, 352]}
>>> on_bit_list = parse_list_to_dict(["V1=0x7", "H1=0x7", "L1=0x7"], value_transform=lambda x: int(x, 16)) >>> off_bit_list = parse_list_to_dict(["V1=0x256", "H1=0x352", "L1=0x352"], value_transform=lambda x: int(x, 16)) >>> zip_dict_values(on_bit_list, off_bit_list,{}) {'V1': [7, 598], 'H1': [7, 850], 'L1': [7, 850]}