Source code for pipeparts.source

"""Module for producing source elements

"""
import pathlib
import sys
import os
from typing import List, Tuple, Union, Iterable, Optional

from ligo import segments

from gstlal.pipeparts import pipetools, transform, filters, mux
from gstlal.utilities import laltools

BYTE_ORDER = 'LE' if sys.byteorder == "little" else 'BE'


[docs]class AudioTestWaveform: """Enumeration of test waveforms References: [1] https://gstreamer.freedesktop.org/documentation/audiotestsrc/index.html?gi-language=python#GstAudioTestSrcWave """ Sine = 0 Square = 1 Saw = 2 Triangle = 3 Silence = 4 WhiteNoise = 5 PinkNoise = 6 SineTable = 7 Ticks = 8 GaussianNoise = 9 RedNoise = 10 BlueNoise = 11 VioletNoise = 12
[docs]class NDSChannelType: """Enumeration of NDS channel types References: Implementation: gstlal-ugly/gst/nds/ndssrc.c """ Unknown = 'unknown' Online = 'online' Raw = 'raw' Reduced = 'reduced' SecondTrend = 's-trend' MinuteTrend = 'm-trend' TestPoint = 'test-pt'
[docs]def fake_ligo(pipeline: pipetools.Pipeline, instrument: str = None, channel_name: str = None, blocksize: int = 16384 * 8 * 1) -> pipetools.Element: """Fake LIGO Source Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added instrument: str, default None channel_name: str, default None blocksize: int, default 16384 * 8 * 1, Number of samples in each outgoing buffer References: Implementation gstlal/gst/python/lal_fakeligosrc.py Returns: Element """ properties = {"blocksize": blocksize} properties.update((name, val) for name, val in (("instrument", instrument), ("channel_name", channel_name)) if val is not None) return pipetools.make_element_with_src(pipeline, None, "lal_fakeligosrc", **properties)
[docs]def fake_aligo(pipeline: pipetools.Pipeline, instrument: str = None, channel_name: str = None, blocksize: int = 16384 * 8 * 1) -> pipetools.Element: """Fake Advanced LIGO Source Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added instrument: str, default None channel_name: str, default None blocksize: int, default 16384 * 8 * 1, Number of samples in each outgoing buffer References: Implementation gstlal/gst/python/lal_fakeadvligosrc.py Returns: Element """ properties = {"blocksize": blocksize} properties.update((name, val) for name, val in (("instrument", instrument), ("channel_name", channel_name)) if val is not None) return pipetools.make_element_with_src(pipeline, None, "lal_fakeadvligosrc", **properties)
[docs]def fake_avirgo(pipeline: pipetools.Pipeline, instrument: str = None, channel_name: str = None, blocksize: int = 16384 * 8 * 1) -> pipetools.Element: """Fake Advanced Virgo Source Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added instrument: str, default None channel_name: str, default None blocksize: int, default 16384 * 8 * 1, Number of samples in each outgoing buffer References: Implementation gstlal/gst/python/lal_fakeadvvirgosrc.py Returns: Element """ properties = {"blocksize": blocksize} if instrument is not None: properties["instrument"] = instrument if channel_name is not None: properties["channel_name"] = channel_name return pipetools.make_element_with_src(pipeline, None, "lal_fakeadvvirgosrc", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALSegmentSrc.html">lal_segmentsrc</a> element to a pipeline with useful default properties
[docs]def segment(pipeline: pipetools.Pipeline, segment_list: List[Tuple[pipetools.TimeGPS, pipetools.TimeGPS]], blocksize: int = 4096 * 1 * 1, invert_output: bool = False) -> pipetools.Element: """The output is a buffer of boolean values specifying when a list of segments are on and off. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added segment_list: Iterable[Tuple[TimeGPS, TimeGPS]], list of segment start / stop times blocksize: int, default blocksize is 4096 seconds of unsigned integers at 1 Hz, e.g. segments without nanoseconds invert_output: bool, default False, False = output is high in segments (default), True = output is low in segments References: Implementation: gstlal/gst/lal/gstlal_segmentsrc.c Returns: Element """ return pipetools.make_element_with_src(pipeline, None, "lal_segmentsrc", blocksize=blocksize, segment_list=segments.segmentlist(segments.segment(a.ns(), b.ns()) for a, b in segment_list), invert_output=invert_output)
## Adds a <a href="@gstlalgtkdoc/GstLALCacheSrc.html">lal_cachesrc</a> element to a pipeline with useful default properties
[docs]def cache(pipeline: pipetools.Pipeline, location: str, use_mmap: bool = True, **properties) -> pipetools.Element: """Retrieve frame files from locations recorded in a LAL cache file. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added location: str, Path to LAL cache file. use_mmap: bool, default True, if True Use mmap() instead of read() **properties: References: Implementation: gstlal/gst/lal/gstlal_cachesrc.c Returns: Element """ return pipetools.make_element_with_src(pipeline, None, "lal_cachesrc", location=location, use_mmap=use_mmap, **properties)
[docs]def lvshm(pipeline: pipetools.Pipeline, shm_name: str, **properties) -> pipetools.Element: """LIGO-Virgo shared memory frame file source element Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added shm_name: str, Shared memory partition name. Suggestions: LHO_Data, LLO_Data, VIRGO_Data **properties: References: Implementation: gstlal-ugly/gst/gds/lvshmsrc.cc Returns: Element """ return pipetools.make_element_with_src(pipeline, None, "gds_lvshmsrc", shm_name=shm_name, **properties)
[docs]def devshm(pipeline: pipetools.Pipeline, shm_dirname: str, **properties) -> pipetools.Element: """LIGO-Virgo /dev/shm frame file source element Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added shm_dirname: str, Shared memory directory name (full path). Suggestion: /dev/shm/kafka/L1_O3ReplayMDC **properties: References: Implementation: gstlal-ugly/gst/gds/devshmsrc.cc Returns: Element """ return pipetools.make_element_with_src(pipeline, None, "gds_devshmsrc", shm_dirname=shm_dirname, **properties)
[docs]def framexmit(pipeline: pipetools.Pipeline, multicast_group: str = '0.0.0.0', port: int = 0, **properties) -> pipetools.Element: """FrameXMIT based source element Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added multicast_group: str, default "0.0.0.0", The address of multicast group to join. If no multicast address is supplied, the receiver will listen for UDP/IP broadcast transmissions at the specified port. port: int, default 0, The local port on which to receive broadcasts (0 = allocate). These ports can be reused by multiple applications. **properties: References: Implementation: gstlal-ugly/gst/gds/framexmitsrc.cc Returns: Element """ return pipetools.make_element_with_src(pipeline, None, "gds_framexmitsrc", multicast_group=multicast_group, port=port, **properties)
[docs]def nds(pipeline: pipetools.Pipeline, host: str, instrument: str, channel_name: str, channel_type: str, blocksize: int = 16384 * 8 * 1, port: int = 31200) -> pipetools.Element: """NDS-based src element Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added host: str, NDS1 or NDS2 remote host name or IP address instrument: str, name of instrument channel_name: str, Name of the desired NDS channel. channel_type: str, Type of the desired NDS channel. blocksize: int, default 16384 * 8 * 1, blocksize port: int, NDS1 or NDS2 remote host port References: Implementation: gstlal-ugly/gst/nds/ndssrc.c Returns: Element """ # default blocksize is 1 second of double precision floats at # 16384 Hz, e.g., LIGO h(t) return pipetools.make_element_with_src(pipeline, None, "ndssrc", blocksize=blocksize, port=port, host=host, channel_name="%s:%s" % (instrument, channel_name), channel_type=channel_type)
## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audiotestsrc.html">audiotestsrc</a> element to a pipeline with useful default properties
[docs]def audio_test(pipeline: pipetools.Pipeline, freq: float = 440, volume: float = 0.8, wave: int = AudioTestWaveform.Sine, samples_per_buffer: int = 1024, **properties) -> pipetools.Element: """AudioTestSrc can be used to generate basic audio signals. It support several different waveforms and allows to set the base frequency and volume. Some waveforms might use additional properties. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added freq: float, Frequency of test signal. The sample rate needs to be at least 2 times higher. volume: float, default 0.8, Volume of test signal wave: int, default 0, the type of waveform to produce. Options are: sine (0) – Sine square (1) – Square saw (2) – Saw triangle (3) – Triangle silence (4) – Silence white-noise (5) – White uniform noise pink-noise (6) – Pink noise sine-table (7) – Sine table ticks (8) – Periodic Ticks gaussian-noise (9) – White Gaussian noise red-noise (10) – Red (brownian) noise blue-noise (11) – Blue noise violet-noise (12) – Violet noise samples_per_buffer: int, default 1024, Number of samples in each outgoing buffer. Must be at least twice 'freq' **properties: References: [1] https://gstreamer.freedesktop.org/documentation/audiotestsrc/index.html?gi-language=python Returns: Element """ if 'samplesperbuffer' in properties: # support legacy argument name samples_per_buffer = properties.pop('samplesperbuffer') return pipetools.make_element_with_src(pipeline, None, "audiotestsrc", freq=freq, volume=volume, wave=wave, samplesperbuffer=samples_per_buffer, **properties)
## see documentation for mktaginject() mkcapsfilter() and mkaudiotestsrc()
[docs]def fake(pipeline: pipetools.Pipeline, instrument: str, channel_name: str, blocksize: int = None, volume: float = 1e-20, is_live: bool = False, wave: int = AudioTestWaveform.GaussianNoise, rate: int = 16384, **properties) -> pipetools.Element: """Create an audio_test source with several additional, lal-specific caps specified Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added instrument: str, name of instrument channel_name: str, name of input channel blocksize: int, default 1 second * rate samples/second * 8 volume: float, default 1e-20, the sample volume is_live: bool, default False, whether or not audio_test source will behave like live source wave: int, default 9 (Gaussian Noise), see AudioTestWaveform enum for options rate: int, default 16384, sample rate **properties: Returns: Element """ if blocksize is None: # default blocksize is 1 second * rate samples/second * 8 # bytes/sample (assume double-precision floats) blocksize = 1 * rate * 8 caps = filters.caps(pipeline, audio_test(pipeline, samples_per_buffer=int(blocksize / 8), wave=wave, volume=volume, is_live=is_live, **properties), "audio/x-raw, format=F64%s, rate=%d" % (BYTE_ORDER, rate)) return transform.tag_inject(pipeline, caps, "instrument=%s,channel-name=%s,units=strain" % (instrument, channel_name))
[docs]def files(pipeline: pipetools.Pipeline, paths: Iterable[Union[str, pathlib.Path]], instrument: str, channel_name: str, cache_path: Optional[Union[str, pathlib.Path]] = None) -> pipetools.Element: """Create a source from a list of file paths Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added paths: Iterable[Path or str], the full paths to the frame files cache_path: Path or str, default None, the path to write out the cache file if specified, else write to temporary directory Notes: This is a convenience utility around cache source and framecppdemux that creates a cache file from a list of file paths Returns: Element """ cache_path = laltools.create_cache(entries=paths, cache_path=cache_path) src = cache(pipeline, location=cache_path.as_posix()) demux = mux.framecpp_channel_demux(pipeline, src, do_file_checksum=False, channel_list=["%s:%s" % (instrument, channel_name)]) mux.FrameCPPChannelDemuxSetUnitsHandler(demux, dict.fromkeys(demux.get_property("channel-list"), "strain")) # allow frame reading and decoding to occur in a different thread src = transform.queue(pipeline, None, max_size_buffers=0, max_size_bytes=0, max_size_time=8 * pipetools.Gst.SECOND) SrcDeferredLink(demux, "%s:%s" % (instrument, channel_name), src.get_static_pad("sink")) return src