Coverage for bilby/gw/likelihood/multiband.py: 96%
359 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
2import math
3import numbers
5import numpy as np
7from .base import GravitationalWaveTransient
8from ...core.utils import (
9 logger, speed_of_light, solar_mass, radius_of_earth,
10 gravitational_constant, round_up_to_power_of_two,
11 recursively_load_dict_contents_from_group,
12 recursively_save_dict_contents_to_group
13)
14from ..prior import CBCPriorDict
17class MBGravitationalWaveTransient(GravitationalWaveTransient):
18 """A multi-banded likelihood object
20 This uses the method described in S. Morisaki, 2021, arXiv: 2104.07813.
22 Parameters
23 ----------
24 interferometers: list, bilby.gw.detector.InterferometerList
25 A list of `bilby.detector.Interferometer` instances - contains the detector data and power spectral densities
26 waveform_generator: `bilby.waveform_generator.WaveformGenerator`
27 An object which computes the frequency-domain strain of the signal, given some set of parameters
28 reference_chirp_mass: float, optional
29 A reference chirp mass for determining the frequency banding. This is set to prior minimum of chirp mass if
30 not specified. Hence a CBCPriorDict object needs to be passed to priors when this parameter is not specified.
31 highest_mode: int, optional
32 The maximum magnetic number of gravitational-wave moments. Default is 2
33 linear_interpolation: bool, optional
34 If True, the linear-interpolation method is used for the computation of (h, h). If False, the IFFT-FFT method
35 is used. Default is True.
36 accuracy_factor: float, optional
37 A parameter to determine the accuracy of multi-banding. The larger this factor is, the more accurate the
38 approximation is. This corresponds to L in the paper. Default is 5.
39 time_offset: float, optional
40 (end time of data) - (maximum arrival time). If None, it is inferred from the prior of geocent time.
41 delta_f_end: float, optional
42 The frequency scale with which waveforms at the high-frequency end are smoothed. If None, it is determined from
43 the prior of geocent time.
44 maximum_banding_frequency: float, optional
45 A maximum frequency for multi-banding. If specified, the low-frequency limit of a band does not exceed it.
46 minimum_banding_duration: float, optional
47 A minimum duration for multi-banding. If specified, the duration of a band is not smaller than it.
48 weights: str or dict, optional
49 Pre-computed multiband weights for calculating inner products.
50 distance_marginalization: bool, optional
51 If true, marginalize over distance in the likelihood. This uses a look up table calculated at run time. The
52 distance prior is set to be a delta function at the minimum distance allowed in the prior being marginalised
53 over.
54 phase_marginalization: bool, optional
55 If true, marginalize over phase in the likelihood. This is done analytically using a Bessel function. The phase
56 prior is set to be a delta function at phase=0.
57 priors: dict, bilby.prior.PriorDict
58 A dictionary of priors containing at least the geocent_time prior
59 distance_marginalization_lookup_table: (dict, str), optional
60 If a dict, dictionary containing the lookup_table, distance_array, (distance) prior_array, and
61 reference_distance used to construct the table. If a string the name of a file containing these quantities. The
62 lookup table is stored after construction in either the provided string or a default location:
63 '.distance_marginalization_lookup_dmin{}_dmax{}_n{}.npz'
64 reference_frame: (str, bilby.gw.detector.InterferometerList, list), optional
65 Definition of the reference frame for the sky location.
66 - "sky": sample in RA/dec, this is the default
67 - e.g., "H1L1", ["H1", "L1"], InterferometerList(["H1", "L1"]):
68 sample in azimuth and zenith, `azimuth` and `zenith` defined in the frame where the z-axis is aligned the the
69 vector connecting H1 and L1.
70 time_reference: str, optional
71 Name of the reference for the sampled time parameter.
72 - "geocent"/"geocenter": sample in the time at the Earth's center, this is the default
73 - e.g., "H1": sample in the time of arrival at H1
75 Returns
76 -------
77 Likelihood: `bilby.core.likelihood.Likelihood`
78 A likelihood object, able to compute the likelihood of the data given some model parameters
80 """
81 def __init__(
82 self, interferometers, waveform_generator, reference_chirp_mass=None, highest_mode=2,
83 linear_interpolation=True, accuracy_factor=5, time_offset=None, delta_f_end=None,
84 maximum_banding_frequency=None, minimum_banding_duration=0., weights=None,
85 distance_marginalization=False, phase_marginalization=False, priors=None,
86 distance_marginalization_lookup_table=None, reference_frame="sky", time_reference="geocenter"
87 ):
88 super(MBGravitationalWaveTransient, self).__init__(
89 interferometers=interferometers, waveform_generator=waveform_generator, priors=priors,
90 distance_marginalization=distance_marginalization, phase_marginalization=phase_marginalization,
91 time_marginalization=False, distance_marginalization_lookup_table=distance_marginalization_lookup_table,
92 jitter_time=False, reference_frame=reference_frame, time_reference=time_reference
93 )
94 if weights is None:
95 self.reference_chirp_mass = reference_chirp_mass
96 self.highest_mode = highest_mode
97 self.linear_interpolation = linear_interpolation
98 self.accuracy_factor = accuracy_factor
99 self.time_offset = time_offset
100 self.delta_f_end = delta_f_end
101 self.maximum_banding_frequency = maximum_banding_frequency
102 self.minimum_banding_duration = minimum_banding_duration
103 self.setup_multibanding()
104 else:
105 if isinstance(weights, str):
106 import h5py
107 logger.info(f"Loading multiband weights from {weights}.")
108 with h5py.File(weights, 'r') as f:
109 weights = recursively_load_dict_contents_from_group(f, '/')
110 self.setup_multibanding_from_weights(weights)
112 @property
113 def reference_chirp_mass(self):
114 return self._reference_chirp_mass
116 @property
117 def reference_chirp_mass_in_second(self):
118 return gravitational_constant * self._reference_chirp_mass * solar_mass / speed_of_light**3.
120 @reference_chirp_mass.setter
121 def reference_chirp_mass(self, reference_chirp_mass):
122 if isinstance(reference_chirp_mass, numbers.Number):
123 self._reference_chirp_mass = reference_chirp_mass
124 else:
125 logger.info(
126 "No int or float number has been passed to reference_chirp_mass. "
127 "Checking prior minimum of chirp mass ..."
128 )
129 if not isinstance(self.priors, CBCPriorDict):
130 raise TypeError(
131 f"priors: {self.priors} is not CBCPriorDict. Prior minimum of chirp mass can not be obtained."
132 )
133 self._reference_chirp_mass = self.priors.minimum_chirp_mass
134 if self._reference_chirp_mass is None:
135 raise Exception(
136 "Prior minimum of chirp mass can not be determined as priors does not contain necessary mass "
137 "parameters."
138 )
139 logger.info(
140 "reference_chirp_mass is automatically set to prior minimum of chirp mass: "
141 f"{self._reference_chirp_mass}."
142 )
144 @property
145 def highest_mode(self):
146 return self._highest_mode
148 @highest_mode.setter
149 def highest_mode(self, highest_mode):
150 if isinstance(highest_mode, numbers.Number):
151 self._highest_mode = highest_mode
152 else:
153 raise TypeError("highest_mode must be a number")
155 @property
156 def linear_interpolation(self):
157 return self._linear_interpolation
159 @linear_interpolation.setter
160 def linear_interpolation(self, linear_interpolation):
161 if isinstance(linear_interpolation, bool) or isinstance(linear_interpolation, np.bool_):
162 self._linear_interpolation = linear_interpolation
163 else:
164 raise TypeError("linear_interpolation must be a bool")
166 @property
167 def accuracy_factor(self):
168 return self._accuracy_factor
170 @accuracy_factor.setter
171 def accuracy_factor(self, accuracy_factor):
172 if isinstance(accuracy_factor, numbers.Number):
173 self._accuracy_factor = accuracy_factor
174 else:
175 raise TypeError("accuracy_factor must be a number")
177 @property
178 def time_offset(self):
179 return self._time_offset
181 @time_offset.setter
182 def time_offset(self, time_offset):
183 """
184 This sets the time offset assumed when frequency bands are constructed. The default value is (the
185 maximum offset of geocent time in the prior range) + (light-traveling time of the Earth). If the
186 prior does not contain 'geocent_time', 2.12 seconds is used. It is calculated assuming that the
187 maximum offset of geocent time is 2.1 seconds, which is the value for the standard prior used by
188 LIGO-Virgo-KAGRA.
189 """
190 time_parameter = self.time_reference + "_time"
191 if time_parameter == "geocent_time":
192 safety = radius_of_earth / speed_of_light
193 else:
194 safety = 2 * radius_of_earth / speed_of_light
195 if time_offset is not None:
196 if isinstance(time_offset, numbers.Number):
197 self._time_offset = time_offset
198 else:
199 raise TypeError("time_offset must be a number")
200 elif self.priors is not None and time_parameter in self.priors:
201 self._time_offset = (
202 self.interferometers.start_time + self.interferometers.duration
203 - self.priors[time_parameter].minimum + safety
204 )
205 else:
206 self._time_offset = 2.12
207 logger.warning("time offset can not be inferred. Use the standard time offset of {} seconds.".format(
208 self._time_offset))
210 @property
211 def delta_f_end(self):
212 return self._delta_f_end
214 @delta_f_end.setter
215 def delta_f_end(self, delta_f_end):
216 """
217 This sets the frequency scale of tapering the high-frequency end of waveform, to avoid the issues of
218 abrupt termination of waveform described in Sec. 2. F of arXiv: 2104.07813. This needs to be much
219 larger than the inverse of the minimum time offset, and the default value is 100 times of that. If
220 the prior does not contain 'geocent_time' and the minimum time offset can not be computed, 53Hz is
221 used. It is computed assuming that the minimum offset of geocent time is 1.9 seconds, which is the
222 value for the standard prior used by LIGO-Virgo-KAGRA.
223 """
224 time_parameter = self.time_reference + "_time"
225 if time_parameter == "geocent_time":
226 safety = radius_of_earth / speed_of_light
227 else:
228 safety = 2 * radius_of_earth / speed_of_light
229 if delta_f_end is not None:
230 if isinstance(delta_f_end, numbers.Number):
231 self._delta_f_end = delta_f_end
232 else:
233 raise TypeError("delta_f_end must be a number")
234 elif self.priors is not None and time_parameter in self.priors:
235 self._delta_f_end = 100 / (
236 self.interferometers.start_time + self.interferometers.duration
237 - self.priors[time_parameter].maximum - safety
238 )
239 else:
240 self._delta_f_end = 53.
241 logger.warning("delta_f_end can not be inferred. Use the standard delta_f_end of {} Hz.".format(
242 self._delta_f_end))
244 @property
245 def maximum_banding_frequency(self):
246 return self._maximum_banding_frequency
248 @maximum_banding_frequency.setter
249 def maximum_banding_frequency(self, maximum_banding_frequency):
250 r"""
251 This sets the upper limit on a starting frequency of a band. The default value is the frequency at
252 which f - 1 / \sqrt(- d\tau / df) starts to decrease, because the bisection search of the starting
253 frequency does not work from that frequency. The stationary phase approximation is not valid at such
254 a high frequency, which can break down the approximation. It is calculated from the 0PN formula of
255 time-to-merger \tau(f). The user-specified frequency is used if it is lower than that frequency.
256 """
257 fmax_tmp = (
258 (15 / 968)**(3 / 5) * (self.highest_mode / (2 * np.pi))**(8 / 5)
259 / self.reference_chirp_mass_in_second
260 )
261 if maximum_banding_frequency is not None:
262 if isinstance(maximum_banding_frequency, numbers.Number):
263 if maximum_banding_frequency < fmax_tmp:
264 fmax_tmp = maximum_banding_frequency
265 else:
266 logger.warning("The input maximum_banding_frequency is too large."
267 "It is set to be {} Hz.".format(fmax_tmp))
268 else:
269 raise TypeError("maximum_banding_frequency must be a number")
270 self._maximum_banding_frequency = fmax_tmp
272 @property
273 def minimum_banding_duration(self):
274 return self._minimum_banding_duration
276 @minimum_banding_duration.setter
277 def minimum_banding_duration(self, minimum_banding_duration):
278 if isinstance(minimum_banding_duration, numbers.Number):
279 self._minimum_banding_duration = minimum_banding_duration
280 else:
281 raise TypeError("minimum_banding_duration must be a number")
283 @property
284 def minimum_frequency(self):
285 return np.min([i.minimum_frequency for i in self.interferometers])
287 @property
288 def maximum_frequency(self):
289 return np.max([i.maximum_frequency for i in self.interferometers])
291 @property
292 def number_of_bands(self):
293 return len(self.durations)
295 def setup_multibanding(self):
296 """Set up frequency bands and coefficients needed for likelihood evaluations"""
297 self._setup_frequency_bands()
298 self._setup_integers()
299 self._setup_waveform_frequency_points()
300 self._setup_linear_coefficients()
301 if self.linear_interpolation:
302 self._setup_quadratic_coefficients_linear_interp()
303 else:
304 self._setup_quadratic_coefficients_ifft_fft()
306 def _tau(self, f):
307 """Compute time-to-merger from the input frequency. This uses the 0PN formula.
309 Parameters
310 ----------
311 f: float
312 input frequency
314 Returns
315 -------
316 tau: float
317 time-to-merger
319 """
320 f_22 = 2 * f / self.highest_mode
321 return (
322 5 / 256 * self.reference_chirp_mass_in_second
323 * (np.pi * self.reference_chirp_mass_in_second * f_22) ** (-8 / 3)
324 )
326 def _dtaudf(self, f):
327 """Compute the derivative of time-to-merger with respect to a starting frequency. This uses the 0PN formula.
329 Parameters
330 ----------
331 f: float
332 input frequency
334 Returns
335 -------
336 dtaudf: float
337 derivative of time-to-merger
339 """
340 f_22 = 2 * f / self.highest_mode
341 return (
342 -5 / 96 * self.reference_chirp_mass_in_second
343 * (np.pi * self.reference_chirp_mass_in_second * f_22) ** (-8. / 3.) / f
344 )
346 def _find_starting_frequency(self, duration, fnow):
347 """Find the starting frequency of the next band satisfying (10) and
348 (51) of arXiv: 2104.07813.
350 Parameters
351 ----------
352 duration: float
353 duration of the next band
354 fnow: float
355 starting frequency of the current band
357 Returns
358 -------
359 fnext: float or None
360 starting frequency of the next band. None if a frequency satisfying the conditions does not exist.
361 dfnext: float or None
362 frequency scale with which waveforms are smoothed. None if a frequency satisfying the conditions does not
363 exist.
365 """
366 def _is_above_fnext(f):
367 """This function returns True if f > fnext"""
368 cond1 = (
369 duration - self.time_offset - self._tau(f)
370 - self.accuracy_factor * np.sqrt(-self._dtaudf(f))
371 ) > 0
372 cond2 = f - 1. / np.sqrt(-self._dtaudf(f)) - fnow > 0
373 return cond1 and cond2
374 # Bisection search for fnext
375 fmin, fmax = fnow, self.maximum_banding_frequency
376 if not _is_above_fnext(fmax):
377 return None, None
378 while fmax - fmin > 1e-2 / duration:
379 f = (fmin + fmax) / 2.
380 if _is_above_fnext(f):
381 fmax = f
382 else:
383 fmin = f
384 return f, 1. / np.sqrt(-self._dtaudf(f))
386 def _setup_frequency_bands(self):
387 r"""Set up frequency bands. The durations of bands geometrically decrease T, T/2. T/4, ..., where T is the
388 original duration. This sets the following instance variables.
390 durations: durations of bands (T^(b) in the paper)
391 fb_dfb: 2-dimensional ndarray, which contain starting frequencies (f^(b) in the paper) and frequency scales for
392 smoothing waveforms (\Delta f^(b) in the paper) of bands
394 """
395 self.durations = np.array([self.interferometers.duration])
396 self.fb_dfb = [[self.minimum_frequency, 0.]]
397 dnext = self.interferometers.duration / 2
398 while dnext > max(self.time_offset, self.minimum_banding_duration):
399 fnow, _ = self.fb_dfb[-1]
400 fnext, dfnext = self._find_starting_frequency(dnext, fnow)
401 if fnext is not None and fnext < min(self.maximum_frequency, self.maximum_banding_frequency):
402 self.durations = np.append(self.durations, dnext)
403 self.fb_dfb.append([fnext, dfnext])
404 dnext /= 2
405 else:
406 break
407 self.fb_dfb.append([self.maximum_frequency + self.delta_f_end, self.delta_f_end])
408 self.fb_dfb = np.array(self.fb_dfb)
409 logger.info("The total frequency range is divided into {} bands with frequency intervals of {}.".format(
410 self.number_of_bands, ", ".join(["1/{} Hz".format(d) for d in self.durations])))
412 def _setup_integers(self):
413 """Set up integers needed for likelihood evaluations. This sets the following instance variables.
415 Nbs: the numbers of samples of downsampled data (N^(b) in the paper)
416 Mbs: the numbers of samples of shortened data (M^(b) in the paper)
417 Ks_Ke: start and end frequency indices of bands (K^(b)_s and K^(b)_e in the paper)
419 """
420 self.Nbs = np.array([], dtype=int)
421 self.Mbs = np.array([], dtype=int)
422 self.Ks_Ke = []
423 for b in range(self.number_of_bands):
424 dnow = self.durations[b]
425 fnow, dfnow = self.fb_dfb[b]
426 fnext, _ = self.fb_dfb[b + 1]
427 Nb = max(round_up_to_power_of_two(2. * (fnext * self.interferometers.duration + 1.)), 2**b)
428 self.Nbs = np.append(self.Nbs, Nb)
429 self.Mbs = np.append(self.Mbs, Nb // 2**b)
430 self.Ks_Ke.append([math.ceil((fnow - dfnow) * dnow), math.floor(fnext * dnow)])
431 self.Ks_Ke = np.array(self.Ks_Ke)
433 def _setup_waveform_frequency_points(self):
434 """Set up frequency points where waveforms are evaluated. Frequency points are reordered because some waveform
435 models raise an error if the input frequencies are not increasing. This adds frequency_points into the
436 waveform_arguments of waveform_generator. This sets the following instance variables.
438 banded_frequency_points: ndarray of total banded frequency points
439 start_end_idxs: list of tuples containing start and end indices of each band
440 unique_to_original_frequencies: indices converting unique frequency
441 points into the original duplicated banded frequencies
443 """
444 self.banded_frequency_points = np.array([])
445 self.start_end_idxs = []
446 start_idx = 0
447 for i in range(self.number_of_bands):
448 d = self.durations[i]
449 Ks, Ke = self.Ks_Ke[i]
450 self.banded_frequency_points = np.append(self.banded_frequency_points, np.arange(Ks, Ke + 1) / d)
451 end_idx = start_idx + Ke - Ks
452 self.start_end_idxs.append([start_idx, end_idx])
453 start_idx = end_idx + 1
454 self.start_end_idxs = np.array(self.start_end_idxs)
455 unique_frequencies, idxs = np.unique(self.banded_frequency_points, return_inverse=True)
456 self.waveform_generator.waveform_arguments['frequencies'] = unique_frequencies
457 self.unique_to_original_frequencies = idxs
458 logger.info("The number of frequency points where waveforms are evaluated is {}.".format(
459 len(unique_frequencies)))
460 logger.info("The speed-up gain of multi-banding is {}.".format(
461 (self.maximum_frequency - self.minimum_frequency) * self.interferometers.duration /
462 len(unique_frequencies)))
464 def _get_window_sequence(self, delta_f, start_idx, length, b):
465 """Compute window function on frequencies with a fixed frequency interval
467 Parameters
468 ----------
469 delta_f: float
470 frequency interval
471 start_idx: int
472 starting frequency per delta_f
473 length: int
474 number of frequencies
475 b: int
476 band number
478 Returns
479 -------
480 window_sequence: array
482 """
483 fnow, dfnow = self.fb_dfb[b]
484 fnext, dfnext = self.fb_dfb[b + 1]
486 window_sequence = np.zeros(length)
487 increase_start = np.clip(
488 math.floor((fnow - dfnow) / delta_f) - start_idx + 1, 0, length
489 )
490 unity_start = np.clip(math.ceil(fnow / delta_f) - start_idx, 0, length)
491 decrease_start = np.clip(
492 math.floor((fnext - dfnext) / delta_f) - start_idx + 1, 0, length
493 )
494 decrease_stop = np.clip(math.ceil(fnext / delta_f) - start_idx, 0, length)
496 window_sequence[unity_start:decrease_start] = 1.
498 # this if statement avoids overflow caused by vanishing dfnow
499 if increase_start < unity_start:
500 frequencies = (np.arange(increase_start, unity_start) + start_idx) * delta_f
501 window_sequence[increase_start:unity_start] = (
502 1. + np.cos(np.pi * (frequencies - fnow) / dfnow)
503 ) / 2.
505 if decrease_start < decrease_stop:
506 frequencies = (np.arange(decrease_start, decrease_stop) + start_idx) * delta_f
507 window_sequence[decrease_start:decrease_stop] = (
508 1. - np.cos(np.pi * (frequencies - fnext) / dfnext)
509 ) / 2.
511 return window_sequence
513 def _setup_linear_coefficients(self):
514 """Set up coefficients by which waveforms are multiplied to compute (d, h)"""
515 self.linear_coeffs = dict((ifo.name, np.array([])) for ifo in self.interferometers)
516 N = self.Nbs[-1]
517 for ifo in self.interferometers:
518 logger.info("Pre-computing linear coefficients for {}".format(ifo.name))
519 fddata = np.zeros(N // 2 + 1, dtype=complex)
520 fddata[:len(ifo.frequency_domain_strain)][ifo.frequency_mask[:len(fddata)]] += \
521 ifo.frequency_domain_strain[ifo.frequency_mask] / ifo.power_spectral_density_array[ifo.frequency_mask]
522 for b in range(self.number_of_bands):
523 Ks, Ke = self.Ks_Ke[b]
524 windows = self._get_window_sequence(1. / self.durations[b], Ks, Ke - Ks + 1, b)
525 fddata_in_ith_band = np.copy(fddata[:int(self.Nbs[b] / 2 + 1)])
526 fddata_in_ith_band[-1] = 0. # zeroing data at the Nyquist frequency
527 tddata = np.fft.irfft(fddata_in_ith_band)[-self.Mbs[b]:]
528 Ks, Ke = self.Ks_Ke[b]
529 fddata_in_ith_band = np.fft.rfft(tddata)[Ks:Ke + 1]
530 self.linear_coeffs[ifo.name] = np.append(
531 self.linear_coeffs[ifo.name], (4. / self.durations[b]) * windows * np.conj(fddata_in_ith_band))
533 def _setup_quadratic_coefficients_linear_interp(self):
534 """Set up coefficients by which the squares of waveforms are multiplied to compute (h, h) for the
535 linear-interpolation algorithm"""
536 logger.info("Linear-interpolation algorithm is used for (h, h).")
537 self.quadratic_coeffs = dict((ifo.name, np.array([])) for ifo in self.interferometers)
538 original_duration = self.interferometers.duration
540 for b in range(self.number_of_bands):
541 logger.info(f"Pre-computing quadratic coefficients for the {b}-th band")
542 _start, _end = self.start_end_idxs[b]
543 banded_frequencies = self.banded_frequency_points[_start:_end + 1]
544 prefactor = 4 * self.durations[b] / original_duration
546 # precompute window values
547 _fnow, _dfnow = self.fb_dfb[b]
548 _fnext, _ = self.fb_dfb[b + 1]
549 start_idx_in_band = math.ceil((_fnow - _dfnow) * original_duration)
550 window_sequence = self._get_window_sequence(
551 1 / original_duration,
552 start_idx_in_band,
553 math.floor(_fnext * original_duration) - start_idx_in_band + 1,
554 b
555 )
557 for ifo in self.interferometers:
558 end_idx_in_band = min(
559 start_idx_in_band + len(window_sequence) - 1,
560 len(ifo.power_spectral_density_array) - 1
561 )
562 _frequency_mask = ifo.frequency_mask[start_idx_in_band:end_idx_in_band + 1]
563 window_over_psd = np.zeros(end_idx_in_band + 1 - start_idx_in_band)
564 window_over_psd[_frequency_mask] = \
565 1. / ifo.power_spectral_density_array[start_idx_in_band:end_idx_in_band + 1][_frequency_mask]
566 window_over_psd *= window_sequence[:len(window_over_psd)]
568 coeffs = np.zeros(len(banded_frequencies))
569 for k in range(len(coeffs) - 1):
570 if k == 0:
571 start_idx_in_sum = start_idx_in_band
572 else:
573 start_idx_in_sum = max(
574 start_idx_in_band,
575 math.ceil(original_duration * banded_frequencies[k])
576 )
577 if k == len(coeffs) - 2:
578 end_idx_in_sum = end_idx_in_band
579 else:
580 end_idx_in_sum = min(
581 end_idx_in_band,
582 math.ceil(original_duration * banded_frequencies[k + 1]) - 1
583 )
584 frequencies_in_sum = np.arange(start_idx_in_sum, end_idx_in_sum + 1) / original_duration
585 coeffs[k] += prefactor * np.sum(
586 (banded_frequencies[k + 1] - frequencies_in_sum) *
587 window_over_psd[start_idx_in_sum - start_idx_in_band:end_idx_in_sum - start_idx_in_band + 1]
588 )
589 coeffs[k + 1] += prefactor * np.sum(
590 (frequencies_in_sum - banded_frequencies[k]) *
591 window_over_psd[start_idx_in_sum - start_idx_in_band:end_idx_in_sum - start_idx_in_band + 1]
592 )
593 self.quadratic_coeffs[ifo.name] = np.append(self.quadratic_coeffs[ifo.name], coeffs)
595 def _setup_quadratic_coefficients_ifft_fft(self):
596 """Set up coefficients needed for the IFFT-FFT algorithm to compute (h, h)"""
597 logger.info("IFFT-FFT algorithm is used for (h, h).")
598 N = self.Nbs[-1]
599 # variables defined below correspond to \hat{N}^(b), \hat{T}^(b), \tilde{I}^(b)_{c, k}, h^(b)_{c, m} and
600 # \sqrt{w^(b)(f^(b)_k)} \tilde{h}(f^(b)_k) in the paper
601 Nhatbs = [min(2 * Mb, Nb) for Mb, Nb in zip(self.Mbs, self.Nbs)]
602 self.Tbhats = [self.interferometers.duration * Nbhat / Nb for Nb, Nbhat in zip(self.Nbs, Nhatbs)]
603 self.Ibcs = dict((ifo.name, []) for ifo in self.interferometers)
604 self.hbcs = dict((ifo.name, []) for ifo in self.interferometers)
605 self.wths = dict((ifo.name, []) for ifo in self.interferometers)
606 for ifo in self.interferometers:
607 logger.info("Pre-computing quadratic coefficients for {}".format(ifo.name))
608 full_inv_psds = np.zeros(N // 2 + 1)
609 full_inv_psds[:len(ifo.power_spectral_density_array)][ifo.frequency_mask[:len(full_inv_psds)]] = (
610 1 / ifo.power_spectral_density_array[ifo.frequency_mask]
611 )
612 for b in range(self.number_of_bands):
613 Imb = np.fft.irfft(full_inv_psds[:self.Nbs[b] // 2 + 1])
614 half_length = Nhatbs[b] // 2
615 Imbc = np.append(Imb[:half_length + 1], Imb[-(Nhatbs[b] - half_length - 1):])
616 self.Ibcs[ifo.name].append(np.fft.rfft(Imbc))
617 # Allocate arrays for IFFT-FFT operations
618 self.hbcs[ifo.name].append(np.zeros(Nhatbs[b]))
619 self.wths[ifo.name].append(np.zeros(self.Mbs[b] // 2 + 1, dtype=complex))
620 # precompute windows and their squares
621 self.windows = np.array([])
622 self.square_root_windows = np.array([])
623 for b in range(self.number_of_bands):
624 Ks, Ke = self.Ks_Ke[b]
625 ws = self._get_window_sequence(1. / self.durations[b], Ks, Ke - Ks + 1, b)
626 self.windows = np.append(self.windows, ws)
627 self.square_root_windows = np.append(self.square_root_windows, np.sqrt(ws))
629 @property
630 def weights(self):
631 _weights = {}
632 for key in [
633 "reference_chirp_mass", "highest_mode", "linear_interpolation",
634 "accuracy_factor", "time_offset", "delta_f_end",
635 "maximum_banding_frequency", "minimum_banding_duration",
636 "durations", "fb_dfb", "Nbs", "Mbs", "Ks_Ke",
637 "banded_frequency_points", "start_end_idxs",
638 "unique_to_original_frequencies", "linear_coeffs"
639 ]:
640 _weights[key] = getattr(self, key)
641 _weights["waveform_frequencies"] = \
642 self.waveform_generator.waveform_arguments['frequencies']
643 if self.linear_interpolation:
644 _weights["quadratic_coeffs"] = self.quadratic_coeffs
645 else:
646 for key in ["Tbhats", "windows", "square_root_windows"]:
647 _weights[key] = getattr(self, key)
648 for key in ["wths", "hbcs", "Ibcs"]:
649 _weights[key] = {}
650 value = getattr(self, key)
651 for ifo_name, data in value.items():
652 _weights[key][ifo_name] = dict((str(b), v) for b, v in enumerate(data))
653 return _weights
655 def save_weights(self, filename):
656 """
657 Save multiband weights into a .hdf5 file.
659 Parameters
660 ==========
661 filename : str
663 """
664 import h5py
665 if not filename.endswith(".hdf5"):
666 filename += ".hdf5"
667 logger.info(f"Saving multiband weights to {filename}")
668 with h5py.File(filename, 'w') as f:
669 recursively_save_dict_contents_to_group(f, '/', self.weights)
671 def setup_multibanding_from_weights(self, weights):
672 """
673 Set multiband weights from dictionary-like weights
675 Parameters
676 ==========
677 weights : dict
679 """
680 keys = list(weights.keys())
681 # reference_chirp_mass needs to be set first as it is required for the setter of maximum_banding_frequency
682 self.reference_chirp_mass = weights["reference_chirp_mass"]
683 keys.remove("reference_chirp_mass")
684 for key in keys:
685 value = weights[key]
686 if key in ["wths", "hbcs", "Ibcs"]:
687 to_set = {}
688 for ifo_name, data in value.items():
689 to_set[ifo_name] = [data[str(b)] for b in range(len(data.keys()))]
690 setattr(self, key, to_set)
691 elif key == "waveform_frequencies":
692 self.waveform_generator.waveform_arguments['frequencies'] = weights["waveform_frequencies"]
693 else:
694 setattr(self, key, value)
696 def calculate_snrs(self, waveform_polarizations, interferometer, return_array=False):
697 """
698 Compute the snrs
700 Parameters
701 ----------
702 waveform_polarizations: dict
703 A dictionary of waveform polarizations and the corresponding array
704 interferometer: bilby.gw.detector.Interferometer
705 The bilby interferometer object
706 return_array: bool
707 If true, calculate and return internal array objects
708 (d_inner_h_array and optimal_snr_squared_array), otherwise
709 these are returned as None. This parameter is ignored for the multiband
710 model as these arrays are never calculated.
712 Returns
713 -------
714 calculated_snrs: _CalculatedSNRs
715 An object containing the SNR quantities.
717 """
718 strain = np.zeros(len(self.banded_frequency_points), dtype=complex)
719 for mode in waveform_polarizations:
720 response = interferometer.antenna_response(
721 self.parameters['ra'], self.parameters['dec'],
722 self.parameters['geocent_time'], self.parameters['psi'],
723 mode
724 )
725 strain += waveform_polarizations[mode][self.unique_to_original_frequencies] * response
727 dt = interferometer.time_delay_from_geocenter(
728 self.parameters['ra'], self.parameters['dec'],
729 self.parameters['geocent_time'])
730 dt_geocent = self.parameters['geocent_time'] - interferometer.strain_data.start_time
731 ifo_time = dt_geocent + dt
733 calib_factor = interferometer.calibration_model.get_calibration_factor(
734 self.banded_frequency_points, prefix='recalib_{}_'.format(interferometer.name), **self.parameters)
736 strain *= np.exp(-1j * 2. * np.pi * self.banded_frequency_points * ifo_time)
737 strain *= calib_factor
739 d_inner_h = np.conj(np.dot(strain, self.linear_coeffs[interferometer.name]))
741 if self.linear_interpolation:
742 optimal_snr_squared = np.vdot(
743 np.real(strain * np.conjugate(strain)),
744 self.quadratic_coeffs[interferometer.name]
745 )
746 else:
747 optimal_snr_squared = 0.
748 for b in range(self.number_of_bands):
749 Ks, Ke = self.Ks_Ke[b]
750 start_idx, end_idx = self.start_end_idxs[b]
751 Mb = self.Mbs[b]
752 if b == 0:
753 optimal_snr_squared += (4. / self.interferometers.duration) * np.vdot(
754 np.real(strain[start_idx:end_idx + 1] * np.conjugate(strain[start_idx:end_idx + 1])),
755 interferometer.frequency_mask[Ks:Ke + 1] * self.windows[start_idx:end_idx + 1]
756 / interferometer.power_spectral_density_array[Ks:Ke + 1])
757 else:
758 self.wths[interferometer.name][b][Ks:Ke + 1] = (
759 self.square_root_windows[start_idx:end_idx + 1] * strain[start_idx:end_idx + 1]
760 )
761 self.hbcs[interferometer.name][b][-Mb:] = np.fft.irfft(self.wths[interferometer.name][b])
762 thbc = np.fft.rfft(self.hbcs[interferometer.name][b])
763 optimal_snr_squared += (4. / self.Tbhats[b]) * np.vdot(
764 np.real(thbc * np.conjugate(thbc)), self.Ibcs[interferometer.name][b])
766 complex_matched_filter_snr = d_inner_h / (optimal_snr_squared**0.5)
768 return self._CalculatedSNRs(
769 d_inner_h=d_inner_h,
770 optimal_snr_squared=optimal_snr_squared.real,
771 complex_matched_filter_snr=complex_matched_filter_snr,
772 )
774 def _rescale_signal(self, signal, new_distance):
775 for mode in signal:
776 signal[mode] *= self._ref_dist / new_distance