Source code for pipeparts.transform

"""Module for general transformation elements

"""
from typing import Iterable, Tuple, List

import gi
import numpy
import os

gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst

GObject.threads_init()
Gst.init(None)
from ligo import segments
from gstlal import pipeio
from gstlal.pipeparts import pipetools, filters


# a macro to turn on mkchecktimestamps
if "GSTLAL_CHECK_TIMESTAMPS" in os.environ:
	GSTLAL_CHECK_TIMESTAMPS = True
else:
	GSTLAL_CHECK_TIMESTAMPS = False


[docs]def integrate(pipeline: pipetools.Pipeline, src: pipetools.Element, template_dur: float = 1.0, **properties) -> pipetools.Element: """Integrates audio channel temp_dur length into the past. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element **properties: References: Implementation: gstlal-ugly/gst/lal/gstlal_integrate.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_integrate", template_dur=template_dur, **properties)
[docs]def mean(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element: """Compute mean Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element References: Implementation: gstlal-ugly/gst/lal/gstlal_mean.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_mean", **properties)
[docs]def abs_(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element: """Compute absolute value Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "abs", **properties)
[docs]def pow(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element: """Compute power Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "pow", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALSumSquares.html">lal_sumsquares</a> element to a pipeline with useful default properties
[docs]def sum_squares(pipeline: pipetools.Pipeline, src: pipetools.Element, weights: pipetools.ValueArray = None) -> pipetools.Element: """Computes the weighted sum-of-squares of the input channels. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element weights: ValueArray, default None, Vector of weights to use in sum. If no vector is provided weights of 1.0 are assumed, otherwise the number of input channels must equal the vector length. The incoming channels are first multiplied by the weights, then squared, then summed. References: Implementation gstlal/gstlal/gst/lal/gstlal_sumsquares.c Returns: Element """ if weights is not None: return pipetools.make_element_with_src(pipeline, src, "lal_sumsquares", weights=weights) else: return pipetools.make_element_with_src(pipeline, src, "lal_sumsquares")
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-taginject.html">taginject</a> element to a pipeline with useful default properties
[docs]def tag_inject(pipeline: pipetools.Pipeline, src: pipetools.Element, tags: str) -> pipetools.Element: """Element that injects new metadata tags, but passes incoming data through unmodified. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element tags: str, List of tags to inject into the target file References: [1] https://gstreamer.freedesktop.org/documentation/debug/taginject.html?gi-language=python Returns: Element, unmodified data with new tags """ return pipetools.make_element_with_src(pipeline, src, "taginject", tags=tags)
## Adds a <a href="@gstlalgtkdoc/GSTLALShift.html">lal_shift</a> element to a pipeline with useful default properties
[docs]def shift(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element: """Adjust segment events by +shift Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element **properties: References: Implementation: gstlal/gst/lal/gstlal_shift.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_shift", **properties)
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-audioamplify.html">audioamplify</a> element to a pipeline with useful default properties
[docs]def amplify(pipeline: pipetools.Pipeline, src: pipetools.Element, amplification: float) -> pipetools.Element: """Amplifies an audio stream by a given factor and allows the selection of different clipping modes. The difference between the clipping modes is best evaluated by testing. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element amplification: float, Factor of amplification References: [1] https://gstreamer.freedesktop.org/documentation/audiofx/audioamplify.html?gi-language=python Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "audioamplify", clipping_method=3, amplification=amplification)
## Adds a <a href="@gstlalgtkdoc/GSTLALAudioUnderSample.html">lal_audioundersample</a> element to a pipeline with useful default properties
[docs]def undersample(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element: """Undersamples an audio stream. Undersampling downsamples by taking every n-th sample, with no antialiasing or low-pass filter. For data confined to a narrow frequency band, this transformation simultaneously downconverts and downsamples the data (otherwise it does weird things). This element's output sample rate must be an integer divisor of its input sample rate. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element References: Implementation: gstlal/gst/lal/gstlal_audioundersample.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_audioundersample")
## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audioresample.html">audioresample</a> element to a pipeline with useful default properties
[docs]def resample(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element: """Resamples raw audio buffers to different sample rates using a configurable windowing function to enhance quality. By default, the resampler uses a reduced sinc table, with cubic interpolation filling in the gaps. This ensures that the table does not become too big. However, the interpolation increases the CPU usage considerably. As an alternative, a full sinc table can be used. Doing so can drastically reduce CPU usage (4x faster with 44.1 -> 48 kHz conversions for example), at the cost of increased memory consumption, plus the sinc table takes longer to initialize when the element is created. A third mode exists, which uses the full table unless said table would become too large, in which case the interpolated one is used instead. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element **properties: References: [1] https://gstreamer.freedesktop.org/documentation/audioresample/index.html?gi-language=python Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "audioresample", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALInterpolator.html">lal_interpolator</a> element to a pipeline with useful default properties
[docs]def interpolator(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element: """Interpolates multichannel audio data using BLAS Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element **properties: References: Implementation: gstlal-ugly/gst/lal/gstlal_interpolator.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_interpolator", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALWhiten.html">lal_whiten</a> element to a pipeline with useful default properties
[docs]def whiten(pipeline: pipetools.Pipeline, src: pipetools.Element, psd_mode: int = 0, zero_pad: int = 0, fft_length: int = 8, average_samples: int = 64, median_samples: int = 7, **properties) -> pipetools.Element: """A PSD estimator and time series whitener. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element psd_mode: int, default 0, PSD estimation mode. Options are: "GSTLAL_PSDMODE_RUNNING_AVERAGE", Use running average for PSD "GSTLAL_PSDMODE_FIXED", Use fixed spectrum for PSD zero_pad: int, default 0, Length of the zero-padding to include on both sides of the FFT in seconds fft_length: int, default 8, Total length of the FFT convolution (including zero padding) in seconds average_samples: int, default 64, Number of FFTs to be used in PSD average median_samples: int, default 7, Number of FFTs to be used in PSD median history **properties: References: Implementation: gstlal/gst/lal/gstlal_whiten.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_whiten", psd_mode=psd_mode, zero_pad=zero_pad, fft_length=fft_length, average_samples=average_samples, median_samples=median_samples, **properties)
## Adds a <a href="@gstdoc/gstreamer-plugins-tee.html">tee</a> element to a pipeline with useful default properties
[docs]def tee(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element: """Split data to multiple pads. Branching the data flow is useful when e.g. capturing a video where the video is shown on the screen and also encoded and written to a file. Another example is playing music and hooking up a visualisation module. One needs to use separate queue elements (or a multiqueue) in each branch to provide separate threads for each branch. Otherwise a blocked dataflow in one branch would stall the other branches. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element References: [1] https://gstreamer.freedesktop.org/documentation/coreelements/tee.html?gi-language=python Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "tee")
## Adds a <a href="@gstdoc/GstLALAdder.html">lal_adder</a> element to a pipeline configured for synchronous "sum" mode mixing.
[docs]def adder(pipeline: pipetools.Pipeline, srcs: Iterable[pipetools.Element], sync: bool = True, mix_mode: str = "sum", **properties) -> pipetools.Element: """The adder allows to mix several streams into one by adding the data. Mixed data is clamped to the min/max values of the data format. If the element's sync property is TRUE the streams are mixed with the timestamps synchronized. If the sync property is FALSE (the default, to be compatible with older versions), then the first samples from each stream are added to produce the first sample of the output, the second samples are added to produce the second sample of the output, and so on. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added srcs: Iterable[Gst.Element], the source elements sync: bool, default True, Align the time stamps of input streams mix_mode: str, default 'sum', Algorithm for mixing the input streams, options: "sum", "product" **properties: References: Implementation: gstlal/gst/lal/gstadder.c Returns: Element """ elem = pipetools.make_element_with_src(pipeline, None, "lal_adder", sync=sync, mix_mode=mix_mode, **properties) if srcs is not None: for src in srcs: src.link(elem) return elem
## Adds a <a href="@gstdoc/GstLALAdder.html">lal_adder</a> element to a pipeline configured for synchronous "product" mode mixing.
[docs]def multiplier(pipeline: pipetools.Pipeline, srcs: Iterable[pipetools.Element], sync: bool = True, mix_mode: str = "product", **properties) -> pipetools.Element: """Helper function around adder that defaults to a mix mode of "product" Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added srcs: Iterable[Gst.Element], the source elements sync: bool, default True, Align the time stamps of input streams mix_mode: str, default 'product', Algorithm for mixing the input streams, options: "sum", "product" **properties: References: Implementation: gstlal/gst/lal/gstadder.c Returns: Element """ return adder(pipeline, srcs, sync=sync, mix_mode=mix_mode, **properties)
## Adds a <a href="@gstdoc/gstreamer-plugins-queue.html">queue</a> element to a pipeline with useful default properties
[docs]def queue(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element: """Data is queued until one of the limits specified by the , and/or properties has been reached. Any attempt to push more buffers into the queue will block the pushing thread until more space becomes available. The queue will create a new thread on the source pad to decouple the processing on sink and source pad. You can query how many buffers are queued by reading the property. You can track changes by connecting to the notify::current-level-buffers signal (which like all signals will be emitted from the streaming thread). The same applies to the and properties. The default queue size limits are 200 buffers, 10MB of data, or one second worth of data, whichever is reached first. As said earlier, the queue blocks by default when one of the specified maximums (bytes, time, buffers) has been reached. You can set the property to specify that instead of blocking it should leak (drop) new or old buffers. The signal is emitted when the queue has less data than the specified minimum thresholds require (by default: when the queue is empty). The signal is emitted when the queue is filled up. Both signals are emitted from the context of the streaming thread. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element **properties: References: [1] https://gstreamer.freedesktop.org/documentation/coreelements/queue.html?gi-language=python Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "queue", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALFIRBank.html">lal_firbank</a> element to a pipeline with useful default properties
[docs]def fir_bank(pipeline: pipetools.Pipeline, src: pipetools.Element, latency: int = None, fir_matrix: numpy.ndarray = None, time_domain: bool = None, block_stride: int = None) -> pipetools.Element: """Projects a single audio channel onto a bank of FIR filters to produce a multi-channel output Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element latency: int, default None, Filter latency in samples. fir_matrix: numpy.ndarray, default None, Array of impulse response vectors. Number of vectors (rows) in matrix sets number of output channels. All filters must have the same length. time_domain: bool, default None, Set to true to use time-domain (a.k.a. direct) convolution, set to false to use FFT-based convolution. For long filters FFT-based convolution is usually significantly faster than time-domain convolution but incurs a higher processing latency and requires more RAM. block_stride: int, default None, When using FFT convolutions, this many samples will be produced from each block. Smaller values decrease latency but increase computational cost. If very small values are desired, consider using time-domain convolution mode instead. References: Implementation: gstlal/gst/lal/gstlal_firbank.c Returns: Element """ properties = dict((name, value) for name, value in zip(("latency", "fir_matrix", "time_domain", "block_stride"), (latency, fir_matrix, time_domain, block_stride)) if value is not None) return pipetools.make_element_with_src(pipeline, src, "lal_firbank", **properties)
[docs]def td_whiten(pipeline: pipetools.Pipeline, src: pipetools.Element, latency: int = None, kernel: numpy.ndarray = None, taper_length: int = None): """Generic audio FIR filter with custom filter kernel and smooth kernel updates Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element latency: int, default None, Filter latency in samples. kernel: array, default None, The newest kernel. taper_length: int, default None, Number of samples for kernel transition. References: Implementation: gstlal-ugly/gst/lal/gstlal_tdwhiten.c Returns: Element """ # a taper length of 1/4 kernel length mimics the default # configuration of the FFT whitener if taper_length is None and kernel is not None: taper_length = len(kernel) // 4 properties = dict((name, value) for name, value in zip(("latency", "kernel", "taper_length"), (latency, kernel, taper_length)) if value is not None) return pipetools.make_element_with_src(pipeline, src, "lal_tdwhiten", **properties)
[docs]def trim(pipeline: pipetools.Pipeline, src: pipetools.Element, initial_offset: int = None, final_offset: int = None, inverse: bool = None) -> pipetools.Element: """Pass data only inside a region and mark everything else as gaps. The offsets are media-type specific. For audio buffers, it's the number of samples produced so far. For video buffers, it's generally the frame number. For compressed data, it could be the byte offset in a source or destination file. If inverse=true is set, only data *outside* of the specified region will pass, and data in the inside will be marked as gaps. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element initial_offset: int, default None, Only let data with offset bigger than this value pass. final_offset: int, default None, Only let data with offset smaller than this value pass inverse: bool, default None, If True only data *outside* the region will pass. References: Implementation: gstlal-ugly/gst/lal/gstlal_trim.c Returns: Element """ properties = dict((name, value) for name, value in zip(("initial-offset", "final-offset", "inverse"), (initial_offset, final_offset, inverse)) if value is not None) return pipetools.make_element_with_src(pipeline, src, "lal_trim", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALReblock.html">lal_reblock</a> element to a pipeline with useful default properties
[docs]def reblock(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element: """Chop audio buffers into smaller pieces to enforce a maximum allowed buffer duration Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element **properties: block_duration: int, Maximum output buffer duration in nanoseconds. Buffers may be smaller than this. References: Implementation: gstlal/gst/lal/gstlal_reblock.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_reblock", **properties)
[docs]def bit_vector_gen(pipeline: pipetools.Pipeline, src: pipetools.Element, bit_vector: int, **properties) -> pipetools.Element: """Generate a bit vector stream based on the value of a control input Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element bit_vector: int, Value to generate when output is \"on\" (output is 0 otherwise). Only as many low-order bits as are needed by the output word size will be used. **properties: References: Implementation: gstlal-ugly/gst/lal/gstlal_bitvectorgen.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_bitvectorgen", bit_vector=bit_vector, **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALMatrixMixer.html">lal_matrixmixer</a> element to a pipeline with useful default properties
[docs]def matrix_mixer(pipeline: pipetools.Pipeline, src: pipetools.Element, matrix: numpy.ndarray = None) -> pipetools.Element: """A many-to-many mixer Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element matrix: array, Matrix of mixing coefficients. Number of rows in matrix sets number of input channels, number of columns sets number of output channels. References: Implementation: gstlal/gst/lal/gstlal_matrixmixer.c Returns: Element """ if matrix is not None: return pipetools.make_element_with_src(pipeline, src, "lal_matrixmixer", matrix=matrix) else: return pipetools.make_element_with_src(pipeline, src, "lal_matrixmixer")
## Adds a <a href="@gstlalgtkdoc/GSTLALToggleComplex.html">lal_togglecomplex</a> element to a pipeline with useful default properties
[docs]def toggle_complex(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element: """Replace float caps with complex (with half the channels), complex with float (with twice the channels). Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element References: Implementation: gstlal/gst/lal/gstlal_togglecomplex.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_togglecomplex")
## Adds a <a href="@gstlalgtkdoc/GSTLALAutoChiSq.html">lal_autochisq</a> element to a pipeline with useful default properties
[docs]def auto_chisq(pipeline: pipetools.Pipeline, src: pipetools.Element, autocorrelation_matrix: numpy.ndarray = None, mask_matrix=None, latency: int = 0, snr_thresh: int = 0) -> pipetools.Element: """Computes the chisquared time series from a filter's autocorrelation Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element autocorrelation_matrix: array, default None, Array of complex autocorrelation vectors. Number of vectors (rows) in matrix sets number of channels. All vectors must have the same length. mask_matrix: array, default None, Array of integer mask vectors. Matrix must be the same size as the autocorrelation matrix. Only autocorrelation vector samples corresponding to non-zero samples in these vectors will be used to construct the \\chi^{2} statistic. If this matrix is not supplied, all autocorrelation samples are used. latency: int, default 0, Filter latency in samples. Must be in (-autocorrelation length, 0]. snr_thresh: float, default 0, SNR Threshold that determines a trigger. References: Implementation: gstlal/gst/lal/gstlal_autochisq.c Returns: Element """ properties = {} if autocorrelation_matrix is not None: properties.update({ "autocorrelation_matrix": pipeio.repack_complex_array_to_real(autocorrelation_matrix), "latency": latency, "snr_thresh": snr_thresh }) if mask_matrix is not None: properties["autocorrelation_mask_matrix"] = mask_matrix return pipetools.make_element_with_src(pipeline, src, "lal_autochisq", **properties)
[docs]def colorspace(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element: """Convert video frames between a great variety of video formats. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element References: Pre gstreamer-1.0 docs: (ffmpegcolorspace) https://www.freedesktop.org/software/gstreamer-sdk/data/docs/2012.5/gst-plugins-base-plugins-0.10/gst-plugins-base-plugins-ffmpegcolorspace.html Post gstreamer-1.0 docs: (videoconvert) https://gstreamer.freedesktop.org/documentation/videoconvert/index.html?gi-language=python Migration: https://gstreamer.freedesktop.org/documentation/application-development/appendix/porting-1-0.html?gi-language=c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "videoconvert")
## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audioconvert.html">audioconvert</a> element to a pipeline with useful default properties
[docs]def audio_convert(pipeline: pipetools.Pipeline, src: pipetools.Element, caps_string: str = None) -> pipetools.Element: """Audioconvert converts raw audio buffers between various possible formats. It supports integer to float conversion, width/depth conversion, signedness and endianness conversion and channel transformations (ie. upmixing and downmixing), as well as dithering and noise-shaping. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element caps_string: str, Caps string References: [1] https://gstreamer.freedesktop.org/documentation/audioconvert/index.html?gi-language=python Returns: Element """ elem = pipetools.make_element_with_src(pipeline, src, "audioconvert") if caps_string is not None: elem = filters.caps(pipeline, elem, caps_string) return elem
## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audiorate.html">audiorate</a> element to a pipeline with useful default properties
[docs]def audio_rate(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element: """This element takes an incoming stream of timestamped raw audio frames and produces a perfect stream by inserting or dropping samples as needed. This operation may be of use to link to elements that require or otherwise implicitly assume a perfect stream as they do not store timestamps, but derive this by some means (e.g. bitrate for some AVI cases). The properties , , and can be read to obtain information about number of input samples, output samples, dropped samples (i.e. the number of unused input samples) and inserted samples (i.e. the number of samples added to stream). When the property is set to FALSE, a GObject property notification will be emitted whenever one of the or values changes. This can potentially cause performance degradation. Note that property notification will happen from the streaming thread, so applications should be prepared for this. If the property is non-zero, and an incoming buffer's timestamp deviates less than the property indicates from what would make a 'perfect time', then no samples will be added or dropped. Note that the output is still guaranteed to be a perfect stream, which means that the incoming data is then simply shifted (by less than the indicated tolerance) to a perfect time. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element **properties: References: [1] https://gstreamer.freedesktop.org/documentation/audiorate/index.html?gi-language=python Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "audiorate", **properties)
[docs]def deglitch(pipeline: pipetools.Pipeline, src: pipetools.Element, segment_list: List[Tuple[pipetools.TimeGPS, pipetools.TimeGPS]]) -> pipetools.Element: """Removes glitches based on a segment list. Must be coalesced. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element segment_list: Iterable[Tuple[TimeGPS, TimeGPS]], list of segment start / stop times References: Implementation: gstlal-ugly/gst/lal/gstlaldeglitchfilter.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_deglitcher", segment_list=segments.segmentlist(segments.segment(a.ns(), b.ns()) for a, b in segment_list))
[docs]def check_timestamps(pipeline: pipetools.Pipeline, src: pipetools.Element, name: str = None, silent: bool = True, timestamp_fuzz: int = 1) -> pipetools.Element: """Timestamp Checker Pass-Through Element Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element name: str, name of check silent: bool, default True, Only report errors. timestamp_fuzz: int, Number of nanoseconds of timestamp<-->offset discrepancy to accept before reporting it. Timestamp<-->offset discrepancies of 1/2 a sample or more are always reported. References: Implementation: gstlal/gst/python/lal_checktimestamps.py Returns: Element """ if GSTLAL_CHECK_TIMESTAMPS == True: return pipetools.make_element_with_src(pipeline, src, "lal_checktimestamps", name=name, silent=silent, timestamp_fuzz=timestamp_fuzz) else: return src
## Adds a <a href="@gstlalgtkdoc/GSTLALPeak.html">lal_peak</a> element to a pipeline with useful default properties
[docs]def peak(pipeline: pipetools.Pipeline, src: pipetools.Element, n: int) -> pipetools.Element: """Find peaks in a time series every n samples Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element n: int, number of samples over which to identify peaks References: Implementation: gstlal/gst/lal/gstlal_peak.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_peak", n=n)
[docs]def denoise(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element: """Separate out stationary/non-stationary components from signals. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element **properties: References: Implementation: gstlal-ugly/gst/lal/gstlal_denoiser.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_denoiser", **properties)
[docs]def clean(pipeline: pipetools.Pipeline, src: pipetools.Element, threshold: float = 1.0) -> pipetools.Element: """Helper function for denoise that cleans for a stationary, threshold Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element threshold: float, default 1.0, The threshold in which to allow non-stationary signals in stationary component Returns: Element """ return denoise(pipeline, src, stationary=True, threshold=threshold)
[docs]def latency(pipeline: pipetools.Pipeline, src: pipetools.Element, name: str = None, silent: bool = False) -> pipetools.Element: """Outputs the current GPS time at time of data flow Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element name: str, name silent: bool, if True run silent References: Implementation: gstlal-ugly/gst/lal/gstlal_latency.c Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_latency", name=name, silent=silent)
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-capssetter.html">capssetter</a> element to a pipeline with useful default properties
[docs]def set_caps(pipeline: pipetools.Pipeline, src: pipetools.Element, caps: pipetools.Caps, **properties) -> pipetools.Element: """Sets or merges caps on a stream's buffers. That is, a buffer's caps are updated using (fields of) “caps”. Note that this may contain multiple structures (though not likely recommended), but each of these must be fixed (or will otherwise be rejected). If “join” is TRUE, then the incoming caps' mime-type is compared to the mime-type(s) of provided caps and only matching structure(s) are considered for updating. If “replace” is TRUE, then any caps update is preceded by clearing existing fields, making provided fields (as a whole) replace incoming ones. Otherwise, no clearing is performed, in which case provided fields are added/merged onto incoming caps Although this element might mainly serve as debug helper, it can also practically be used to correct a faulty pixel-aspect-ratio, or to modify a yuv fourcc value to effectively swap chroma components or such alike. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element caps: Gst.Caps, the caps **properties: References: [1] https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-good/html/gst-plugins-good-plugins-capssetter.html Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "capssetter", caps=Gst.Caps.from_string(caps), **properties)
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-progressreport.html">progress_report</a> element to a pipeline with useful default properties
[docs]def progress_report(pipeline: pipetools.Pipeline, src: pipetools.Element, name: str): """The progressreport element can be put into a pipeline to report progress, which is done by doing upstream duration and position queries in regular (real-time) intervals. Both the interval and the preferred query format can be specified via the and the property. Element messages containing a "progress" structure are posted on the bus whenever progress has been queried (since gst-plugins-good 0.10.6 only). Since the element was originally designed for debugging purposes, it will by default also print information about the current progress to the terminal. This can be prevented by setting the property to True. This element is most useful in transcoding pipelines or other situations where just querying the pipeline might not lead to the wanted result. For progress in TIME format, the element is best placed in a 'raw stream' section of the pipeline (or after any demuxers/decoders/parsers). Three more things should be pointed out: First, the element will only query progress when data flow happens. If data flow is stalled for some reason, no progress messages will be posted. Second, there are other elements (like qtdemux, for example) that may also post "progress" element messages on the bus. Applications should check the source of any element messages they receive, if needed. Third, applications should not take action on receiving notification of progress being 100%, they should only take action when they receive an EOS message (since the progress reported is in reference to an internal point of a pipeline and not the pipeline as a whole). Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element name: str, the name References: [1] https://gstreamer.freedesktop.org/documentation/debug/progressreport.html?gi-language=python Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "progressreport", do_query=False, name=name)
# TODO move to calibration specific module
[docs]def lho_coherent_null(pipeline: pipetools.Pipeline, H1src: pipetools.Element, H2src: pipetools.Element, H1_impulse, H1_latency, H2_impulse, H2_latency, srate: int) -> pipetools.Element: """LHO Coherent and Null Streams Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added H1src: Element, h1 source H2src: Element, h2 source H1_impulse: impulse response for H1 H1_latency: latency for H1 H2_impulse: impulse response for H2 H2_latency: latency for H2 srate: int, block stride for fir bank References: Implementation: gstlal/gst/python/lal_lho_coherent_null.py Returns: Element """ elem = pipetools.make_element_with_src(pipeline, None, "lal_lho_coherent_null", block_stride=srate, H1_impulse=H1_impulse, H2_impulse=H2_impulse, H1_latency=H1_latency, H2_latency=H2_latency) for peer, padname in ((H1src, "H1sink"), (H2src, "H2sink")): if isinstance(peer, Gst.Pad): peer.get_parent_element().link_pads(peer, elem, padname) elif peer is not None: peer.link_pads(None, elem, padname) return elem
# TODO move to calibration specific module
[docs]def mkcomputegamma(pipeline, dctrl, exc, cos, sin, **properties): """Compute Gamma Args: pipeline: dctrl: exc: cos: sin: **properties: References: Implementation: gstlal-calibration/gst/python/lal_compute_gamma.py Returns: Element """ elem = pipetools.make_element_with_src(pipeline, None, "lal_compute_gamma", **properties) for peer, padname in ((dctrl, "dctrl_sink"), (exc, "exc_sink"), (cos, "cos"), (sin, "sin")): if isinstance(peer, Gst.Pad): peer.get_parent_element().link_pads(peer, elem, padname) elif peer is not None: peer.link_pads(None, elem, padname) return elem
# TODO find source for lal_odc_to_dqv or delete
[docs]def mkodctodqv(pipeline, src, **properties): return pipetools.make_element_with_src(pipeline, src, "lal_odc_to_dqv", **properties)
# TODO move this somewhere else, it's not a transform element
[docs]def audioresample_variance_gain(quality: int, num: int, den: int) -> float: """Calculate the output gain of GStreamer's stock audioresample element. The audioresample element has a frequency response of unity "almost" all the way up the Nyquist frequency. However, for an input of unit variance Gaussian noise, the output will have a variance very slighly less than 1. The return value is the variance that the filter will produce for a given "quality" setting and sample rate. @param den The denomenator of the ratio of the input and output sample rates @param num The numerator of the ratio of the input and output sample rates @return The variance of the output signal for unit variance input The following example shows how to apply the correction factor using an audioamplify element. >>> from gstlal.pipeutil import * >>> from gstlal.pipeparts import audioresample_variance_gain >>> from gstlal import pipeio >>> import numpy >>> nsamples = 2 ** 17 >>> num = 2 >>> den = 1 >>> def handoff_handler(element, buffer, pad, (quality, filt_len, num, den)): ... out_latency = numpy.ceil(float(den) / num * filt_len) ... buf = pipeio.array_from_audio_buffer(buffer).flatten() ... std = numpy.std(buf[out_latency:-out_latency]) ... print "quality=%2d, filt_len=%3d, num=%d, den=%d, stdev=%.2f" % ( ... quality, filt_len, num, den, std) ... >>> for quality in range(11): ... pipeline = Gst.Pipeline() ... correction = 1/numpy.sqrt(audioresample_variance_gain(quality, num, den)) ... elems = mkelems_in_bin(pipeline, ... ('audiotestsrc', {'wave':'gaussian-noise','volume':1}), ... ('capsfilter', {'caps':Gst.Caps.from_string('audio/x-raw,format=F64LE,rate=%d' % num)}), ... ('audioresample', {'quality':quality}), ... ('capsfilter', {'caps':Gst.Caps.from_string('audio/x-raw,width=F64LE,rate=%d' % den)}), ... ('audioamplify', {'amplification':correction,'clipping-method':'none'}), ... ('fakesink', {'signal-handoffs':True, 'num-buffers':1}) ... ) ... filt_len = elems[2].get_property('filter-length') ... elems[0].set_property('samplesperbuffer', 2 * filt_len + nsamples) ... if elems[-1].connect_after('handoff', handoff_handler, (quality, filt_len, num, den)) < 1: ... raise RuntimeError ... try: ... if pipeline.set_state(Gst.State.PLAYING) is not Gst.State.CHANGE_ASYNC: ... raise RuntimeError ... if not pipeline.get_bus().poll(Gst.MessageType.EOS, -1): ... raise RuntimeError ... finally: ... if pipeline.set_state(Gst.State.NULL) is not Gst.StateChangeReturn.SUCCESS: ... raise RuntimeError ... quality= 0, filt_len= 8, num=2, den=1, stdev=1.00 quality= 1, filt_len= 16, num=2, den=1, stdev=1.00 quality= 2, filt_len= 32, num=2, den=1, stdev=1.00 quality= 3, filt_len= 48, num=2, den=1, stdev=1.00 quality= 4, filt_len= 64, num=2, den=1, stdev=1.00 quality= 5, filt_len= 80, num=2, den=1, stdev=1.00 quality= 6, filt_len= 96, num=2, den=1, stdev=1.00 quality= 7, filt_len=128, num=2, den=1, stdev=1.00 quality= 8, filt_len=160, num=2, den=1, stdev=1.00 quality= 9, filt_len=192, num=2, den=1, stdev=1.00 quality=10, filt_len=256, num=2, den=1, stdev=1.00 """ # These constants were measured with 2**22 samples. if num > den: # downsampling return den * ( 0.7224862140943990596, 0.7975021342935247892, 0.8547537598970208483, 0.8744072146753004704, 0.9075294214410336568, 0.9101523813406768859, 0.9280549396020538744, 0.9391809530012216189, 0.9539276644089494939, 0.9623083437067311285, 0.9684700588501590213 )[quality] / num elif num < den: # upsampling return ( 0.7539740617648067467, 0.8270076656536116122, 0.8835072979478705291, 0.8966758456219333651, 0.9253434087537378838, 0.9255866674042573239, 0.9346487800036394900, 0.9415331868209220190, 0.9524608799160205752, 0.9624372769883490220, 0.9704505626409354324 )[quality] else: # no change in sample rate return 1.