Coverage for bilby/gw/detector/psd.py: 96%

148 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-05-06 04:57 +0000

1import os 

2 

3import numpy as np 

4from scipy.interpolate import interp1d 

5 

6from ...core import utils 

7from ...core.utils import logger 

8from .strain_data import InterferometerStrainData 

9 

10 

11class PowerSpectralDensity(object): 

12 

13 def __init__(self, frequency_array=None, psd_array=None, asd_array=None, 

14 psd_file=None, asd_file=None): 

15 """ 

16 Instantiate a new PowerSpectralDensity object. 

17 

18 Examples 

19 ======== 

20 Using the `from` method directly (here `psd_file` is a string 

21 containing the path to the file to load): 

22 

23 .. code-block:: python 

24 

25 >>> power_spectral_density = PowerSpectralDensity.from_power_spectral_density_file(psd_file) 

26 

27 Alternatively (and equivalently) setting the psd_file directly: 

28 

29 .. code-block:: python 

30 

31 >>> power_spectral_density = PowerSpectralDensity(psd_file=psd_file) 

32 

33 Attributes 

34 ========== 

35 asd_array: array_like 

36 Array representation of the ASD 

37 asd_file: str 

38 Name of the ASD file 

39 frequency_array: array_like 

40 Array containing the frequencies of the ASD/PSD values 

41 psd_array: array_like 

42 Array representation of the PSD 

43 psd_file: str 

44 Name of the PSD file 

45 power_spectral_density_interpolated: scipy.interpolated.interp1d 

46 Interpolated function of the PSD 

47 

48 """ 

49 self._cache = dict( 

50 frequency_array=np.array([]), psd_array=None, asd_array=None) 

51 self.frequency_array = np.array(frequency_array) 

52 if psd_array is not None: 

53 self.psd_array = psd_array 

54 if asd_array is not None: 

55 self.asd_array = asd_array 

56 self.psd_file = psd_file 

57 self.asd_file = asd_file 

58 

59 def _update_cache(self, frequency_array): 

60 psd_array = self.power_spectral_density_interpolated(frequency_array) 

61 self._cache['psd_array'] = psd_array 

62 self._cache['asd_array'] = psd_array**0.5 

63 self._cache['frequency_array'] = frequency_array 

64 

65 def __eq__(self, other): 

66 if self.psd_file == other.psd_file \ 

67 and self.asd_file == other.asd_file \ 

68 and np.array_equal(self.frequency_array, other.frequency_array) \ 

69 and np.array_equal(self.psd_array, other.psd_array) \ 

70 and np.array_equal(self.asd_array, other.asd_array): 

71 return True 

72 return False 

73 

74 def __repr__(self): 

75 if self.asd_file is not None or self.psd_file is not None: 

76 return self.__class__.__name__ + '(psd_file=\'{}\', asd_file=\'{}\')' \ 

77 .format(self.psd_file, self.asd_file) 

78 else: 

79 return self.__class__.__name__ + '(frequency_array={}, psd_array={}, asd_array={})' \ 

80 .format(self.frequency_array, self.psd_array, self.asd_array) 

81 

82 @staticmethod 

83 def from_amplitude_spectral_density_file(asd_file): 

84 """ Set the amplitude spectral density from a given file 

85 

86 Parameters 

87 ========== 

88 asd_file: str 

89 File containing amplitude spectral density, format 'f h_f' 

90 

91 """ 

92 return PowerSpectralDensity(asd_file=asd_file) 

93 

94 @staticmethod 

95 def from_power_spectral_density_file(psd_file): 

96 """ Set the power spectral density from a given file 

97 

98 Parameters 

99 ========== 

100 psd_file: str, optional 

101 File containing power spectral density, format 'f h_f' 

102 

103 """ 

104 return PowerSpectralDensity(psd_file=psd_file) 

105 

106 @staticmethod 

107 def from_frame_file(frame_file, psd_start_time, psd_duration, 

108 fft_length=4, sampling_frequency=4096, roll_off=0.2, 

109 overlap=0, channel=None, name=None, outdir=None, 

110 analysis_segment_start_time=None): 

111 """ Generate power spectral density from a frame file 

112 

113 Parameters 

114 ========== 

115 frame_file: str, optional 

116 Frame file to read data from. 

117 psd_start_time: float 

118 Beginning of segment to analyse. 

119 psd_duration: float, optional 

120 Duration of data (in seconds) to generate PSD from. 

121 fft_length: float, optional 

122 Number of seconds in a single fft. 

123 sampling_frequency: float, optional 

124 Sampling frequency for time series. 

125 This is twice the maximum frequency. 

126 roll_off: float, optional 

127 Rise time in seconds of tukey window. 

128 overlap: float, 

129 Number of seconds of overlap between FFTs. 

130 channel: str, optional 

131 Name of channel to use to generate PSD. 

132 name, outdir: str, optional 

133 Name (and outdir) of the detector for which a PSD is to be 

134 generated. 

135 analysis_segment_start_time: float, optional 

136 The start time of the analysis segment, if given, this data will 

137 be removed before creating the PSD. 

138 

139 """ 

140 strain = InterferometerStrainData(roll_off=roll_off) 

141 strain.set_from_frame_file( 

142 frame_file, start_time=psd_start_time, duration=psd_duration, 

143 channel=channel, sampling_frequency=sampling_frequency) 

144 frequency_array, psd_array = strain.create_power_spectral_density( 

145 fft_length=fft_length, name=name, outdir=outdir, overlap=overlap, 

146 analysis_segment_start_time=analysis_segment_start_time) 

147 return PowerSpectralDensity(frequency_array=frequency_array, psd_array=psd_array) 

148 

149 @staticmethod 

150 def from_channel_name(channel, psd_start_time, psd_duration, 

151 fft_length=4, sampling_frequency=4096, roll_off=0.2, 

152 overlap=0, name=None, outdir=None, 

153 analysis_segment_start_time=None): 

154 """ Generate power spectral density from a given channel name 

155 by loading data using `strain_data.set_from_channel_name` 

156 

157 Parameters 

158 ========== 

159 psd_start_time: float 

160 Beginning of segment to analyse. 

161 psd_duration: float, optional 

162 Duration of data (in seconds) to generate PSD from. 

163 fft_length: float, optional 

164 Number of seconds in a single fft. 

165 sampling_frequency: float, optional 

166 Sampling frequency for time series. 

167 This is twice the maximum frequency. 

168 roll_off: float, optional 

169 Rise time in seconds of tukey window. 

170 overlap: float, 

171 Number of seconds of overlap between FFTs. 

172 channel: str 

173 Name of channel to use to generate PSD in the format 

174 `IFO:Channel` 

175 name, outdir: str, optional 

176 Name (and outdir) of the detector for which a PSD is to be 

177 generated. 

178 analysis_segment_start_time: float, optional 

179 The start time of the analysis segment, if given, this data will 

180 be removed before creating the PSD. 

181 

182 """ 

183 strain = InterferometerStrainData(roll_off=roll_off) 

184 strain.set_from_channel_name( 

185 channel, duration=psd_duration, start_time=psd_start_time, 

186 sampling_frequency=sampling_frequency) 

187 frequency_array, psd_array = strain.create_power_spectral_density( 

188 fft_length=fft_length, name=name, outdir=outdir, overlap=overlap, 

189 analysis_segment_start_time=analysis_segment_start_time) 

190 return PowerSpectralDensity(frequency_array=frequency_array, psd_array=psd_array) 

191 

192 @staticmethod 

193 def from_amplitude_spectral_density_array(frequency_array, asd_array): 

194 return PowerSpectralDensity(frequency_array=frequency_array, asd_array=asd_array) 

195 

196 @staticmethod 

197 def from_power_spectral_density_array(frequency_array, psd_array): 

198 return PowerSpectralDensity(frequency_array=frequency_array, psd_array=psd_array) 

199 

200 @staticmethod 

201 def from_aligo(): 

202 logger.info("No power spectral density provided, using aLIGO," 

203 "zero detuning, high power.") 

204 return PowerSpectralDensity.from_power_spectral_density_file(psd_file='aLIGO_ZERO_DET_high_P_psd.txt') 

205 

206 @property 

207 def psd_array(self): 

208 return self.__psd_array 

209 

210 @psd_array.setter 

211 def psd_array(self, psd_array): 

212 self.__check_frequency_array_matches_density_array(psd_array) 

213 self.__psd_array = np.array(psd_array) 

214 self.__asd_array = psd_array ** 0.5 

215 self.__interpolate_power_spectral_density() 

216 

217 @property 

218 def asd_array(self): 

219 return self.__asd_array 

220 

221 @asd_array.setter 

222 def asd_array(self, asd_array): 

223 self.__check_frequency_array_matches_density_array(asd_array) 

224 self.__asd_array = np.array(asd_array) 

225 self.__psd_array = asd_array ** 2 

226 self.__interpolate_power_spectral_density() 

227 

228 def __check_frequency_array_matches_density_array(self, density_array): 

229 if len(self.frequency_array) != len(density_array): 

230 raise ValueError('Provided spectral density does not match frequency array. Not updating.\n' 

231 'Length spectral density {}\n Length frequency array {}\n' 

232 .format(density_array, self.frequency_array)) 

233 

234 def __interpolate_power_spectral_density(self): 

235 """Interpolate the loaded power spectral density so it can be resampled 

236 for arbitrary frequency arrays. 

237 """ 

238 self.__power_spectral_density_interpolated = interp1d(self.frequency_array, 

239 self.psd_array, 

240 bounds_error=False, 

241 fill_value=np.inf) 

242 self._update_cache(self.frequency_array) 

243 

244 def get_power_spectral_density_array(self, frequency_array): 

245 if not np.array_equal(frequency_array, self._cache['frequency_array']): 

246 self._update_cache(frequency_array=frequency_array) 

247 return self._cache['psd_array'] 

248 

249 def get_amplitude_spectral_density_array(self, frequency_array): 

250 if not np.array_equal(frequency_array, self._cache['frequency_array']): 

251 self._update_cache(frequency_array=frequency_array) 

252 return self._cache['asd_array'] 

253 

254 @property 

255 def power_spectral_density_interpolated(self): 

256 return self.__power_spectral_density_interpolated 

257 

258 @property 

259 def asd_file(self): 

260 return self._asd_file 

261 

262 @asd_file.setter 

263 def asd_file(self, asd_file): 

264 asd_file = self.__validate_file_name(file=asd_file) 

265 self._asd_file = asd_file 

266 if asd_file is not None: 

267 self.__import_amplitude_spectral_density() 

268 self.__check_file_was_asd_file() 

269 

270 def __check_file_was_asd_file(self): 

271 if min(self.asd_array) < 1e-30: 

272 logger.warning("You specified an amplitude spectral density file.") 

273 logger.warning("{} WARNING {}".format("*" * 30, "*" * 30)) 

274 logger.warning("The minimum of the provided curve is {:.2e}.".format(min(self.asd_array))) 

275 logger.warning("You may have intended to provide this as a power spectral density.") 

276 

277 @property 

278 def psd_file(self): 

279 return self._psd_file 

280 

281 @psd_file.setter 

282 def psd_file(self, psd_file): 

283 psd_file = self.__validate_file_name(file=psd_file) 

284 self._psd_file = psd_file 

285 if psd_file is not None: 

286 self.__import_power_spectral_density() 

287 self.__check_file_was_psd_file() 

288 

289 def __check_file_was_psd_file(self): 

290 if min(self.psd_array) > 1e-30: 

291 logger.warning("You specified a power spectral density file.") 

292 logger.warning("{} WARNING {}".format("*" * 30, "*" * 30)) 

293 logger.warning("The minimum of the provided curve is {:.2e}.".format(min(self.psd_array))) 

294 logger.warning("You may have intended to provide this as an amplitude spectral density.") 

295 

296 @staticmethod 

297 def __validate_file_name(file): 

298 """ 

299 Test if the file exists or is available in the default directory. 

300 

301 Parameters 

302 ========== 

303 file: str, None 

304 A string pointing either to a PSD file, or the name of a psd file 

305 in the default directory. If none, no check is performed. 

306 

307 Returns 

308 ======= 

309 file: str 

310 The path to the PSD file to use 

311 

312 Raises 

313 ====== 

314 ValueError: 

315 If the PSD file cannot be located 

316 

317 """ 

318 if file is None: 

319 logger.debug("PSD file set to None") 

320 return None 

321 elif os.path.isfile(file): 

322 logger.debug("PSD file {} exists".format(file)) 

323 return file 

324 else: 

325 file_in_default_directory = ( 

326 os.path.join(os.path.dirname(__file__), 'noise_curves', file)) 

327 if os.path.isfile(file_in_default_directory): 

328 logger.debug("PSD file {} exists in default dir.".format(file)) 

329 return file_in_default_directory 

330 else: 

331 raise ValueError( 

332 "Unable to locate PSD file {} locally or in the default dir" 

333 .format(file)) 

334 return file 

335 

336 def __import_amplitude_spectral_density(self): 

337 """ Automagically load an amplitude spectral density curve """ 

338 self.frequency_array, self.asd_array = np.genfromtxt(self.asd_file).T 

339 

340 def __import_power_spectral_density(self): 

341 """ Automagically load a power spectral density curve """ 

342 self.frequency_array, self.psd_array = np.genfromtxt(self.psd_file).T 

343 

344 def get_noise_realisation(self, sampling_frequency, duration): 

345 """ 

346 Generate frequency Gaussian noise scaled to the power spectral density. 

347 

348 Parameters 

349 ========== 

350 sampling_frequency: float 

351 sampling frequency of noise 

352 duration: float 

353 duration of noise 

354 

355 Returns 

356 ======= 

357 array_like: frequency domain strain of this noise realisation 

358 array_like: frequencies related to the frequency domain strain 

359 

360 """ 

361 white_noise, frequencies = utils.create_white_noise(sampling_frequency, duration) 

362 with np.errstate(invalid="ignore"): 

363 frequency_domain_strain = self.__power_spectral_density_interpolated(frequencies) ** 0.5 * white_noise 

364 out_of_bounds = (frequencies < min(self.frequency_array)) | (frequencies > max(self.frequency_array)) 

365 frequency_domain_strain[out_of_bounds] = 0 * (1 + 1j) 

366 return frequency_domain_strain, frequencies