Source code for kernels

# Copyright (C) 2010--2013  Kipp Cannon, Chad Hanna, Leo Singer
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


import math
from typing import Optional, Tuple

import numpy
try:
	from pyfftw.interfaces import scipy_fftpack as fftpack
except ImportError:
	from scipy import fftpack

import lal
from lal import LIGOTimeGPS


[docs]class PSDFirKernel(object): def __init__(self): self.revplan = None self.fwdplan = None self.target_phase = None self.target_phase_mask = None
[docs] def set_phase( self, psd: lal.REAL8FrequencySeries, f_low: float = 10.0, m1: float = 1.4, m2: float = 1.4 ) -> None: """ Compute the phase response of zero-latency whitening filter given a reference PSD. """ kernel, latency, sample_rate = self.psd_to_linear_phase_whitening_fir_kernel(psd) kernel, phase = self.linear_phase_fir_kernel_to_minimum_phase_whitening_fir_kernel(kernel, sample_rate) # get merger model for SNR = 1. f_psd = psd.f0 + numpy.arange(len(psd.data.data)) * psd.deltaF horizon_distance = HorizonDistance(f_low, f_psd[-1], psd.deltaF, m1, m2) f_model, model= horizon_distance(psd, 1.)[1] # find the range of frequency bins covered by the merger # model kmin, kmax = f_psd.searchsorted(f_model[0]), f_psd.searchsorted(f_model[-1]) + 1 # compute SNR=1 model's (d SNR^2 / df) spectral density unit_snr2_density = numpy.zeros_like(phase) unit_snr2_density[kmin:kmax] = model / psd.data.data[kmin:kmax] # integrate across each frequency bin, converting to # snr^2/bin. NOTE: this step is here for the record, but # is commented out because it has no effect on the result # given the renormalization that occurs next. #unit_snr2_density *= psd.deltaF # take 16th root, then normalize so max=1. why? I don't # know, just feels good, on the whole. unit_snr2_density = unit_snr2_density**(1./16) unit_snr2_density /= unit_snr2_density.max() # record phase vector and SNR^2 density vector self.target_phase = phase self.target_phase_mask = unit_snr2_density
[docs] def psd_to_linear_phase_whitening_fir_kernel( self, psd: lal.REAL8FrequencySeries, invert: Optional[bool] = True, nyquist: Optional[float] = None ) -> Tuple[numpy.ndarray, int, int]: """ Compute an acausal finite impulse-response filter kernel from a power spectral density conforming to the LAL normalization convention, such that if colored Gaussian random noise with the given PSD is fed into an FIR filter using the kernel the filter's output will be zero-mean unit-variance Gaussian random noise. The PSD must be provided as a lal.REAL8FrequencySeries object. The phase response of this filter is 0, just like whitening done in the frequency domain. Args: psd: lal.REAL8FrequencySeries, the reference PSD invert: bool, default true, whether to invert the kernel nyquist: float, disabled by default, whether to change the Nyquist frequency. Returns: Tuple[numpy.ndarray, int, int], the kernel, latency, sample rate pair. The kernel is a numpy array containing the filter kernel, the latency is the filter latency in samples and the sample rate is in Hz. The kernel and latency can be used, for example, with gstreamer's stock audiofirfilter element. """ # # this could be relaxed with some work # assert psd.f0 == 0.0 # # extract the PSD bins and determine sample rate for kernel # data = psd.data.data / 2 sample_rate = 2 * int(round(psd.f0 + (len(data) - 1) * psd.deltaF)) # # remove LAL normalization # data *= sample_rate # # change Nyquist frequency if requested. round to nearest # available bin # if nyquist is not None: i = int(round((nyquist - psd.f0) / psd.deltaF)) assert i < len(data) data = data[:i + 1] sample_rate = 2 * int(round(psd.f0 + (len(data) - 1) * psd.deltaF)) # # compute the FIR kernel. it always has an odd number of # samples and no DC offset. # data[0] = data[-1] = 0.0 if invert: data_nonzeros = (data != 0.) data[data_nonzeros] = 1./data[data_nonzeros] # repack data: data[0], data[1], 0, data[2], 0, .... tmp = numpy.zeros((2 * len(data) - 1,), dtype = data.dtype) tmp[len(data)-1:] = data #tmp[:len(data)] = data data = tmp kernel_fseries = lal.CreateCOMPLEX16FrequencySeries( name = "double sided psd", epoch = LIGOTimeGPS(0), f0 = 0.0, deltaF = psd.deltaF, length = len(data), sampleUnits = lal.Unit("strain s") ) kernel_tseries = lal.CreateCOMPLEX16TimeSeries( name = "timeseries of whitening kernel", epoch = LIGOTimeGPS(0.), f0 = 0., deltaT = 1.0 / sample_rate, length = len(data), sampleUnits = lal.Unit("strain") ) # FIXME check for change in length if self.revplan is None: self.revplan = lal.CreateReverseCOMPLEX16FFTPlan(len(data), 1) kernel_fseries.data.data = numpy.sqrt(data) + 0.j lal.COMPLEX16FreqTimeFFT(kernel_tseries, kernel_fseries, self.revplan) kernel = kernel_tseries.data.data.real kernel = numpy.roll(kernel, (len(data) - 1) // 2) / sample_rate * 2 # # apply a Tukey window whose flat bit is 50% of the kernel. # preserve the FIR kernel's square magnitude # norm_before = numpy.dot(kernel, kernel) kernel *= lal.CreateTukeyREAL8Window(len(data), .5).data.data kernel *= math.sqrt(norm_before / numpy.dot(kernel, kernel)) # # the kernel's latency # latency = (len(data) - 1) // 2 # # done # return kernel, latency, sample_rate
[docs] def linear_phase_fir_kernel_to_minimum_phase_whitening_fir_kernel( self, linear_phase_kernel: numpy.ndarray, sample_rate: int ) -> Tuple[numpy.ndarray, numpy.ndarray]: """ Compute the minimum-phase response filter (zero latency) associated with a linear-phase response filter (latency equal to half the filter length). From "Design of Optimal Minimum-Phase Digital FIR Filters Using Discrete Hilbert Transforms", IEEE Trans. Signal Processing, vol. 48, pp. 1491-1495, May 2000. Args: linear_phase_kernel: numpy.ndarray, the kernel to compute the minimum-phase kernel with sample_rate: int, the sample rate Returns: Tuple[numpy.ndarray. numpy.ndarray], the kernel and the phase response. The kernel is a numpy array containing the filter kernel. The kernel can be used, for example, with gstreamer's stock audiofirfilter element. """ # # compute abs of FFT of kernel # # FIXME check for change in length if self.fwdplan is None: self.fwdplan = lal.CreateForwardCOMPLEX16FFTPlan(len(linear_phase_kernel), 1) if self.revplan is None: self.revplan = lal.CreateReverseCOMPLEX16FFTPlan(len(linear_phase_kernel), 1) deltaT = 1. / sample_rate deltaF = 1. / (len(linear_phase_kernel) * deltaT) working_length = len(linear_phase_kernel) kernel_tseries = lal.CreateCOMPLEX16TimeSeries( name = "timeseries of whitening kernel", epoch = LIGOTimeGPS(0.), f0 = 0., deltaT = deltaT, length = working_length, sampleUnits = lal.Unit("strain") ) kernel_tseries.data.data = linear_phase_kernel absX = lal.CreateCOMPLEX16FrequencySeries( name = "absX", epoch = LIGOTimeGPS(0), f0 = 0.0, deltaF = deltaF, length = working_length, sampleUnits = lal.Unit("strain s") ) logabsX = lal.CreateCOMPLEX16FrequencySeries( name = "absX", epoch = LIGOTimeGPS(0), f0 = 0.0, deltaF = deltaF, length = working_length, sampleUnits = lal.Unit("strain s") ) cepstrum = lal.CreateCOMPLEX16TimeSeries( name = "cepstrum", epoch = LIGOTimeGPS(0.), f0 = 0., deltaT = deltaT, length = working_length, sampleUnits = lal.Unit("strain") ) theta = lal.CreateCOMPLEX16FrequencySeries( name = "theta", epoch = LIGOTimeGPS(0), f0 = 0.0, deltaF = deltaF, length = working_length, sampleUnits = lal.Unit("strain s") ) min_phase_kernel = lal.CreateCOMPLEX16TimeSeries( name = "min phase kernel", epoch = LIGOTimeGPS(0.), f0 = 0., deltaT = deltaT, length = working_length, sampleUnits = lal.Unit("strain") ) lal.COMPLEX16TimeFreqFFT(absX, kernel_tseries, self.fwdplan) absX.data.data[:] = abs(absX.data.data) # # compute the cepstrum of the kernel (i.e., the iFFT of the # log of the abs of the FFT of the kernel) # logabsX.data.data[:] = numpy.log(absX.data.data) lal.COMPLEX16FreqTimeFFT(cepstrum, logabsX, self.revplan) # # multiply cepstrum by sgn # cepstrum.data.data[0] = 0. cepstrum.data.data[working_length // 2] = 0. cepstrum.data.data[working_length // 2 + 1:] = -cepstrum.data.data[working_length // 2 + 1:] # # compute theta # lal.COMPLEX16TimeFreqFFT(theta, cepstrum, self.fwdplan) # # compute the gain and phase of the zero-phase # approximation relative to the original linear-phase # filter # theta_data = theta.data.data[working_length // 2:] #gain = numpy.exp(theta_data.real) phase = -theta_data.imag # # apply optional masked phase adjustment # if self.target_phase is not None: # compute phase adjustment for +ve frequencies phase_adjustment = (self.target_phase - phase) * self.target_phase_mask # combine with phase adjustment for -ve frequencies phase_adjustment = numpy.concatenate((phase_adjustment[1:][-1::-1].conj(), phase_adjustment)) # apply adjustment. phase adjustment is what we # wish to add to the phase. theta's imaginary # component contains the negative of the phase, so # we need to add -phase to theta's imaginary # component theta.data.data += -1.j * phase_adjustment # report adjusted phase #phase = -theta.data.data[working_length // 2:].imag # # compute minimum phase kernel # absX.data.data *= numpy.exp(theta.data.data) lal.COMPLEX16FreqTimeFFT(min_phase_kernel, absX, self.revplan) kernel = min_phase_kernel.data.data.real # # this kernel needs to be reversed to follow conventions # used with the audiofirfilter and lal_firbank elements # kernel = kernel[-1::-1] # # done # return kernel, phase
[docs]def fir_whitener_kernel( length: int, duration: float, sample_rate: int, psd: lal.REAL8FrequencySeries ) -> lal.COMPLEX16FrequencySeries: """Create an FIR whitener kernel. """ assert psd # # Add another COMPLEX16TimeSeries and COMPLEX16FrequencySeries for kernel's FFT (Leo) # # Add another FFT plan for kernel FFT (Leo) fwdplan_kernel = lal.CreateForwardCOMPLEX16FFTPlan(length, 1) kernel_tseries = lal.CreateCOMPLEX16TimeSeries( name = "timeseries of whitening kernel", epoch = LIGOTimeGPS(0.), f0 = 0., deltaT = 1.0 / sample_rate, length = length, sampleUnits = lal.Unit("strain") ) kernel_fseries = lal.CreateCOMPLEX16FrequencySeries( name = "freqseries of whitening kernel", epoch = LIGOTimeGPS(0), f0 = 0.0, deltaF = 1.0 / duration, length = length, sampleUnits = lal.Unit("strain s") ) # # Obtain a kernel of zero-latency whitening filter and # adjust its length (Leo) # psd_fir_kernel = PSDFirKernel() (kernel, latency, fir_rate) = psd_fir_kernel.psd_to_linear_phase_whitening_fir_kernel(psd, nyquist = sample_rate / 2.0) (kernel, theta) = psd_fir_kernel.linear_phase_fir_kernel_to_minimum_phase_whitening_fir_kernel(kernel, fir_rate) kernel = kernel[-1::-1] # FIXME this is off by one sample, but shouldn't be. Look at the miminum phase function # assert len(kernel) == length if len(kernel) < length: kernel = numpy.append(kernel, numpy.zeros(length - len(kernel))) else: kernel = kernel[:length] kernel_tseries.data.data = kernel # # FFT of the kernel # lal.COMPLEX16TimeFreqFFT(kernel_fseries, kernel_tseries, fwdplan_kernel) #FIXME return kernel_fseries
[docs]def one_second_highpass_kernel(rate: int, cutoff: int = 12) -> numpy.ndarray: """Create a one second high-pass kernel. Args: rate (Hz): int, the sampling rate cutoff (Hz): int, the high-pass cutoff Returns: numpy.ndarray, the high-passed kernel """ highpass_filter_fd = numpy.ones(rate, dtype=complex) highpass_filter_fd[:int(cutoff)] = 0. highpass_filter_fd[-int(cutoff):] = 0. highpass_filter_fd[(rate // 2 - 1):(rate // 2 + 1)] = 0. highpass_filter_td = fftpack.ifft(highpass_filter_fd) highpass_filter_td = numpy.roll(highpass_filter_td.real, rate // 2) highpass_filter_kernel = numpy.zeros(len(highpass_filter_td) + 1) highpass_filter_kernel[:-1] = highpass_filter_td[:] x = numpy.arange(len(highpass_filter_kernel)) mid = len(x) / 2. highpass_filter_kernel *= 1. - (x - mid)**2 / mid**2 return highpass_filter_kernel
[docs]def fixed_duration_bandpass_kernel( rate: int, flow: float = 0, fhigh: float = numpy.inf, duration: float = 1.0 ) -> numpy.ndarray: """Create a fixed-duration band-pass kernel. Args: rate (Hz): int, the sampling rate flow (Hz): float, default 0, the low frequency of the pass band fhigh (Hz): float, default +inf, the high frequency of the pass band duration (s): float, default 1.0, the duration of the kernel Returns: numpy.ndarray, the band-passed kernel """ deltaF = 1. / duration nsamps = int(rate * duration) + 1 f = numpy.arange(nsamps) * deltaF - rate / 2. filt = numpy.ones(len(f)) ix1 = numpy.logical_and(f <= -flow, f >= -fhigh) ix2 = numpy.logical_and(f >= flow, f <= fhigh) filt[numpy.logical_not(numpy.logical_or(ix1, ix2))] = 0. filt = numpy.real(fftpack.ifft(fftpack.ifftshift(filt))) / nsamps window = numpy.sinc(2 * f / rate) out = numpy.roll(filt, nsamps // 2) * window out /= (out**2).sum()**.5 return out