Coverage for bilby/gw/detector/interferometer.py: 75%

242 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-05-06 04:57 +0000

1import os 

2 

3import numpy as np 

4from bilby_cython.geometry import ( 

5 get_polarization_tensor, 

6 three_by_three_matrix_contraction, 

7 time_delay_from_geocenter, 

8) 

9 

10from ...core import utils 

11from ...core.utils import docstring, logger, PropertyAccessor, safe_file_dump 

12from .. import utils as gwutils 

13from .calibration import Recalibrate 

14from .geometry import InterferometerGeometry 

15from .strain_data import InterferometerStrainData 

16from ..conversion import generate_all_bbh_parameters 

17 

18 

19class Interferometer(object): 

20 """Class for the Interferometer """ 

21 

22 length = PropertyAccessor('geometry', 'length') 

23 latitude = PropertyAccessor('geometry', 'latitude') 

24 latitude_radians = PropertyAccessor('geometry', 'latitude_radians') 

25 longitude = PropertyAccessor('geometry', 'longitude') 

26 longitude_radians = PropertyAccessor('geometry', 'longitude_radians') 

27 elevation = PropertyAccessor('geometry', 'elevation') 

28 x = PropertyAccessor('geometry', 'x') 

29 y = PropertyAccessor('geometry', 'y') 

30 xarm_azimuth = PropertyAccessor('geometry', 'xarm_azimuth') 

31 yarm_azimuth = PropertyAccessor('geometry', 'yarm_azimuth') 

32 xarm_tilt = PropertyAccessor('geometry', 'xarm_tilt') 

33 yarm_tilt = PropertyAccessor('geometry', 'yarm_tilt') 

34 vertex = PropertyAccessor('geometry', 'vertex') 

35 detector_tensor = PropertyAccessor('geometry', 'detector_tensor') 

36 

37 duration = PropertyAccessor('strain_data', 'duration') 

38 sampling_frequency = PropertyAccessor('strain_data', 'sampling_frequency') 

39 start_time = PropertyAccessor('strain_data', 'start_time') 

40 frequency_array = PropertyAccessor('strain_data', 'frequency_array') 

41 time_array = PropertyAccessor('strain_data', 'time_array') 

42 minimum_frequency = PropertyAccessor('strain_data', 'minimum_frequency') 

43 maximum_frequency = PropertyAccessor('strain_data', 'maximum_frequency') 

44 frequency_mask = PropertyAccessor('strain_data', 'frequency_mask') 

45 frequency_domain_strain = PropertyAccessor('strain_data', 'frequency_domain_strain') 

46 time_domain_strain = PropertyAccessor('strain_data', 'time_domain_strain') 

47 

48 def __init__(self, name, power_spectral_density, minimum_frequency, maximum_frequency, length, latitude, longitude, 

49 elevation, xarm_azimuth, yarm_azimuth, xarm_tilt=0., yarm_tilt=0., calibration_model=Recalibrate()): 

50 """ 

51 Instantiate an Interferometer object. 

52 

53 Parameters 

54 ========== 

55 name: str 

56 Interferometer name, e.g., H1. 

57 power_spectral_density: bilby.gw.detector.PowerSpectralDensity 

58 Power spectral density determining the sensitivity of the detector. 

59 minimum_frequency: float 

60 Minimum frequency to analyse for detector. 

61 maximum_frequency: float 

62 Maximum frequency to analyse for detector. 

63 length: float 

64 Length of the interferometer in km. 

65 latitude: float 

66 Latitude North in degrees (South is negative). 

67 longitude: float 

68 Longitude East in degrees (West is negative). 

69 elevation: float 

70 Height above surface in metres. 

71 xarm_azimuth: float 

72 Orientation of the x arm in degrees North of East. 

73 yarm_azimuth: float 

74 Orientation of the y arm in degrees North of East. 

75 xarm_tilt: float, optional 

76 Tilt of the x arm in radians above the horizontal defined by 

77 ellipsoid earth model in LIGO-T980044-08. 

78 yarm_tilt: float, optional 

79 Tilt of the y arm in radians above the horizontal. 

80 calibration_model: Recalibration 

81 Calibration model, this applies the calibration correction to the 

82 template, the default model applies no correction. 

83 """ 

84 self.geometry = InterferometerGeometry(length, latitude, longitude, elevation, 

85 xarm_azimuth, yarm_azimuth, xarm_tilt, yarm_tilt) 

86 

87 self.name = name 

88 self.power_spectral_density = power_spectral_density 

89 self.calibration_model = calibration_model 

90 self.strain_data = InterferometerStrainData( 

91 minimum_frequency=minimum_frequency, 

92 maximum_frequency=maximum_frequency) 

93 self.meta_data = dict(name=name) 

94 

95 def __eq__(self, other): 

96 if self.name == other.name and \ 

97 self.geometry == other.geometry and \ 

98 self.power_spectral_density.__eq__(other.power_spectral_density) and \ 

99 self.calibration_model == other.calibration_model and \ 

100 self.strain_data == other.strain_data: 

101 return True 

102 return False 

103 

104 def __repr__(self): 

105 return self.__class__.__name__ + '(name=\'{}\', power_spectral_density={}, minimum_frequency={}, ' \ 

106 'maximum_frequency={}, length={}, latitude={}, longitude={}, elevation={}, ' \ 

107 'xarm_azimuth={}, yarm_azimuth={}, xarm_tilt={}, yarm_tilt={})' \ 

108 .format(self.name, self.power_spectral_density, float(self.strain_data.minimum_frequency), 

109 float(self.strain_data.maximum_frequency), float(self.geometry.length), 

110 float(self.geometry.latitude), float(self.geometry.longitude), 

111 float(self.geometry.elevation), float(self.geometry.xarm_azimuth), 

112 float(self.geometry.yarm_azimuth), float(self.geometry.xarm_tilt), 

113 float(self.geometry.yarm_tilt)) 

114 

115 def set_strain_data_from_gwpy_timeseries(self, time_series): 

116 """ Set the `Interferometer.strain_data` from a gwpy TimeSeries 

117 

118 Parameters 

119 ========== 

120 time_series: gwpy.timeseries.timeseries.TimeSeries 

121 The data to set. 

122 

123 """ 

124 self.strain_data.set_from_gwpy_timeseries(time_series=time_series) 

125 

126 def set_strain_data_from_frequency_domain_strain( 

127 self, frequency_domain_strain, sampling_frequency=None, 

128 duration=None, start_time=0, frequency_array=None): 

129 """ Set the `Interferometer.strain_data` from a numpy array 

130 

131 Parameters 

132 ========== 

133 frequency_domain_strain: array_like 

134 The data to set. 

135 sampling_frequency: float 

136 The sampling frequency (in Hz). 

137 duration: float 

138 The data duration (in s). 

139 start_time: float 

140 The GPS start-time of the data. 

141 frequency_array: array_like 

142 The array of frequencies, if sampling_frequency and duration not 

143 given. 

144 

145 """ 

146 self.strain_data.set_from_frequency_domain_strain( 

147 frequency_domain_strain=frequency_domain_strain, 

148 sampling_frequency=sampling_frequency, duration=duration, 

149 start_time=start_time, frequency_array=frequency_array) 

150 

151 def set_strain_data_from_power_spectral_density( 

152 self, sampling_frequency, duration, start_time=0): 

153 """ Set the `Interferometer.strain_data` from a power spectal density 

154 

155 This uses the `interferometer.power_spectral_density` object to set 

156 the `strain_data` to a noise realization. See 

157 `bilby.gw.detector.InterferometerStrainData` for further information. 

158 

159 Parameters 

160 ========== 

161 sampling_frequency: float 

162 The sampling frequency (in Hz) 

163 duration: float 

164 The data duration (in s) 

165 start_time: float 

166 The GPS start-time of the data 

167 

168 """ 

169 self.strain_data.set_from_power_spectral_density( 

170 self.power_spectral_density, sampling_frequency=sampling_frequency, 

171 duration=duration, start_time=start_time) 

172 

173 def set_strain_data_from_frame_file( 

174 self, frame_file, sampling_frequency, duration, start_time=0, 

175 channel=None, buffer_time=1): 

176 """ Set the `Interferometer.strain_data` from a frame file 

177 

178 Parameters 

179 ========== 

180 frame_file: str 

181 File from which to load data. 

182 channel: str 

183 Channel to read from frame. 

184 sampling_frequency: float 

185 The sampling frequency (in Hz) 

186 duration: float 

187 The data duration (in s) 

188 start_time: float 

189 The GPS start-time of the data 

190 buffer_time: float 

191 Read in data with `start_time-buffer_time` and 

192 `start_time+duration+buffer_time` 

193 

194 """ 

195 self.strain_data.set_from_frame_file( 

196 frame_file=frame_file, sampling_frequency=sampling_frequency, 

197 duration=duration, start_time=start_time, 

198 channel=channel, buffer_time=buffer_time) 

199 

200 def set_strain_data_from_channel_name( 

201 self, channel, sampling_frequency, duration, start_time=0): 

202 """ 

203 Set the `Interferometer.strain_data` by fetching from given channel 

204 using strain_data.set_from_channel_name() 

205 

206 Parameters 

207 ========== 

208 channel: str 

209 Channel to look for using gwpy in the format `IFO:Channel` 

210 sampling_frequency: float 

211 The sampling frequency (in Hz) 

212 duration: float 

213 The data duration (in s) 

214 start_time: float 

215 The GPS start-time of the data 

216 

217 """ 

218 self.strain_data.set_from_channel_name( 

219 channel=channel, sampling_frequency=sampling_frequency, 

220 duration=duration, start_time=start_time) 

221 

222 def set_strain_data_from_csv(self, filename): 

223 """ Set the `Interferometer.strain_data` from a csv file 

224 

225 Parameters 

226 ========== 

227 filename: str 

228 The path to the file to read in 

229 

230 """ 

231 self.strain_data.set_from_csv(filename) 

232 

233 def set_strain_data_from_zero_noise( 

234 self, sampling_frequency, duration, start_time=0): 

235 """ Set the `Interferometer.strain_data` to zero noise 

236 

237 Parameters 

238 ========== 

239 sampling_frequency: float 

240 The sampling frequency (in Hz) 

241 duration: float 

242 The data duration (in s) 

243 start_time: float 

244 The GPS start-time of the data 

245 

246 """ 

247 

248 self.strain_data.set_from_zero_noise( 

249 sampling_frequency=sampling_frequency, duration=duration, 

250 start_time=start_time) 

251 

252 def antenna_response(self, ra, dec, time, psi, mode): 

253 """ 

254 Calculate the antenna response function for a given sky location 

255 

256 See Nishizawa et al. (2009) arXiv:0903.0528 for definitions of the polarisation tensors. 

257 [u, v, w] represent the Earth-frame 

258 [m, n, omega] represent the wave-frame 

259 Note: there is a typo in the definition of the wave-frame in Nishizawa et al. 

260 

261 Parameters 

262 ========== 

263 ra: float 

264 right ascension in radians 

265 dec: float 

266 declination in radians 

267 time: float 

268 geocentric GPS time 

269 psi: float 

270 binary polarisation angle counter-clockwise about the direction of propagation 

271 mode: str 

272 polarisation mode (e.g. 'plus', 'cross') or the name of a specific detector. 

273 If mode == self.name, return 1 

274 

275 Returns 

276 ======= 

277 float: The antenna response for the specified mode and time/location 

278 

279 """ 

280 if mode in ["plus", "cross", "x", "y", "breathing", "longitudinal"]: 

281 polarization_tensor = get_polarization_tensor(ra, dec, time, psi, mode) 

282 return three_by_three_matrix_contraction(self.geometry.detector_tensor, polarization_tensor) 

283 elif mode == self.name: 

284 return 1 

285 else: 

286 return 0 

287 

288 def get_detector_response(self, waveform_polarizations, parameters, frequencies=None): 

289 """ Get the detector response for a particular waveform 

290 

291 Parameters 

292 ========== 

293 waveform_polarizations: dict 

294 polarizations of the waveform 

295 parameters: dict 

296 parameters describing position and time of arrival of the signal 

297 frequencies: array-like, optional 

298 The frequency values to evaluate the response at. If 

299 not provided, the response is computed using 

300 :code:`self.frequency_array`. If the frequencies are 

301 specified, no frequency masking is performed. 

302 Returns 

303 ======= 

304 array_like: A 3x3 array representation of the detector response (signal observed in the interferometer) 

305 """ 

306 if frequencies is None: 

307 frequencies = self.frequency_array[self.frequency_mask] 

308 mask = self.frequency_mask 

309 else: 

310 mask = np.ones(len(frequencies), dtype=bool) 

311 

312 signal = {} 

313 for mode in waveform_polarizations.keys(): 

314 det_response = self.antenna_response( 

315 parameters['ra'], 

316 parameters['dec'], 

317 parameters['geocent_time'], 

318 parameters['psi'], mode) 

319 

320 signal[mode] = waveform_polarizations[mode] * det_response 

321 signal_ifo = sum(signal.values()) * mask 

322 

323 time_shift = self.time_delay_from_geocenter( 

324 parameters['ra'], parameters['dec'], parameters['geocent_time']) 

325 

326 # Be careful to first subtract the two GPS times which are ~1e9 sec. 

327 # And then add the time_shift which varies at ~1e-5 sec 

328 dt_geocent = parameters['geocent_time'] - self.strain_data.start_time 

329 dt = dt_geocent + time_shift 

330 

331 signal_ifo[mask] = signal_ifo[mask] * np.exp(-1j * 2 * np.pi * dt * frequencies) 

332 

333 signal_ifo[mask] *= self.calibration_model.get_calibration_factor( 

334 frequencies, prefix='recalib_{}_'.format(self.name), **parameters 

335 ) 

336 

337 return signal_ifo 

338 

339 def check_signal_duration(self, parameters, raise_error=True): 

340 """ Check that the signal with the given parameters fits in the data 

341 

342 Parameters 

343 ========== 

344 parameters: dict 

345 A dictionary of the injection parameters 

346 raise_error: bool 

347 If True, raise an error in the signal does not fit. Otherwise, print 

348 a warning message. 

349 """ 

350 try: 

351 parameters = generate_all_bbh_parameters(parameters) 

352 except AttributeError: 

353 logger.debug( 

354 "generate_all_bbh_parameters parameters failed during check_signal_duration" 

355 ) 

356 return 

357 

358 if ("mass_1" not in parameters) and ("mass_2" not in parameters): 

359 if raise_error: 

360 raise AttributeError("Unable to check signal duration as mass not given") 

361 else: 

362 return 

363 

364 # Calculate the time to merger 

365 deltaT = gwutils.calculate_time_to_merger( 

366 frequency=self.minimum_frequency, 

367 mass_1=parameters["mass_1"], 

368 mass_2=parameters["mass_2"], 

369 ) 

370 deltaT = np.round(deltaT, 1) 

371 if deltaT > self.duration: 

372 msg = ( 

373 f"The injected signal has a duration in-band of {deltaT}s, but " 

374 f"the data for detector {self.name} has a duration of {self.duration}s" 

375 ) 

376 if raise_error: 

377 raise ValueError(msg) 

378 else: 

379 logger.warning(msg) 

380 

381 def inject_signal(self, parameters, injection_polarizations=None, 

382 waveform_generator=None, raise_error=True): 

383 """ General signal injection method. 

384 Provide the injection parameters and either the injection polarizations 

385 or the waveform generator to inject a signal into the detector. 

386 Defaults to the injection polarizations is both are given. 

387 

388 Parameters 

389 ========== 

390 parameters: dict 

391 Parameters of the injection. 

392 injection_polarizations: dict, optional 

393 Polarizations of waveform to inject, output of 

394 `waveform_generator.frequency_domain_strain()`. If 

395 `waveform_generator` is also given, the injection_polarizations will 

396 be calculated directly and this argument can be ignored. 

397 waveform_generator: bilby.gw.waveform_generator.WaveformGenerator, optional 

398 A WaveformGenerator instance using the source model to inject. If 

399 `injection_polarizations` is given, this will be ignored. 

400 raise_error: bool 

401 If true, raise an error if the injected signal has a duration 

402 longer than the data duration. If False, a warning will be printed 

403 instead. 

404 

405 Notes 

406 ===== 

407 if your signal takes a substantial amount of time to generate, or 

408 you experience buggy behaviour. It is preferable to provide the 

409 injection_polarizations directly. 

410 

411 Returns 

412 ======= 

413 injection_polarizations: dict 

414 The injected polarizations. This is the same as the injection_polarizations parameters 

415 if it was passed in. Otherwise it is the return value of waveform_generator.frequency_domain_strain(). 

416 

417 """ 

418 self.check_signal_duration(parameters, raise_error) 

419 

420 if injection_polarizations is None and waveform_generator is None: 

421 raise ValueError( 

422 "inject_signal needs one of waveform_generator or " 

423 "injection_polarizations.") 

424 elif injection_polarizations is not None: 

425 self.inject_signal_from_waveform_polarizations(parameters=parameters, 

426 injection_polarizations=injection_polarizations) 

427 elif waveform_generator is not None: 

428 injection_polarizations = self.inject_signal_from_waveform_generator(parameters=parameters, 

429 waveform_generator=waveform_generator) 

430 return injection_polarizations 

431 

432 def inject_signal_from_waveform_generator(self, parameters, waveform_generator): 

433 """ Inject a signal using a waveform generator and a set of parameters. 

434 Alternative to `inject_signal` and `inject_signal_from_waveform_polarizations` 

435 

436 Parameters 

437 ========== 

438 parameters: dict 

439 Parameters of the injection. 

440 waveform_generator: bilby.gw.waveform_generator.WaveformGenerator 

441 A WaveformGenerator instance using the source model to inject. 

442 

443 Notes 

444 ===== 

445 if your signal takes a substantial amount of time to generate, or 

446 you experience buggy behaviour. It is preferable to use the 

447 inject_signal_from_waveform_polarizations() method. 

448 

449 Returns 

450 ======= 

451 injection_polarizations: dict 

452 The internally generated injection parameters 

453 

454 """ 

455 injection_polarizations = \ 

456 waveform_generator.frequency_domain_strain(parameters) 

457 self.inject_signal_from_waveform_polarizations(parameters=parameters, 

458 injection_polarizations=injection_polarizations) 

459 return injection_polarizations 

460 

461 def inject_signal_from_waveform_polarizations(self, parameters, injection_polarizations): 

462 """ Inject a signal into the detector from a dict of waveform polarizations. 

463 Alternative to `inject_signal` and `inject_signal_from_waveform_generator`. 

464 

465 Parameters 

466 ========== 

467 parameters: dict 

468 Parameters of the injection. 

469 injection_polarizations: dict 

470 Polarizations of waveform to inject, output of 

471 `waveform_generator.frequency_domain_strain()`. 

472 

473 """ 

474 if not self.strain_data.time_within_data(parameters['geocent_time']): 

475 logger.warning( 

476 'Injecting signal outside segment, start_time={}, merger time={}.' 

477 .format(self.strain_data.start_time, parameters['geocent_time'])) 

478 

479 signal_ifo = self.get_detector_response(injection_polarizations, parameters) 

480 self.strain_data.frequency_domain_strain += signal_ifo 

481 

482 self.meta_data['optimal_SNR'] = ( 

483 np.sqrt(self.optimal_snr_squared(signal=signal_ifo)).real) 

484 self.meta_data['matched_filter_SNR'] = ( 

485 self.matched_filter_snr(signal=signal_ifo)) 

486 self.meta_data['parameters'] = parameters 

487 

488 logger.info("Injected signal in {}:".format(self.name)) 

489 logger.info(" optimal SNR = {:.2f}".format(self.meta_data['optimal_SNR'])) 

490 logger.info(" matched filter SNR = {:.2f}".format(self.meta_data['matched_filter_SNR'])) 

491 for key in parameters: 

492 logger.info(' {} = {}'.format(key, parameters[key])) 

493 

494 @property 

495 def amplitude_spectral_density_array(self): 

496 """ Returns the amplitude spectral density (ASD) given we know a power spectral density (PSD) 

497 

498 Returns 

499 ======= 

500 array_like: An array representation of the ASD 

501 

502 """ 

503 return ( 

504 self.power_spectral_density.get_amplitude_spectral_density_array( 

505 frequency_array=self.strain_data.frequency_array) * 

506 self.strain_data.window_factor**0.5) 

507 

508 @property 

509 def power_spectral_density_array(self): 

510 """ Returns the power spectral density (PSD) 

511 

512 This accounts for whether the data in the interferometer has been windowed. 

513 

514 Returns 

515 ======= 

516 array_like: An array representation of the PSD 

517 

518 """ 

519 return ( 

520 self.power_spectral_density.get_power_spectral_density_array( 

521 frequency_array=self.strain_data.frequency_array) * 

522 self.strain_data.window_factor) 

523 

524 def unit_vector_along_arm(self, arm): 

525 logger.warning("This method has been moved and will be removed in the future." 

526 "Use Interferometer.geometry.unit_vector_along_arm instead.") 

527 return self.geometry.unit_vector_along_arm(arm) 

528 

529 def time_delay_from_geocenter(self, ra, dec, time): 

530 """ 

531 Calculate the time delay from the geocenter for the interferometer. 

532 

533 Use the time delay function from utils. 

534 

535 Parameters 

536 ========== 

537 ra: float 

538 right ascension of source in radians 

539 dec: float 

540 declination of source in radians 

541 time: float 

542 GPS time 

543 

544 Returns 

545 ======= 

546 float: The time delay from geocenter in seconds 

547 """ 

548 return time_delay_from_geocenter(self.geometry.vertex, ra, dec, time) 

549 

550 def vertex_position_geocentric(self): 

551 """ 

552 Calculate the position of the IFO vertex in geocentric coordinates in meters. 

553 

554 Based on arXiv:gr-qc/0008066 Eqs. B11-B13 except for the typo in the definition of the local radius. 

555 See Section 2.1 of LIGO-T980044-10 for the correct expression 

556 

557 Returns 

558 ======= 

559 array_like: A 3D array representation of the vertex 

560 """ 

561 return gwutils.get_vertex_position_geocentric(self.geometry.latitude_radians, 

562 self.geometry.longitude_radians, 

563 self.geometry.elevation) 

564 

565 def optimal_snr_squared(self, signal): 

566 """ 

567 

568 Parameters 

569 ========== 

570 signal: array_like 

571 Array containing the signal 

572 

573 Returns 

574 ======= 

575 float: The optimal signal to noise ratio possible squared 

576 """ 

577 return gwutils.optimal_snr_squared( 

578 signal=signal[self.strain_data.frequency_mask], 

579 power_spectral_density=self.power_spectral_density_array[self.strain_data.frequency_mask], 

580 duration=self.strain_data.duration) 

581 

582 def inner_product(self, signal): 

583 """ 

584 

585 Parameters 

586 ========== 

587 signal: array_like 

588 Array containing the signal 

589 

590 Returns 

591 ======= 

592 float: The optimal signal to noise ratio possible squared 

593 """ 

594 return gwutils.noise_weighted_inner_product( 

595 aa=signal[self.strain_data.frequency_mask], 

596 bb=self.strain_data.frequency_domain_strain[self.strain_data.frequency_mask], 

597 power_spectral_density=self.power_spectral_density_array[self.strain_data.frequency_mask], 

598 duration=self.strain_data.duration) 

599 

600 def template_template_inner_product(self, signal_1, signal_2): 

601 """A noise weighted inner product between two templates, using this ifo's PSD. 

602 

603 Parameters 

604 ========== 

605 signal_1 : array_like 

606 An array containing the first signal 

607 signal_2 : array_like 

608 an array containing the second signal 

609 

610 Returns 

611 ======= 

612 float: The noise weighted inner product of the two templates 

613 """ 

614 return gwutils.noise_weighted_inner_product( 

615 aa=signal_1[self.strain_data.frequency_mask], 

616 bb=signal_2[self.strain_data.frequency_mask], 

617 power_spectral_density=self.power_spectral_density_array[self.strain_data.frequency_mask], 

618 duration=self.strain_data.duration) 

619 

620 def matched_filter_snr(self, signal): 

621 """ 

622 

623 Parameters 

624 ========== 

625 signal: array_like 

626 Array containing the signal 

627 

628 Returns 

629 ======= 

630 complex: The matched filter signal to noise ratio 

631 

632 """ 

633 return gwutils.matched_filter_snr( 

634 signal=signal[self.strain_data.frequency_mask], 

635 frequency_domain_strain=self.strain_data.frequency_domain_strain[self.strain_data.frequency_mask], 

636 power_spectral_density=self.power_spectral_density_array[self.strain_data.frequency_mask], 

637 duration=self.strain_data.duration) 

638 

639 def whiten_frequency_series(self, frequency_series : np.array) -> np.array: 

640 """Whitens a frequency series with the noise properties of the detector 

641 

642 .. math:: 

643 \\tilde{a}_w(f) = \\tilde{a}(f) \\sqrt{\\frac{4}{T S_n(f)}} 

644 

645 Such that 

646 

647 .. math:: 

648 Var(n) = \\frac{1}{N} \\sum_{k=0}^N n_W(f_k)n_W^*(f_k) = 2 

649 

650 Where the factor of two is due to the independent real and imaginary 

651 components. 

652 

653 Parameters 

654 ========== 

655 frequency_series : np.array 

656 The frequency series, whitened by the ASD 

657 """ 

658 return frequency_series / (self.amplitude_spectral_density_array * np.sqrt(self.duration / 4)) 

659 

660 def get_whitened_time_series_from_whitened_frequency_series( 

661 self, 

662 whitened_frequency_series : np.array 

663 ) -> np.array: 

664 """Gets the whitened time series from a whitened frequency series. 

665 

666 This ifft's and also applies a windowing factor, 

667 since when f_min and f_max are set bilby applies a mask to the series. 

668 

669 Per 6.2a-b in https://arxiv.org/pdf/gr-qc/0509116 since our window 

670 is just a band pass, 

671 this coefficient is :math:`w/W` where 

672 

673 .. math:: 

674 

675 W = \\frac{1}{N} \\sum_{k=0}^N w^2[j] 

676 

677 Since our window :math:`w` is simply 1 or 0, depending on the mask, we get 

678 

679 .. math:: 

680 

681 W = \\frac{1}{N} \\sum_{k=0}^N \\Theta(f_{max} - f_k)\\Theta(f_k - f_{min}) 

682 

683 and accordingly the termwise window factor is 

684 

685 .. math:: 

686 w = \\sqrt{N W} = \\sqrt{\\sum_{k=0}^N \\Theta(f_{max} - f_k)\\Theta(f_k - f_{min})} 

687 

688 """ 

689 frequency_window_factor = ( 

690 np.sum(self.frequency_mask) 

691 / len(self.frequency_mask) 

692 ) 

693 

694 whitened_time_series = ( 

695 np.fft.irfft(whitened_frequency_series) 

696 * np.sqrt(np.sum(self.frequency_mask)) / frequency_window_factor 

697 ) 

698 

699 return whitened_time_series 

700 

701 @property 

702 def whitened_frequency_domain_strain(self): 

703 r"""Whitens the frequency domain data by dividing through by ASD, 

704 with appropriate normalization. 

705 

706 See `whiten_frequency_series()` for details. 

707 

708 Returns 

709 ======= 

710 array_like: The whitened data 

711 """ 

712 return self.whiten_frequency_series(self.strain_data.frequency_domain_strain) 

713 

714 @property 

715 def whitened_time_domain_strain(self) -> np.array: 

716 """Calculates the whitened time domain strain 

717 by iffting the whitened frequency domain strain, 

718 with the appropriate normalization. 

719 

720 See `get_whitened_time_series_from_whitened_frequency_series()` for details 

721 

722 Returns 

723 ======= 

724 array_like 

725 The whitened data in the time domain 

726 """ 

727 return self.get_whitened_time_series_from_whitened_frequency_series(self.whitened_frequency_domain_strain) 

728 

729 def save_data(self, outdir, label=None): 

730 """ Creates save files for interferometer data in plain text format. 

731 

732 Saves two files: the frequency domain strain data with three columns [f, real part of h(f), 

733 imaginary part of h(f)], and the amplitude spectral density with two columns [f, ASD(f)]. 

734 

735 Note that in v1.3.0 and below, the ASD was saved in a file called *_psd.dat. 

736 

737 Parameters 

738 ========== 

739 outdir: str 

740 The output directory in which the data is supposed to be saved 

741 label: str 

742 The name of the output files 

743 """ 

744 

745 if label is None: 

746 filename_asd = '{}/{}_asd.dat'.format(outdir, self.name) 

747 filename_data = '{}/{}_frequency_domain_data.dat'.format(outdir, self.name) 

748 else: 

749 filename_asd = '{}/{}_{}_asd.dat'.format(outdir, self.name, label) 

750 filename_data = '{}/{}_{}_frequency_domain_data.dat'.format(outdir, self.name, label) 

751 np.savetxt(filename_data, 

752 np.array( 

753 [self.strain_data.frequency_array, 

754 self.strain_data.frequency_domain_strain.real, 

755 self.strain_data.frequency_domain_strain.imag]).T, 

756 header='f real_h(f) imag_h(f)') 

757 np.savetxt(filename_asd, 

758 np.array( 

759 [self.strain_data.frequency_array, 

760 self.amplitude_spectral_density_array]).T, 

761 header='f h(f)') 

762 

763 def plot_data(self, signal=None, outdir='.', label=None): 

764 import matplotlib.pyplot as plt 

765 if utils.command_line_args.bilby_test_mode: 

766 return 

767 

768 fig, ax = plt.subplots() 

769 df = self.strain_data.frequency_array[1] - self.strain_data.frequency_array[0] 

770 asd = gwutils.asd_from_freq_series( 

771 freq_data=self.strain_data.frequency_domain_strain, df=df) 

772 

773 ax.loglog(self.strain_data.frequency_array[self.strain_data.frequency_mask], 

774 asd[self.strain_data.frequency_mask], 

775 color='C0', label=self.name) 

776 ax.loglog(self.strain_data.frequency_array[self.strain_data.frequency_mask], 

777 self.amplitude_spectral_density_array[self.strain_data.frequency_mask], 

778 color='C1', lw=1.0, label=self.name + ' ASD') 

779 if signal is not None: 

780 signal_asd = gwutils.asd_from_freq_series( 

781 freq_data=signal, df=df) 

782 

783 ax.loglog(self.strain_data.frequency_array[self.strain_data.frequency_mask], 

784 signal_asd[self.strain_data.frequency_mask], 

785 color='C2', 

786 label='Signal') 

787 ax.grid(True) 

788 ax.set_ylabel(r'Strain [strain/$\sqrt{\rm Hz}$]') 

789 ax.set_xlabel(r'Frequency [Hz]') 

790 ax.legend(loc='best') 

791 fig.tight_layout() 

792 if label is None: 

793 fig.savefig( 

794 '{}/{}_frequency_domain_data.png'.format(outdir, self.name)) 

795 else: 

796 fig.savefig( 

797 '{}/{}_{}_frequency_domain_data.png'.format( 

798 outdir, self.name, label)) 

799 plt.close(fig) 

800 

801 def plot_time_domain_data( 

802 self, outdir='.', label=None, bandpass_frequencies=(50, 250), 

803 notches=None, start_end=None, t0=None): 

804 """ Plots the strain data in the time domain 

805 

806 Parameters 

807 ========== 

808 outdir, label: str 

809 Used in setting the saved filename. 

810 bandpass: tuple, optional 

811 A tuple of the (low, high) frequencies to use when bandpassing the 

812 data, if None no bandpass is applied. 

813 notches: list, optional 

814 A list of frequencies specifying any lines to notch 

815 start_end: tuple 

816 A tuple of the (start, end) range of GPS times to plot 

817 t0: float 

818 If given, the reference time to subtract from the time series before 

819 plotting. 

820 

821 """ 

822 import matplotlib.pyplot as plt 

823 from gwpy.timeseries import TimeSeries 

824 from gwpy.signal.filter_design import bandpass, concatenate_zpks, notch 

825 

826 # We use the gwpy timeseries to perform bandpass and notching 

827 if notches is None: 

828 notches = list() 

829 timeseries = TimeSeries( 

830 data=self.strain_data.time_domain_strain, times=self.strain_data.time_array) 

831 zpks = [] 

832 if bandpass_frequencies is not None: 

833 zpks.append(bandpass( 

834 bandpass_frequencies[0], bandpass_frequencies[1], 

835 self.strain_data.sampling_frequency)) 

836 if notches is not None: 

837 for line in notches: 

838 zpks.append(notch( 

839 line, self.strain_data.sampling_frequency)) 

840 if len(zpks) > 0: 

841 zpk = concatenate_zpks(*zpks) 

842 strain = timeseries.filter(zpk, filtfilt=False) 

843 else: 

844 strain = timeseries 

845 

846 fig, ax = plt.subplots() 

847 

848 if t0: 

849 x = self.strain_data.time_array - t0 

850 xlabel = 'GPS time [s] - {}'.format(t0) 

851 else: 

852 x = self.strain_data.time_array 

853 xlabel = 'GPS time [s]' 

854 

855 ax.plot(x, strain) 

856 ax.set_xlabel(xlabel) 

857 ax.set_ylabel('Strain') 

858 

859 if start_end is not None: 

860 ax.set_xlim(*start_end) 

861 

862 fig.tight_layout() 

863 

864 if label is None: 

865 fig.savefig( 

866 '{}/{}_time_domain_data.png'.format(outdir, self.name)) 

867 else: 

868 fig.savefig( 

869 '{}/{}_{}_time_domain_data.png'.format(outdir, self.name, label)) 

870 plt.close(fig) 

871 

872 @staticmethod 

873 def _filename_from_outdir_label_extension(outdir, label, extension="h5"): 

874 return os.path.join(outdir, label + f'.{extension}') 

875 

876 _save_ifo_docstring = """ Save the object to a {format} file 

877 

878 {extra} 

879 

880 Attributes 

881 ========== 

882 outdir: str, optional 

883 Output directory name of the file, defaults to 'outdir'. 

884 label: str, optional 

885 Output file name, is self.name if not given otherwise. 

886 """ 

887 

888 _load_docstring = """ Loads in an Interferometer object from a {format} file 

889 

890 Parameters 

891 ========== 

892 filename: str 

893 If given, try to load from this filename 

894 

895 """ 

896 

897 @docstring(_save_ifo_docstring.format( 

898 format="pickle", extra=".. versionadded:: 1.1.0" 

899 )) 

900 def to_pickle(self, outdir="outdir", label=None): 

901 utils.check_directory_exists_and_if_not_mkdir('outdir') 

902 filename = self._filename_from_outdir_label_extension(outdir, label, extension="pkl") 

903 safe_file_dump(self, filename, "dill") 

904 

905 @classmethod 

906 @docstring(_load_docstring.format(format="pickle")) 

907 def from_pickle(cls, filename=None): 

908 import dill 

909 with open(filename, "rb") as ff: 

910 res = dill.load(ff) 

911 if res.__class__ != cls: 

912 raise TypeError('The loaded object is not an Interferometer') 

913 return res