Coverage for bilby/gw/detector/__init__.py: 11%

104 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-05-06 04:57 +0000

1from ..conversion import convert_to_lal_binary_black_hole_parameters 

2from .calibration import * 

3from .interferometer import * 

4from .networks import * 

5from .psd import * 

6from .strain_data import * 

7 

8def get_safe_signal_duration(mass_1, mass_2, a_1, a_2, tilt_1, tilt_2, flow=10): 

9 """ Calculate the safe signal duration, given the parameters 

10 

11 Parameters 

12 ========== 

13 mass_1, mass_2, a_1, a_2, tilt_1, tilt_2: float 

14 The signal parameters 

15 flow: float 

16 The lower frequency cutoff from which to calculate the signal duration 

17 

18 Returns 

19 ======= 

20 safe_signal_duration: float 

21 The shortest safe signal duration (i.e., the signal duration rounded up 

22 to the nearest power of 2) 

23 

24 """ 

25 from lal import MSUN_SI 

26 from lalsimulation import SimInspiralChirpTimeBound 

27 chirp_time = SimInspiralChirpTimeBound( 

28 flow, mass_1 * MSUN_SI, mass_2 * MSUN_SI, 

29 a_1 * np.cos(tilt_1), a_2 * np.cos(tilt_2)) 

30 return max(2**(int(np.log2(chirp_time)) + 1), 4) 

31 

32 

33def inject_signal_into_gwpy_timeseries( 

34 data, waveform_generator, parameters, det, power_spectral_density=None, outdir=None, label=None): 

35 """ Inject a signal into a gwpy timeseries 

36 

37 Parameters 

38 ========== 

39 data: gwpy.timeseries.TimeSeries 

40 The time-series data into which we want to inject the signal 

41 waveform_generator: bilby.gw.waveform_generator.WaveformGenerator 

42 An initialised waveform_generator 

43 parameters: dict 

44 A dictionary of the signal-parameters to inject 

45 det: bilby.gw.detector.Interferometer 

46 The interferometer for which the data refers too 

47 power_spectral_density: bilby.gw.detector.PowerSpectralDensity 

48 Power spectral density determining the sensitivity of the detector. 

49 outdir, label: str 

50 If given, the outdir and label used to generate a plot 

51 

52 Returns 

53 ======= 

54 data_and_signal: gwpy.timeseries.TimeSeries 

55 The data with the time-domain signal added 

56 meta_data: dict 

57 A dictionary of meta data about the injection 

58 

59 """ 

60 from gwpy.timeseries import TimeSeries 

61 from gwpy.plot import Plot 

62 

63 ifo = get_empty_interferometer(det) 

64 

65 if isinstance(power_spectral_density, PowerSpectralDensity): 

66 ifo.power_spectral_density = power_spectral_density 

67 elif power_spectral_density is not None: 

68 raise TypeError( 

69 "Input power_spectral_density should be bilby.gw.detector.psd.PowerSpectralDensity " 

70 "object or None, received {}.".format(type(power_spectral_density)) 

71 ) 

72 

73 ifo.strain_data.set_from_gwpy_timeseries(data) 

74 

75 parameters_check, _ = convert_to_lal_binary_black_hole_parameters(parameters) 

76 parameters_check = {key: parameters_check[key] for key in 

77 ['mass_1', 'mass_2', 'a_1', 'a_2', 'tilt_1', 'tilt_2']} 

78 safe_time = get_safe_signal_duration(**parameters_check) 

79 if data.duration.value < safe_time: 

80 ValueError( 

81 "Injecting a signal with safe-duration {} longer than the data {}" 

82 .format(safe_time, data.duration.value)) 

83 

84 waveform_polarizations = waveform_generator.time_domain_strain(parameters) 

85 

86 signal = np.zeros(len(data)) 

87 

88 for mode in waveform_polarizations.keys(): 

89 det_response = ifo.antenna_response( 

90 parameters['ra'], parameters['dec'], parameters['geocent_time'], 

91 parameters['psi'], mode) 

92 signal += waveform_polarizations[mode] * det_response 

93 time_shift = ifo.time_delay_from_geocenter( 

94 parameters['ra'], parameters['dec'], parameters['geocent_time']) 

95 

96 dt = parameters['geocent_time'] + time_shift - data.times[0].value 

97 n_roll = dt * data.sample_rate.value 

98 n_roll = int(np.round(n_roll)) 

99 signal_shifted = TimeSeries( 

100 data=np.roll(signal, n_roll), times=data.times, unit=data.unit) 

101 

102 signal_and_data = data.inject(signal_shifted) 

103 

104 if outdir is not None and label is not None: 

105 fig = Plot(signal_shifted) 

106 fig.savefig('{}/{}_{}_time_domain_injected_signal'.format( 

107 outdir, ifo.name, label)) 

108 

109 meta_data = dict() 

110 frequency_domain_signal, _ = utils.nfft(signal, waveform_generator.sampling_frequency) 

111 meta_data['optimal_SNR'] = ( 

112 np.sqrt(ifo.optimal_snr_squared(signal=frequency_domain_signal)).real) 

113 meta_data['matched_filter_SNR'] = ( 

114 ifo.matched_filter_snr(signal=frequency_domain_signal)) 

115 meta_data['parameters'] = parameters 

116 

117 logger.info("Injected signal in {}:".format(ifo.name)) 

118 logger.info(" optimal SNR = {:.2f}".format(meta_data['optimal_SNR'])) 

119 logger.info(" matched filter SNR = {:.2f}".format(meta_data['matched_filter_SNR'])) 

120 for key in parameters: 

121 logger.info(' {} = {}'.format(key, parameters[key])) 

122 

123 return signal_and_data, meta_data 

124 

125 

126def get_interferometer_with_fake_noise_and_injection( 

127 name, injection_parameters, injection_polarizations=None, 

128 waveform_generator=None, sampling_frequency=4096, duration=4, 

129 start_time=None, outdir='outdir', label=None, plot=True, save=True, 

130 zero_noise=False, raise_error=True): 

131 """ 

132 Helper function to obtain an Interferometer instance with appropriate 

133 power spectral density and data, given an center_time. 

134 

135 Note: by default this generates an Interferometer with a power spectral 

136 density based on advanced LIGO. 

137 

138 Parameters 

139 ========== 

140 name: str 

141 Detector name, e.g., 'H1'. 

142 injection_parameters: dict 

143 injection parameters, needed for sky position and timing 

144 injection_polarizations: dict 

145 Polarizations of waveform to inject, output of 

146 `waveform_generator.frequency_domain_strain()`. If 

147 `waveform_generator` is also given, the injection_polarizations will 

148 be calculated directly and this argument can be ignored. 

149 waveform_generator: bilby.gw.waveform_generator.WaveformGenerator 

150 A WaveformGenerator instance using the source model to inject. If 

151 `injection_polarizations` is given, this will be ignored. 

152 sampling_frequency: float 

153 sampling frequency for data, should match injection signal 

154 duration: float 

155 length of data, should be the same as used for signal generation 

156 start_time: float 

157 Beginning of data segment, if None, injection is placed 2s before 

158 end of segment. 

159 outdir: str 

160 directory in which to store output 

161 label: str 

162 If given, an identifying label used in generating file names. 

163 plot: bool 

164 If true, create an ASD + strain plot 

165 save: bool 

166 If true, save frequency domain data and PSD to file 

167 zero_noise: bool 

168 If true, set noise to zero. 

169 

170 Returns 

171 ======= 

172 bilby.gw.detector.Interferometer: An Interferometer instance with a PSD and frequency-domain strain data. 

173 

174 """ 

175 

176 utils.check_directory_exists_and_if_not_mkdir(outdir) 

177 

178 if start_time is None: 

179 start_time = injection_parameters['geocent_time'] + 2 - duration 

180 

181 interferometer = get_empty_interferometer(name) 

182 interferometer.power_spectral_density = PowerSpectralDensity.from_aligo() 

183 if zero_noise: 

184 interferometer.set_strain_data_from_zero_noise( 

185 sampling_frequency=sampling_frequency, duration=duration, 

186 start_time=start_time) 

187 else: 

188 interferometer.set_strain_data_from_power_spectral_density( 

189 sampling_frequency=sampling_frequency, duration=duration, 

190 start_time=start_time) 

191 

192 injection_polarizations = interferometer.inject_signal( 

193 parameters=injection_parameters, 

194 injection_polarizations=injection_polarizations, 

195 waveform_generator=waveform_generator, 

196 raise_error=raise_error) 

197 

198 signal = interferometer.get_detector_response( 

199 injection_polarizations, injection_parameters) 

200 

201 if plot: 

202 interferometer.plot_data(signal=signal, outdir=outdir, label=label) 

203 

204 if save: 

205 interferometer.save_data(outdir, label=label) 

206 

207 return interferometer 

208 

209 

210def load_data_from_cache_file( 

211 cache_file, start_time, segment_duration, psd_duration, psd_start_time, 

212 channel_name=None, sampling_frequency=4096, roll_off=0.2, 

213 overlap=0, outdir=None): 

214 """ Helper routine to generate an interferometer from a cache file 

215 

216 Parameters 

217 ========== 

218 cache_file: str 

219 Path to the location of the cache file 

220 start_time, psd_start_time: float 

221 GPS start time of the segment and data stretch used for the PSD 

222 segment_duration, psd_duration: float 

223 Segment duration and duration of data to use to generate the PSD (in 

224 seconds). 

225 roll_off: float, optional 

226 Rise time in seconds of tukey window. 

227 overlap: float, 

228 Number of seconds of overlap between FFTs. 

229 channel_name: str 

230 Channel name 

231 sampling_frequency: int 

232 Sampling frequency 

233 outdir: str, optional 

234 The output directory in which the data is saved 

235 

236 Returns 

237 ======= 

238 ifo: bilby.gw.detector.Interferometer 

239 An initialised interferometer object with strain data set to the 

240 appropriate data in the cache file and a PSD. 

241 """ 

242 import lal 

243 data_set = False 

244 psd_set = False 

245 

246 with open(cache_file, 'r') as ff: 

247 lines = ff.readlines() 

248 if len(lines)>1: 

249 raise ValueError('This method cannot handle cache files with' 

250 ' multiple frames. Use `load_data_by_channel_name' 

251 ' instead.') 

252 else: 

253 line = lines[0] 

254 cache = lal.utils.cache.CacheEntry(line) 

255 data_in_cache = ( 

256 (cache.segment[0].gpsSeconds < start_time) & 

257 (cache.segment[1].gpsSeconds > start_time + segment_duration)) 

258 psd_in_cache = ( 

259 (cache.segment[0].gpsSeconds < psd_start_time) & 

260 (cache.segment[1].gpsSeconds > psd_start_time + psd_duration)) 

261 ifo = get_empty_interferometer( 

262 "{}1".format(cache.observatory)) 

263 if not data_in_cache: 

264 raise ValueError('The specified data segment does not exist in' 

265 ' this frame.') 

266 if not psd_in_cache: 

267 raise ValueError('The specified PSD data segment does not exist' 

268 ' in this frame.') 

269 if (not data_set) & data_in_cache: 

270 ifo.set_strain_data_from_frame_file( 

271 frame_file=cache.path, 

272 sampling_frequency=sampling_frequency, 

273 duration=segment_duration, 

274 start_time=start_time, 

275 channel=channel_name, buffer_time=0) 

276 data_set = True 

277 if (not psd_set) & psd_in_cache: 

278 ifo.power_spectral_density = \ 

279 PowerSpectralDensity.from_frame_file( 

280 cache.path, 

281 psd_start_time=psd_start_time, 

282 psd_duration=psd_duration, 

283 fft_length=segment_duration, 

284 sampling_frequency=sampling_frequency, 

285 roll_off=roll_off, 

286 overlap=overlap, 

287 channel=channel_name, 

288 name=cache.observatory, 

289 outdir=outdir, 

290 analysis_segment_start_time=start_time) 

291 psd_set = True 

292 if data_set and psd_set: 

293 return ifo 

294 elif not data_set: 

295 raise ValueError('Data not loaded for {}'.format(ifo.name)) 

296 elif not psd_set: 

297 raise ValueError('PSD not created for {}'.format(ifo.name)) 

298 

299 

300def load_data_by_channel_name( 

301 channel_name, start_time, segment_duration, psd_duration, psd_start_time, 

302 sampling_frequency=4096, roll_off=0.2, 

303 overlap=0, outdir=None): 

304 """ Helper routine to generate an interferometer from a channel name 

305 This function creates an empty interferometer specified in the name  

306 of the channel. It calls `ifo.set_strain_data_from_channel_name` to  

307 set the data and PSD in the interferometer using data retrieved from  

308 the specified channel using gwpy.TimeSeries.get() 

309 

310 Parameters 

311 ========== 

312 channel_name: str 

313 Channel name with the format `IFO:Channel` 

314 start_time, psd_start_time: float 

315 GPS start time of the segment and data stretch used for the PSD 

316 segment_duration, psd_duration: float 

317 Segment duration and duration of data to use to generate the PSD (in 

318 seconds). 

319 roll_off: float, optional 

320 Rise time in seconds of tukey window. 

321 overlap: float, 

322 Number of seconds of overlap between FFTs. 

323 sampling_frequency: int 

324 Sampling frequency 

325 outdir: str, optional 

326 The output directory in which the data is saved 

327 

328 Returns 

329 ======= 

330 ifo: bilby.gw.detector.Interferometer 

331 An initialised interferometer object with strain data set to the 

332 appropriate data fetched from the specified channel and a PSD. 

333 """ 

334 try: 

335 det = channel_name.split(':')[-2] 

336 except IndexError: 

337 raise IndexError("Channel name must be of the format `IFO:Channel`") 

338 ifo = get_empty_interferometer(det) 

339 

340 ifo.set_strain_data_from_channel_name( 

341 channel=channel_name, 

342 sampling_frequency=sampling_frequency, 

343 duration=segment_duration, 

344 start_time=start_time) 

345 

346 ifo.power_spectral_density = \ 

347 PowerSpectralDensity.from_channel_name( 

348 channel=channel_name, 

349 psd_start_time=psd_start_time, 

350 psd_duration=psd_duration, 

351 fft_length=segment_duration, 

352 sampling_frequency=sampling_frequency, 

353 roll_off=roll_off, 

354 overlap=overlap, 

355 name=det, 

356 outdir=outdir, 

357 analysis_segment_start_time=start_time) 

358 return ifo