# Copyright (C) 2009--2013 Kipp Cannon, Chad Hanna, Drew Keppel
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import sys
import os
import optparse
import math
import numpy
import gi
gi.require_version('Gst', '1.0')
gi.require_version("GstAudio", "1.0")
from gi.repository import GObject
from gi.repository import Gst, GstAudio
import lal
from ligo import segments
from gstlal import kernels
from gstlal import pipeparts
from gstlal.psd import interpolate_psd
from gstlal import pipeio
__doc__ = """
**Review Status**
+------------------------------------------------+------------------------------------------+------------+
| Names | Hash | Date |
+================================================+==========================================+============+
| Florent, Sathya, Duncan Me, Jolien, Kipp, Chad | 8a6ea41398be79c00bdc27456ddeb1b590b0f68e | 2014-06-18 |
+------------------------------------------------+------------------------------------------+------------+
"""
# a macro to switch between a conventional whitener and a fir whitener below
try:
if int(os.environ["GSTLAL_FIR_WHITEN"]):
FIR_WHITENER = True
else:
FIR_WHITENER = False
except KeyError as e:
print("You must set the environment variable GSTLAL_FIR_WHITEN to either 0 or 1. 1 enables causal whitening. 0 is the traditional acausal whitening filter", file=sys.stderr)
raise
[docs]def mkcondition(pipeline, src, target_rate, instrument, psd = None, psd_fft_length = 32, ht_gate_threshold = float("inf"), idq_gate_threshold = float("inf"), veto_segments = None, nxydump_segment = None, track_psd = False, block_duration = Gst.SECOND // 4, zero_pad = None, width = 64, statevector = None, dqvector = None, idq_series = None, idq_channel_name = None, fir_whiten_reference_psd = None, track_latency = False):
"""
Build pipeline stage to whiten and downsample h(t).
* pipeline: the gstreamer pipeline to add this to
* src: the gstreamer element that will be providing data to this
* target_rate: the requested sample rate.
* instrument: the instrument to process
* psd: a psd frequency series
* psd_fft_length: length of fft used for whitening
* ht_gate_threshold: gate h(t) if it crosses this value
* veto_segments: segments to mark as gaps after whitening
* track_psd: decide whether to dynamically track the spectrum or use the fixed spectrum provided
* width: type convert to either 32 or 64 bit float
* fir_whiten_reference_psd: when using FIR whitener, use this PSD to define desired desired phase response
**Gstreamer graph describing this function**
.. graphviz::
digraph mkbasicsrc {
rankdir = LR;
compound=true;
node [shape=record fontsize=10 fontname="Verdana"];
edge [fontsize=8 fontname="Verdana"];
capsfilter1 ;
audioresample ;
capsfilter2 ;
reblock ;
whiten ;
audioconvert ;
capsfilter3 ;
"segmentsrcgate()" [label="segmentsrcgate() \\n [iff veto segment list provided]", style=filled, color=lightgrey];
tee ;
audioamplifyr1 ;
capsfilterr1 ;
htgater1 [label="htgate() \\n [iff ht gate specified]", style=filled, color=lightgrey];
tee1 ;
audioamplifyr2 ;
capsfilterr2 ;
htgater2 [label="htgate() \\n [iff ht gate specified]", style=filled, color=lightgrey];
tee2 ;
audioamplify_rn ;
capsfilter_rn ;
htgate_rn [style=filled, color=lightgrey, label="htgate() \\n [iff ht gate specified]"];
tee ;
// nodes
"\<src\>" -> capsfilter1 -> audioresample;
audioresample -> capsfilter2;
capsfilter2 -> reblock;
reblock -> whiten;
whiten -> audioconvert;
audioconvert -> capsfilter3;
capsfilter3 -> "segmentsrcgate()";
"segmentsrcgate()" -> tee;
tee -> audioamplifyr1 [label="Rate 1"];
audioamplifyr1 -> capsfilterr1;
capsfilterr1 -> htgater1;
htgater1 -> tee1 -> "\<return\> 1";
tee -> audioamplifyr2 [label="Rate 2"];
audioamplifyr2 -> capsfilterr2;
capsfilterr2 -> htgater2;
htgater2 -> tee2 -> "\<return\> 2";
tee -> audioamplify_rn [label="Rate N"];
audioamplify_rn -> capsfilter_rn;
capsfilter_rn -> htgate_rn;
htgate_rn -> tee_n -> "\<return\> 3";
}
"""
#
# input sanity checks
#
if psd is None and not track_psd:
raise ValueError("must enable track_psd when psd is None")
if int(psd_fft_length) != psd_fft_length:
raise ValueError("psd_fft_length must be an integer")
psd_fft_length = int(psd_fft_length)
if int(block_duration) != block_duration:
raise ValueError("block duration must be an integer")
#
# set default whitener zero-padding if needed
#
if zero_pad is None:
if FIR_WHITENER:
# in this configuration we are not asking the
# whitener to reassemble an output time series
# (that we care about) so we disable zero-padding
# to get the most information from the whitener's
# FFT blocks.
zero_pad = 0
elif psd_fft_length % 4:
raise ValueError("default whitener zero-padding requires psd_fft_length to be multiple of 4")
else:
zero_pad = psd_fft_length // 4
#
# down-sample to highest of target sample rates. we include a caps
# filter upstream of the resampler to ensure that this is, infact,
# *down*-sampling. if the source time series has a lower sample
# rate than the highest target sample rate the resampler will
# become an upsampler, and the result will likely interact poorly
# with the whitener as it tries to ampify the non-existant
# high-frequency components, possibly adding significant numerical
# noise to its output. if you see errors about being unable to
# negotiate a format from this stage in the pipeline, it is because
# you are asking for output sample rates that are higher than the
# sample rate of your data source.
#
head = pipeparts.mkcapsfilter(pipeline, src, f"audio/x-raw, rate=[{target_rate:d},MAX]")
head = pipeparts.mkinterpolator(pipeline, head)
head = pipeparts.mkaudioconvert(pipeline, head)
head = pipeparts.mkchecktimestamps(pipeline, head, f"{instrument}_timestamps_{target_rate:d}_hoft")
#
# construct whitener.
#
if FIR_WHITENER:
head = pipeparts.mktee(pipeline, head)
whiten = pipeparts.mkwhiten(pipeline, head, fft_length = psd_fft_length, zero_pad = zero_pad, average_samples = 64, median_samples = 7, expand_gaps = True, name = "lal_whiten_%s" % instrument)
pipeparts.mkfakesink(pipeline, whiten)
# high pass filter
kernel = kernels.one_second_highpass_kernel(max(rates), cutoff = 12)
block_stride = block_duration * target_rate // Gst.SECOND
assert len(kernel) % 2 == 1, "high-pass filter length is not odd"
head = pipeparts.mkfirbank(pipeline, head, fir_matrix = numpy.array(kernel, ndmin = 2), block_stride = block_stride, time_domain = False, latency = (len(kernel) - 1) // 2)
# FIR filter for whitening kernel
head = pipeparts.mktdwhiten(pipeline, head, kernel = numpy.zeros(1 + target_rate * psd_fft_length, dtype=numpy.float64), latency = 0)
# compute whitening kernel from PSD
def set_fir_psd(whiten, pspec, firelem, psd_fir_kernel):
psd_data = numpy.array(whiten.get_property("mean-psd"))
psd = lal.CreateREAL8FrequencySeries(
name = "psd",
epoch = lal.LIGOTimeGPS(0),
f0 = 0.0,
deltaF = whiten.get_property("delta-f"),
sampleUnits = lal.Unit(whiten.get_property("psd-units")),
length = len(psd_data)
)
psd.data.data = psd_data
kernel, latency, sample_rate = psd_fir_kernel.psd_to_linear_phase_whitening_fir_kernel(psd)
kernel, phase = psd_fir_kernel.linear_phase_fir_kernel_to_minimum_phase_whitening_fir_kernel(kernel, sample_rate)
firelem.set_property("kernel", pipeio.format_property(kernel))
firkernel = kernels.PSDFirKernel()
if fir_whiten_reference_psd is not None:
assert fir_whiten_reference_psd.f0 == 0.
# interpolate the reference phase PSD if its
# resolution doesn't match what we'll eventually
# require it to be.
if psd_fft_length != round(1. / fir_whiten_reference_psd.deltaF):
fir_whiten_reference_psd = interpolate_psd(fir_whiten_reference_psd, 1. / psd_fft_length)
# confirm that the reference phase PSD's Nyquist is
# sufficiently high, then reduce it to the required
# Nyquist if needed.
assert (psd_fft_length * target_rate) // 2 + 1 <= len(fir_whiten_reference_psd.data.data), "fir_whiten_reference_psd Nyquist too low"
if (psd_fft_length * target_rate) // 2 + 1 < len(fir_whiten_reference_psd.data.data):
fir_whiten_reference_psd = lal.CutREAL8FrequencySeries(fir_whiten_reference_psd, 0, (psd_fft_length * target_rate) // 2 + 1)
# set the reference phase PSD
firkernel.set_phase(fir_whiten_reference_psd)
whiten.connect_after("notify::mean-psd", set_fir_psd, head, firkernel)
# Gate after gaps. the queue sizes on the control inputs
# need only be large enough to hold the state vector
# streams until they are required. the streams will be
# consumed immediately when needed, so there is no risk
# that these queues add to the latency, so make them
# generously large.
# FIXME the -target_rate extra padding is for the high pass
# filter: NOTE it also needs to be big enough for the
# downsampling filter, but that is typically smaller than the
# HP filter (192 samples at Qual 9)
# FIXME: this first queue should not be needed. what is
# going on!?
if statevector is not None or dqvector is not None:
head = pipeparts.mkqueue(pipeline, head, max_size_buffers = 0, max_size_bytes = 0, max_size_time = Gst.SECOND * (psd_fft_length + 2))
if statevector is not None:
head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, statevector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -target_rate, attack_length = -target_rate * (psd_fft_length + 1))
if dqvector is not None:
head = pipeparts.mkgate(pipeline, head, control = pipeparts.mkqueue(pipeline, dqvector, max_size_buffers = 0, max_size_bytes = 0, max_size_time = 0), default_state = False, threshold = 1, hold_length = -target_rate, attack_length = -target_rate * (psd_fft_length + 1))
head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_fir" % instrument)
#head = pipeparts.mknxydumpsinktee(pipeline, head, filename = "after_mkfirbank.txt")
else:
# FIXME: we should require fir_whiten_reference_psd to be
# None in this code path for safety, but that's hard to do
# since the calling code would need to know what
# environment variable is being used to select the mode,
# and we don't want to be duplicating that code all over
# the place
#
# add a reblock element. the whitener's gap support isn't
# 100% yet and giving it smaller input buffers works around
# the remaining weaknesses (namely that when it sees a gap
# buffer large enough to drain its internal history, it
# doesn't know enough to produce a short non-gap buffer to
# drain its history followed by a gap buffer, it just
# produces one huge non-gap buffer that's mostly zeros).
# this is not required in the FIR-whitener case because
# there we don't use the whitener's output time series for
# anything.
#
head = pipeparts.mkreblock(pipeline, head, block_duration = block_duration)
head = whiten = pipeparts.mkwhiten(pipeline, head, fft_length = psd_fft_length, zero_pad = zero_pad, average_samples = 64, median_samples = 7, expand_gaps = True, name = "lal_whiten_%s" % instrument)
# make the buffers going downstream smaller, this can
# really help with RAM
head = pipeparts.mkreblock(pipeline, head, block_duration = block_duration)
#
# enable/disable PSD tracking
#
whiten.set_property("psd-mode", 0 if track_psd else 1)
#
# install signal handler to retrieve \Delta f and f_{Nyquist}
# whenever they are known and/or change, resample the user-supplied
# PSD, and install it into the whitener.
#
if psd is not None:
def psd_units_or_resolution_changed(elem, pspec, psd):
# make sure units are set, compute scale factor
units = lal.Unit(elem.get_property("psd-units"))
if units == lal.DimensionlessUnit:
return
scale = float(psd.sampleUnits / units)
# get frequency resolution and number of bins
delta_f = elem.get_property("delta-f")
n = int(round(elem.get_property("f-nyquist") / delta_f) + 1)
# interpolate, rescale, and install PSD
psd = interpolate_psd(psd, delta_f)
elem.set_property("mean-psd", pipeio.format_property(psd.data.data[:n] * scale))
whiten.connect_after("notify::f-nyquist", psd_units_or_resolution_changed, psd)
whiten.connect_after("notify::delta-f", psd_units_or_resolution_changed, psd)
whiten.connect_after("notify::psd-units", psd_units_or_resolution_changed, psd)
#
# convert to desired precision
#
head = pipeparts.mkaudioconvert(pipeline, head)
if width == 64:
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (target_rate, GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F64)))
elif width == 32:
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d, format=%s" % (target_rate, GstAudio.AudioFormat.to_string(GstAudio.AudioFormat.F32)))
else:
raise ValueError("invalid width: %d" % width)
head = pipeparts.mkchecktimestamps(pipeline, head, "%s_timestamps_%d_whitehoft" % (instrument, target_rate))
#
# optionally add vetoes
#
if veto_segments is not None:
long_veto_segments = segments.segmentlist(veto_segments).protract(0.25).coalesce()
head = pipeparts.mkdeglitcher(pipeline, head, long_veto_segments)
#
# optional gate on whitened h(t) amplitude controlled by iDQ timeseries.
# Must come before the h(t) self gating b/c output of h(t) gating is
# used to define state of instruments in analysis.
# attack and hold are made to be 1/2 second or 1 sample, whichever is larger
#
idq_gate_window = max(target_rate // 4, 1) # samples
if idq_series is not None and idq_gate_threshold is not None:
if idq_channel_name:
_, channel_type = idq_channel_name.split('-')
invert_idq_control = channel_type.split('_')[0] != 'FAP'
#control = pipeparts.mkintegrate(pipeline, idq_series, 1.0, name='gstlalintegrate_%s'%instrument)
control = pipeparts.mkqueue(pipeline, idq_series, max_size_time = 12 * Gst.SECOND, max_size_bytes = 0, max_size_buffers = 0)
head = mkhtgate(pipeline, head, control = control, threshold = idq_gate_threshold, hold_length = idq_gate_window, attack_length = idq_gate_window, name = "%s_idq_gate" % instrument, invert_control = invert_idq_control)
#
# optional gate on whitened h(t) amplitude. attack and hold are
# made to be 1/2 second or 1 sample, whichever is larger
#
# FIXME: this could be omitted if ht_gate_threshold is None, but
# we need to collect whitened h(t) segments, however something
# could be done to collect those if these gates aren't here.
ht_gate_window = max(target_rate // 2, 1) # samples
head = mkhtgate(pipeline, head, threshold = ht_gate_threshold if ht_gate_threshold is not None else float("+inf"), hold_length = ht_gate_window, attack_length = ht_gate_window, name = "%s_ht_gate" % instrument)
# emit signals so that a user can latch on to them
head.set_property("emit-signals", True)
if track_latency:
head = pipeparts.mklatency(pipeline, head, name = "%s_whitening_latency" % instrument, silent = True)
return head
[docs]def mkmultiband(pipeline, head, rates, instrument = None, unit_normalize = True):
"""
Build pipeline stage to multiband a stream.
* rates: a list of the requested sample rates, e.g., [512,1024].
"""
#
# tee for highest sample rate stream
#
head = {max(rates): pipeparts.mktee(pipeline, head)}
#
# down-sample whitened time series to remaining target sample rates
# while applying an amplitude correction to adjust for low-pass
# filter roll-off. we also scale by \sqrt{original rate / new
# rate}. this is done to preserve the square magnitude of the time
# series --- the inner product of the time series with itself.
# really what we want is for
#
# \int v_{1}(t) v_{2}(t) \diff t
# \approx \sum v_{1}(t) v_{2}(t) \Delta t
#
# to be preserved across different sample rates, i.e. for different
# \Delta t. what we do is rescale the time series and ignore
# \Delta t, so we put 1/2 factor of the ratio of the \Delta t's
# into the h(t) time series here, and, later, another 1/2 factor
# into the template when it gets downsampled.
#
# by design, the output of the whitener is a unit-variance time
# series. however, downsampling it reduces the variance due to the
# removal of some frequency components. we require the input to
# the orthogonal filter banks to be unit variance, therefore a
# correction factor is applied via an audio amplify element to
# adjust for the reduction in variance due to the downsampler.
#
for rate in sorted(set(rates))[:-1]:
# downsample. make sure each output stream is unit
# normalized, otherwise the audio resampler removes power
# according to the rate difference and filter rolloff
if unit_normalize:
# NOTE the interpolator is about as good as the
# audioresampler at quality 10, hence the 10.
head[rate] = pipeparts.mkaudioamplify(pipeline, head[max(rates)], 1. / math.sqrt(pipeparts.audioresample_variance_gain(10, max(rates), rate)))
else:
head[rate] = head[max(rates)]
head[rate] = pipeparts.mkcapsfilter(pipeline, pipeparts.mkinterpolator(pipeline, head[rate]), caps = "audio/x-raw, rate=%d" % rate)
head[rate] = pipeparts.mkchecktimestamps(pipeline, head[rate], "%s_timestamps_%d_whitehoft" % (instrument, rate))
head[rate] = pipeparts.mktee(pipeline, head[rate])
#
# done. return value is a dictionary of tee elements indexed by
# sample rate
#
return head
[docs]def mkhtgate(pipeline, src, control = None, threshold = 8.0, attack_length = 128, hold_length = 128, invert_control = True, **kwargs):
"""
A convenience function to provide thresholds on input data. This can
be used to remove large spikes / glitches etc. Of course you can use it for
other stuff by plugging whatever you want as input and ouput
NOTE: the queues constructed by this code assume the attack and
hold lengths combined are less than 1 second in duration.
**Gstreamer Graph**
.. graphviz::
digraph G {
compound=true;
node [shape=record fontsize=10 fontname="Verdana"];
rankdir=LR;
tee ;
inputqueue ;
lal_gate ;
in [label="\<src\>"];
out [label="\<return\>"];
in -> tee -> inputqueue -> lal_gate -> out;
tee -> lal_gate;
}
"""
# FIXME someday explore a good bandpass filter
# src = pipeparts.mkaudiochebband(pipeline, src, low_frequency, high_frequency)
if control is None:
control = src = pipeparts.mktee(pipeline, src)
src = pipeparts.mkqueue(pipeline, src, max_size_time = Gst.SECOND, max_size_bytes = 0, max_size_buffers = 0)
return pipeparts.mkgate(pipeline, src, control = control, threshold = threshold, attack_length = -attack_length, hold_length = -hold_length, invert_control = invert_control, **kwargs)