Coverage for bilby/gw/detector/interferometer.py: 75%
242 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
1import os
3import numpy as np
4from bilby_cython.geometry import (
5 get_polarization_tensor,
6 three_by_three_matrix_contraction,
7 time_delay_from_geocenter,
8)
10from ...core import utils
11from ...core.utils import docstring, logger, PropertyAccessor, safe_file_dump
12from .. import utils as gwutils
13from .calibration import Recalibrate
14from .geometry import InterferometerGeometry
15from .strain_data import InterferometerStrainData
16from ..conversion import generate_all_bbh_parameters
19class Interferometer(object):
20 """Class for the Interferometer """
22 length = PropertyAccessor('geometry', 'length')
23 latitude = PropertyAccessor('geometry', 'latitude')
24 latitude_radians = PropertyAccessor('geometry', 'latitude_radians')
25 longitude = PropertyAccessor('geometry', 'longitude')
26 longitude_radians = PropertyAccessor('geometry', 'longitude_radians')
27 elevation = PropertyAccessor('geometry', 'elevation')
28 x = PropertyAccessor('geometry', 'x')
29 y = PropertyAccessor('geometry', 'y')
30 xarm_azimuth = PropertyAccessor('geometry', 'xarm_azimuth')
31 yarm_azimuth = PropertyAccessor('geometry', 'yarm_azimuth')
32 xarm_tilt = PropertyAccessor('geometry', 'xarm_tilt')
33 yarm_tilt = PropertyAccessor('geometry', 'yarm_tilt')
34 vertex = PropertyAccessor('geometry', 'vertex')
35 detector_tensor = PropertyAccessor('geometry', 'detector_tensor')
37 duration = PropertyAccessor('strain_data', 'duration')
38 sampling_frequency = PropertyAccessor('strain_data', 'sampling_frequency')
39 start_time = PropertyAccessor('strain_data', 'start_time')
40 frequency_array = PropertyAccessor('strain_data', 'frequency_array')
41 time_array = PropertyAccessor('strain_data', 'time_array')
42 minimum_frequency = PropertyAccessor('strain_data', 'minimum_frequency')
43 maximum_frequency = PropertyAccessor('strain_data', 'maximum_frequency')
44 frequency_mask = PropertyAccessor('strain_data', 'frequency_mask')
45 frequency_domain_strain = PropertyAccessor('strain_data', 'frequency_domain_strain')
46 time_domain_strain = PropertyAccessor('strain_data', 'time_domain_strain')
48 def __init__(self, name, power_spectral_density, minimum_frequency, maximum_frequency, length, latitude, longitude,
49 elevation, xarm_azimuth, yarm_azimuth, xarm_tilt=0., yarm_tilt=0., calibration_model=Recalibrate()):
50 """
51 Instantiate an Interferometer object.
53 Parameters
54 ==========
55 name: str
56 Interferometer name, e.g., H1.
57 power_spectral_density: bilby.gw.detector.PowerSpectralDensity
58 Power spectral density determining the sensitivity of the detector.
59 minimum_frequency: float
60 Minimum frequency to analyse for detector.
61 maximum_frequency: float
62 Maximum frequency to analyse for detector.
63 length: float
64 Length of the interferometer in km.
65 latitude: float
66 Latitude North in degrees (South is negative).
67 longitude: float
68 Longitude East in degrees (West is negative).
69 elevation: float
70 Height above surface in metres.
71 xarm_azimuth: float
72 Orientation of the x arm in degrees North of East.
73 yarm_azimuth: float
74 Orientation of the y arm in degrees North of East.
75 xarm_tilt: float, optional
76 Tilt of the x arm in radians above the horizontal defined by
77 ellipsoid earth model in LIGO-T980044-08.
78 yarm_tilt: float, optional
79 Tilt of the y arm in radians above the horizontal.
80 calibration_model: Recalibration
81 Calibration model, this applies the calibration correction to the
82 template, the default model applies no correction.
83 """
84 self.geometry = InterferometerGeometry(length, latitude, longitude, elevation,
85 xarm_azimuth, yarm_azimuth, xarm_tilt, yarm_tilt)
87 self.name = name
88 self.power_spectral_density = power_spectral_density
89 self.calibration_model = calibration_model
90 self.strain_data = InterferometerStrainData(
91 minimum_frequency=minimum_frequency,
92 maximum_frequency=maximum_frequency)
93 self.meta_data = dict(name=name)
95 def __eq__(self, other):
96 if self.name == other.name and \
97 self.geometry == other.geometry and \
98 self.power_spectral_density.__eq__(other.power_spectral_density) and \
99 self.calibration_model == other.calibration_model and \
100 self.strain_data == other.strain_data:
101 return True
102 return False
104 def __repr__(self):
105 return self.__class__.__name__ + '(name=\'{}\', power_spectral_density={}, minimum_frequency={}, ' \
106 'maximum_frequency={}, length={}, latitude={}, longitude={}, elevation={}, ' \
107 'xarm_azimuth={}, yarm_azimuth={}, xarm_tilt={}, yarm_tilt={})' \
108 .format(self.name, self.power_spectral_density, float(self.strain_data.minimum_frequency),
109 float(self.strain_data.maximum_frequency), float(self.geometry.length),
110 float(self.geometry.latitude), float(self.geometry.longitude),
111 float(self.geometry.elevation), float(self.geometry.xarm_azimuth),
112 float(self.geometry.yarm_azimuth), float(self.geometry.xarm_tilt),
113 float(self.geometry.yarm_tilt))
115 def set_strain_data_from_gwpy_timeseries(self, time_series):
116 """ Set the `Interferometer.strain_data` from a gwpy TimeSeries
118 Parameters
119 ==========
120 time_series: gwpy.timeseries.timeseries.TimeSeries
121 The data to set.
123 """
124 self.strain_data.set_from_gwpy_timeseries(time_series=time_series)
126 def set_strain_data_from_frequency_domain_strain(
127 self, frequency_domain_strain, sampling_frequency=None,
128 duration=None, start_time=0, frequency_array=None):
129 """ Set the `Interferometer.strain_data` from a numpy array
131 Parameters
132 ==========
133 frequency_domain_strain: array_like
134 The data to set.
135 sampling_frequency: float
136 The sampling frequency (in Hz).
137 duration: float
138 The data duration (in s).
139 start_time: float
140 The GPS start-time of the data.
141 frequency_array: array_like
142 The array of frequencies, if sampling_frequency and duration not
143 given.
145 """
146 self.strain_data.set_from_frequency_domain_strain(
147 frequency_domain_strain=frequency_domain_strain,
148 sampling_frequency=sampling_frequency, duration=duration,
149 start_time=start_time, frequency_array=frequency_array)
151 def set_strain_data_from_power_spectral_density(
152 self, sampling_frequency, duration, start_time=0):
153 """ Set the `Interferometer.strain_data` from a power spectal density
155 This uses the `interferometer.power_spectral_density` object to set
156 the `strain_data` to a noise realization. See
157 `bilby.gw.detector.InterferometerStrainData` for further information.
159 Parameters
160 ==========
161 sampling_frequency: float
162 The sampling frequency (in Hz)
163 duration: float
164 The data duration (in s)
165 start_time: float
166 The GPS start-time of the data
168 """
169 self.strain_data.set_from_power_spectral_density(
170 self.power_spectral_density, sampling_frequency=sampling_frequency,
171 duration=duration, start_time=start_time)
173 def set_strain_data_from_frame_file(
174 self, frame_file, sampling_frequency, duration, start_time=0,
175 channel=None, buffer_time=1):
176 """ Set the `Interferometer.strain_data` from a frame file
178 Parameters
179 ==========
180 frame_file: str
181 File from which to load data.
182 channel: str
183 Channel to read from frame.
184 sampling_frequency: float
185 The sampling frequency (in Hz)
186 duration: float
187 The data duration (in s)
188 start_time: float
189 The GPS start-time of the data
190 buffer_time: float
191 Read in data with `start_time-buffer_time` and
192 `start_time+duration+buffer_time`
194 """
195 self.strain_data.set_from_frame_file(
196 frame_file=frame_file, sampling_frequency=sampling_frequency,
197 duration=duration, start_time=start_time,
198 channel=channel, buffer_time=buffer_time)
200 def set_strain_data_from_channel_name(
201 self, channel, sampling_frequency, duration, start_time=0):
202 """
203 Set the `Interferometer.strain_data` by fetching from given channel
204 using strain_data.set_from_channel_name()
206 Parameters
207 ==========
208 channel: str
209 Channel to look for using gwpy in the format `IFO:Channel`
210 sampling_frequency: float
211 The sampling frequency (in Hz)
212 duration: float
213 The data duration (in s)
214 start_time: float
215 The GPS start-time of the data
217 """
218 self.strain_data.set_from_channel_name(
219 channel=channel, sampling_frequency=sampling_frequency,
220 duration=duration, start_time=start_time)
222 def set_strain_data_from_csv(self, filename):
223 """ Set the `Interferometer.strain_data` from a csv file
225 Parameters
226 ==========
227 filename: str
228 The path to the file to read in
230 """
231 self.strain_data.set_from_csv(filename)
233 def set_strain_data_from_zero_noise(
234 self, sampling_frequency, duration, start_time=0):
235 """ Set the `Interferometer.strain_data` to zero noise
237 Parameters
238 ==========
239 sampling_frequency: float
240 The sampling frequency (in Hz)
241 duration: float
242 The data duration (in s)
243 start_time: float
244 The GPS start-time of the data
246 """
248 self.strain_data.set_from_zero_noise(
249 sampling_frequency=sampling_frequency, duration=duration,
250 start_time=start_time)
252 def antenna_response(self, ra, dec, time, psi, mode):
253 """
254 Calculate the antenna response function for a given sky location
256 See Nishizawa et al. (2009) arXiv:0903.0528 for definitions of the polarisation tensors.
257 [u, v, w] represent the Earth-frame
258 [m, n, omega] represent the wave-frame
259 Note: there is a typo in the definition of the wave-frame in Nishizawa et al.
261 Parameters
262 ==========
263 ra: float
264 right ascension in radians
265 dec: float
266 declination in radians
267 time: float
268 geocentric GPS time
269 psi: float
270 binary polarisation angle counter-clockwise about the direction of propagation
271 mode: str
272 polarisation mode (e.g. 'plus', 'cross') or the name of a specific detector.
273 If mode == self.name, return 1
275 Returns
276 =======
277 float: The antenna response for the specified mode and time/location
279 """
280 if mode in ["plus", "cross", "x", "y", "breathing", "longitudinal"]:
281 polarization_tensor = get_polarization_tensor(ra, dec, time, psi, mode)
282 return three_by_three_matrix_contraction(self.geometry.detector_tensor, polarization_tensor)
283 elif mode == self.name:
284 return 1
285 else:
286 return 0
288 def get_detector_response(self, waveform_polarizations, parameters, frequencies=None):
289 """ Get the detector response for a particular waveform
291 Parameters
292 ==========
293 waveform_polarizations: dict
294 polarizations of the waveform
295 parameters: dict
296 parameters describing position and time of arrival of the signal
297 frequencies: array-like, optional
298 The frequency values to evaluate the response at. If
299 not provided, the response is computed using
300 :code:`self.frequency_array`. If the frequencies are
301 specified, no frequency masking is performed.
302 Returns
303 =======
304 array_like: A 3x3 array representation of the detector response (signal observed in the interferometer)
305 """
306 if frequencies is None:
307 frequencies = self.frequency_array[self.frequency_mask]
308 mask = self.frequency_mask
309 else:
310 mask = np.ones(len(frequencies), dtype=bool)
312 signal = {}
313 for mode in waveform_polarizations.keys():
314 det_response = self.antenna_response(
315 parameters['ra'],
316 parameters['dec'],
317 parameters['geocent_time'],
318 parameters['psi'], mode)
320 signal[mode] = waveform_polarizations[mode] * det_response
321 signal_ifo = sum(signal.values()) * mask
323 time_shift = self.time_delay_from_geocenter(
324 parameters['ra'], parameters['dec'], parameters['geocent_time'])
326 # Be careful to first subtract the two GPS times which are ~1e9 sec.
327 # And then add the time_shift which varies at ~1e-5 sec
328 dt_geocent = parameters['geocent_time'] - self.strain_data.start_time
329 dt = dt_geocent + time_shift
331 signal_ifo[mask] = signal_ifo[mask] * np.exp(-1j * 2 * np.pi * dt * frequencies)
333 signal_ifo[mask] *= self.calibration_model.get_calibration_factor(
334 frequencies, prefix='recalib_{}_'.format(self.name), **parameters
335 )
337 return signal_ifo
339 def check_signal_duration(self, parameters, raise_error=True):
340 """ Check that the signal with the given parameters fits in the data
342 Parameters
343 ==========
344 parameters: dict
345 A dictionary of the injection parameters
346 raise_error: bool
347 If True, raise an error in the signal does not fit. Otherwise, print
348 a warning message.
349 """
350 try:
351 parameters = generate_all_bbh_parameters(parameters)
352 except AttributeError:
353 logger.debug(
354 "generate_all_bbh_parameters parameters failed during check_signal_duration"
355 )
356 return
358 if ("mass_1" not in parameters) and ("mass_2" not in parameters):
359 if raise_error:
360 raise AttributeError("Unable to check signal duration as mass not given")
361 else:
362 return
364 # Calculate the time to merger
365 deltaT = gwutils.calculate_time_to_merger(
366 frequency=self.minimum_frequency,
367 mass_1=parameters["mass_1"],
368 mass_2=parameters["mass_2"],
369 )
370 deltaT = np.round(deltaT, 1)
371 if deltaT > self.duration:
372 msg = (
373 f"The injected signal has a duration in-band of {deltaT}s, but "
374 f"the data for detector {self.name} has a duration of {self.duration}s"
375 )
376 if raise_error:
377 raise ValueError(msg)
378 else:
379 logger.warning(msg)
381 def inject_signal(self, parameters, injection_polarizations=None,
382 waveform_generator=None, raise_error=True):
383 """ General signal injection method.
384 Provide the injection parameters and either the injection polarizations
385 or the waveform generator to inject a signal into the detector.
386 Defaults to the injection polarizations is both are given.
388 Parameters
389 ==========
390 parameters: dict
391 Parameters of the injection.
392 injection_polarizations: dict, optional
393 Polarizations of waveform to inject, output of
394 `waveform_generator.frequency_domain_strain()`. If
395 `waveform_generator` is also given, the injection_polarizations will
396 be calculated directly and this argument can be ignored.
397 waveform_generator: bilby.gw.waveform_generator.WaveformGenerator, optional
398 A WaveformGenerator instance using the source model to inject. If
399 `injection_polarizations` is given, this will be ignored.
400 raise_error: bool
401 If true, raise an error if the injected signal has a duration
402 longer than the data duration. If False, a warning will be printed
403 instead.
405 Notes
406 =====
407 if your signal takes a substantial amount of time to generate, or
408 you experience buggy behaviour. It is preferable to provide the
409 injection_polarizations directly.
411 Returns
412 =======
413 injection_polarizations: dict
414 The injected polarizations. This is the same as the injection_polarizations parameters
415 if it was passed in. Otherwise it is the return value of waveform_generator.frequency_domain_strain().
417 """
418 self.check_signal_duration(parameters, raise_error)
420 if injection_polarizations is None and waveform_generator is None:
421 raise ValueError(
422 "inject_signal needs one of waveform_generator or "
423 "injection_polarizations.")
424 elif injection_polarizations is not None:
425 self.inject_signal_from_waveform_polarizations(parameters=parameters,
426 injection_polarizations=injection_polarizations)
427 elif waveform_generator is not None:
428 injection_polarizations = self.inject_signal_from_waveform_generator(parameters=parameters,
429 waveform_generator=waveform_generator)
430 return injection_polarizations
432 def inject_signal_from_waveform_generator(self, parameters, waveform_generator):
433 """ Inject a signal using a waveform generator and a set of parameters.
434 Alternative to `inject_signal` and `inject_signal_from_waveform_polarizations`
436 Parameters
437 ==========
438 parameters: dict
439 Parameters of the injection.
440 waveform_generator: bilby.gw.waveform_generator.WaveformGenerator
441 A WaveformGenerator instance using the source model to inject.
443 Notes
444 =====
445 if your signal takes a substantial amount of time to generate, or
446 you experience buggy behaviour. It is preferable to use the
447 inject_signal_from_waveform_polarizations() method.
449 Returns
450 =======
451 injection_polarizations: dict
452 The internally generated injection parameters
454 """
455 injection_polarizations = \
456 waveform_generator.frequency_domain_strain(parameters)
457 self.inject_signal_from_waveform_polarizations(parameters=parameters,
458 injection_polarizations=injection_polarizations)
459 return injection_polarizations
461 def inject_signal_from_waveform_polarizations(self, parameters, injection_polarizations):
462 """ Inject a signal into the detector from a dict of waveform polarizations.
463 Alternative to `inject_signal` and `inject_signal_from_waveform_generator`.
465 Parameters
466 ==========
467 parameters: dict
468 Parameters of the injection.
469 injection_polarizations: dict
470 Polarizations of waveform to inject, output of
471 `waveform_generator.frequency_domain_strain()`.
473 """
474 if not self.strain_data.time_within_data(parameters['geocent_time']):
475 logger.warning(
476 'Injecting signal outside segment, start_time={}, merger time={}.'
477 .format(self.strain_data.start_time, parameters['geocent_time']))
479 signal_ifo = self.get_detector_response(injection_polarizations, parameters)
480 self.strain_data.frequency_domain_strain += signal_ifo
482 self.meta_data['optimal_SNR'] = (
483 np.sqrt(self.optimal_snr_squared(signal=signal_ifo)).real)
484 self.meta_data['matched_filter_SNR'] = (
485 self.matched_filter_snr(signal=signal_ifo))
486 self.meta_data['parameters'] = parameters
488 logger.info("Injected signal in {}:".format(self.name))
489 logger.info(" optimal SNR = {:.2f}".format(self.meta_data['optimal_SNR']))
490 logger.info(" matched filter SNR = {:.2f}".format(self.meta_data['matched_filter_SNR']))
491 for key in parameters:
492 logger.info(' {} = {}'.format(key, parameters[key]))
494 @property
495 def amplitude_spectral_density_array(self):
496 """ Returns the amplitude spectral density (ASD) given we know a power spectral density (PSD)
498 Returns
499 =======
500 array_like: An array representation of the ASD
502 """
503 return (
504 self.power_spectral_density.get_amplitude_spectral_density_array(
505 frequency_array=self.strain_data.frequency_array) *
506 self.strain_data.window_factor**0.5)
508 @property
509 def power_spectral_density_array(self):
510 """ Returns the power spectral density (PSD)
512 This accounts for whether the data in the interferometer has been windowed.
514 Returns
515 =======
516 array_like: An array representation of the PSD
518 """
519 return (
520 self.power_spectral_density.get_power_spectral_density_array(
521 frequency_array=self.strain_data.frequency_array) *
522 self.strain_data.window_factor)
524 def unit_vector_along_arm(self, arm):
525 logger.warning("This method has been moved and will be removed in the future."
526 "Use Interferometer.geometry.unit_vector_along_arm instead.")
527 return self.geometry.unit_vector_along_arm(arm)
529 def time_delay_from_geocenter(self, ra, dec, time):
530 """
531 Calculate the time delay from the geocenter for the interferometer.
533 Use the time delay function from utils.
535 Parameters
536 ==========
537 ra: float
538 right ascension of source in radians
539 dec: float
540 declination of source in radians
541 time: float
542 GPS time
544 Returns
545 =======
546 float: The time delay from geocenter in seconds
547 """
548 return time_delay_from_geocenter(self.geometry.vertex, ra, dec, time)
550 def vertex_position_geocentric(self):
551 """
552 Calculate the position of the IFO vertex in geocentric coordinates in meters.
554 Based on arXiv:gr-qc/0008066 Eqs. B11-B13 except for the typo in the definition of the local radius.
555 See Section 2.1 of LIGO-T980044-10 for the correct expression
557 Returns
558 =======
559 array_like: A 3D array representation of the vertex
560 """
561 return gwutils.get_vertex_position_geocentric(self.geometry.latitude_radians,
562 self.geometry.longitude_radians,
563 self.geometry.elevation)
565 def optimal_snr_squared(self, signal):
566 """
568 Parameters
569 ==========
570 signal: array_like
571 Array containing the signal
573 Returns
574 =======
575 float: The optimal signal to noise ratio possible squared
576 """
577 return gwutils.optimal_snr_squared(
578 signal=signal[self.strain_data.frequency_mask],
579 power_spectral_density=self.power_spectral_density_array[self.strain_data.frequency_mask],
580 duration=self.strain_data.duration)
582 def inner_product(self, signal):
583 """
585 Parameters
586 ==========
587 signal: array_like
588 Array containing the signal
590 Returns
591 =======
592 float: The optimal signal to noise ratio possible squared
593 """
594 return gwutils.noise_weighted_inner_product(
595 aa=signal[self.strain_data.frequency_mask],
596 bb=self.strain_data.frequency_domain_strain[self.strain_data.frequency_mask],
597 power_spectral_density=self.power_spectral_density_array[self.strain_data.frequency_mask],
598 duration=self.strain_data.duration)
600 def template_template_inner_product(self, signal_1, signal_2):
601 """A noise weighted inner product between two templates, using this ifo's PSD.
603 Parameters
604 ==========
605 signal_1 : array_like
606 An array containing the first signal
607 signal_2 : array_like
608 an array containing the second signal
610 Returns
611 =======
612 float: The noise weighted inner product of the two templates
613 """
614 return gwutils.noise_weighted_inner_product(
615 aa=signal_1[self.strain_data.frequency_mask],
616 bb=signal_2[self.strain_data.frequency_mask],
617 power_spectral_density=self.power_spectral_density_array[self.strain_data.frequency_mask],
618 duration=self.strain_data.duration)
620 def matched_filter_snr(self, signal):
621 """
623 Parameters
624 ==========
625 signal: array_like
626 Array containing the signal
628 Returns
629 =======
630 complex: The matched filter signal to noise ratio
632 """
633 return gwutils.matched_filter_snr(
634 signal=signal[self.strain_data.frequency_mask],
635 frequency_domain_strain=self.strain_data.frequency_domain_strain[self.strain_data.frequency_mask],
636 power_spectral_density=self.power_spectral_density_array[self.strain_data.frequency_mask],
637 duration=self.strain_data.duration)
639 def whiten_frequency_series(self, frequency_series : np.array) -> np.array:
640 """Whitens a frequency series with the noise properties of the detector
642 .. math::
643 \\tilde{a}_w(f) = \\tilde{a}(f) \\sqrt{\\frac{4}{T S_n(f)}}
645 Such that
647 .. math::
648 Var(n) = \\frac{1}{N} \\sum_{k=0}^N n_W(f_k)n_W^*(f_k) = 2
650 Where the factor of two is due to the independent real and imaginary
651 components.
653 Parameters
654 ==========
655 frequency_series : np.array
656 The frequency series, whitened by the ASD
657 """
658 return frequency_series / (self.amplitude_spectral_density_array * np.sqrt(self.duration / 4))
660 def get_whitened_time_series_from_whitened_frequency_series(
661 self,
662 whitened_frequency_series : np.array
663 ) -> np.array:
664 """Gets the whitened time series from a whitened frequency series.
666 This ifft's and also applies a windowing factor,
667 since when f_min and f_max are set bilby applies a mask to the series.
669 Per 6.2a-b in https://arxiv.org/pdf/gr-qc/0509116 since our window
670 is just a band pass,
671 this coefficient is :math:`w/W` where
673 .. math::
675 W = \\frac{1}{N} \\sum_{k=0}^N w^2[j]
677 Since our window :math:`w` is simply 1 or 0, depending on the mask, we get
679 .. math::
681 W = \\frac{1}{N} \\sum_{k=0}^N \\Theta(f_{max} - f_k)\\Theta(f_k - f_{min})
683 and accordingly the termwise window factor is
685 .. math::
686 w = \\sqrt{N W} = \\sqrt{\\sum_{k=0}^N \\Theta(f_{max} - f_k)\\Theta(f_k - f_{min})}
688 """
689 frequency_window_factor = (
690 np.sum(self.frequency_mask)
691 / len(self.frequency_mask)
692 )
694 whitened_time_series = (
695 np.fft.irfft(whitened_frequency_series)
696 * np.sqrt(np.sum(self.frequency_mask)) / frequency_window_factor
697 )
699 return whitened_time_series
701 @property
702 def whitened_frequency_domain_strain(self):
703 r"""Whitens the frequency domain data by dividing through by ASD,
704 with appropriate normalization.
706 See `whiten_frequency_series()` for details.
708 Returns
709 =======
710 array_like: The whitened data
711 """
712 return self.whiten_frequency_series(self.strain_data.frequency_domain_strain)
714 @property
715 def whitened_time_domain_strain(self) -> np.array:
716 """Calculates the whitened time domain strain
717 by iffting the whitened frequency domain strain,
718 with the appropriate normalization.
720 See `get_whitened_time_series_from_whitened_frequency_series()` for details
722 Returns
723 =======
724 array_like
725 The whitened data in the time domain
726 """
727 return self.get_whitened_time_series_from_whitened_frequency_series(self.whitened_frequency_domain_strain)
729 def save_data(self, outdir, label=None):
730 """ Creates save files for interferometer data in plain text format.
732 Saves two files: the frequency domain strain data with three columns [f, real part of h(f),
733 imaginary part of h(f)], and the amplitude spectral density with two columns [f, ASD(f)].
735 Note that in v1.3.0 and below, the ASD was saved in a file called *_psd.dat.
737 Parameters
738 ==========
739 outdir: str
740 The output directory in which the data is supposed to be saved
741 label: str
742 The name of the output files
743 """
745 if label is None:
746 filename_asd = '{}/{}_asd.dat'.format(outdir, self.name)
747 filename_data = '{}/{}_frequency_domain_data.dat'.format(outdir, self.name)
748 else:
749 filename_asd = '{}/{}_{}_asd.dat'.format(outdir, self.name, label)
750 filename_data = '{}/{}_{}_frequency_domain_data.dat'.format(outdir, self.name, label)
751 np.savetxt(filename_data,
752 np.array(
753 [self.strain_data.frequency_array,
754 self.strain_data.frequency_domain_strain.real,
755 self.strain_data.frequency_domain_strain.imag]).T,
756 header='f real_h(f) imag_h(f)')
757 np.savetxt(filename_asd,
758 np.array(
759 [self.strain_data.frequency_array,
760 self.amplitude_spectral_density_array]).T,
761 header='f h(f)')
763 def plot_data(self, signal=None, outdir='.', label=None):
764 import matplotlib.pyplot as plt
765 if utils.command_line_args.bilby_test_mode:
766 return
768 fig, ax = plt.subplots()
769 df = self.strain_data.frequency_array[1] - self.strain_data.frequency_array[0]
770 asd = gwutils.asd_from_freq_series(
771 freq_data=self.strain_data.frequency_domain_strain, df=df)
773 ax.loglog(self.strain_data.frequency_array[self.strain_data.frequency_mask],
774 asd[self.strain_data.frequency_mask],
775 color='C0', label=self.name)
776 ax.loglog(self.strain_data.frequency_array[self.strain_data.frequency_mask],
777 self.amplitude_spectral_density_array[self.strain_data.frequency_mask],
778 color='C1', lw=1.0, label=self.name + ' ASD')
779 if signal is not None:
780 signal_asd = gwutils.asd_from_freq_series(
781 freq_data=signal, df=df)
783 ax.loglog(self.strain_data.frequency_array[self.strain_data.frequency_mask],
784 signal_asd[self.strain_data.frequency_mask],
785 color='C2',
786 label='Signal')
787 ax.grid(True)
788 ax.set_ylabel(r'Strain [strain/$\sqrt{\rm Hz}$]')
789 ax.set_xlabel(r'Frequency [Hz]')
790 ax.legend(loc='best')
791 fig.tight_layout()
792 if label is None:
793 fig.savefig(
794 '{}/{}_frequency_domain_data.png'.format(outdir, self.name))
795 else:
796 fig.savefig(
797 '{}/{}_{}_frequency_domain_data.png'.format(
798 outdir, self.name, label))
799 plt.close(fig)
801 def plot_time_domain_data(
802 self, outdir='.', label=None, bandpass_frequencies=(50, 250),
803 notches=None, start_end=None, t0=None):
804 """ Plots the strain data in the time domain
806 Parameters
807 ==========
808 outdir, label: str
809 Used in setting the saved filename.
810 bandpass: tuple, optional
811 A tuple of the (low, high) frequencies to use when bandpassing the
812 data, if None no bandpass is applied.
813 notches: list, optional
814 A list of frequencies specifying any lines to notch
815 start_end: tuple
816 A tuple of the (start, end) range of GPS times to plot
817 t0: float
818 If given, the reference time to subtract from the time series before
819 plotting.
821 """
822 import matplotlib.pyplot as plt
823 from gwpy.timeseries import TimeSeries
824 from gwpy.signal.filter_design import bandpass, concatenate_zpks, notch
826 # We use the gwpy timeseries to perform bandpass and notching
827 if notches is None:
828 notches = list()
829 timeseries = TimeSeries(
830 data=self.strain_data.time_domain_strain, times=self.strain_data.time_array)
831 zpks = []
832 if bandpass_frequencies is not None:
833 zpks.append(bandpass(
834 bandpass_frequencies[0], bandpass_frequencies[1],
835 self.strain_data.sampling_frequency))
836 if notches is not None:
837 for line in notches:
838 zpks.append(notch(
839 line, self.strain_data.sampling_frequency))
840 if len(zpks) > 0:
841 zpk = concatenate_zpks(*zpks)
842 strain = timeseries.filter(zpk, filtfilt=False)
843 else:
844 strain = timeseries
846 fig, ax = plt.subplots()
848 if t0:
849 x = self.strain_data.time_array - t0
850 xlabel = 'GPS time [s] - {}'.format(t0)
851 else:
852 x = self.strain_data.time_array
853 xlabel = 'GPS time [s]'
855 ax.plot(x, strain)
856 ax.set_xlabel(xlabel)
857 ax.set_ylabel('Strain')
859 if start_end is not None:
860 ax.set_xlim(*start_end)
862 fig.tight_layout()
864 if label is None:
865 fig.savefig(
866 '{}/{}_time_domain_data.png'.format(outdir, self.name))
867 else:
868 fig.savefig(
869 '{}/{}_{}_time_domain_data.png'.format(outdir, self.name, label))
870 plt.close(fig)
872 @staticmethod
873 def _filename_from_outdir_label_extension(outdir, label, extension="h5"):
874 return os.path.join(outdir, label + f'.{extension}')
876 _save_ifo_docstring = """ Save the object to a {format} file
878 {extra}
880 Attributes
881 ==========
882 outdir: str, optional
883 Output directory name of the file, defaults to 'outdir'.
884 label: str, optional
885 Output file name, is self.name if not given otherwise.
886 """
888 _load_docstring = """ Loads in an Interferometer object from a {format} file
890 Parameters
891 ==========
892 filename: str
893 If given, try to load from this filename
895 """
897 @docstring(_save_ifo_docstring.format(
898 format="pickle", extra=".. versionadded:: 1.1.0"
899 ))
900 def to_pickle(self, outdir="outdir", label=None):
901 utils.check_directory_exists_and_if_not_mkdir('outdir')
902 filename = self._filename_from_outdir_label_extension(outdir, label, extension="pkl")
903 safe_file_dump(self, filename, "dill")
905 @classmethod
906 @docstring(_load_docstring.format(format="pickle"))
907 def from_pickle(cls, filename=None):
908 import dill
909 with open(filename, "rb") as ff:
910 res = dill.load(ff)
911 if res.__class__ != cls:
912 raise TypeError('The loaded object is not an Interferometer')
913 return res