"""Module for general transformation elements
"""
from typing import Iterable, Tuple, List
import gi
import numpy
import os
gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
GObject.threads_init()
Gst.init(None)
from ligo import segments
from gstlal import pipeio
from gstlal.pipeparts import pipetools, filters
# a macro to turn on mkchecktimestamps
if "GSTLAL_CHECK_TIMESTAMPS" in os.environ:
GSTLAL_CHECK_TIMESTAMPS = True
else:
GSTLAL_CHECK_TIMESTAMPS = False
[docs]def integrate(pipeline: pipetools.Pipeline, src: pipetools.Element, template_dur: float = 1.0, **properties) -> pipetools.Element:
"""Integrates audio channel temp_dur length into the past.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
**properties:
References:
Implementation: gstlal-ugly/gst/lal/gstlal_integrate.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_integrate", template_dur=template_dur, **properties)
[docs]def mean(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element:
"""Compute mean
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
References:
Implementation: gstlal-ugly/gst/lal/gstlal_mean.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_mean", **properties)
[docs]def abs_(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element:
"""Compute absolute value
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "abs", **properties)
[docs]def pow(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element:
"""Compute power
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "pow", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALSumSquares.html">lal_sumsquares</a> element to a pipeline with useful default properties
[docs]def sum_squares(pipeline: pipetools.Pipeline, src: pipetools.Element, weights: pipetools.ValueArray = None) -> pipetools.Element:
"""Computes the weighted sum-of-squares of the input channels.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
weights:
ValueArray, default None, Vector of weights to use in sum. If no vector is provided weights of 1.0 are assumed,
otherwise the number of input channels must equal the vector length. The incoming channels are first multiplied
by the weights, then squared, then summed.
References:
Implementation gstlal/gstlal/gst/lal/gstlal_sumsquares.c
Returns:
Element
"""
if weights is not None:
return pipetools.make_element_with_src(pipeline, src, "lal_sumsquares", weights=weights)
else:
return pipetools.make_element_with_src(pipeline, src, "lal_sumsquares")
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-taginject.html">taginject</a> element to a pipeline with useful default properties
[docs]def tag_inject(pipeline: pipetools.Pipeline, src: pipetools.Element, tags: str) -> pipetools.Element:
"""Element that injects new metadata tags, but passes incoming data through unmodified.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
tags:
str, List of tags to inject into the target file
References:
[1] https://gstreamer.freedesktop.org/documentation/debug/taginject.html?gi-language=python
Returns:
Element, unmodified data with new tags
"""
return pipetools.make_element_with_src(pipeline, src, "taginject", tags=tags)
## Adds a <a href="@gstlalgtkdoc/GSTLALShift.html">lal_shift</a> element to a pipeline with useful default properties
[docs]def shift(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element:
"""Adjust segment events by +shift
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
**properties:
References:
Implementation: gstlal/gst/lal/gstlal_shift.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_shift", **properties)
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-audioamplify.html">audioamplify</a> element to a pipeline with useful default properties
[docs]def amplify(pipeline: pipetools.Pipeline, src: pipetools.Element, amplification: float) -> pipetools.Element:
"""Amplifies an audio stream by a given factor and allows the selection of different clipping modes. The
difference between the clipping modes is best evaluated by testing.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
amplification:
float, Factor of amplification
References:
[1] https://gstreamer.freedesktop.org/documentation/audiofx/audioamplify.html?gi-language=python
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "audioamplify", clipping_method=3, amplification=amplification)
## Adds a <a href="@gstlalgtkdoc/GSTLALAudioUnderSample.html">lal_audioundersample</a> element to a pipeline with useful default properties
[docs]def undersample(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element:
"""Undersamples an audio stream. Undersampling downsamples by taking every n-th sample, with no antialiasing or
low-pass filter. For data confined to a narrow frequency band, this transformation simultaneously downconverts
and downsamples the data (otherwise it does weird things). This element's output sample rate must be an integer
divisor of its input sample rate.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
References:
Implementation: gstlal/gst/lal/gstlal_audioundersample.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_audioundersample")
## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audioresample.html">audioresample</a> element to a pipeline with useful default properties
[docs]def resample(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element:
"""Resamples raw audio buffers to different sample rates using a configurable windowing function to enhance quality. By default,
the resampler uses a reduced sinc table, with cubic interpolation filling in the gaps. This ensures that the table does not
become too big. However, the interpolation increases the CPU usage considerably. As an alternative, a full sinc table can be
used. Doing so can drastically reduce CPU usage (4x faster with 44.1 -> 48 kHz conversions for example), at the cost of increased
memory consumption, plus the sinc table takes longer to initialize when the element is created. A third mode exists, which uses
the full table unless said table would become too large, in which case the interpolated one is used instead.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
**properties:
References:
[1] https://gstreamer.freedesktop.org/documentation/audioresample/index.html?gi-language=python
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "audioresample", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALInterpolator.html">lal_interpolator</a> element to a pipeline with useful default properties
[docs]def interpolator(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element:
"""Interpolates multichannel audio data using BLAS
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
**properties:
References:
Implementation: gstlal-ugly/gst/lal/gstlal_interpolator.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_interpolator", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALWhiten.html">lal_whiten</a> element to a pipeline with useful default properties
[docs]def whiten(pipeline: pipetools.Pipeline, src: pipetools.Element, psd_mode: int = 0, zero_pad: int = 0, fft_length: int = 8,
average_samples: int = 64, median_samples: int = 7, **properties) -> pipetools.Element:
"""A PSD estimator and time series whitener.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
psd_mode:
int, default 0, PSD estimation mode. Options are:
"GSTLAL_PSDMODE_RUNNING_AVERAGE", Use running average for PSD
"GSTLAL_PSDMODE_FIXED", Use fixed spectrum for PSD
zero_pad:
int, default 0, Length of the zero-padding to include on both sides of the FFT in seconds
fft_length:
int, default 8, Total length of the FFT convolution (including zero padding) in seconds
average_samples:
int, default 64, Number of FFTs to be used in PSD average
median_samples:
int, default 7, Number of FFTs to be used in PSD median history
**properties:
References:
Implementation: gstlal/gst/lal/gstlal_whiten.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_whiten", psd_mode=psd_mode, zero_pad=zero_pad, fft_length=fft_length, average_samples=average_samples,
median_samples=median_samples,
**properties)
## Adds a <a href="@gstdoc/gstreamer-plugins-tee.html">tee</a> element to a pipeline with useful default properties
[docs]def tee(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element:
"""Split data to multiple pads. Branching the data flow is useful when e.g. capturing a video where the video is shown on
the screen and also encoded and written to a file. Another example is playing music and hooking up a visualisation module.
One needs to use separate queue elements (or a multiqueue) in each branch to provide separate threads for each branch.
Otherwise a blocked dataflow in one branch would stall the other branches.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
References:
[1] https://gstreamer.freedesktop.org/documentation/coreelements/tee.html?gi-language=python
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "tee")
## Adds a <a href="@gstdoc/GstLALAdder.html">lal_adder</a> element to a pipeline configured for synchronous "sum" mode mixing.
[docs]def adder(pipeline: pipetools.Pipeline, srcs: Iterable[pipetools.Element], sync: bool = True, mix_mode: str = "sum", **properties) -> pipetools.Element:
"""The adder allows to mix several streams into one by adding the data. Mixed data is clamped to the min/max
values of the data format. If the element's sync property is TRUE the streams are mixed with the timestamps
synchronized. If the sync property is FALSE (the default, to be compatible with older versions), then the
first samples from each stream are added to produce the first sample of the output, the second samples are
added to produce the second sample of the output, and so on.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
srcs:
Iterable[Gst.Element], the source elements
sync:
bool, default True, Align the time stamps of input streams
mix_mode:
str, default 'sum', Algorithm for mixing the input streams, options: "sum", "product"
**properties:
References:
Implementation: gstlal/gst/lal/gstadder.c
Returns:
Element
"""
elem = pipetools.make_element_with_src(pipeline, None, "lal_adder", sync=sync, mix_mode=mix_mode, **properties)
if srcs is not None:
for src in srcs:
src.link(elem)
return elem
## Adds a <a href="@gstdoc/GstLALAdder.html">lal_adder</a> element to a pipeline configured for synchronous "product" mode mixing.
[docs]def multiplier(pipeline: pipetools.Pipeline, srcs: Iterable[pipetools.Element], sync: bool = True, mix_mode: str = "product", **properties) -> pipetools.Element:
"""Helper function around adder that defaults to a mix mode of "product"
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
srcs:
Iterable[Gst.Element], the source elements
sync:
bool, default True, Align the time stamps of input streams
mix_mode:
str, default 'product', Algorithm for mixing the input streams, options: "sum", "product"
**properties:
References:
Implementation: gstlal/gst/lal/gstadder.c
Returns:
Element
"""
return adder(pipeline, srcs, sync=sync, mix_mode=mix_mode, **properties)
## Adds a <a href="@gstdoc/gstreamer-plugins-queue.html">queue</a> element to a pipeline with useful default properties
[docs]def queue(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element:
"""Data is queued until one of the limits specified by the , and/or properties has been reached. Any attempt to push
more buffers into the queue will block the pushing thread until more space becomes available. The queue will create a
new thread on the source pad to decouple the processing on sink and source pad. You can query how many buffers are
queued by reading the property. You can track changes by connecting to the notify::current-level-buffers signal (which
like all signals will be emitted from the streaming thread). The same applies to the and properties. The default queue
size limits are 200 buffers, 10MB of data, or one second worth of data, whichever is reached first. As said earlier, the
queue blocks by default when one of the specified maximums (bytes, time, buffers) has been reached. You can set the
property to specify that instead of blocking it should leak (drop) new or old buffers. The signal is emitted when the
queue has less data than the specified minimum thresholds require (by default: when the queue is empty). The signal is
emitted when the queue is filled up. Both signals are emitted from the context of the streaming thread.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
**properties:
References:
[1] https://gstreamer.freedesktop.org/documentation/coreelements/queue.html?gi-language=python
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "queue", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALFIRBank.html">lal_firbank</a> element to a pipeline with useful default properties
[docs]def fir_bank(pipeline: pipetools.Pipeline, src: pipetools.Element, latency: int = None, fir_matrix: numpy.ndarray = None, time_domain: bool = None,
block_stride: int = None) -> pipetools.Element:
"""Projects a single audio channel onto a bank of FIR filters to produce a multi-channel output
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
latency:
int, default None, Filter latency in samples.
fir_matrix:
numpy.ndarray, default None, Array of impulse response vectors. Number of vectors (rows) in matrix sets number of output channels. All filters
must have the same length.
time_domain:
bool, default None, Set to true to use time-domain (a.k.a. direct) convolution, set to false to use FFT-based convolution.
For long filters FFT-based convolution is usually significantly faster than time-domain convolution but incurs a higher processing
latency and requires more RAM.
block_stride:
int, default None, When using FFT convolutions, this many samples will be produced from each block. Smaller values decrease latency
but increase computational cost. If very small values are desired, consider using time-domain convolution mode instead.
References:
Implementation: gstlal/gst/lal/gstlal_firbank.c
Returns:
Element
"""
properties = dict((name, value) for name, value in zip(("latency", "fir_matrix", "time_domain", "block_stride"),
(latency, fir_matrix, time_domain, block_stride)) if value is not None)
return pipetools.make_element_with_src(pipeline, src, "lal_firbank", **properties)
[docs]def td_whiten(pipeline: pipetools.Pipeline, src: pipetools.Element, latency: int = None, kernel: numpy.ndarray = None, taper_length: int = None):
"""Generic audio FIR filter with custom filter kernel and smooth kernel updates
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
latency:
int, default None, Filter latency in samples.
kernel:
array, default None, The newest kernel.
taper_length:
int, default None, Number of samples for kernel transition.
References:
Implementation: gstlal-ugly/gst/lal/gstlal_tdwhiten.c
Returns:
Element
"""
# a taper length of 1/4 kernel length mimics the default
# configuration of the FFT whitener
if taper_length is None and kernel is not None:
taper_length = len(kernel) // 4
properties = dict((name, value) for name, value in zip(("latency", "kernel", "taper_length"),
(latency, kernel, taper_length)) if value is not None)
return pipetools.make_element_with_src(pipeline, src, "lal_tdwhiten", **properties)
[docs]def trim(pipeline: pipetools.Pipeline, src: pipetools.Element, initial_offset: int = None, final_offset: int = None, inverse: bool = None) -> pipetools.Element:
"""Pass data only inside a region and mark everything else as gaps. The offsets are media-type specific. For audio
buffers, it's the number of samples produced so far. For video buffers, it's generally the frame number. For compressed
data, it could be the byte offset in a source or destination file. If inverse=true is set, only data *outside* of the
specified region will pass, and data in the inside will be marked as gaps.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
initial_offset:
int, default None, Only let data with offset bigger than this value pass.
final_offset:
int, default None, Only let data with offset smaller than this value pass
inverse:
bool, default None, If True only data *outside* the region will pass.
References:
Implementation: gstlal-ugly/gst/lal/gstlal_trim.c
Returns:
Element
"""
properties = dict((name, value) for name, value in zip(("initial-offset", "final-offset", "inverse"),
(initial_offset, final_offset, inverse)) if value is not None)
return pipetools.make_element_with_src(pipeline, src, "lal_trim", **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALReblock.html">lal_reblock</a> element to a pipeline with useful default properties
[docs]def reblock(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element:
"""Chop audio buffers into smaller pieces to enforce a maximum allowed buffer duration
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
**properties:
block_duration:
int, Maximum output buffer duration in nanoseconds. Buffers may be smaller than this.
References:
Implementation: gstlal/gst/lal/gstlal_reblock.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_reblock", **properties)
[docs]def bit_vector_gen(pipeline: pipetools.Pipeline, src: pipetools.Element, bit_vector: int, **properties) -> pipetools.Element:
"""Generate a bit vector stream based on the value of a control input
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
bit_vector:
int, Value to generate when output is \"on\" (output is 0 otherwise). Only as many
low-order bits as are needed by the output word size will be used.
**properties:
References:
Implementation: gstlal-ugly/gst/lal/gstlal_bitvectorgen.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_bitvectorgen", bit_vector=bit_vector, **properties)
## Adds a <a href="@gstlalgtkdoc/GSTLALMatrixMixer.html">lal_matrixmixer</a> element to a pipeline with useful default properties
[docs]def matrix_mixer(pipeline: pipetools.Pipeline, src: pipetools.Element, matrix: numpy.ndarray = None) -> pipetools.Element:
"""A many-to-many mixer
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
matrix:
array, Matrix of mixing coefficients. Number of rows in matrix sets number of input channels,
number of columns sets number of output channels.
References:
Implementation: gstlal/gst/lal/gstlal_matrixmixer.c
Returns:
Element
"""
if matrix is not None:
return pipetools.make_element_with_src(pipeline, src, "lal_matrixmixer", matrix=matrix)
else:
return pipetools.make_element_with_src(pipeline, src, "lal_matrixmixer")
## Adds a <a href="@gstlalgtkdoc/GSTLALToggleComplex.html">lal_togglecomplex</a> element to a pipeline with useful default properties
[docs]def toggle_complex(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element:
"""Replace float caps with complex (with half the channels), complex with float (with twice the channels).
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
References:
Implementation: gstlal/gst/lal/gstlal_togglecomplex.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_togglecomplex")
## Adds a <a href="@gstlalgtkdoc/GSTLALAutoChiSq.html">lal_autochisq</a> element to a pipeline with useful default properties
[docs]def auto_chisq(pipeline: pipetools.Pipeline, src: pipetools.Element, autocorrelation_matrix: numpy.ndarray = None, mask_matrix=None,
latency: int = 0, snr_thresh: int = 0) -> pipetools.Element:
"""Computes the chisquared time series from a filter's autocorrelation
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
autocorrelation_matrix:
array, default None, Array of complex autocorrelation vectors. Number of vectors (rows) in matrix sets
number of channels. All vectors must have the same length.
mask_matrix:
array, default None, Array of integer mask vectors. Matrix must be the same size as the autocorrelation
matrix. Only autocorrelation vector samples corresponding to non-zero samples in these vectors will be
used to construct the \\chi^{2} statistic. If this matrix is not supplied, all autocorrelation samples
are used.
latency:
int, default 0, Filter latency in samples. Must be in (-autocorrelation length, 0].
snr_thresh:
float, default 0, SNR Threshold that determines a trigger.
References:
Implementation: gstlal/gst/lal/gstlal_autochisq.c
Returns:
Element
"""
properties = {}
if autocorrelation_matrix is not None:
properties.update({
"autocorrelation_matrix": pipeio.repack_complex_array_to_real(autocorrelation_matrix),
"latency": latency,
"snr_thresh": snr_thresh
})
if mask_matrix is not None:
properties["autocorrelation_mask_matrix"] = mask_matrix
return pipetools.make_element_with_src(pipeline, src, "lal_autochisq", **properties)
[docs]def colorspace(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element:
"""Convert video frames between a great variety of video formats.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
References:
Pre gstreamer-1.0 docs: (ffmpegcolorspace) https://www.freedesktop.org/software/gstreamer-sdk/data/docs/2012.5/gst-plugins-base-plugins-0.10/gst-plugins-base-plugins-ffmpegcolorspace.html
Post gstreamer-1.0 docs: (videoconvert) https://gstreamer.freedesktop.org/documentation/videoconvert/index.html?gi-language=python
Migration: https://gstreamer.freedesktop.org/documentation/application-development/appendix/porting-1-0.html?gi-language=c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "videoconvert")
## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audioconvert.html">audioconvert</a> element to a pipeline with useful default properties
[docs]def audio_convert(pipeline: pipetools.Pipeline, src: pipetools.Element, caps_string: str = None) -> pipetools.Element:
"""Audioconvert converts raw audio buffers between various possible formats. It supports integer to float conversion,
width/depth conversion, signedness and endianness conversion and channel transformations (ie. upmixing and downmixing),
as well as dithering and noise-shaping.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
caps_string:
str, Caps string
References:
[1] https://gstreamer.freedesktop.org/documentation/audioconvert/index.html?gi-language=python
Returns:
Element
"""
elem = pipetools.make_element_with_src(pipeline, src, "audioconvert")
if caps_string is not None:
elem = filters.caps(pipeline, elem, caps_string)
return elem
## Adds a <a href="@gstpluginsbasedoc/gst-plugins-base-plugins-audiorate.html">audiorate</a> element to a pipeline with useful default properties
[docs]def audio_rate(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element:
"""This element takes an incoming stream of timestamped raw audio frames and produces a perfect stream by
inserting or dropping samples as needed. This operation may be of use to link to elements that require or
otherwise implicitly assume a perfect stream as they do not store timestamps, but derive this by some means
(e.g. bitrate for some AVI cases). The properties , , and can be read to obtain information about number of
input samples, output samples, dropped samples (i.e. the number of unused input samples) and inserted samples
(i.e. the number of samples added to stream). When the property is set to FALSE, a GObject property notification
will be emitted whenever one of the or values changes. This can potentially cause performance degradation. Note
that property notification will happen from the streaming thread, so applications should be prepared for this. If
the property is non-zero, and an incoming buffer's timestamp deviates less than the property indicates from what
would make a 'perfect time', then no samples will be added or dropped. Note that the output is still guaranteed to
be a perfect stream, which means that the incoming data is then simply shifted (by less than the indicated tolerance)
to a perfect time.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
**properties:
References:
[1] https://gstreamer.freedesktop.org/documentation/audiorate/index.html?gi-language=python
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "audiorate", **properties)
[docs]def deglitch(pipeline: pipetools.Pipeline, src: pipetools.Element, segment_list: List[Tuple[pipetools.TimeGPS, pipetools.TimeGPS]]) -> pipetools.Element:
"""Removes glitches based on a segment list. Must be coalesced.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
segment_list:
Iterable[Tuple[TimeGPS, TimeGPS]], list of segment start / stop times
References:
Implementation: gstlal-ugly/gst/lal/gstlaldeglitchfilter.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_deglitcher", segment_list=segments.segmentlist(segments.segment(a.ns(), b.ns()) for a, b in segment_list))
[docs]def check_timestamps(pipeline: pipetools.Pipeline, src: pipetools.Element, name: str = None, silent: bool = True, timestamp_fuzz: int = 1) -> pipetools.Element:
"""Timestamp Checker Pass-Through Element
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
name:
str, name of check
silent:
bool, default True, Only report errors.
timestamp_fuzz:
int, Number of nanoseconds of timestamp<-->offset discrepancy to accept before reporting it. Timestamp<-->offset discrepancies
of 1/2 a sample or more are always reported.
References:
Implementation: gstlal/gst/python/lal_checktimestamps.py
Returns:
Element
"""
if GSTLAL_CHECK_TIMESTAMPS == True:
return pipetools.make_element_with_src(pipeline, src, "lal_checktimestamps", name=name, silent=silent, timestamp_fuzz=timestamp_fuzz)
else:
return src
## Adds a <a href="@gstlalgtkdoc/GSTLALPeak.html">lal_peak</a> element to a pipeline with useful default properties
[docs]def peak(pipeline: pipetools.Pipeline, src: pipetools.Element, n: int) -> pipetools.Element:
"""Find peaks in a time series every n samples
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
n:
int, number of samples over which to identify peaks
References:
Implementation: gstlal/gst/lal/gstlal_peak.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_peak", n=n)
[docs]def denoise(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element:
"""Separate out stationary/non-stationary components from signals.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
**properties:
References:
Implementation: gstlal-ugly/gst/lal/gstlal_denoiser.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_denoiser", **properties)
[docs]def clean(pipeline: pipetools.Pipeline, src: pipetools.Element, threshold: float = 1.0) -> pipetools.Element:
"""Helper function for denoise that cleans for a stationary, threshold
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
threshold:
float, default 1.0, The threshold in which to allow non-stationary signals in stationary component
Returns:
Element
"""
return denoise(pipeline, src, stationary=True, threshold=threshold)
[docs]def latency(pipeline: pipetools.Pipeline, src: pipetools.Element, name: str = None, silent: bool = False) -> pipetools.Element:
"""Outputs the current GPS time at time of data flow
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
name:
str, name
silent:
bool, if True run silent
References:
Implementation: gstlal-ugly/gst/lal/gstlal_latency.c
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_latency", name=name, silent=silent)
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-capssetter.html">capssetter</a> element to a pipeline with useful default properties
[docs]def set_caps(pipeline: pipetools.Pipeline, src: pipetools.Element, caps: pipetools.Caps, **properties) -> pipetools.Element:
"""Sets or merges caps on a stream's buffers. That is, a buffer's caps are updated using (fields of) “caps”. Note that this
may contain multiple structures (though not likely recommended), but each of these must be fixed (or will otherwise be rejected).
If “join” is TRUE, then the incoming caps' mime-type is compared to the mime-type(s) of provided caps and only matching
structure(s) are considered for updating.
If “replace” is TRUE, then any caps update is preceded by clearing existing fields, making provided fields (as a whole)
replace incoming ones. Otherwise, no clearing is performed, in which case provided fields are added/merged onto incoming caps
Although this element might mainly serve as debug helper, it can also practically be used to correct a faulty pixel-aspect-ratio,
or to modify a yuv fourcc value to effectively swap chroma components or such alike.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
caps:
Gst.Caps, the caps
**properties:
References:
[1] https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-good/html/gst-plugins-good-plugins-capssetter.html
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "capssetter", caps=Gst.Caps.from_string(caps), **properties)
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-progressreport.html">progress_report</a> element to a pipeline with useful default properties
[docs]def progress_report(pipeline: pipetools.Pipeline, src: pipetools.Element, name: str):
"""The progressreport element can be put into a pipeline to report progress, which is done by doing upstream
duration and position queries in regular (real-time) intervals. Both the interval and the preferred query format
can be specified via the and the property.
Element messages containing a "progress" structure are posted on the bus whenever progress has been queried
(since gst-plugins-good 0.10.6 only).
Since the element was originally designed for debugging purposes, it will by default also print information
about the current progress to the terminal. This can be prevented by setting the property to True.
This element is most useful in transcoding pipelines or other situations where just querying the pipeline might
not lead to the wanted result. For progress in TIME format, the element is best placed in a 'raw stream' section
of the pipeline (or after any demuxers/decoders/parsers).
Three more things should be pointed out:
First, the element will only query progress when data flow happens. If data flow is stalled for some reason, no
progress messages will be posted.
Second, there are other elements (like qtdemux, for example) that may also post "progress" element messages on the bus.
Applications should check the source of any element messages they receive, if needed.
Third, applications should not take action on receiving notification of progress being 100%, they should only take action
when they receive an EOS message (since the progress reported is in reference to an internal point of a pipeline and
not the pipeline as a whole).
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
name:
str, the name
References:
[1] https://gstreamer.freedesktop.org/documentation/debug/progressreport.html?gi-language=python
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "progressreport", do_query=False, name=name)
# TODO move to calibration specific module
[docs]def lho_coherent_null(pipeline: pipetools.Pipeline, H1src: pipetools.Element, H2src: pipetools.Element, H1_impulse, H1_latency, H2_impulse, H2_latency,
srate: int) -> pipetools.Element:
"""LHO Coherent and Null Streams
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
H1src:
Element, h1 source
H2src:
Element, h2 source
H1_impulse:
impulse response for H1
H1_latency:
latency for H1
H2_impulse:
impulse response for H2
H2_latency:
latency for H2
srate:
int, block stride for fir bank
References:
Implementation: gstlal/gst/python/lal_lho_coherent_null.py
Returns:
Element
"""
elem = pipetools.make_element_with_src(pipeline, None, "lal_lho_coherent_null", block_stride=srate, H1_impulse=H1_impulse, H2_impulse=H2_impulse, H1_latency=H1_latency,
H2_latency=H2_latency)
for peer, padname in ((H1src, "H1sink"), (H2src, "H2sink")):
if isinstance(peer, Gst.Pad):
peer.get_parent_element().link_pads(peer, elem, padname)
elif peer is not None:
peer.link_pads(None, elem, padname)
return elem
# TODO move to calibration specific module
[docs]def mkcomputegamma(pipeline, dctrl, exc, cos, sin, **properties):
"""Compute Gamma
Args:
pipeline:
dctrl:
exc:
cos:
sin:
**properties:
References:
Implementation: gstlal-calibration/gst/python/lal_compute_gamma.py
Returns:
Element
"""
elem = pipetools.make_element_with_src(pipeline, None, "lal_compute_gamma", **properties)
for peer, padname in ((dctrl, "dctrl_sink"), (exc, "exc_sink"), (cos, "cos"), (sin, "sin")):
if isinstance(peer, Gst.Pad):
peer.get_parent_element().link_pads(peer, elem, padname)
elif peer is not None:
peer.link_pads(None, elem, padname)
return elem
# TODO find source for lal_odc_to_dqv or delete
[docs]def mkodctodqv(pipeline, src, **properties):
return pipetools.make_element_with_src(pipeline, src, "lal_odc_to_dqv", **properties)
# TODO move this somewhere else, it's not a transform element
[docs]def audioresample_variance_gain(quality: int, num: int, den: int) -> float:
"""Calculate the output gain of GStreamer's stock audioresample element.
The audioresample element has a frequency response of unity "almost" all the
way up the Nyquist frequency. However, for an input of unit variance
Gaussian noise, the output will have a variance very slighly less than 1.
The return value is the variance that the filter will produce for a given
"quality" setting and sample rate.
@param den The denomenator of the ratio of the input and output sample rates
@param num The numerator of the ratio of the input and output sample rates
@return The variance of the output signal for unit variance input
The following example shows how to apply the correction factor using an
audioamplify element.
>>> from gstlal.pipeutil import *
>>> from gstlal.pipeparts import audioresample_variance_gain
>>> from gstlal import pipeio
>>> import numpy
>>> nsamples = 2 ** 17
>>> num = 2
>>> den = 1
>>> def handoff_handler(element, buffer, pad, (quality, filt_len, num, den)):
... out_latency = numpy.ceil(float(den) / num * filt_len)
... buf = pipeio.array_from_audio_buffer(buffer).flatten()
... std = numpy.std(buf[out_latency:-out_latency])
... print "quality=%2d, filt_len=%3d, num=%d, den=%d, stdev=%.2f" % (
... quality, filt_len, num, den, std)
...
>>> for quality in range(11):
... pipeline = Gst.Pipeline()
... correction = 1/numpy.sqrt(audioresample_variance_gain(quality, num, den))
... elems = mkelems_in_bin(pipeline,
... ('audiotestsrc', {'wave':'gaussian-noise','volume':1}),
... ('capsfilter', {'caps':Gst.Caps.from_string('audio/x-raw,format=F64LE,rate=%d' % num)}),
... ('audioresample', {'quality':quality}),
... ('capsfilter', {'caps':Gst.Caps.from_string('audio/x-raw,width=F64LE,rate=%d' % den)}),
... ('audioamplify', {'amplification':correction,'clipping-method':'none'}),
... ('fakesink', {'signal-handoffs':True, 'num-buffers':1})
... )
... filt_len = elems[2].get_property('filter-length')
... elems[0].set_property('samplesperbuffer', 2 * filt_len + nsamples)
... if elems[-1].connect_after('handoff', handoff_handler, (quality, filt_len, num, den)) < 1:
... raise RuntimeError
... try:
... if pipeline.set_state(Gst.State.PLAYING) is not Gst.State.CHANGE_ASYNC:
... raise RuntimeError
... if not pipeline.get_bus().poll(Gst.MessageType.EOS, -1):
... raise RuntimeError
... finally:
... if pipeline.set_state(Gst.State.NULL) is not Gst.StateChangeReturn.SUCCESS:
... raise RuntimeError
...
quality= 0, filt_len= 8, num=2, den=1, stdev=1.00
quality= 1, filt_len= 16, num=2, den=1, stdev=1.00
quality= 2, filt_len= 32, num=2, den=1, stdev=1.00
quality= 3, filt_len= 48, num=2, den=1, stdev=1.00
quality= 4, filt_len= 64, num=2, den=1, stdev=1.00
quality= 5, filt_len= 80, num=2, den=1, stdev=1.00
quality= 6, filt_len= 96, num=2, den=1, stdev=1.00
quality= 7, filt_len=128, num=2, den=1, stdev=1.00
quality= 8, filt_len=160, num=2, den=1, stdev=1.00
quality= 9, filt_len=192, num=2, den=1, stdev=1.00
quality=10, filt_len=256, num=2, den=1, stdev=1.00
"""
# These constants were measured with 2**22 samples.
if num > den: # downsampling
return den * (
0.7224862140943990596,
0.7975021342935247892,
0.8547537598970208483,
0.8744072146753004704,
0.9075294214410336568,
0.9101523813406768859,
0.9280549396020538744,
0.9391809530012216189,
0.9539276644089494939,
0.9623083437067311285,
0.9684700588501590213
)[quality] / num
elif num < den: # upsampling
return (
0.7539740617648067467,
0.8270076656536116122,
0.8835072979478705291,
0.8966758456219333651,
0.9253434087537378838,
0.9255866674042573239,
0.9346487800036394900,
0.9415331868209220190,
0.9524608799160205752,
0.9624372769883490220,
0.9704505626409354324
)[quality]
else: # no change in sample rate
return 1.