# Copyright (C) 2010--2013 Kipp Cannon, Chad Hanna, Leo Singer
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
import math
import sys
import signal
from typing import Dict, Iterable, Optional, Tuple, Union
import numpy
import pandas
from scipy import interpolate
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
GObject.threads_init()
Gst.init(None)
from ligo.lw import utils as ligolw_utils
import lal
import lal.series
from lal import LIGOTimeGPS
import lalsimulation
from gstlal import datasource
from gstlal import pipeparts
from gstlal import pipeio
from gstlal import simplehandler
__doc__ = """
**Review Status**
+-------------------------------------------------+------------------------------------------+------------+
| Names | Hash | Date |
+=================================================+==========================================+============+
| Florent, Sathya, Duncan Me., Jolien, Kipp, Chad | b3ef077fe87b597578000f140e4aa780f3a227aa | 2014-05-01 |
+-------------------------------------------------+------------------------------------------+------------+
"""
#
# =============================================================================
#
# PSD Measurement
#
# =============================================================================
#
#
# pipeline handler for PSD measurement
#
[docs]class PSDHandler(simplehandler.Handler):
def __init__(self, *args, **kwargs):
self.psd = None
simplehandler.Handler.__init__(self, *args, **kwargs)
[docs] def do_on_message(self, bus, message):
if message.type == Gst.MessageType.ELEMENT and message.get_structure().get_name() == "spectrum":
self.psd = pipeio.parse_spectrum_message(message)
return True
return False
[docs]class PSDTracker:
def __init__(self):
self.psd = None
[docs] def on_spectrum_message(self, message):
self.psd = pipeio.parse_spectrum_message(message)
return True
#
# measure_psd()
#
[docs]def measure_psd(gw_data_source_info, instrument, rate, psd_fft_length = 8, verbose = False):
"""
**Gstreamer graph**
.. graphviz::
digraph G {
// graph properties
rankdir=LR;
compound=true;
node [shape=record fontsize=10 fontname="Verdana"];
edge [fontsize=8 fontname="Verdana"];
// nodes
"mkbasicsrc()" ;
capsfilter1 ;
resample ;
capsfilter2 ;
queue ;
whiten ;
fakesink ;
"mkbasicsrc()" -> capsfilter1 -> resample -> capsfilter2 -> queue -> whiten -> fakesink;
}
Stream Implementation: (future refactor)
stream = Stream.from_datasource(gw_data_source_info, instrument, verbose=verbose)
stream.add_callback(MessageType.ELEMENT, "spectrum", tracker.on_spectrum_message)
stream = stream.capsfilter(f"audio/x-raw, rate=[{rate:d},MAX]") # disallow upsampling
stream.resample(quality=9) \
.capsfilter(f"audio/x-raw, rate={rate:d}") \
.queue(max_size_buffers=8) \
.whiten(
psd_mode=0,
zero_pad=0,
fft_length=psd_fft_length,
average_samples=average_samples,
median_samples=7
) \
.fakesink()
"""
#
# 8 FFT-lengths is just a ball-parky estimate of how much data is
# needed for a good PSD, this isn't a requirement of the code (the
# code requires a minimum of 1)
#
if gw_data_source_info.seg is not None and float(abs(gw_data_source_info.seg)) < 8 * psd_fft_length:
raise ValueError("segment %s too short" % str(gw_data_source_info.seg))
#
# build pipeline
#
if verbose:
print("measuring PSD in segment %s" % str(gw_data_source_info.seg), file=sys.stderr)
print("building pipeline ...", file=sys.stderr)
mainloop = GObject.MainLoop()
pipeline = Gst.Pipeline(name="psd")
handler = PSDHandler(mainloop, pipeline)
head, _, _, _ = datasource.mkbasicsrc(pipeline, gw_data_source_info, instrument, verbose = verbose)
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=[%d,MAX]" % rate) # disallow upsampling
head = pipeparts.mkresample(pipeline, head, quality = 9)
head = pipeparts.mkcapsfilter(pipeline, head, "audio/x-raw, rate=%d" % rate)
head = pipeparts.mkqueue(pipeline, head, max_size_buffers = 8)
if gw_data_source_info.seg is not None:
average_samples = int(round(float(abs(gw_data_source_info.seg)) / (psd_fft_length / 2.) - 1.))
else:
#FIXME maybe let the user specify this
average_samples = 64
head = pipeparts.mkwhiten(pipeline, head, psd_mode = 0, zero_pad = 0, fft_length = psd_fft_length, average_samples = average_samples, median_samples = 7)
pipeparts.mkfakesink(pipeline, head)
#
# setup signal handler to shutdown pipeline for live data
#
if gw_data_source_info.data_source in ("lvshm", "framexmit"):# FIXME what about nds online?
simplehandler.OneTimeSignalHandler(pipeline)
#
# process segment
#
if verbose:
print("putting pipeline into READY state ...", file=sys.stderr)
if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter READY state")
if gw_data_source_info.data_source not in ("lvshm", "framexmit"):# FIXME what about nds online?
datasource.pipeline_seek_for_gps(pipeline, *gw_data_source_info.seg)
if verbose:
print("putting pipeline into PLAYING state ...", file=sys.stderr)
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
if verbose:
print("running pipeline ...", file=sys.stderr)
mainloop.run()
#
# done
#
if verbose:
print("PSD measurement complete", file=sys.stderr)
return handler.psd
[docs]def read_psd(filename: str, verbose: Optional[bool] = False) -> Dict[str, lal.REAL8FrequencySeries]:
"""Reads in an XML-formatted PSD.
Args:
filename:
str, the file to read PSD(s) from
verbose:
bool, default False, whether to display logging messages
Returns:
a dictionary of lal FrequencySeries keyed by instrument
"""
return lal.series.read_psd_xmldoc(
ligolw_utils.load_filename(
filename,
verbose=verbose,
contenthandler=lal.series.PSDContentHandler
)
)
[docs]def write_psd(
filename: str,
psddict: Dict[str, lal.REAL8FrequencySeries],
trap_signals: Optional[Iterable[signal.Signals]] = None,
verbose: Optional[bool] = False,
) -> None:
"""Writes an XML-formatted PSD to disk.
Wrapper around make_psd_xmldoc() to write the XML document directly
to a named file.
Args:
filename:
str, the file to write PSD(s) to
psds:
Dict[str, lal.REAL8FrequencySeries], the PSD(s)
trap_signals:
Iterable[signal.Signal], optional, whether to attach extra signal
handlers on write
verbose:
bool, default False, whether to display logging messages
"""
ligolw_utils.write_filename(lal.series.make_psd_xmldoc(psddict), filename, verbose = verbose, trap_signals = trap_signals)
[docs]def read_asd_txt(
filename: str,
df: float = 0.25,
zero_pad: bool = False,
read_as_psd: bool = False,
) -> lal.REAL8FrequencySeries:
"""Reads in a text-formatted ASD as a PSD.
Args:
filename:
str, the file to read ASD(s) from
df (Hz):
float, default 0.25, the frequency resolution to interpolate to
zero_pad:
bool, default False, whether to zero-pad PSD to 0 Hz if needed.
read_as_psd:
bool, default False, whether to treat input as PSD rather than ASD
Returns:
lal.REAL8FrequencySeries, the PSD
"""
data = numpy.loadtxt(filename, comments="#")
psd_data = data[:, 1]
if not read_as_psd:
psd_data = numpy.power(psd_data, 2)
f = data[:, 0]
if zero_pad:
f_pad = numpy.arange(0, f[0], df)
psd_data = numpy.concatenate((numpy.ones(len(f_pad)) * psd_data[0], psd_data))
f = numpy.concatenate((f_pad, f))
uniformf = numpy.arange(f[0], f.max(), df)
psdinterp = interpolate.interp1d(f, psd_data)
psd_data = psdinterp(uniformf)
psd = lal.CreateREAL8FrequencySeries(
name = "PSD",
epoch = 0,
f0 = f[0],
deltaF = df,
sampleUnits = lal.Unit("s strain^2"),
length = len(psd_data),
)
psd.data.data = psd_data
return psd
[docs]def write_asd_txt(
filename: str,
psd: lal.REAL8FrequencySeries,
verbose: Optional[bool] = False,
) -> None:
"""Writes an text-formatted ASD to disk.
Args:
filename:
str, the file to write ASD to
psd:
lal.REAL8FrequencySeries, the PSD
verbose:
bool, default False, whether to display logging messages
"""
if verbose:
print("writing '%s' ..." % filename, file=sys.stderr)
with open(filename, "w") as f:
for i, x in enumerate(psd.data.data):
print("%.16g %.16g" % (psd.f0 + i * psd.deltaF, x**.5), file=f)
[docs]def interpolate_psd(psd: lal.REAL8FrequencySeries, deltaF: int) -> lal.REAL8FrequencySeries:
"""Interpolates a PSD to a target frequency resolution.
Args:
psd:
lal.REAL8FrequencySeries, the PSD to interpolate
deltaF:
int, the target frequency resolution to interpolate to
Returns:
lal.REAL8FrequencySeries, the interpolated PSD
"""
#
# no-op?
#
if deltaF == psd.deltaF:
return psd
#
# interpolate PSD by clipping/zero-padding time-domain impulse
# response of equivalent whitening filter
#
#from scipy import fftpack
#psd_data = psd.data.data
#x = numpy.zeros((len(psd_data) * 2 - 2,), dtype = "double")
#psd_data = numpy.where(psd_data, psd_data, float("inf"))
#x[0] = 1 / psd_data[0]**.5
#x[1::2] = 1 / psd_data[1:]**.5
#x = fftpack.irfft(x)
#if deltaF < psd.deltaF:
# x *= numpy.cos(numpy.arange(len(x)) * math.pi / (len(x) + 1))**2
# x = numpy.concatenate((x[:(len(x) / 2)], numpy.zeros((int(round(len(x) * psd.deltaF / deltaF)) - len(x),), dtype = "double"), x[(len(x) / 2):]))
#else:
# x = numpy.concatenate((x[:(int(round(len(x) * psd.deltaF / deltaF)) / 2)], x[-(int(round(len(x) * psd.deltaF / deltaF)) / 2):]))
# x *= numpy.cos(numpy.arange(len(x)) * math.pi / (len(x) + 1))**2
#x = 1 / fftpack.rfft(x)**2
#psd_data = numpy.concatenate(([x[0]], x[1::2]))
#
# interpolate log(PSD) with cubic spline. note that the PSD is
# clipped at 1e-300 to prevent nan's in the interpolator (which
# doesn't seem to like the occasional sample being -inf)
#
psd_data = psd.data.data
psd_data = numpy.where(psd_data, psd_data, 1e-300)
f = psd.f0 + numpy.arange(len(psd_data)) * psd.deltaF
interp = interpolate.splrep(f, numpy.log(psd_data), s = 0)
f = psd.f0 + numpy.arange(round((len(psd_data) - 1) * psd.deltaF / deltaF) + 1) * deltaF
psd_data = numpy.exp(interpolate.splev(f, interp, der = 0))
#
# return result
#
psd = lal.CreateREAL8FrequencySeries(
name = psd.name,
epoch = psd.epoch,
f0 = psd.f0,
deltaF = deltaF,
sampleUnits = psd.sampleUnits,
length = len(psd_data)
)
psd.data.data = psd_data
return psd
[docs]def movingaverage(psd: numpy.ndarray, window_size: int) -> numpy.ndarray:
"""Smoothen a PSD with a moving average.
Args:
psd:
numpy.ndarray, the PSD to smoothen
window_size:
int, the size of the window used for the moving median
Returns:
the smoothened PSD
"""
window = lal.CreateTukeyREAL8Window(window_size, 0.5).data.data
return numpy.convolve(psd, window, 'same')
[docs]def taperzero_fseries(
fseries: lal.REAL8FrequencySeries,
minfs: Optional[Tuple[float, float]] = (35.0, 40.0),
maxfs: Optional[Tuple[float, float]] = (1800., 2048.)
) -> lal.REAL8FrequencySeries:
"""Taper the PSD to infinity for given min/max frequencies.
Args:
psd:
lal.REAL8FrequencySeries, the PSD to taper
minfs (Hz):
Tuple[float, float], optional, the frequency boundaries over which to taper the
spectrum to infinity. i.e., frequencies below the first item in the tuple will
have an infinite spectrum, the second item in the tuple will not be changed.
A taper from 0 to infinity is applied in between.
maxfs (Hz):
Tuple[float, float], optional, the frequency boundaries over which to taper the
spectrum to infinity. i.e., frequencies above the second item in the tuple will
have an infinite spectrum, the first item in the tuple will not be changed.
A taper from 0 to infinity is applied in between.
Returns:
lal.REAL8FrequencySeries, the tapered PSD
"""
#
# store the psd horizon before tapering
#
data = fseries.data.data
norm_before = numpy.dot(data.conj(), data).real
#
# taper to infinity to turn this psd into an effective band pass filter
#
deltaF = fseries.deltaF
kmin = int(minfs[0] / deltaF)
kmax = int(minfs[1] / deltaF)
data[(len(data)//2 + 1) - kmin + 1:(len(data)//2 + 1) + kmin] = 0.
data[(len(data)//2 + 1) + kmin:(len(data)//2 + 1) + kmax] *= numpy.sin(numpy.arange(kmax-kmin) / (kmax-kmin-1.) * numpy.pi / 2.0)**4
data[(len(data)//2 + 1) - kmax:(len(data)//2 + 1) - kmin] *= numpy.cos(numpy.arange(kmax-kmin) / (kmax-kmin-1.) * numpy.pi / 2.0)**4
kmin = int(maxfs[0] / deltaF) - 1
kmax = int(maxfs[1] / deltaF) - 1
data[(len(data)//2 + 1) + kmax:] = data[:-(len(data)//2 + 1) - kmax] = 0.
data[(len(data)//2 + 1) + kmin:(len(data)//2 + 1) + kmax] *= numpy.cos(numpy.arange(kmax-kmin) / (kmax-kmin-1.) * numpy.pi / 2.0)**4
data[(len(data)//2 + 1) - kmax:(len(data)//2 + 1) - kmin] *= numpy.sin(numpy.arange(kmax-kmin) / (kmax-kmin-1.) * numpy.pi / 2.0)**4
#
# renormalize after tapering
#
fseries.data.data = data * math.sqrt(norm_before / numpy.dot(data.conj(), data).real)
#
# done
#
return fseries
[docs]def condition_psd(
psd: lal.REAL8FrequencySeries,
newdeltaF: int,
minfs: Optional[Tuple[int, int]] = (35.0, 40.0),
maxfs: Optional[Tuple[int, int]] = (1800., 2048.),
smoothing_frequency: Optional[float] = 4.,
fir_whiten: Optional[bool] = False
) -> lal.REAL8FrequencySeries:
"""Condition a PSD suitable for whitening waveforms.
Args:
psd:
lal.REAL8FrequencySeries, the PSD to taper
newdeltaF (Hz):
int, the target delta F to interpolate to
minfs (Hz):
Tuple[float, float], optional, the frequency boundaries over which to taper the
spectrum to infinity. i.e., frequencies below the first item in the tuple will
have an infinite spectrum, the second item in the tuple will not be changed.
A taper from 0 to infinity is applied in between.
maxfs (Hz):
Tuple[float, float], optional, the frequency boundaries over which to taper the
spectrum to infinity. i.e., frequencies above the second item in the tuple will
have an infinite spectrum, the first item in the tuple will not be changed.
A taper from 0 to infinity is applied in between.
smoothing_frequency (Hz):
float, default = 4 Hz, the target frequency resolution after smoothing. Lines with
bandwidths << smoothing_frequency are removed via a median calculation.
Remaining features will be blurred out to this resolution.
fir_whiten:
bool, default False, whether to enable causal whitening with a time-domain
whitening kernel vs. traditional acausal whitening
Returns:
lal.REAL8FrequencySeries, the conditioned PSD
"""
#
# store the psd horizon before conditioning
#
horizon_distance = HorizonDistance(minfs[1], maxfs[0], psd.deltaF, 1.4, 1.4)
horizon_before = horizon_distance(psd, 8.0)[0]
#
# interpolate to new \Delta f
#
psd = interpolate_psd(psd, newdeltaF)
#
# Smooth the psd
#
psddata = psd.data.data
avgwindow = int(smoothing_frequency / newdeltaF)
psddata = movingmedian(psddata, avgwindow)
psddata = movingaverage(psddata, avgwindow)
psd.data.data = psddata
#
# Tapering psd in either side up to infinity if a frequency-domain whitener is used, returns a psd without tapering otherwise.
# For a time-domain whitener, the tapering is effectively done as a part of deriving a frequency series of the FIR-whitner kernel
#
if not fir_whiten:
#
# Taper to infinity to turn this psd into an effective band pass filter
#
psddata = psd.data.data
kmin = int(minfs[0] / newdeltaF)
kmax = int(minfs[1] / newdeltaF)
psddata[:kmin+1] = numpy.inf
psddata[kmin:kmax] /= numpy.sin(numpy.arange(kmax-kmin) / (kmax-kmin-1.) * numpy.pi / 2.0)**4
kmin = int(maxfs[0] / newdeltaF)
kmax = int(maxfs[1] / newdeltaF)
psddata[kmax:] = numpy.inf
psddata[kmin:kmax] /= numpy.cos(numpy.arange(kmax-kmin) / (kmax-kmin-1.) * numpy.pi / 2.0)**4
psd.data.data = psddata
#
# compute the psd horizon after conditioning and renormalize
#
horizon_after = horizon_distance(psd, 8.0)[0]
psddata = psd.data.data
psd.data.data = psddata * (horizon_after / horizon_before)**2
#
# done
#
return psd
[docs]def polyfit(
psd: lal.REAL8FrequencySeries,
f_low: float,
f_high: float,
order: int,
verbose: Optional[bool] = False
) -> lal.REAL8FrequencySeries:
"""Fit a PSD to a polynomial.
Args:
psd:
lal.REAL8FrequencySeries, the PSD to fit
f_low (Hz):
float, the low frequency to begin fitting with
f_high (Hz):
float, the high frequency to stop fitting with
order:
int, the order of the fitting polynomial
verbose:
bool, default false, whether to display the fit
Returns:
lal.REAL8FrequencySeries, the PSD fitted to a polynomial
"""
minsample = int(f_low // psd.deltaF)
maxsample = int(f_high // psd.deltaF)
# f / f_min between f_min and f_max, i.e. f[0] here is 1
f = numpy.arange(maxsample - minsample) * psd.deltaF + 1
data = psd.data.data[minsample:maxsample]
logf = numpy.linspace(numpy.log(f[0]), numpy.log(f[-1]), 100000)
interp = interpolate.interp1d(numpy.log(f), numpy.log(data))
data = interp(logf)
p = numpy.poly1d(numpy.polyfit(logf, data, order))
if verbose:
print("\nFit polynomial is: \n\nlog(PSD) = \n", p, "\n\nwhere x = f / f_min\n", file=sys.stderr)
data = numpy.exp(p(numpy.log(f)))
olddata = psd.data.data
olddata[minsample:maxsample] = data
psd = lal.CreateREAL8FrequencySeries(
name = psd.name,
epoch = psd.epoch,
f0 = psd.f0,
deltaF = psd.deltaF,
sampleUnits = psd.sampleUnits,
length = len(olddata)
)
psd.data.data = olddata
return psd
[docs]def harmonic_mean(psddict: Dict[str, lal.REAL8FrequencySeries]) -> lal.REAL8FrequencySeries:
"""Take the harmonic mean of a dictionary of PSDs.
"""
refpsd = list(psddict.values())[0]
psd = lal.CreateREAL8FrequencySeries("psd", refpsd.epoch, 0., refpsd.deltaF, lal.Unit("strain^2 s"), refpsd.data.length)
psd.data.data[:] = 0.
for ifo in psddict:
psd.data.data[:] += 1. / psddict[ifo].data.data
psd.data.data[:] = len(psddict) / psd.data.data[:]
return psd
[docs]class HorizonDistance(object):
def __init__(self, f_min, f_max, delta_f, m1, m2, spin1 = (0., 0., 0.), spin2 = (0., 0., 0.), eccentricity = 0., inclination = 0., approximant = "IMRPhenomD"):
"""
Configures the horizon distance calculation for a specific
waveform model. The waveform is pre-computed and stored,
so this initialization step can be time-consuming but
computing horizon distances from measured PSDs will be
fast.
The waveform model's spectrum parameters, for example its
Nyquist and frequency resolution, need not match the
parameters for the PSDs that will ultimately be supplied
but there are some advantages to be had in getting them to
match. For example, computing the waveform with a smaller
delta_f than will be needed will require additional storage
and consume additional CPU time for the initialization,
while computing it with too low an f_max or too large a
delta_f might lead to inaccurate horizon distance
estimates.
f_min (Hertz) sets the frequency at which the waveform
model is to begin.
f_max (Hertz) sets the frequency upto which the waveform's
model is desired.
delta_f (Hertz) sets the frequency resolution of the
desired waveform model.
m1, m2 (solar masses) set the component masses of the
system to model.
spin1, spin2 (3-component vectors, geometric units) set the
spins of the component masses.
eccentricity [0, 1) sets the eccentricity of the system.
inclination (radians) sets the orbital inclination of the
system.
Example:
>>> # configure for non-spinning, circular, 1.4+1.4 BNS
>>> horizon_distance = HorizonDistance(10., 1024., 1./32., 1.4, 1.4)
>>> # populate a PSD for testing
>>> import lal, lalsimulation
>>> psd = lal.CreateREAL8FrequencySeries("psd", lal.LIGOTimeGPS(0), 0., 1./32., lal.Unit("strain^2 s"), horizon_distance.model.data.length)
>>> lalsimulation.SimNoisePSDaLIGODesignSensitivityP1200087(psd, 0.)
0
>>> # compute horizon distance
>>> D, (f, model) = horizon_distance(psd)
>>> print("%.4g Mpc" % D)
434.7 Mpc
>>> # compute distance and spectrum for SNR = 25
>>> D, (f, model) = horizon_distance(psd, 25.)
>>> "%.4g Mpc" % D
'139.1 Mpc'
>>> f
array([ 10. , 10.03125, 10.0625 , ..., 1023.9375 ,
1023.96875, 1024. ])
>>> model
array([ 8.05622865e-45, 7.99763234e-45, 7.93964216e-45, ...,
1.11824864e-49, 1.11815656e-49, 1.11806450e-49])
NOTE:
- Currently the SEOBNRv4_ROM waveform model is used, so its
limitations with respect to masses, spins, etc., apply.
The choice of waveform model is subject to change.
"""
self.f_min = f_min
self.f_max = f_max
self.m1 = m1
self.m2 = m2
self.spin1 = numpy.array(spin1)
self.spin2 = numpy.array(spin2)
self.inclination = inclination
self.eccentricity = eccentricity
self.approximant = approximant
# NOTE: the waveform models are computed up-to but not
# including the supplied f_max parameter so we need to pass
# (f_max + delta_f) if we want the waveform model defined
# in the f_max bin
hp, hc = lalsimulation.SimInspiralFD(
m1 * lal.MSUN_SI, m2 * lal.MSUN_SI,
spin1[0], spin1[1], spin1[2],
spin2[0], spin2[1], spin2[2],
1.0, # distance (m)
inclination,
0.0, # reference orbital phase (rad)
0.0, # longitude of ascending nodes (rad)
eccentricity,
0.0, # mean anomaly of periastron
delta_f,
f_min,
f_max + delta_f,
100., # reference frequency (Hz)
None, # LAL dictionary containing accessory parameters
lalsimulation.GetApproximantFromString(self.approximant)
)
assert hp.data.length > 0, "huh!? h+ has zero length!"
#
# store |h(f)|^2 for source at D = 1 m. see (5) in
# arXiv:1003.2481
#
self.model = lal.CreateREAL8FrequencySeries(
name = "signal spectrum",
epoch = LIGOTimeGPS(0),
f0 = hp.f0,
deltaF = hp.deltaF,
sampleUnits = hp.sampleUnits * hp.sampleUnits,
length = hp.data.length
)
self.model.data.data[:] = numpy.abs(hp.data.data)**2.
def __call__(self, psd, snr = 8.):
"""
Compute the horizon distance for the configured waveform
model given the PSD and the SNR at which the horizon is
defined (default = 8). Equivalently, from a PSD and an
observed SNR compute and return the amplitude of the
configured waveform's spectrum required to achieve that
SNR.
The return value is a two-element tuple. The first element
is the horizon distance in Mpc. The second element is,
itself, a two-element tuple containing two vectors giving
the frequencies and amplitudes of the waveform model's
spectrum scaled so as to have the given SNR. The vectors
are clipped to the range of frequencies that were used for
the SNR integral.
The parameters of the PSD, for example its Nyquist and
frequency resolution, need not match the parameters of the
configured waveform model. In the event of a mismatch, the
waveform model is resampled to the frequencies at which the
PSD has been measured.
The inspiral spectrum returned has the same units as the
PSD and is normalized so that the SNR is
SNR^2 = \int (inspiral_spectrum / psd) df
That is, the ratio of the inspiral spectrum to the PSD
gives the spectral density of SNR^2.
"""
#
# frequencies at which PSD has been measured
#
f = psd.f0 + numpy.arange(psd.data.length) * psd.deltaF
#
# nearest-neighbour interpolation of waveform model
# evaluated at PSD's frequency bins
#
indexes = ((f - self.model.f0) / self.model.deltaF).round().astype("int").clip(0, self.model.data.length - 1)
model = self.model.data.data[indexes]
#
# range of indexes for integration
#
kmin = (max(psd.f0, self.model.f0, self.f_min) - psd.f0) / psd.deltaF
kmin = int(round(kmin))
kmax = (min(psd.f0 + psd.data.length * psd.deltaF, self.model.f0 + self.model.data.length * self.model.deltaF, self.f_max) - psd.f0) / psd.deltaF
kmax = int(round(kmax)) + 1
assert kmin < kmax, "PSD and waveform model do not intersect"
#
# SNR for source at D = 1 m <--> D in m for source w/ SNR =
# 1. see (3) in arXiv:1003.2481
#
f = f[kmin:kmax]
model = model[kmin:kmax]
D = math.sqrt(4. * (model / psd.data.data[kmin:kmax]).sum() * psd.deltaF)
#
# distance at desired SNR
#
D /= snr
#
# scale inspiral spectrum by distance to achieve desired SNR
#
model *= 4. / D**2.
#
# D in Mpc for source with specified SNR, and waveform
# model
#
return D / (1e6 * lal.PC_SI), (f, model)
[docs]def effective_distance_factor(inclination, fp, fc):
"""
Returns the ratio of effective distance to physical distance for
compact binary mergers. Inclination is the orbital inclination of
the system in radians, fp and fc are the F+ and Fx antenna factors.
See lal.ComputeDetAMResponse() for a function to compute antenna
factors. The effective distance is given by
Deff = effective_distance_factor * D
See Equation (4.3) of arXiv:0705.1514.
"""
cos2i = math.cos(inclination)**2
return 1.0 / math.sqrt(fp**2 * (1+cos2i)**2 / 4 + fc**2 * cos2i)