Coverage for bilby/gw/detector/__init__.py: 11%
104 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
1from ..conversion import convert_to_lal_binary_black_hole_parameters
2from .calibration import *
3from .interferometer import *
4from .networks import *
5from .psd import *
6from .strain_data import *
8def get_safe_signal_duration(mass_1, mass_2, a_1, a_2, tilt_1, tilt_2, flow=10):
9 """ Calculate the safe signal duration, given the parameters
11 Parameters
12 ==========
13 mass_1, mass_2, a_1, a_2, tilt_1, tilt_2: float
14 The signal parameters
15 flow: float
16 The lower frequency cutoff from which to calculate the signal duration
18 Returns
19 =======
20 safe_signal_duration: float
21 The shortest safe signal duration (i.e., the signal duration rounded up
22 to the nearest power of 2)
24 """
25 from lal import MSUN_SI
26 from lalsimulation import SimInspiralChirpTimeBound
27 chirp_time = SimInspiralChirpTimeBound(
28 flow, mass_1 * MSUN_SI, mass_2 * MSUN_SI,
29 a_1 * np.cos(tilt_1), a_2 * np.cos(tilt_2))
30 return max(2**(int(np.log2(chirp_time)) + 1), 4)
33def inject_signal_into_gwpy_timeseries(
34 data, waveform_generator, parameters, det, power_spectral_density=None, outdir=None, label=None):
35 """ Inject a signal into a gwpy timeseries
37 Parameters
38 ==========
39 data: gwpy.timeseries.TimeSeries
40 The time-series data into which we want to inject the signal
41 waveform_generator: bilby.gw.waveform_generator.WaveformGenerator
42 An initialised waveform_generator
43 parameters: dict
44 A dictionary of the signal-parameters to inject
45 det: bilby.gw.detector.Interferometer
46 The interferometer for which the data refers too
47 power_spectral_density: bilby.gw.detector.PowerSpectralDensity
48 Power spectral density determining the sensitivity of the detector.
49 outdir, label: str
50 If given, the outdir and label used to generate a plot
52 Returns
53 =======
54 data_and_signal: gwpy.timeseries.TimeSeries
55 The data with the time-domain signal added
56 meta_data: dict
57 A dictionary of meta data about the injection
59 """
60 from gwpy.timeseries import TimeSeries
61 from gwpy.plot import Plot
63 ifo = get_empty_interferometer(det)
65 if isinstance(power_spectral_density, PowerSpectralDensity):
66 ifo.power_spectral_density = power_spectral_density
67 elif power_spectral_density is not None:
68 raise TypeError(
69 "Input power_spectral_density should be bilby.gw.detector.psd.PowerSpectralDensity "
70 "object or None, received {}.".format(type(power_spectral_density))
71 )
73 ifo.strain_data.set_from_gwpy_timeseries(data)
75 parameters_check, _ = convert_to_lal_binary_black_hole_parameters(parameters)
76 parameters_check = {key: parameters_check[key] for key in
77 ['mass_1', 'mass_2', 'a_1', 'a_2', 'tilt_1', 'tilt_2']}
78 safe_time = get_safe_signal_duration(**parameters_check)
79 if data.duration.value < safe_time:
80 ValueError(
81 "Injecting a signal with safe-duration {} longer than the data {}"
82 .format(safe_time, data.duration.value))
84 waveform_polarizations = waveform_generator.time_domain_strain(parameters)
86 signal = np.zeros(len(data))
88 for mode in waveform_polarizations.keys():
89 det_response = ifo.antenna_response(
90 parameters['ra'], parameters['dec'], parameters['geocent_time'],
91 parameters['psi'], mode)
92 signal += waveform_polarizations[mode] * det_response
93 time_shift = ifo.time_delay_from_geocenter(
94 parameters['ra'], parameters['dec'], parameters['geocent_time'])
96 dt = parameters['geocent_time'] + time_shift - data.times[0].value
97 n_roll = dt * data.sample_rate.value
98 n_roll = int(np.round(n_roll))
99 signal_shifted = TimeSeries(
100 data=np.roll(signal, n_roll), times=data.times, unit=data.unit)
102 signal_and_data = data.inject(signal_shifted)
104 if outdir is not None and label is not None:
105 fig = Plot(signal_shifted)
106 fig.savefig('{}/{}_{}_time_domain_injected_signal'.format(
107 outdir, ifo.name, label))
109 meta_data = dict()
110 frequency_domain_signal, _ = utils.nfft(signal, waveform_generator.sampling_frequency)
111 meta_data['optimal_SNR'] = (
112 np.sqrt(ifo.optimal_snr_squared(signal=frequency_domain_signal)).real)
113 meta_data['matched_filter_SNR'] = (
114 ifo.matched_filter_snr(signal=frequency_domain_signal))
115 meta_data['parameters'] = parameters
117 logger.info("Injected signal in {}:".format(ifo.name))
118 logger.info(" optimal SNR = {:.2f}".format(meta_data['optimal_SNR']))
119 logger.info(" matched filter SNR = {:.2f}".format(meta_data['matched_filter_SNR']))
120 for key in parameters:
121 logger.info(' {} = {}'.format(key, parameters[key]))
123 return signal_and_data, meta_data
126def get_interferometer_with_fake_noise_and_injection(
127 name, injection_parameters, injection_polarizations=None,
128 waveform_generator=None, sampling_frequency=4096, duration=4,
129 start_time=None, outdir='outdir', label=None, plot=True, save=True,
130 zero_noise=False, raise_error=True):
131 """
132 Helper function to obtain an Interferometer instance with appropriate
133 power spectral density and data, given an center_time.
135 Note: by default this generates an Interferometer with a power spectral
136 density based on advanced LIGO.
138 Parameters
139 ==========
140 name: str
141 Detector name, e.g., 'H1'.
142 injection_parameters: dict
143 injection parameters, needed for sky position and timing
144 injection_polarizations: dict
145 Polarizations of waveform to inject, output of
146 `waveform_generator.frequency_domain_strain()`. If
147 `waveform_generator` is also given, the injection_polarizations will
148 be calculated directly and this argument can be ignored.
149 waveform_generator: bilby.gw.waveform_generator.WaveformGenerator
150 A WaveformGenerator instance using the source model to inject. If
151 `injection_polarizations` is given, this will be ignored.
152 sampling_frequency: float
153 sampling frequency for data, should match injection signal
154 duration: float
155 length of data, should be the same as used for signal generation
156 start_time: float
157 Beginning of data segment, if None, injection is placed 2s before
158 end of segment.
159 outdir: str
160 directory in which to store output
161 label: str
162 If given, an identifying label used in generating file names.
163 plot: bool
164 If true, create an ASD + strain plot
165 save: bool
166 If true, save frequency domain data and PSD to file
167 zero_noise: bool
168 If true, set noise to zero.
170 Returns
171 =======
172 bilby.gw.detector.Interferometer: An Interferometer instance with a PSD and frequency-domain strain data.
174 """
176 utils.check_directory_exists_and_if_not_mkdir(outdir)
178 if start_time is None:
179 start_time = injection_parameters['geocent_time'] + 2 - duration
181 interferometer = get_empty_interferometer(name)
182 interferometer.power_spectral_density = PowerSpectralDensity.from_aligo()
183 if zero_noise:
184 interferometer.set_strain_data_from_zero_noise(
185 sampling_frequency=sampling_frequency, duration=duration,
186 start_time=start_time)
187 else:
188 interferometer.set_strain_data_from_power_spectral_density(
189 sampling_frequency=sampling_frequency, duration=duration,
190 start_time=start_time)
192 injection_polarizations = interferometer.inject_signal(
193 parameters=injection_parameters,
194 injection_polarizations=injection_polarizations,
195 waveform_generator=waveform_generator,
196 raise_error=raise_error)
198 signal = interferometer.get_detector_response(
199 injection_polarizations, injection_parameters)
201 if plot:
202 interferometer.plot_data(signal=signal, outdir=outdir, label=label)
204 if save:
205 interferometer.save_data(outdir, label=label)
207 return interferometer
210def load_data_from_cache_file(
211 cache_file, start_time, segment_duration, psd_duration, psd_start_time,
212 channel_name=None, sampling_frequency=4096, roll_off=0.2,
213 overlap=0, outdir=None):
214 """ Helper routine to generate an interferometer from a cache file
216 Parameters
217 ==========
218 cache_file: str
219 Path to the location of the cache file
220 start_time, psd_start_time: float
221 GPS start time of the segment and data stretch used for the PSD
222 segment_duration, psd_duration: float
223 Segment duration and duration of data to use to generate the PSD (in
224 seconds).
225 roll_off: float, optional
226 Rise time in seconds of tukey window.
227 overlap: float,
228 Number of seconds of overlap between FFTs.
229 channel_name: str
230 Channel name
231 sampling_frequency: int
232 Sampling frequency
233 outdir: str, optional
234 The output directory in which the data is saved
236 Returns
237 =======
238 ifo: bilby.gw.detector.Interferometer
239 An initialised interferometer object with strain data set to the
240 appropriate data in the cache file and a PSD.
241 """
242 import lal
243 data_set = False
244 psd_set = False
246 with open(cache_file, 'r') as ff:
247 lines = ff.readlines()
248 if len(lines)>1:
249 raise ValueError('This method cannot handle cache files with'
250 ' multiple frames. Use `load_data_by_channel_name'
251 ' instead.')
252 else:
253 line = lines[0]
254 cache = lal.utils.cache.CacheEntry(line)
255 data_in_cache = (
256 (cache.segment[0].gpsSeconds < start_time) &
257 (cache.segment[1].gpsSeconds > start_time + segment_duration))
258 psd_in_cache = (
259 (cache.segment[0].gpsSeconds < psd_start_time) &
260 (cache.segment[1].gpsSeconds > psd_start_time + psd_duration))
261 ifo = get_empty_interferometer(
262 "{}1".format(cache.observatory))
263 if not data_in_cache:
264 raise ValueError('The specified data segment does not exist in'
265 ' this frame.')
266 if not psd_in_cache:
267 raise ValueError('The specified PSD data segment does not exist'
268 ' in this frame.')
269 if (not data_set) & data_in_cache:
270 ifo.set_strain_data_from_frame_file(
271 frame_file=cache.path,
272 sampling_frequency=sampling_frequency,
273 duration=segment_duration,
274 start_time=start_time,
275 channel=channel_name, buffer_time=0)
276 data_set = True
277 if (not psd_set) & psd_in_cache:
278 ifo.power_spectral_density = \
279 PowerSpectralDensity.from_frame_file(
280 cache.path,
281 psd_start_time=psd_start_time,
282 psd_duration=psd_duration,
283 fft_length=segment_duration,
284 sampling_frequency=sampling_frequency,
285 roll_off=roll_off,
286 overlap=overlap,
287 channel=channel_name,
288 name=cache.observatory,
289 outdir=outdir,
290 analysis_segment_start_time=start_time)
291 psd_set = True
292 if data_set and psd_set:
293 return ifo
294 elif not data_set:
295 raise ValueError('Data not loaded for {}'.format(ifo.name))
296 elif not psd_set:
297 raise ValueError('PSD not created for {}'.format(ifo.name))
300def load_data_by_channel_name(
301 channel_name, start_time, segment_duration, psd_duration, psd_start_time,
302 sampling_frequency=4096, roll_off=0.2,
303 overlap=0, outdir=None):
304 """ Helper routine to generate an interferometer from a channel name
305 This function creates an empty interferometer specified in the name
306 of the channel. It calls `ifo.set_strain_data_from_channel_name` to
307 set the data and PSD in the interferometer using data retrieved from
308 the specified channel using gwpy.TimeSeries.get()
310 Parameters
311 ==========
312 channel_name: str
313 Channel name with the format `IFO:Channel`
314 start_time, psd_start_time: float
315 GPS start time of the segment and data stretch used for the PSD
316 segment_duration, psd_duration: float
317 Segment duration and duration of data to use to generate the PSD (in
318 seconds).
319 roll_off: float, optional
320 Rise time in seconds of tukey window.
321 overlap: float,
322 Number of seconds of overlap between FFTs.
323 sampling_frequency: int
324 Sampling frequency
325 outdir: str, optional
326 The output directory in which the data is saved
328 Returns
329 =======
330 ifo: bilby.gw.detector.Interferometer
331 An initialised interferometer object with strain data set to the
332 appropriate data fetched from the specified channel and a PSD.
333 """
334 try:
335 det = channel_name.split(':')[-2]
336 except IndexError:
337 raise IndexError("Channel name must be of the format `IFO:Channel`")
338 ifo = get_empty_interferometer(det)
340 ifo.set_strain_data_from_channel_name(
341 channel=channel_name,
342 sampling_frequency=sampling_frequency,
343 duration=segment_duration,
344 start_time=start_time)
346 ifo.power_spectral_density = \
347 PowerSpectralDensity.from_channel_name(
348 channel=channel_name,
349 psd_start_time=psd_start_time,
350 psd_duration=psd_duration,
351 fft_length=segment_duration,
352 sampling_frequency=sampling_frequency,
353 roll_off=roll_off,
354 overlap=overlap,
355 name=det,
356 outdir=outdir,
357 analysis_segment_start_time=start_time)
358 return ifo