Coverage for bilby/gw/detector/psd.py: 96%
148 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
1import os
3import numpy as np
4from scipy.interpolate import interp1d
6from ...core import utils
7from ...core.utils import logger
8from .strain_data import InterferometerStrainData
11class PowerSpectralDensity(object):
13 def __init__(self, frequency_array=None, psd_array=None, asd_array=None,
14 psd_file=None, asd_file=None):
15 """
16 Instantiate a new PowerSpectralDensity object.
18 Examples
19 ========
20 Using the `from` method directly (here `psd_file` is a string
21 containing the path to the file to load):
23 .. code-block:: python
25 >>> power_spectral_density = PowerSpectralDensity.from_power_spectral_density_file(psd_file)
27 Alternatively (and equivalently) setting the psd_file directly:
29 .. code-block:: python
31 >>> power_spectral_density = PowerSpectralDensity(psd_file=psd_file)
33 Attributes
34 ==========
35 asd_array: array_like
36 Array representation of the ASD
37 asd_file: str
38 Name of the ASD file
39 frequency_array: array_like
40 Array containing the frequencies of the ASD/PSD values
41 psd_array: array_like
42 Array representation of the PSD
43 psd_file: str
44 Name of the PSD file
45 power_spectral_density_interpolated: scipy.interpolated.interp1d
46 Interpolated function of the PSD
48 """
49 self._cache = dict(
50 frequency_array=np.array([]), psd_array=None, asd_array=None)
51 self.frequency_array = np.array(frequency_array)
52 if psd_array is not None:
53 self.psd_array = psd_array
54 if asd_array is not None:
55 self.asd_array = asd_array
56 self.psd_file = psd_file
57 self.asd_file = asd_file
59 def _update_cache(self, frequency_array):
60 psd_array = self.power_spectral_density_interpolated(frequency_array)
61 self._cache['psd_array'] = psd_array
62 self._cache['asd_array'] = psd_array**0.5
63 self._cache['frequency_array'] = frequency_array
65 def __eq__(self, other):
66 if self.psd_file == other.psd_file \
67 and self.asd_file == other.asd_file \
68 and np.array_equal(self.frequency_array, other.frequency_array) \
69 and np.array_equal(self.psd_array, other.psd_array) \
70 and np.array_equal(self.asd_array, other.asd_array):
71 return True
72 return False
74 def __repr__(self):
75 if self.asd_file is not None or self.psd_file is not None:
76 return self.__class__.__name__ + '(psd_file=\'{}\', asd_file=\'{}\')' \
77 .format(self.psd_file, self.asd_file)
78 else:
79 return self.__class__.__name__ + '(frequency_array={}, psd_array={}, asd_array={})' \
80 .format(self.frequency_array, self.psd_array, self.asd_array)
82 @staticmethod
83 def from_amplitude_spectral_density_file(asd_file):
84 """ Set the amplitude spectral density from a given file
86 Parameters
87 ==========
88 asd_file: str
89 File containing amplitude spectral density, format 'f h_f'
91 """
92 return PowerSpectralDensity(asd_file=asd_file)
94 @staticmethod
95 def from_power_spectral_density_file(psd_file):
96 """ Set the power spectral density from a given file
98 Parameters
99 ==========
100 psd_file: str, optional
101 File containing power spectral density, format 'f h_f'
103 """
104 return PowerSpectralDensity(psd_file=psd_file)
106 @staticmethod
107 def from_frame_file(frame_file, psd_start_time, psd_duration,
108 fft_length=4, sampling_frequency=4096, roll_off=0.2,
109 overlap=0, channel=None, name=None, outdir=None,
110 analysis_segment_start_time=None):
111 """ Generate power spectral density from a frame file
113 Parameters
114 ==========
115 frame_file: str, optional
116 Frame file to read data from.
117 psd_start_time: float
118 Beginning of segment to analyse.
119 psd_duration: float, optional
120 Duration of data (in seconds) to generate PSD from.
121 fft_length: float, optional
122 Number of seconds in a single fft.
123 sampling_frequency: float, optional
124 Sampling frequency for time series.
125 This is twice the maximum frequency.
126 roll_off: float, optional
127 Rise time in seconds of tukey window.
128 overlap: float,
129 Number of seconds of overlap between FFTs.
130 channel: str, optional
131 Name of channel to use to generate PSD.
132 name, outdir: str, optional
133 Name (and outdir) of the detector for which a PSD is to be
134 generated.
135 analysis_segment_start_time: float, optional
136 The start time of the analysis segment, if given, this data will
137 be removed before creating the PSD.
139 """
140 strain = InterferometerStrainData(roll_off=roll_off)
141 strain.set_from_frame_file(
142 frame_file, start_time=psd_start_time, duration=psd_duration,
143 channel=channel, sampling_frequency=sampling_frequency)
144 frequency_array, psd_array = strain.create_power_spectral_density(
145 fft_length=fft_length, name=name, outdir=outdir, overlap=overlap,
146 analysis_segment_start_time=analysis_segment_start_time)
147 return PowerSpectralDensity(frequency_array=frequency_array, psd_array=psd_array)
149 @staticmethod
150 def from_channel_name(channel, psd_start_time, psd_duration,
151 fft_length=4, sampling_frequency=4096, roll_off=0.2,
152 overlap=0, name=None, outdir=None,
153 analysis_segment_start_time=None):
154 """ Generate power spectral density from a given channel name
155 by loading data using `strain_data.set_from_channel_name`
157 Parameters
158 ==========
159 psd_start_time: float
160 Beginning of segment to analyse.
161 psd_duration: float, optional
162 Duration of data (in seconds) to generate PSD from.
163 fft_length: float, optional
164 Number of seconds in a single fft.
165 sampling_frequency: float, optional
166 Sampling frequency for time series.
167 This is twice the maximum frequency.
168 roll_off: float, optional
169 Rise time in seconds of tukey window.
170 overlap: float,
171 Number of seconds of overlap between FFTs.
172 channel: str
173 Name of channel to use to generate PSD in the format
174 `IFO:Channel`
175 name, outdir: str, optional
176 Name (and outdir) of the detector for which a PSD is to be
177 generated.
178 analysis_segment_start_time: float, optional
179 The start time of the analysis segment, if given, this data will
180 be removed before creating the PSD.
182 """
183 strain = InterferometerStrainData(roll_off=roll_off)
184 strain.set_from_channel_name(
185 channel, duration=psd_duration, start_time=psd_start_time,
186 sampling_frequency=sampling_frequency)
187 frequency_array, psd_array = strain.create_power_spectral_density(
188 fft_length=fft_length, name=name, outdir=outdir, overlap=overlap,
189 analysis_segment_start_time=analysis_segment_start_time)
190 return PowerSpectralDensity(frequency_array=frequency_array, psd_array=psd_array)
192 @staticmethod
193 def from_amplitude_spectral_density_array(frequency_array, asd_array):
194 return PowerSpectralDensity(frequency_array=frequency_array, asd_array=asd_array)
196 @staticmethod
197 def from_power_spectral_density_array(frequency_array, psd_array):
198 return PowerSpectralDensity(frequency_array=frequency_array, psd_array=psd_array)
200 @staticmethod
201 def from_aligo():
202 logger.info("No power spectral density provided, using aLIGO,"
203 "zero detuning, high power.")
204 return PowerSpectralDensity.from_power_spectral_density_file(psd_file='aLIGO_ZERO_DET_high_P_psd.txt')
206 @property
207 def psd_array(self):
208 return self.__psd_array
210 @psd_array.setter
211 def psd_array(self, psd_array):
212 self.__check_frequency_array_matches_density_array(psd_array)
213 self.__psd_array = np.array(psd_array)
214 self.__asd_array = psd_array ** 0.5
215 self.__interpolate_power_spectral_density()
217 @property
218 def asd_array(self):
219 return self.__asd_array
221 @asd_array.setter
222 def asd_array(self, asd_array):
223 self.__check_frequency_array_matches_density_array(asd_array)
224 self.__asd_array = np.array(asd_array)
225 self.__psd_array = asd_array ** 2
226 self.__interpolate_power_spectral_density()
228 def __check_frequency_array_matches_density_array(self, density_array):
229 if len(self.frequency_array) != len(density_array):
230 raise ValueError('Provided spectral density does not match frequency array. Not updating.\n'
231 'Length spectral density {}\n Length frequency array {}\n'
232 .format(density_array, self.frequency_array))
234 def __interpolate_power_spectral_density(self):
235 """Interpolate the loaded power spectral density so it can be resampled
236 for arbitrary frequency arrays.
237 """
238 self.__power_spectral_density_interpolated = interp1d(self.frequency_array,
239 self.psd_array,
240 bounds_error=False,
241 fill_value=np.inf)
242 self._update_cache(self.frequency_array)
244 def get_power_spectral_density_array(self, frequency_array):
245 if not np.array_equal(frequency_array, self._cache['frequency_array']):
246 self._update_cache(frequency_array=frequency_array)
247 return self._cache['psd_array']
249 def get_amplitude_spectral_density_array(self, frequency_array):
250 if not np.array_equal(frequency_array, self._cache['frequency_array']):
251 self._update_cache(frequency_array=frequency_array)
252 return self._cache['asd_array']
254 @property
255 def power_spectral_density_interpolated(self):
256 return self.__power_spectral_density_interpolated
258 @property
259 def asd_file(self):
260 return self._asd_file
262 @asd_file.setter
263 def asd_file(self, asd_file):
264 asd_file = self.__validate_file_name(file=asd_file)
265 self._asd_file = asd_file
266 if asd_file is not None:
267 self.__import_amplitude_spectral_density()
268 self.__check_file_was_asd_file()
270 def __check_file_was_asd_file(self):
271 if min(self.asd_array) < 1e-30:
272 logger.warning("You specified an amplitude spectral density file.")
273 logger.warning("{} WARNING {}".format("*" * 30, "*" * 30))
274 logger.warning("The minimum of the provided curve is {:.2e}.".format(min(self.asd_array)))
275 logger.warning("You may have intended to provide this as a power spectral density.")
277 @property
278 def psd_file(self):
279 return self._psd_file
281 @psd_file.setter
282 def psd_file(self, psd_file):
283 psd_file = self.__validate_file_name(file=psd_file)
284 self._psd_file = psd_file
285 if psd_file is not None:
286 self.__import_power_spectral_density()
287 self.__check_file_was_psd_file()
289 def __check_file_was_psd_file(self):
290 if min(self.psd_array) > 1e-30:
291 logger.warning("You specified a power spectral density file.")
292 logger.warning("{} WARNING {}".format("*" * 30, "*" * 30))
293 logger.warning("The minimum of the provided curve is {:.2e}.".format(min(self.psd_array)))
294 logger.warning("You may have intended to provide this as an amplitude spectral density.")
296 @staticmethod
297 def __validate_file_name(file):
298 """
299 Test if the file exists or is available in the default directory.
301 Parameters
302 ==========
303 file: str, None
304 A string pointing either to a PSD file, or the name of a psd file
305 in the default directory. If none, no check is performed.
307 Returns
308 =======
309 file: str
310 The path to the PSD file to use
312 Raises
313 ======
314 ValueError:
315 If the PSD file cannot be located
317 """
318 if file is None:
319 logger.debug("PSD file set to None")
320 return None
321 elif os.path.isfile(file):
322 logger.debug("PSD file {} exists".format(file))
323 return file
324 else:
325 file_in_default_directory = (
326 os.path.join(os.path.dirname(__file__), 'noise_curves', file))
327 if os.path.isfile(file_in_default_directory):
328 logger.debug("PSD file {} exists in default dir.".format(file))
329 return file_in_default_directory
330 else:
331 raise ValueError(
332 "Unable to locate PSD file {} locally or in the default dir"
333 .format(file))
334 return file
336 def __import_amplitude_spectral_density(self):
337 """ Automagically load an amplitude spectral density curve """
338 self.frequency_array, self.asd_array = np.genfromtxt(self.asd_file).T
340 def __import_power_spectral_density(self):
341 """ Automagically load a power spectral density curve """
342 self.frequency_array, self.psd_array = np.genfromtxt(self.psd_file).T
344 def get_noise_realisation(self, sampling_frequency, duration):
345 """
346 Generate frequency Gaussian noise scaled to the power spectral density.
348 Parameters
349 ==========
350 sampling_frequency: float
351 sampling frequency of noise
352 duration: float
353 duration of noise
355 Returns
356 =======
357 array_like: frequency domain strain of this noise realisation
358 array_like: frequencies related to the frequency domain strain
360 """
361 white_noise, frequencies = utils.create_white_noise(sampling_frequency, duration)
362 with np.errstate(invalid="ignore"):
363 frequency_domain_strain = self.__power_spectral_density_interpolated(frequencies) ** 0.5 * white_noise
364 out_of_bounds = (frequencies < min(self.frequency_array)) | (frequencies > max(self.frequency_array))
365 frequency_domain_strain[out_of_bounds] = 0 * (1 + 1j)
366 return frequency_domain_strain, frequencies