# Licensed under an MIT style license -- see LICENSE.md
import numpy as np
import lalsimulation as lalsim
from lalsimulation import (
SimInspiralGetSpinFreqFromApproximant, SIM_INSPIRAL_SPINS_CASEBYCASE,
SIM_INSPIRAL_SPINS_FLOW
)
from pesummary.utils.utils import iterator, logger
from pesummary.utils.exceptions import EvolveSpinError
__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]
_python_module_from_approximant_name = {
"SEOBNRv5": "lalsimulation.gwsignal.models.pyseobnr_model"
}
def _get_spin_freq_from_approximant(approximant):
"""Determine whether the reference frequency is the starting frequency
for a given approximant string.
Parameters
----------
approximant: str
Name of the approximant you wish to check
"""
try:
# default to using LAL code
approx = getattr(lalsim, approximant)
return SimInspiralGetSpinFreqFromApproximant(approx)
except AttributeError:
from lalsimulation import (
SIM_INSPIRAL_SPINS_NONPRECESSING, SIM_INSPIRAL_SPINS_F_REF
)
# check to see if approximant is in gwsignal
approx = _gwsignal_generator_from_string(approximant)
meta = approx.metadata
if meta["type"] == "aligned_spin":
return SIM_INSPIRAL_SPINS_NONPRECESSING
elif meta["type"] == "precessing_spin":
if meta["f_ref_spin"]:
return SIM_INSPIRAL_SPINS_F_REF
return SIM_INSPIRAL_SPINS_FLOW
raise EvolveSpinError(
"Unable to evolve spins as '{}' does not have a set frequency "
"at which the spins are defined".format(approximant)
)
def _get_start_freq_from_approximant(approximant, f_low, f_ref):
"""Determine the starting frequency to use when evolving the spins for
a given approximant string.
Parameters
----------
approximant: str
Name of the approximant you wish to check
f_low: float
Low frequency used when generating the posterior samples
f_ref: float
Reference frequency used when generating the posterior samples
"""
try:
spinfreq_enum = _get_spin_freq_from_approximant(approximant)
except ValueError: # raised when approximant is not in gwsignal
raise EvolveSpinError(
"Unable to evolve spins as '{}' is unknown to lalsimulation "
"and gwsignal".format(approximant)
)
if spinfreq_enum == SIM_INSPIRAL_SPINS_CASEBYCASE:
_msg = (
"Unable to evolve spins as '{}' does not have a set frequency "
"at which the spins are defined".format(approximant)
)
logger.warning(_msg)
raise EvolveSpinError(_msg)
return float(np.where(
np.array(spinfreq_enum == SIM_INSPIRAL_SPINS_FLOW), f_low, f_ref
))
def _check_approximant_from_string(approximant):
"""Check to see if the approximant is known to lalsimulation and/or
gwsignal
Parameters
----------
approximant: str
approximant you wish to check
"""
if hasattr(lalsim, approximant):
return True
else:
try:
_ = _gwsignal_generator_from_string(approximant)
except (ValueError, NameError):
return False
return True
def _lal_approximant_from_string(approximant):
"""Return the LAL approximant number given an approximant string
Parameters
----------
approximant: str
approximant you wish to convert
"""
return lalsim.GetApproximantFromString(approximant)
def _gwsignal_generator_from_string(approximant, version="v1", **kwargs):
"""Return the GW signal generator given an approximant string
Parameters
----------
approximant: str
approximant you wish to convert
version: str, optional
version of gwsignal_get_waveform_generator to use. 'v1' ignores
all kwargs and simply passes the approximant name to
gwsignal_get_waveform_generator. 'v2' passes the approximant name
and python_module to gwsignal_get_waveform_generator. The python_module
can either be provided to _gwsignal_generator_from_string directly or if
not provided, PESummary will use the stored default values. Default 'v1'
kwargs: dict, optional
optional kwargs to pass to gwsignal_get_waveform_generator
"""
from lalsimulation.gwsignal.models import gwsignal_get_waveform_generator
if version == "v1":
return gwsignal_get_waveform_generator(approximant)
elif version == "v2":
fallback = [
value for key, value in _python_module_from_approximant_name.items()
if key in approximant
]
if len(fallback):
fallback = fallback[0]
else:
fallback = None
module = kwargs.get("python_module", fallback)
if module is None:
raise ValueError(
"Please provide a python_module defining the gwsignal "
"generator as pesummary does not have a default value"
)
return gwsignal_get_waveform_generator(
approximant, python_module=module
)
else:
raise ValueError(
"Unknown version {}. Please use either v1 or v2".format(version)
)
def _insert_flags(flags, LAL_parameters=None):
"""Insert waveform specific flags to a LAL dictionary
Parameters
----------
flags: dict
dictionary of flags you wish to add to a LAL dictionary
LAL_parameters: LALDict, optional
An existing LAL dictionary to add mode array to. If not provided, a new
LAL dictionary is created. Default None.
"""
if LAL_parameters is None:
import lal
LAL_parameters = lal.CreateDict()
for key, item in flags.items():
func = getattr(lalsim, "SimInspiralWaveformParamsInsert" + key, None)
if func is not None:
func(LAL_parameters, int(item))
else:
logger.warning(
"Unable to add flag {} to LAL dictionary as no function "
"SimInspiralWaveformParamsInsert{} found in "
"LALSimulation.".format(key, key)
)
return LAL_parameters
def _insert_mode_array(modes, LAL_parameters=None):
"""Add a mode array to a LAL dictionary
Parameters
----------
modes: 2d list
2d list of modes you wish to add to a LAL dictionary. Must be of the
form [[l1, m1], [l2, m2]]
LAL_parameters: LALDict, optional
An existing LAL dictionary to add mode array to. If not provided, a new
LAL dictionary is created. Default None.
"""
if LAL_parameters is None:
import lal
LAL_parameters = lal.CreateDict()
_mode_array = lalsim.SimInspiralCreateModeArray()
for l, m in modes:
lalsim.SimInspiralModeArrayActivateMode(_mode_array, l, m)
lalsim.SimInspiralWaveformParamsInsertModeArray(LAL_parameters, _mode_array)
return LAL_parameters
def _waveform_args(samples, f_ref=20., ind=0, longAscNodes=0., eccentricity=0.):
"""Arguments to be passed to waveform generation
Parameters
----------
f_ref: float, optional
reference frequency to use when converting spherical spins to
cartesian spins
ind: int, optional
index for the sample you wish to plot
longAscNodes: float, optional
longitude of ascending nodes, degenerate with the polarization
angle. Default 0.
eccentricity: float, optional
eccentricity at reference frequency. Default 0.
"""
from lal import MSUN_SI, PC_SI
key = list(samples.keys())[0]
if isinstance(samples[key], (list, np.ndarray)):
_samples = {key: value[ind] for key, value in samples.items()}
else:
_samples = samples.copy()
required = [
"mass_1", "mass_2", "luminosity_distance"
]
if not all(param in _samples.keys() for param in required):
raise ValueError(
"Unable to generate a waveform. Please add samples for "
+ ", ".join(required)
)
waveform_args = [
_samples["mass_1"] * MSUN_SI, _samples["mass_2"] * MSUN_SI
]
spin_angles = [
"theta_jn", "phi_jl", "tilt_1", "tilt_2", "phi_12", "a_1", "a_2",
"phase"
]
spin_angles_condition = all(
spin in _samples.keys() for spin in spin_angles
)
cartesian_spins = [
"spin_1x", "spin_1y", "spin_1z", "spin_2x", "spin_2y", "spin_2z"
]
cartesian_spins_condition = any(
spin in _samples.keys() for spin in cartesian_spins
)
if spin_angles_condition and not cartesian_spins_condition:
from pesummary.gw.conversions import component_spins
data = component_spins(
_samples["theta_jn"], _samples["phi_jl"], _samples["tilt_1"],
_samples["tilt_2"], _samples["phi_12"], _samples["a_1"],
_samples["a_2"], _samples["mass_1"], _samples["mass_2"],
f_ref, _samples["phase"]
)
iota, spin_1x, spin_1y, spin_1z, spin_2x, spin_2y, spin_2z = data.T
spins = [spin_1x, spin_1y, spin_1z, spin_2x, spin_2y, spin_2z]
else:
iota = _samples["iota"]
spins = [
_samples[param] if param in _samples.keys() else 0. for param in
["spin_1x", "spin_1y", "spin_1z", "spin_2x", "spin_2y", "spin_2z"]
]
_zero_spins = np.isclose(spins, 0.)
if sum(_zero_spins):
spins = np.array(spins)
spins[_zero_spins] = 0.
spins = list(spins)
waveform_args += spins
phase = _samples["phase"] if "phase" in _samples.keys() else 0.
waveform_args += [
_samples["luminosity_distance"] * PC_SI * 10**6, iota, phase
]
waveform_args += [longAscNodes, eccentricity, 0.]
return waveform_args, _samples
[docs]
def antenna_response(samples, ifo):
"""
"""
import importlib
mod = importlib.import_module("pesummary.gw.plots.plot")
func = getattr(mod, "__antenna_response")
antenna = func(
ifo, samples["ra"], samples["dec"], samples["psi"],
samples["geocent_time"]
)
return antenna
def _project_waveform(ifo, hp, hc, ra, dec, psi, time):
"""Project a waveform onto a given detector
Parameters
----------
ifo: str
name of the detector you wish to project the waveform onto
hp: np.ndarray
plus gravitational wave polarization
hc: np.ndarray
cross gravitational wave polarization
ra: float
right ascension to be passed to antenna response function
dec: float
declination to be passed to antenna response function
psi: float
polarization to be passed to antenna response function
time: float
time to be passed to antenna response function
"""
samples = {
"ra": ra, "dec": dec, "psi": psi, "geocent_time": time
}
antenna = antenna_response(samples, ifo)
ht = hp * antenna[0] + hc * antenna[1]
return ht
def _wrapper_for_td_waveform(args):
"""Wrapper function for td_waveform for a pool of workers
Parameters
----------
args: tuple
All args passed to td_waveform
"""
return td_waveform(*args[:-1], **args[-1])
def _td_waveform(
waveform_args, approximant, delta_t, f_low, f_ref, LAL_parameters, samples,
pycbc=False, project=None, debug=False, **kwargs
):
"""Generate a gravitational wave in the time domain
Parameters
----------
waveform_args: tuple
args to pass to lalsimulation.SimInspiralChooseTDWaveform
approximant: str
lalsimulation approximant number to use when generating a waveform
delta_t: float
spacing between time samples
f_low: float
frequency to start evaluating the waveform
f_ref: float, optional
reference frequency
LAL_parameters: LALDict
LAL dictionary containing accessory parameters. Default None
samples: dict
dictionary of posterior samples to use when projecting the waveform
onto a given detector
pycbc: Bool, optional
return a the waveform as a pycbc.timeseries.TimeSeries object
project: str, optional
name of the detector to project the waveform onto. If None,
the plus and cross polarizations are returned. Default None
debug: bool, optional
if True, return the model object used to generate the waveform. Only
used when the waveform approximant is not in LALSimulation
kwargs: dict, optional
all kwargs passed to _calculate_hp_hc_td
"""
from gwpy.timeseries import TimeSeries
from astropy.units import Quantity
(hp, hc), model = _calculate_hp_hc_td(
waveform_args, delta_t, f_low, f_ref, LAL_parameters, approximant,
debug=True, **kwargs
)
hp = TimeSeries(hp.data.data, dt=hp.deltaT, t0=hp.epoch)
hc = TimeSeries(hc.data.data, dt=hc.deltaT, t0=hc.epoch)
if pycbc:
hp, hc = hp.to_pycbc(), hc.to_pycbc()
if project is None and debug:
return {"h_plus": hp, "h_cross": hc}, model
elif project is None:
return {"h_plus": hp, "h_cross": hc}
ht = _project_waveform(
project, hp, hc, samples["ra"], samples["dec"], samples["psi"],
samples["geocent_time"]
)
if "{}_time".format(project) not in samples.keys():
from pesummary.gw.conversions import time_in_each_ifo
try:
_detector_time = time_in_each_ifo(
project, samples["ra"], samples["dec"], samples["geocent_time"]
)
except Exception:
logger.warning(
"Unable to calculate samples for '{}_time' using the provided "
"posterior samples. Unable to shift merger to merger time in "
"the detector".format(project)
)
if debug:
return ht, model
return ht
else:
_detector_time = samples["{}_time".format(project)]
ht.times = (
Quantity(ht.times, unit="s") + Quantity(_detector_time, unit="s")
)
if debug:
return ht, model
return ht
def _calculate_hp_hc_td(
waveform_args, delta_t, f_low, f_ref, LAL_parameters, approximant,
debug=False, **kwargs
):
"""Calculate the plus and cross polarizations in the time domain.
If the approximant is in LALSimulation, the
SimInspiralChooseTDWaveform function is used. If the approximant is
not in LALSimulation, gwsignal is used
Parameters
----------
waveform_args: tuple
args to pass to lalsimulation.SimInspiralChooseTDWaveform
delta_t: float
spacing between time samples
f_low: float
frequency to start evaluating the waveform
f_ref: float, optional
reference frequency
LAL_parameters: LALDict
LAL dictionary containing accessory parameters. Default None
approximant: str
lalsimulation approximant number to use when generating a waveform
debug: bool, optional
if True, return the model object used to generate the waveform. Only
used when the waveform approximant is not in LALSimulation
**kwargs: dict, optional
all kwargs passed to _setup_gwsignal
"""
if hasattr(lalsim, approximant):
approx = _lal_approximant_from_string(approximant)
if lalsim.SimInspiralImplementedFDApproximants(approx):
# if the waveform is defined in the frequency domain, use
# SimInspiralTD as it appropiately conditions the waveform so it
# is correct at f_low in the time domain
_func = lalsim.SimInspiralTD
else:
# if the waveform is defined in the time domain, use
# SimInspiralChooseTDWaveform as it doesnt taper the start of the
# waveform
_func = lalsim.SimInspiralChooseTDWaveform
result = _func(
*waveform_args, delta_t, f_low, f_ref, LAL_parameters, approx
)
if debug:
return result, None
return result
wfm_gen = _setup_gwsignal(
waveform_args, f_low, f_ref, approximant, delta_t=delta_t, **kwargs
)
hpc = wfm_gen.generate_td_polarizations()
if debug:
return hpc, wfm_gen
return hpc
def _calculate_hp_hc_fd(
waveform_args, delta_f, f_low, f_high, f_ref, LAL_parameters, approximant,
debug=False, **kwargs
):
"""Calculate the plus and cross polarizations in the frequency domain.
If the approximant is in LALSimulation, the
SimInspiralChooseFDWaveform function is used. If the approximant is
not in LALSimulation, gwsignal is used
Parameters
----------
waveform_args: tuple
args to pass to lalsimulation.SimInspiralChooseFDWaveform
delta_f: float
spacing between frequency samples
f_low: float
frequency to start evaluating the waveform
f_high: float, optional
frequency to stop evaluating the waveform
f_ref: float, optional
reference frequency
LAL_parameters: LALDict
LAL dictionary containing accessory parameters. Default None
approximant: str
lalsimulation approximant number to use when generating a waveform
debug: bool, optional
if True, return the model object used to generate the waveform. Only
used when the waveform approximant is not in LALSimulation
"""
if hasattr(lalsim, approximant):
approx = _lal_approximant_from_string(approximant)
for func in [lalsim.SimInspiralChooseFDWaveform, lalsim.SimInspiralFD]:
try:
result = func(
*waveform_args, delta_f, f_low, f_high, f_ref,
LAL_parameters, approx
)
if debug:
return result, None
return result
except Exception:
continue
break
wfm_gen = _setup_gwsignal(
waveform_args, f_low, f_ref, approximant, delta_f=delta_f,
f_high=f_high, **kwargs
)
hpc = wfm_gen.generate_fd_polarizations()
if debug:
return hpc, wfm_gen
return hpc
def _setup_gwsignal(
waveform_args, f_low, f_ref, approximant, delta_t=None, delta_f=None,
f_high=None, **kwargs
):
"""Setup a waveform generator with gwsignal
Parameters
----------
waveform_args: tuple
args to pass to lalsimulation.SimInspiralChoose*DWaveform
f_low: float
frequency to start evaluating the waveform
f_ref: float, optional
reference frequency
approximant: str
lalsimulation approximant number to use when generating a waveform
delta_t: float, optional
spacing between frequency samples
delta_f: float, optional
spacing between frequency samples
f_high: float, optional
frequency to stop evaluating the waveform
kwargs: dict, optional
all kwargs passed to _gwsignal_generator_from_string
"""
from lalsimulation.gwsignal.core.parameter_conventions import (
common_units_dictionary
)
from astropy import units
names = [
"mass1", "mass2", "spin1x", "spin1y", "spin1z", "spin2x", "spin2y",
"spin2z", "distance", "inclination", "phi_ref", "longAscNodes",
"eccentricity", "meanPerAno"
]
params_dict = {name: waveform_args[ii] for ii, name in enumerate(names)}
if delta_t is not None:
params_dict["deltaT"] = delta_t
if delta_f is not None:
params_dict["deltaF"] = delta_f
params_dict["f22_start"] = f_low
params_dict["f22_ref"] = f_ref
if f_high is None:
params_dict["f_max"] = 0.5 / params_dict["deltaT"]
else:
params_dict["f_max"] = f_high
wf_gen = _gwsignal_generator_from_string(approximant, **kwargs)
for key, item in params_dict.items():
params_dict[key] = item * common_units_dictionary.get(key, 1)
params_dict["mass1"] *= units.kg
params_dict["mass2"] *= units.kg
params_dict["distance"] *= units.m
wfm_gen = wf_gen._generate_waveform_class(**params_dict)
return wfm_gen