# Copyright (C) 2010--2013 Kipp Cannon, Chad Hanna, Leo Singer
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import math
from typing import Optional, Tuple
import numpy
try:
from pyfftw.interfaces import scipy_fftpack as fftpack
except ImportError:
from scipy import fftpack
import lal
from lal import LIGOTimeGPS
[docs]class PSDFirKernel(object):
def __init__(self):
self.revplan = None
self.fwdplan = None
self.target_phase = None
self.target_phase_mask = None
[docs] def set_phase(
self,
psd: lal.REAL8FrequencySeries,
f_low: float = 10.0,
m1: float = 1.4,
m2: float = 1.4
) -> None:
"""
Compute the phase response of zero-latency whitening filter
given a reference PSD.
"""
kernel, latency, sample_rate = self.psd_to_linear_phase_whitening_fir_kernel(psd)
kernel, phase = self.linear_phase_fir_kernel_to_minimum_phase_whitening_fir_kernel(kernel, sample_rate)
# get merger model for SNR = 1.
f_psd = psd.f0 + numpy.arange(len(psd.data.data)) * psd.deltaF
horizon_distance = HorizonDistance(f_low, f_psd[-1], psd.deltaF, m1, m2)
f_model, model= horizon_distance(psd, 1.)[1]
# find the range of frequency bins covered by the merger
# model
kmin, kmax = f_psd.searchsorted(f_model[0]), f_psd.searchsorted(f_model[-1]) + 1
# compute SNR=1 model's (d SNR^2 / df) spectral density
unit_snr2_density = numpy.zeros_like(phase)
unit_snr2_density[kmin:kmax] = model / psd.data.data[kmin:kmax]
# integrate across each frequency bin, converting to
# snr^2/bin. NOTE: this step is here for the record, but
# is commented out because it has no effect on the result
# given the renormalization that occurs next.
#unit_snr2_density *= psd.deltaF
# take 16th root, then normalize so max=1. why? I don't
# know, just feels good, on the whole.
unit_snr2_density = unit_snr2_density**(1./16)
unit_snr2_density /= unit_snr2_density.max()
# record phase vector and SNR^2 density vector
self.target_phase = phase
self.target_phase_mask = unit_snr2_density
[docs] def psd_to_linear_phase_whitening_fir_kernel(
self,
psd: lal.REAL8FrequencySeries,
invert: Optional[bool] = True,
nyquist: Optional[float] = None
) -> Tuple[numpy.ndarray, int, int]:
"""
Compute an acausal finite impulse-response filter kernel
from a power spectral density conforming to the LAL
normalization convention, such that if colored Gaussian
random noise with the given PSD is fed into an FIR filter
using the kernel the filter's output will be zero-mean
unit-variance Gaussian random noise. The PSD must be
provided as a lal.REAL8FrequencySeries object.
The phase response of this filter is 0, just like whitening
done in the frequency domain.
Args:
psd:
lal.REAL8FrequencySeries, the reference PSD
invert:
bool, default true, whether to invert the kernel
nyquist:
float, disabled by default, whether to change
the Nyquist frequency.
Returns:
Tuple[numpy.ndarray, int, int], the kernel, latency,
sample rate pair. The kernel is a numpy array containing
the filter kernel, the latency is the filter latency in
samples and the sample rate is in Hz. The kernel and
latency can be used, for example, with gstreamer's stock
audiofirfilter element.
"""
#
# this could be relaxed with some work
#
assert psd.f0 == 0.0
#
# extract the PSD bins and determine sample rate for kernel
#
data = psd.data.data / 2
sample_rate = 2 * int(round(psd.f0 + (len(data) - 1) * psd.deltaF))
#
# remove LAL normalization
#
data *= sample_rate
#
# change Nyquist frequency if requested. round to nearest
# available bin
#
if nyquist is not None:
i = int(round((nyquist - psd.f0) / psd.deltaF))
assert i < len(data)
data = data[:i + 1]
sample_rate = 2 * int(round(psd.f0 + (len(data) - 1) * psd.deltaF))
#
# compute the FIR kernel. it always has an odd number of
# samples and no DC offset.
#
data[0] = data[-1] = 0.0
if invert:
data_nonzeros = (data != 0.)
data[data_nonzeros] = 1./data[data_nonzeros]
# repack data: data[0], data[1], 0, data[2], 0, ....
tmp = numpy.zeros((2 * len(data) - 1,), dtype = data.dtype)
tmp[len(data)-1:] = data
#tmp[:len(data)] = data
data = tmp
kernel_fseries = lal.CreateCOMPLEX16FrequencySeries(
name = "double sided psd",
epoch = LIGOTimeGPS(0),
f0 = 0.0,
deltaF = psd.deltaF,
length = len(data),
sampleUnits = lal.Unit("strain s")
)
kernel_tseries = lal.CreateCOMPLEX16TimeSeries(
name = "timeseries of whitening kernel",
epoch = LIGOTimeGPS(0.),
f0 = 0.,
deltaT = 1.0 / sample_rate,
length = len(data),
sampleUnits = lal.Unit("strain")
)
# FIXME check for change in length
if self.revplan is None:
self.revplan = lal.CreateReverseCOMPLEX16FFTPlan(len(data), 1)
kernel_fseries.data.data = numpy.sqrt(data) + 0.j
lal.COMPLEX16FreqTimeFFT(kernel_tseries, kernel_fseries, self.revplan)
kernel = kernel_tseries.data.data.real
kernel = numpy.roll(kernel, (len(data) - 1) // 2) / sample_rate * 2
#
# apply a Tukey window whose flat bit is 50% of the kernel.
# preserve the FIR kernel's square magnitude
#
norm_before = numpy.dot(kernel, kernel)
kernel *= lal.CreateTukeyREAL8Window(len(data), .5).data.data
kernel *= math.sqrt(norm_before / numpy.dot(kernel, kernel))
#
# the kernel's latency
#
latency = (len(data) - 1) // 2
#
# done
#
return kernel, latency, sample_rate
[docs] def linear_phase_fir_kernel_to_minimum_phase_whitening_fir_kernel(
self,
linear_phase_kernel: numpy.ndarray,
sample_rate: int
) -> Tuple[numpy.ndarray, numpy.ndarray]:
"""
Compute the minimum-phase response filter (zero latency)
associated with a linear-phase response filter (latency
equal to half the filter length).
From "Design of Optimal Minimum-Phase Digital FIR Filters
Using Discrete Hilbert Transforms", IEEE Trans. Signal
Processing, vol. 48, pp. 1491-1495, May 2000.
Args:
linear_phase_kernel:
numpy.ndarray, the kernel to compute the minimum-phase kernel with
sample_rate:
int, the sample rate
Returns:
Tuple[numpy.ndarray. numpy.ndarray], the kernel and the phase response.
The kernel is a numpy array containing the filter kernel. The kernel
can be used, for example, with gstreamer's stock audiofirfilter element.
"""
#
# compute abs of FFT of kernel
#
# FIXME check for change in length
if self.fwdplan is None:
self.fwdplan = lal.CreateForwardCOMPLEX16FFTPlan(len(linear_phase_kernel), 1)
if self.revplan is None:
self.revplan = lal.CreateReverseCOMPLEX16FFTPlan(len(linear_phase_kernel), 1)
deltaT = 1. / sample_rate
deltaF = 1. / (len(linear_phase_kernel) * deltaT)
working_length = len(linear_phase_kernel)
kernel_tseries = lal.CreateCOMPLEX16TimeSeries(
name = "timeseries of whitening kernel",
epoch = LIGOTimeGPS(0.),
f0 = 0.,
deltaT = deltaT,
length = working_length,
sampleUnits = lal.Unit("strain")
)
kernel_tseries.data.data = linear_phase_kernel
absX = lal.CreateCOMPLEX16FrequencySeries(
name = "absX",
epoch = LIGOTimeGPS(0),
f0 = 0.0,
deltaF = deltaF,
length = working_length,
sampleUnits = lal.Unit("strain s")
)
logabsX = lal.CreateCOMPLEX16FrequencySeries(
name = "absX",
epoch = LIGOTimeGPS(0),
f0 = 0.0,
deltaF = deltaF,
length = working_length,
sampleUnits = lal.Unit("strain s")
)
cepstrum = lal.CreateCOMPLEX16TimeSeries(
name = "cepstrum",
epoch = LIGOTimeGPS(0.),
f0 = 0.,
deltaT = deltaT,
length = working_length,
sampleUnits = lal.Unit("strain")
)
theta = lal.CreateCOMPLEX16FrequencySeries(
name = "theta",
epoch = LIGOTimeGPS(0),
f0 = 0.0,
deltaF = deltaF,
length = working_length,
sampleUnits = lal.Unit("strain s")
)
min_phase_kernel = lal.CreateCOMPLEX16TimeSeries(
name = "min phase kernel",
epoch = LIGOTimeGPS(0.),
f0 = 0.,
deltaT = deltaT,
length = working_length,
sampleUnits = lal.Unit("strain")
)
lal.COMPLEX16TimeFreqFFT(absX, kernel_tseries, self.fwdplan)
absX.data.data[:] = abs(absX.data.data)
#
# compute the cepstrum of the kernel (i.e., the iFFT of the
# log of the abs of the FFT of the kernel)
#
logabsX.data.data[:] = numpy.log(absX.data.data)
lal.COMPLEX16FreqTimeFFT(cepstrum, logabsX, self.revplan)
#
# multiply cepstrum by sgn
#
cepstrum.data.data[0] = 0.
cepstrum.data.data[working_length // 2] = 0.
cepstrum.data.data[working_length // 2 + 1:] = -cepstrum.data.data[working_length // 2 + 1:]
#
# compute theta
#
lal.COMPLEX16TimeFreqFFT(theta, cepstrum, self.fwdplan)
#
# compute the gain and phase of the zero-phase
# approximation relative to the original linear-phase
# filter
#
theta_data = theta.data.data[working_length // 2:]
#gain = numpy.exp(theta_data.real)
phase = -theta_data.imag
#
# apply optional masked phase adjustment
#
if self.target_phase is not None:
# compute phase adjustment for +ve frequencies
phase_adjustment = (self.target_phase - phase) * self.target_phase_mask
# combine with phase adjustment for -ve frequencies
phase_adjustment = numpy.concatenate((phase_adjustment[1:][-1::-1].conj(), phase_adjustment))
# apply adjustment. phase adjustment is what we
# wish to add to the phase. theta's imaginary
# component contains the negative of the phase, so
# we need to add -phase to theta's imaginary
# component
theta.data.data += -1.j * phase_adjustment
# report adjusted phase
#phase = -theta.data.data[working_length // 2:].imag
#
# compute minimum phase kernel
#
absX.data.data *= numpy.exp(theta.data.data)
lal.COMPLEX16FreqTimeFFT(min_phase_kernel, absX, self.revplan)
kernel = min_phase_kernel.data.data.real
#
# this kernel needs to be reversed to follow conventions
# used with the audiofirfilter and lal_firbank elements
#
kernel = kernel[-1::-1]
#
# done
#
return kernel, phase
[docs]def fir_whitener_kernel(
length: int,
duration: float,
sample_rate: int,
psd: lal.REAL8FrequencySeries
) -> lal.COMPLEX16FrequencySeries:
"""Create an FIR whitener kernel.
"""
assert psd
#
# Add another COMPLEX16TimeSeries and COMPLEX16FrequencySeries for kernel's FFT (Leo)
#
# Add another FFT plan for kernel FFT (Leo)
fwdplan_kernel = lal.CreateForwardCOMPLEX16FFTPlan(length, 1)
kernel_tseries = lal.CreateCOMPLEX16TimeSeries(
name = "timeseries of whitening kernel",
epoch = LIGOTimeGPS(0.),
f0 = 0.,
deltaT = 1.0 / sample_rate,
length = length,
sampleUnits = lal.Unit("strain")
)
kernel_fseries = lal.CreateCOMPLEX16FrequencySeries(
name = "freqseries of whitening kernel",
epoch = LIGOTimeGPS(0),
f0 = 0.0,
deltaF = 1.0 / duration,
length = length,
sampleUnits = lal.Unit("strain s")
)
#
# Obtain a kernel of zero-latency whitening filter and
# adjust its length (Leo)
#
psd_fir_kernel = PSDFirKernel()
(kernel, latency, fir_rate) = psd_fir_kernel.psd_to_linear_phase_whitening_fir_kernel(psd, nyquist = sample_rate / 2.0)
(kernel, theta) = psd_fir_kernel.linear_phase_fir_kernel_to_minimum_phase_whitening_fir_kernel(kernel, fir_rate)
kernel = kernel[-1::-1]
# FIXME this is off by one sample, but shouldn't be. Look at the miminum phase function
# assert len(kernel) == length
if len(kernel) < length:
kernel = numpy.append(kernel, numpy.zeros(length - len(kernel)))
else:
kernel = kernel[:length]
kernel_tseries.data.data = kernel
#
# FFT of the kernel
#
lal.COMPLEX16TimeFreqFFT(kernel_fseries, kernel_tseries, fwdplan_kernel) #FIXME
return kernel_fseries
[docs]def one_second_highpass_kernel(rate: int, cutoff: int = 12) -> numpy.ndarray:
"""Create a one second high-pass kernel.
Args:
rate (Hz):
int, the sampling rate
cutoff (Hz):
int, the high-pass cutoff
Returns:
numpy.ndarray, the high-passed kernel
"""
highpass_filter_fd = numpy.ones(rate, dtype=complex)
highpass_filter_fd[:int(cutoff)] = 0.
highpass_filter_fd[-int(cutoff):] = 0.
highpass_filter_fd[(rate // 2 - 1):(rate // 2 + 1)] = 0.
highpass_filter_td = fftpack.ifft(highpass_filter_fd)
highpass_filter_td = numpy.roll(highpass_filter_td.real, rate // 2)
highpass_filter_kernel = numpy.zeros(len(highpass_filter_td) + 1)
highpass_filter_kernel[:-1] = highpass_filter_td[:]
x = numpy.arange(len(highpass_filter_kernel))
mid = len(x) / 2.
highpass_filter_kernel *= 1. - (x - mid)**2 / mid**2
return highpass_filter_kernel
[docs]def fixed_duration_bandpass_kernel(
rate: int,
flow: float = 0,
fhigh: float = numpy.inf,
duration: float = 1.0
) -> numpy.ndarray:
"""Create a fixed-duration band-pass kernel.
Args:
rate (Hz):
int, the sampling rate
flow (Hz):
float, default 0, the low frequency of the pass band
fhigh (Hz):
float, default +inf, the high frequency of the pass band
duration (s):
float, default 1.0, the duration of the kernel
Returns:
numpy.ndarray, the band-passed kernel
"""
deltaF = 1. / duration
nsamps = int(rate * duration) + 1
f = numpy.arange(nsamps) * deltaF - rate / 2.
filt = numpy.ones(len(f))
ix1 = numpy.logical_and(f <= -flow, f >= -fhigh)
ix2 = numpy.logical_and(f >= flow, f <= fhigh)
filt[numpy.logical_not(numpy.logical_or(ix1, ix2))] = 0.
filt = numpy.real(fftpack.ifft(fftpack.ifftshift(filt))) / nsamps
window = numpy.sinc(2 * f / rate)
out = numpy.roll(filt, nsamps // 2) * window
out /= (out**2).sum()**.5
return out