"""Module for producing source elements
"""
import pathlib
import sys
import os
from typing import List, Tuple, Union, Iterable, Optional
from ligo import segments
from gstlal.pipeparts import pipetools, transform, filters, mux
from gstlal.utilities import laltools
BYTE_ORDER = 'LE' if sys.byteorder == "little" else 'BE'
[docs]class NDSChannelType:
"""Enumeration of NDS channel types
References:
Implementation: gstlal-ugly/gst/nds/ndssrc.c
"""
Unknown = 'unknown'
Online = 'online'
Raw = 'raw'
Reduced = 'reduced'
SecondTrend = 's-trend'
MinuteTrend = 'm-trend'
TestPoint = 'test-pt'
[docs]class SrcDeferredLink(object):
"""A class that manages the task of watching for and connecting to new
source pads by name. The inputs are an element, the name of the
source pad to watch for on that element, and the sink pad (on a
different element) to which the source pad should be linked when it
appears.
The "pad-added" signal of the element will be used to watch for new
pads, and if the "no-more-pads" signal is emitted by the element
before the requested pad has appeared ValueError is raised.
"""
def __init__(self, element, srcpadname, sinkpad):
no_more_pads_handler_id = element.connect("no-more-pads", self.no_more_pads, srcpadname)
assert no_more_pads_handler_id > 0
pad_added_data = [srcpadname, sinkpad, no_more_pads_handler_id]
pad_added_handler_id = element.connect("pad-added", self.pad_added, pad_added_data)
assert pad_added_handler_id > 0
pad_added_data.append(pad_added_handler_id)
[docs] @staticmethod
def pad_added(element, pad, src_sink_ids):
srcpadname, sinkpad, no_more_pads_handler_id, pad_added_handler_id = src_sink_ids
if pad.get_name() == srcpadname:
element.handler_disconnect(no_more_pads_handler_id)
element.handler_disconnect(pad_added_handler_id)
pad.link(sinkpad)
[docs] @staticmethod
def no_more_pads(element, srcpadname):
raise ValueError("<%s>: no pad named '%s'" % (element.get_name(), srcpadname))
[docs]def fake_ligo(pipeline: pipetools.Pipeline, instrument: str = None, channel_name: str = None, blocksize: int = 16384 * 8 * 1) -> pipetools.Element:
"""Fake LIGO Source
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
instrument:
str, default None
channel_name:
str, default None
blocksize:
int, default 16384 * 8 * 1, Number of samples in each outgoing buffer
References:
Implementation gstlal/gst/python/lal_fakeligosrc.py
Returns:
Element
"""
properties = {"blocksize": blocksize}
properties.update((name, val) for name, val in (("instrument", instrument), ("channel_name", channel_name)) if val is not None)
return pipetools.make_element_with_src(pipeline, None, "lal_fakeligosrc", **properties)
[docs]def fake_aligo(pipeline: pipetools.Pipeline, instrument: str = None, channel_name: str = None, blocksize: int = 16384 * 8 * 1) -> pipetools.Element:
"""Fake Advanced LIGO Source
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
instrument:
str, default None
channel_name:
str, default None
blocksize:
int, default 16384 * 8 * 1, Number of samples in each outgoing buffer
References:
Implementation gstlal/gst/python/lal_fakeadvligosrc.py
Returns:
Element
"""
properties = {"blocksize": blocksize}
properties.update((name, val) for name, val in (("instrument", instrument), ("channel_name", channel_name)) if val is not None)
return pipetools.make_element_with_src(pipeline, None, "lal_fakeadvligosrc", **properties)
[docs]def fake_avirgo(pipeline: pipetools.Pipeline, instrument: str = None, channel_name: str = None, blocksize: int = 16384 * 8 * 1) -> pipetools.Element:
"""Fake Advanced Virgo Source
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
instrument:
str, default None
channel_name:
str, default None
blocksize:
int, default 16384 * 8 * 1, Number of samples in each outgoing buffer
References:
Implementation gstlal/gst/python/lal_fakeadvvirgosrc.py
Returns:
Element
"""
properties = {"blocksize": blocksize}
if instrument is not None:
properties["instrument"] = instrument
if channel_name is not None:
properties["channel_name"] = channel_name
return pipetools.make_element_with_src(pipeline, None, "lal_fakeadvvirgosrc", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALSegmentSrc.html">lal_segmentsrc</a> element to a pipeline with useful default properties
[docs]def segment(pipeline: pipetools.Pipeline, segment_list: List[Tuple[pipetools.TimeGPS, pipetools.TimeGPS]], blocksize: int = 4096 * 1 * 1,
invert_output: bool = False) -> pipetools.Element:
"""The output is a buffer of boolean values specifying when a list of segments are on and off.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
segment_list:
Iterable[Tuple[TimeGPS, TimeGPS]], list of segment start / stop times
blocksize:
int, default blocksize is 4096 seconds of unsigned integers at 1 Hz, e.g. segments without nanoseconds
invert_output:
bool, default False, False = output is high in segments (default), True = output is low in segments
References:
Implementation: gstlal/gst/lal/gstlal_segmentsrc.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, None, "lal_segmentsrc", blocksize=blocksize,
segment_list=segments.segmentlist(segments.segment(a.ns(), b.ns()) for a, b in segment_list),
invert_output=invert_output)
## Adds a <a href="@gstlalgtkdoc/GstLALCacheSrc.html">lal_cachesrc</a> element to a pipeline with useful default properties
[docs]def cache(pipeline: pipetools.Pipeline, location: str, use_mmap: bool = True, **properties) -> pipetools.Element:
"""Retrieve frame files from locations recorded in a LAL cache file.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
location:
str, Path to LAL cache file.
use_mmap:
bool, default True, if True Use mmap() instead of read()
**properties:
References:
Implementation: gstlal/gst/lal/gstlal_cachesrc.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, None, "lal_cachesrc", location=location, use_mmap=use_mmap, **properties)
[docs]def lvshm(pipeline: pipetools.Pipeline, shm_name: str, **properties) -> pipetools.Element:
"""LIGO-Virgo shared memory frame file source element
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
shm_name:
str, Shared memory partition name. Suggestions: LHO_Data, LLO_Data, VIRGO_Data
**properties:
References:
Implementation: gstlal-ugly/gst/gds/lvshmsrc.cc
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, None, "gds_lvshmsrc", shm_name=shm_name, **properties)
[docs]def devshm(pipeline: pipetools.Pipeline, shm_dirname: str, **properties) -> pipetools.Element:
"""LIGO-Virgo /dev/shm frame file source element
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
shm_dirname:
str, Shared memory directory name (full path). Suggestion: /dev/shm/kafka/L1_O3ReplayMDC
**properties:
References:
Implementation: gstlal-ugly/gst/gds/devshmsrc.cc
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, None, "gds_devshmsrc", shm_dirname=shm_dirname, **properties)
[docs]def framexmit(pipeline: pipetools.Pipeline, multicast_group: str = '0.0.0.0', port: int = 0, **properties) -> pipetools.Element:
"""FrameXMIT based source element
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
multicast_group:
str, default "0.0.0.0", The address of multicast group to join. If no multicast address is supplied, the receiver will listen for
UDP/IP broadcast transmissions at the specified port.
port:
int, default 0, The local port on which to receive broadcasts (0 = allocate). These ports can be reused by multiple applications.
**properties:
References:
Implementation: gstlal-ugly/gst/gds/framexmitsrc.cc
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, None, "gds_framexmitsrc", multicast_group=multicast_group, port=port, **properties)
[docs]def nds(pipeline: pipetools.Pipeline, host: str, instrument: str, channel_name: str, channel_type: str, blocksize: int = 16384 * 8 * 1, port: int = 31200) -> pipetools.Element:
"""NDS-based src element
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
host:
str, NDS1 or NDS2 remote host name or IP address
instrument:
str, name of instrument
channel_name:
str, Name of the desired NDS channel.
channel_type:
str, Type of the desired NDS channel.
blocksize:
int, default 16384 * 8 * 1, blocksize
port:
int, NDS1 or NDS2 remote host port
References:
Implementation: gstlal-ugly/gst/nds/ndssrc.c
Returns:
Element
"""
# default blocksize is 1 second of double precision floats at
# 16384 Hz, e.g., LIGO h(t)
return pipetools.make_element_with_src(pipeline, None, "ndssrc", blocksize=blocksize, port=port, host=host, channel_name="%s:%s" % (instrument, channel_name),
channel_type=channel_type)
## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audiotestsrc.html">audiotestsrc</a> element to a pipeline with useful default properties
[docs]def audio_test(pipeline: pipetools.Pipeline, freq: float = 440, volume: float = 0.8, wave: int = AudioTestWaveform.Sine, samples_per_buffer: int = 1024,
**properties) -> pipetools.Element:
"""AudioTestSrc can be used to generate basic audio signals. It support several different waveforms and
allows to set the base frequency and volume. Some waveforms might use additional properties.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
freq:
float, Frequency of test signal. The sample rate needs to be at least 2 times higher.
volume:
float, default 0.8, Volume of test signal
wave:
int, default 0, the type of waveform to produce. Options are:
sine (0) – Sine
square (1) – Square
saw (2) – Saw
triangle (3) – Triangle
silence (4) – Silence
white-noise (5) – White uniform noise
pink-noise (6) – Pink noise
sine-table (7) – Sine table
ticks (8) – Periodic Ticks
gaussian-noise (9) – White Gaussian noise
red-noise (10) – Red (brownian) noise
blue-noise (11) – Blue noise
violet-noise (12) – Violet noise
samples_per_buffer:
int, default 1024, Number of samples in each outgoing buffer. Must be at least twice 'freq'
**properties:
References:
[1] https://gstreamer.freedesktop.org/documentation/audiotestsrc/index.html?gi-language=python
Returns:
Element
"""
if 'samplesperbuffer' in properties: # support legacy argument name
samples_per_buffer = properties.pop('samplesperbuffer')
return pipetools.make_element_with_src(pipeline, None, "audiotestsrc", freq=freq, volume=volume, wave=wave,
samplesperbuffer=samples_per_buffer, **properties)
## see documentation for mktaginject() mkcapsfilter() and mkaudiotestsrc()
[docs]def fake(pipeline: pipetools.Pipeline, instrument: str, channel_name: str, blocksize: int = None, volume: float = 1e-20,
is_live: bool = False, wave: int = AudioTestWaveform.GaussianNoise, rate: int = 16384, **properties) -> pipetools.Element:
"""Create an audio_test source with several additional, lal-specific caps specified
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
instrument:
str, name of instrument
channel_name:
str, name of input channel
blocksize:
int, default 1 second * rate samples/second * 8
volume:
float, default 1e-20, the sample volume
is_live:
bool, default False, whether or not audio_test source will behave like live source
wave:
int, default 9 (Gaussian Noise), see AudioTestWaveform enum for options
rate:
int, default 16384, sample rate
**properties:
Returns:
Element
"""
if blocksize is None:
# default blocksize is 1 second * rate samples/second * 8
# bytes/sample (assume double-precision floats)
blocksize = 1 * rate * 8
caps = filters.caps(pipeline,
audio_test(pipeline, samples_per_buffer=int(blocksize / 8), wave=wave,
volume=volume, is_live=is_live, **properties),
"audio/x-raw, format=F64%s, rate=%d" % (BYTE_ORDER, rate))
return transform.tag_inject(pipeline, caps, "instrument=%s,channel-name=%s,units=strain" % (instrument, channel_name))
[docs]def files(pipeline: pipetools.Pipeline, paths: Iterable[Union[str, pathlib.Path]], instrument: str, channel_name: str,
cache_path: Optional[Union[str, pathlib.Path]] = None) -> pipetools.Element:
"""Create a source from a list of file paths
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
paths:
Iterable[Path or str], the full paths to the frame files
cache_path:
Path or str, default None, the path to write out the cache file if specified, else write to temporary directory
Notes:
This is a convenience utility around cache source and framecppdemux that creates a cache file
from a list of file paths
Returns:
Element
"""
cache_path = laltools.create_cache(entries=paths, cache_path=cache_path)
src = cache(pipeline, location=cache_path.as_posix())
demux = mux.framecpp_channel_demux(pipeline, src, do_file_checksum=False, channel_list=["%s:%s" % (instrument, channel_name)])
mux.FrameCPPChannelDemuxSetUnitsHandler(demux, dict.fromkeys(demux.get_property("channel-list"), "strain"))
# allow frame reading and decoding to occur in a different thread
src = transform.queue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=8 * pipetools.Gst.SECOND)
SrcDeferredLink(demux, "%s:%s" % (instrument, channel_name), src.get_static_pad("sink"))
return src