Coverage for bilby/gw/detector/networks.py: 96%

147 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-05-06 04:57 +0000

1import os 

2 

3import numpy as np 

4import math 

5 

6from ...core import utils 

7from ...core.utils import logger, safe_file_dump 

8from .interferometer import Interferometer 

9from .psd import PowerSpectralDensity 

10 

11 

12class InterferometerList(list): 

13 """A list of Interferometer objects""" 

14 

15 def __init__(self, interferometers): 

16 """Instantiate a InterferometerList 

17 

18 The InterferometerList is a list of Interferometer objects, each 

19 object has the data used in evaluating the likelihood 

20 

21 Parameters 

22 ========== 

23 interferometers: iterable 

24 The list of interferometers 

25 """ 

26 

27 super(InterferometerList, self).__init__() 

28 if isinstance(interferometers, str): 

29 raise TypeError("Input must not be a string") 

30 for ifo in interferometers: 

31 if isinstance(ifo, str): 

32 ifo = get_empty_interferometer(ifo) 

33 if not isinstance(ifo, (Interferometer, TriangularInterferometer)): 

34 raise TypeError( 

35 "Input list of interferometers are not all Interferometer objects" 

36 ) 

37 else: 

38 self.append(ifo) 

39 self._check_interferometers() 

40 

41 def _check_interferometers(self): 

42 """Verify IFOs 'duration', 'start_time', 'sampling_frequency' are the same. 

43 

44 If the above attributes are not the same, then the attributes are checked to 

45 see if they are the same up to 5 decimal places. 

46 

47 If both checks fail, then a ValueError is raised. 

48 """ 

49 consistent_attributes = ["duration", "start_time", "sampling_frequency"] 

50 for attribute in consistent_attributes: 

51 x = [ 

52 getattr(interferometer.strain_data, attribute) 

53 for interferometer in self 

54 ] 

55 try: 

56 if not all(y == x[0] for y in x): 

57 ifo_strs = [ 

58 "{ifo}[{attribute}]={value}".format( 

59 ifo=ifo.name, 

60 attribute=attribute, 

61 value=getattr(ifo.strain_data, attribute), 

62 ) 

63 for ifo in self 

64 ] 

65 raise ValueError( 

66 "The {} of all interferometers are not the same: {}".format( 

67 attribute, ", ".join(ifo_strs) 

68 ) 

69 ) 

70 except ValueError as e: 

71 if not all(math.isclose(y, x[0], abs_tol=1e-5) for y in x): 

72 raise ValueError(e) 

73 else: 

74 logger.warning(e) 

75 

76 def set_strain_data_from_power_spectral_densities( 

77 self, sampling_frequency, duration, start_time=0 

78 ): 

79 """Set the `Interferometer.strain_data` from the power spectral densities of the detectors 

80 

81 This uses the `interferometer.power_spectral_density` object to set 

82 the `strain_data` to a noise realization. See 

83 `bilby.gw.detector.InterferometerStrainData` for further information. 

84 

85 Parameters 

86 ========== 

87 sampling_frequency: float 

88 The sampling frequency (in Hz) 

89 duration: float 

90 The data duration (in s) 

91 start_time: float 

92 The GPS start-time of the data 

93 

94 """ 

95 for interferometer in self: 

96 interferometer.set_strain_data_from_power_spectral_density( 

97 sampling_frequency=sampling_frequency, 

98 duration=duration, 

99 start_time=start_time, 

100 ) 

101 

102 def set_strain_data_from_zero_noise( 

103 self, sampling_frequency, duration, start_time=0 

104 ): 

105 """Set the `Interferometer.strain_data` from the power spectral densities of the detectors 

106 

107 This uses the `interferometer.power_spectral_density` object to set 

108 the `strain_data` to zero noise. See 

109 `bilby.gw.detector.InterferometerStrainData` for further information. 

110 

111 Parameters 

112 ========== 

113 sampling_frequency: float 

114 The sampling frequency (in Hz) 

115 duration: float 

116 The data duration (in s) 

117 start_time: float 

118 The GPS start-time of the data 

119 

120 """ 

121 for interferometer in self: 

122 interferometer.set_strain_data_from_zero_noise( 

123 sampling_frequency=sampling_frequency, 

124 duration=duration, 

125 start_time=start_time, 

126 ) 

127 

128 def inject_signal( 

129 self, 

130 parameters=None, 

131 injection_polarizations=None, 

132 waveform_generator=None, 

133 raise_error=True, 

134 ): 

135 """ Inject a signal into noise in each of the three detectors. 

136 

137 Parameters 

138 ========== 

139 parameters: dict 

140 Parameters of the injection. 

141 injection_polarizations: dict 

142 Polarizations of waveform to inject, output of 

143 `waveform_generator.frequency_domain_strain()`. If 

144 `waveform_generator` is also given, the injection_polarizations will 

145 be calculated directly and this argument can be ignored. 

146 waveform_generator: bilby.gw.waveform_generator.WaveformGenerator 

147 A WaveformGenerator instance using the source model to inject. If 

148 `injection_polarizations` is given, this will be ignored. 

149 raise_error: bool 

150 Whether to raise an error if the injected signal does not fit in 

151 the segment. 

152 

153 Notes 

154 ===== 

155 if your signal takes a substantial amount of time to generate, or 

156 you experience buggy behaviour. It is preferable to provide the 

157 injection_polarizations directly. 

158 

159 Returns 

160 ======= 

161 injection_polarizations: dict 

162 

163 """ 

164 if injection_polarizations is None: 

165 if waveform_generator is not None: 

166 injection_polarizations = waveform_generator.frequency_domain_strain( 

167 parameters 

168 ) 

169 else: 

170 raise ValueError( 

171 "inject_signal needs one of waveform_generator or " 

172 "injection_polarizations." 

173 ) 

174 

175 all_injection_polarizations = list() 

176 for interferometer in self: 

177 all_injection_polarizations.append( 

178 interferometer.inject_signal( 

179 parameters=parameters, 

180 injection_polarizations=injection_polarizations, 

181 raise_error=raise_error, 

182 ) 

183 ) 

184 

185 return all_injection_polarizations 

186 

187 def save_data(self, outdir, label=None): 

188 """Creates a save file for the data in plain text format 

189 

190 Parameters 

191 ========== 

192 outdir: str 

193 The output directory in which the data is supposed to be saved 

194 label: str 

195 The string labelling the data 

196 """ 

197 for interferometer in self: 

198 interferometer.save_data(outdir=outdir, label=label) 

199 

200 def plot_data(self, signal=None, outdir=".", label=None): 

201 if utils.command_line_args.bilby_test_mode: 

202 return 

203 

204 for interferometer in self: 

205 interferometer.plot_data(signal=signal, outdir=outdir, label=label) 

206 

207 @property 

208 def number_of_interferometers(self): 

209 return len(self) 

210 

211 @property 

212 def duration(self): 

213 return self[0].strain_data.duration 

214 

215 @property 

216 def start_time(self): 

217 return self[0].strain_data.start_time 

218 

219 @property 

220 def sampling_frequency(self): 

221 return self[0].strain_data.sampling_frequency 

222 

223 @property 

224 def frequency_array(self): 

225 return self[0].strain_data.frequency_array 

226 

227 def append(self, interferometer): 

228 if isinstance(interferometer, InterferometerList): 

229 super(InterferometerList, self).extend(interferometer) 

230 else: 

231 super(InterferometerList, self).append(interferometer) 

232 self._check_interferometers() 

233 

234 def extend(self, interferometers): 

235 super(InterferometerList, self).extend(interferometers) 

236 self._check_interferometers() 

237 

238 def insert(self, index, interferometer): 

239 super(InterferometerList, self).insert(index, interferometer) 

240 self._check_interferometers() 

241 

242 @property 

243 def meta_data(self): 

244 """Dictionary of the per-interferometer meta_data""" 

245 return { 

246 interferometer.name: interferometer.meta_data for interferometer in self 

247 } 

248 

249 @staticmethod 

250 def _filename_from_outdir_label_extension(outdir, label, extension="h5"): 

251 return os.path.join(outdir, label + f".{extension}") 

252 

253 _save_docstring = """ Saves the object to a {format} file 

254 

255 {extra} 

256 

257 Parameters 

258 ========== 

259 outdir: str, optional 

260 Output directory name of the file 

261 label: str, optional 

262 Output file name, is 'ifo_list' if not given otherwise. A list of 

263 the included interferometers will be appended. 

264 """ 

265 

266 _load_docstring = """ Loads in an InterferometerList object from a {format} file 

267 

268 Parameters 

269 ========== 

270 filename: str 

271 If given, try to load from this filename 

272 

273 """ 

274 

275 def to_pickle(self, outdir="outdir", label="ifo_list"): 

276 utils.check_directory_exists_and_if_not_mkdir(outdir) 

277 label = label + "_" + "".join(ifo.name for ifo in self) 

278 filename = self._filename_from_outdir_label_extension( 

279 outdir, label, extension="pkl" 

280 ) 

281 safe_file_dump(self, filename, "dill") 

282 

283 @classmethod 

284 def from_pickle(cls, filename=None): 

285 import dill 

286 

287 with open(filename, "rb") as ff: 

288 res = dill.load(ff) 

289 if res.__class__ != cls: 

290 raise TypeError("The loaded object is not an InterferometerList") 

291 return res 

292 

293 to_pickle.__doc__ = _save_docstring.format( 

294 format="pickle", extra=".. versionadded:: 1.1.0" 

295 ) 

296 from_pickle.__doc__ = _load_docstring.format(format="pickle") 

297 

298 

299class TriangularInterferometer(InterferometerList): 

300 def __init__( 

301 self, 

302 name, 

303 power_spectral_density, 

304 minimum_frequency, 

305 maximum_frequency, 

306 length, 

307 latitude, 

308 longitude, 

309 elevation, 

310 xarm_azimuth, 

311 yarm_azimuth, 

312 xarm_tilt=0.0, 

313 yarm_tilt=0.0, 

314 ): 

315 super(TriangularInterferometer, self).__init__([]) 

316 self.name = name 

317 # for attr in ['power_spectral_density', 'minimum_frequency', 'maximum_frequency']: 

318 if isinstance(power_spectral_density, PowerSpectralDensity): 

319 power_spectral_density = [power_spectral_density] * 3 

320 if isinstance(minimum_frequency, float) or isinstance(minimum_frequency, int): 

321 minimum_frequency = [minimum_frequency] * 3 

322 if isinstance(maximum_frequency, float) or isinstance(maximum_frequency, int): 

323 maximum_frequency = [maximum_frequency] * 3 

324 

325 for ii in range(3): 

326 self.append( 

327 Interferometer( 

328 "{}{}".format(name, ii + 1), 

329 power_spectral_density[ii], 

330 minimum_frequency[ii], 

331 maximum_frequency[ii], 

332 length, 

333 latitude, 

334 longitude, 

335 elevation, 

336 xarm_azimuth, 

337 yarm_azimuth, 

338 xarm_tilt, 

339 yarm_tilt, 

340 ) 

341 ) 

342 

343 xarm_azimuth += 240 

344 yarm_azimuth += 240 

345 

346 latitude += ( 

347 np.arctan( 

348 length 

349 * np.sin(xarm_azimuth * np.pi / 180) 

350 * 1e3 

351 / utils.radius_of_earth 

352 ) 

353 * 180 

354 / np.pi 

355 ) 

356 longitude += ( 

357 np.arctan( 

358 length 

359 * np.cos(xarm_azimuth * np.pi / 180) 

360 * 1e3 

361 / utils.radius_of_earth 

362 ) 

363 * 180 

364 / np.pi 

365 ) 

366 

367 

368def get_empty_interferometer(name): 

369 """ 

370 Get an interferometer with standard parameters for known detectors. 

371 

372 These objects do not have any noise instantiated. 

373 

374 The available instruments are: 

375 H1, L1, V1, GEO600, CE 

376 

377 Detector positions taken from: 

378 L1/H1: LIGO-T980044-10 

379 V1/GEO600: arXiv:gr-qc/0008066 [45] 

380 CE: located at the site of H1 

381 

382 Detector sensitivities: 

383 H1/L1/V1: https://dcc.ligo.org/LIGO-P1200087-v42/public 

384 GEO600: http://www.geo600.org/1032083/GEO600_Sensitivity_Curves 

385 CE: https://dcc.ligo.org/LIGO-P1600143/public 

386 

387 

388 Parameters 

389 ========== 

390 name: str 

391 Interferometer identifier. 

392 

393 Returns 

394 ======= 

395 interferometer: Interferometer 

396 Interferometer instance 

397 """ 

398 filename = os.path.join( 

399 os.path.dirname(__file__), "detectors", "{}.interferometer".format(name) 

400 ) 

401 try: 

402 return load_interferometer(filename) 

403 except OSError: 

404 raise ValueError("Interferometer {} not implemented".format(name)) 

405 

406 

407def load_interferometer(filename): 

408 """Load an interferometer from a file.""" 

409 parameters = dict() 

410 with open(filename, "r") as parameter_file: 

411 lines = parameter_file.readlines() 

412 for line in lines: 

413 if line[0] == "#" or line[0] == "\n": 

414 continue 

415 split_line = line.split("=") 

416 key = split_line[0].strip() 

417 value = eval("=".join(split_line[1:])) 

418 parameters[key] = value 

419 if "shape" not in parameters.keys(): 

420 ifo = Interferometer(**parameters) 

421 logger.debug("Assuming L shape for {}".format("name")) 

422 elif parameters["shape"].lower() in ["l", "ligo"]: 

423 parameters.pop("shape") 

424 ifo = Interferometer(**parameters) 

425 elif parameters["shape"].lower() in ["triangular", "triangle"]: 

426 parameters.pop("shape") 

427 ifo = TriangularInterferometer(**parameters) 

428 else: 

429 raise IOError( 

430 "{} could not be loaded. Invalid parameter 'shape'.".format(filename) 

431 ) 

432 return ifo