Coverage for bilby/gw/detector/calibration.py: 92%

185 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-05-06 04:57 +0000

1""" Functions for adding calibration factors to waveform templates. 

2""" 

3import copy 

4import os 

5 

6import numpy as np 

7import pandas as pd 

8from scipy.interpolate import interp1d 

9 

10from ...core.utils.log import logger 

11from ...core.prior.dict import PriorDict 

12from ..prior import CalibrationPriorDict 

13 

14 

15def read_calibration_file(filename, frequency_array, number_of_response_curves, starting_index=0): 

16 """ 

17 Function to read the hdf5 files from the calibration group containing the physical calibration response curves. 

18 

19 Parameters 

20 ---------- 

21 filename: str 

22 Location of the HDF5 file that contains the curves 

23 frequency_array: array-like 

24 The frequency values to calculate the calibration response curves at 

25 number_of_response_curves: int 

26 Number of random draws to use from the calibration file 

27 starting_index: int 

28 Index of the first curve to use within the array. This allows for segmenting the calibration curve array 

29 into smaller pieces. 

30 

31 Returns 

32 ------- 

33 calibration_draws: array-like 

34 Array which contains the calibration responses as a function of the frequency array specified. 

35 Shape is (number_of_response_curves x len(frequency_array)) 

36 

37 """ 

38 import tables 

39 

40 logger.info(f"Reading calibration draws from {filename}") 

41 calibration_file = tables.open_file(filename, 'r') 

42 calibration_amplitude = \ 

43 calibration_file.root.deltaR.draws_amp_rel[starting_index:number_of_response_curves + starting_index] 

44 calibration_phase = \ 

45 calibration_file.root.deltaR.draws_phase[starting_index:number_of_response_curves + starting_index] 

46 

47 calibration_frequencies = calibration_file.root.deltaR.freq[:] 

48 

49 calibration_file.close() 

50 

51 if len(calibration_amplitude.dtype) != 0: # handling if this is a calibration group hdf5 file 

52 calibration_amplitude = calibration_amplitude.view(np.float64).reshape(calibration_amplitude.shape + (-1,)) 

53 calibration_phase = calibration_phase.view(np.float64).reshape(calibration_phase.shape + (-1,)) 

54 calibration_frequencies = calibration_frequencies.view(np.float64) 

55 

56 # interpolate to the frequency array (where if outside the range of the calibration uncertainty its fixed to 1) 

57 calibration_draws = calibration_amplitude * np.exp(1j * calibration_phase) 

58 calibration_draws = interp1d( 

59 calibration_frequencies, calibration_draws, kind='cubic', 

60 bounds_error=False, fill_value=1)(frequency_array) 

61 

62 try: 

63 parameter_draws = pd.read_hdf(filename, key="CalParams") 

64 except KeyError: 

65 parameter_draws = None 

66 

67 return calibration_draws, parameter_draws 

68 

69 

70def write_calibration_file(filename, frequency_array, calibration_draws, calibration_parameter_draws=None): 

71 """ 

72 Function to write the generated response curves to file 

73 

74 Parameters 

75 ---------- 

76 filename: str 

77 Location and filename to save the file 

78 frequency_array: array-like 

79 The frequency values where the calibration response was calculated 

80 calibration_draws: array-like 

81 Array which contains the calibration responses as a function of the frequency array specified. 

82 Shape is (number_of_response_curves x len(frequency_array)) 

83 calibration_parameter_draws: data_frame 

84 Parameters used to generate the random draws of the calibration response curves 

85 

86 """ 

87 import tables 

88 

89 logger.info(f"Writing calibration draws to {filename}") 

90 calibration_file = tables.open_file(filename, 'w') 

91 deltaR_group = calibration_file.create_group(calibration_file.root, 'deltaR') 

92 

93 # Save output 

94 calibration_file.create_carray(deltaR_group, 'draws_amp_rel', obj=np.abs(calibration_draws)) 

95 calibration_file.create_carray(deltaR_group, 'draws_phase', obj=np.angle(calibration_draws)) 

96 calibration_file.create_carray(deltaR_group, 'freq', obj=frequency_array) 

97 

98 calibration_file.close() 

99 

100 # Save calibration parameter draws 

101 if calibration_parameter_draws is not None: 

102 calibration_parameter_draws.to_hdf(filename, key='CalParams', data_columns=True, format='table') 

103 

104 

105class Recalibrate(object): 

106 

107 name = 'none' 

108 

109 def __init__(self, prefix='recalib_'): 

110 """ 

111 Base calibration object. This applies no transformation 

112 

113 Parameters 

114 ========== 

115 prefix: str 

116 Prefix on parameters relating to the calibration. 

117 """ 

118 self.params = dict() 

119 self.prefix = prefix 

120 

121 def __repr__(self): 

122 return self.__class__.__name__ + '(prefix=\'{}\')'.format(self.prefix) 

123 

124 def get_calibration_factor(self, frequency_array, **params): 

125 """Apply calibration model 

126 

127 This method should be overwritten by subclasses 

128 

129 Parameters 

130 ========== 

131 frequency_array: array-like 

132 The frequency values to calculate the calibration factor for. 

133 params : dict 

134 Dictionary of sampling parameters which includes 

135 calibration parameters. 

136 

137 Returns 

138 ======= 

139 calibration_factor : array-like 

140 The factor to multiply the strain by. 

141 """ 

142 return np.ones_like(frequency_array) 

143 

144 def set_calibration_parameters(self, **params): 

145 self.params.update({key[len(self.prefix):]: params[key] for key in params 

146 if self.prefix in key}) 

147 

148 def __eq__(self, other): 

149 return self.__dict__ == other.__dict__ 

150 

151 

152class CubicSpline(Recalibrate): 

153 

154 name = 'cubic_spline' 

155 

156 def __init__(self, prefix, minimum_frequency, maximum_frequency, n_points): 

157 """ 

158 Cubic spline recalibration 

159 

160 see https://dcc.ligo.org/DocDB/0116/T1400682/001/calnote.pdf 

161 

162 This assumes the spline points follow 

163 np.logspace(np.log(minimum_frequency), np.log(maximum_frequency), n_points) 

164 

165 Parameters 

166 ========== 

167 prefix: str 

168 Prefix on parameters relating to the calibration. 

169 minimum_frequency: float 

170 minimum frequency of spline points 

171 maximum_frequency: float 

172 maximum frequency of spline points 

173 n_points: int 

174 number of spline points 

175 """ 

176 super(CubicSpline, self).__init__(prefix=prefix) 

177 if n_points < 4: 

178 raise ValueError('Cubic spline calibration requires at least 4 spline nodes.') 

179 self.n_points = n_points 

180 self.minimum_frequency = minimum_frequency 

181 self.maximum_frequency = maximum_frequency 

182 self._log_spline_points = np.linspace( 

183 np.log10(minimum_frequency), np.log10(maximum_frequency), n_points) 

184 

185 @property 

186 def delta_log_spline_points(self): 

187 if not hasattr(self, "_delta_log_spline_points"): 

188 self._delta_log_spline_points = self._log_spline_points[1] - self._log_spline_points[0] 

189 return self._delta_log_spline_points 

190 

191 @property 

192 def nodes_to_spline_coefficients(self): 

193 if not hasattr(self, "_nodes_to_spline_coefficients"): 

194 self._setup_spline_coefficients() 

195 return self._nodes_to_spline_coefficients 

196 

197 def _setup_spline_coefficients(self): 

198 """ 

199 Precompute matrix converting values at nodes to spline coefficients. 

200 The algorithm for interpolation is described in 

201 https://dcc.ligo.org/LIGO-T2300140, and the matrix calculated here is 

202 to solve Eq. (9) in the note. 

203 """ 

204 tmp1 = np.zeros(shape=(self.n_points, self.n_points)) 

205 tmp1[0, 0] = -1 

206 tmp1[0, 1] = 2 

207 tmp1[0, 2] = -1 

208 tmp1[-1, -3] = -1 

209 tmp1[-1, -2] = 2 

210 tmp1[-1, -1] = -1 

211 for i in range(1, self.n_points - 1): 

212 tmp1[i, i - 1] = 1 / 6 

213 tmp1[i, i] = 2 / 3 

214 tmp1[i, i + 1] = 1 / 6 

215 tmp2 = np.zeros(shape=(self.n_points, self.n_points)) 

216 for i in range(1, self.n_points - 1): 

217 tmp2[i, i - 1] = 1 

218 tmp2[i, i] = -2 

219 tmp2[i, i + 1] = 1 

220 self._nodes_to_spline_coefficients = np.linalg.solve(tmp1, tmp2) 

221 

222 @property 

223 def log_spline_points(self): 

224 return self._log_spline_points 

225 

226 def __repr__(self): 

227 return self.__class__.__name__ + '(prefix=\'{}\', minimum_frequency={}, maximum_frequency={}, n_points={})'\ 

228 .format(self.prefix, self.minimum_frequency, self.maximum_frequency, self.n_points) 

229 

230 def _evaluate_spline(self, kind, a, b, c, d, previous_nodes): 

231 """Evaluate Eq. (1) in https://dcc.ligo.org/LIGO-T2300140""" 

232 parameters = np.array([self.params[f"{kind}_{ii}"] for ii in range(self.n_points)]) 

233 next_nodes = previous_nodes + 1 

234 spline_coefficients = self.nodes_to_spline_coefficients.dot(parameters) 

235 return ( 

236 a * parameters[previous_nodes] 

237 + b * parameters[next_nodes] 

238 + c * spline_coefficients[previous_nodes] 

239 + d * spline_coefficients[next_nodes] 

240 ) 

241 

242 def get_calibration_factor(self, frequency_array, **params): 

243 """Apply calibration model 

244 

245 Parameters 

246 ========== 

247 frequency_array: array-like 

248 The frequency values to calculate the calibration factor for. 

249 prefix: str 

250 Prefix for calibration parameter names 

251 params : dict 

252 Dictionary of sampling parameters which includes 

253 calibration parameters. 

254 

255 Returns 

256 ======= 

257 calibration_factor : array-like 

258 The factor to multiply the strain by. 

259 """ 

260 log10f_per_deltalog10f = ( 

261 np.log10(frequency_array) - self.log_spline_points[0] 

262 ) / self.delta_log_spline_points 

263 previous_nodes = np.clip(np.floor(log10f_per_deltalog10f).astype(int), a_min=0, a_max=self.n_points - 2) 

264 b = log10f_per_deltalog10f - previous_nodes 

265 a = 1 - b 

266 c = (a**3 - a) / 6 

267 d = (b**3 - b) / 6 

268 

269 self.set_calibration_parameters(**params) 

270 

271 delta_amplitude = self._evaluate_spline("amplitude", a, b, c, d, previous_nodes) 

272 delta_phase = self._evaluate_spline("phase", a, b, c, d, previous_nodes) 

273 calibration_factor = (1 + delta_amplitude) * (2 + 1j * delta_phase) / (2 - 1j * delta_phase) 

274 

275 return calibration_factor 

276 

277 

278class Precomputed(Recalibrate): 

279 

280 name = "precomputed" 

281 

282 def __init__(self, label, curves, frequency_array, parameters=None): 

283 """ 

284 A class for accessing an array of precomputed recalibration curves. 

285 

286 Parameters 

287 ========== 

288 label: str 

289 The label for the interferometer, e.g., H1. The corresponding 

290 parameter is :code:`recalib_index_{label}`. 

291 curves: array-like 

292 Array with shape (n_curves, n_frequencies) with the recalibration 

293 curves. 

294 frequency_array: array-like 

295 Array of frequencies at which the curves are evaluated. 

296 """ 

297 self.label = label 

298 self.curves = curves 

299 self.frequency_array = frequency_array 

300 self.parameters = parameters 

301 super(Precomputed, self).__init__(prefix=f"recalib_index_{self.label}") 

302 

303 def get_calibration_factor(self, frequency_array, **params): 

304 idx = int(params.get(self.prefix, None)) 

305 if idx is None: 

306 raise KeyError(f"Calibration index for {self.label} not found.") 

307 if not np.array_equal(frequency_array, self.frequency_array): 

308 raise ValueError("Frequency grid passed to calibrator doesn't match.") 

309 return self.curves[idx] 

310 

311 @classmethod 

312 def constant_uncertainty_spline( 

313 cls, amplitude_sigma, phase_sigma, frequency_array, n_nodes, label, n_curves 

314 ): 

315 priors = CalibrationPriorDict.constant_uncertainty_spline( 

316 amplitude_sigma=amplitude_sigma, 

317 phase_sigma=phase_sigma, 

318 minimum_frequency=frequency_array[0], 

319 maximum_frequency=frequency_array[-1], 

320 n_nodes=n_nodes, 

321 label=label, 

322 ) 

323 parameters = pd.DataFrame(priors.sample(n_curves)) 

324 curves = curves_from_spline_and_prior( 

325 label=label, 

326 frequency_array=frequency_array, 

327 n_points=n_nodes, 

328 parameters=parameters, 

329 n_curves=n_curves 

330 ) 

331 return cls( 

332 label=label, 

333 curves=np.array(curves), 

334 frequency_array=frequency_array, 

335 parameters=parameters, 

336 ) 

337 

338 @classmethod 

339 def from_envelope_file( 

340 cls, envelope, frequency_array, n_nodes, label, n_curves 

341 ): 

342 priors = CalibrationPriorDict.from_envelope_file( 

343 envelope_file=envelope, 

344 minimum_frequency=frequency_array[0], 

345 maximum_frequency=frequency_array[-1], 

346 n_nodes=n_nodes, 

347 label=label, 

348 ) 

349 parameters = pd.DataFrame(priors.sample(n_curves)) 

350 curves = curves_from_spline_and_prior( 

351 label=label, 

352 frequency_array=frequency_array, 

353 n_points=n_nodes, 

354 parameters=parameters, 

355 n_curves=n_curves, 

356 ) 

357 return cls( 

358 label=label, 

359 curves=np.array(curves), 

360 frequency_array=frequency_array, 

361 parameters=parameters, 

362 ) 

363 

364 @classmethod 

365 def from_calibration_file(cls, label, filename, frequency_array, n_curves, starting_index=0): 

366 curves, parameters = read_calibration_file( 

367 filename=filename, 

368 frequency_array=frequency_array, 

369 number_of_response_curves=n_curves, 

370 starting_index=starting_index, 

371 ) 

372 return cls( 

373 label=label, 

374 curves=np.array(curves), 

375 frequency_array=frequency_array, 

376 parameters=parameters, 

377 ) 

378 

379 

380def build_calibration_lookup( 

381 interferometers, 

382 lookup_files=None, 

383 priors=None, 

384 number_of_response_curves=1000, 

385 starting_index=0, 

386): 

387 if lookup_files is None and priors is None: 

388 raise ValueError( 

389 "One of calibration_lookup_table or priors must be specified for " 

390 "building calibration marginalization lookup table." 

391 ) 

392 elif lookup_files is None: 

393 lookup_files = dict() 

394 

395 draws = dict() 

396 parameters = dict() 

397 for interferometer in interferometers: 

398 name = interferometer.name 

399 frequencies = interferometer.frequency_array 

400 frequencies = frequencies[interferometer.frequency_mask] 

401 filename = lookup_files.get(name, f"{name}_calibration_file.h5") 

402 

403 if os.path.exists(filename): 

404 draws[name], parameters[name] = read_calibration_file( 

405 filename, 

406 frequencies, 

407 number_of_response_curves, 

408 starting_index, 

409 ) 

410 elif isinstance(interferometer.calibration_model, Precomputed): 

411 model = interferometer.calibration_model 

412 idxs = np.arange(number_of_response_curves, dtype=int) + starting_index 

413 draws[name] = model.curves[idxs] 

414 parameters[name] = pd.DataFrame(model.parameters.iloc[idxs]) 

415 parameters[name][model.prefix] = idxs 

416 else: 

417 if priors is None: 

418 raise ValueError( 

419 "Priors must be passed to generate calibration response curves " 

420 "for cubic spline." 

421 ) 

422 draws[name], parameters[name] = _generate_calibration_draws( 

423 interferometer=interferometer, 

424 priors=priors, 

425 n_curves=number_of_response_curves, 

426 ) 

427 write_calibration_file(filename, frequencies, draws[name], parameters[name]) 

428 

429 interferometer.calibration_model = Recalibrate() 

430 

431 return draws, parameters 

432 

433 

434def _generate_calibration_draws(interferometer, priors, n_curves): 

435 name = interferometer.name 

436 frequencies = interferometer.frequency_array 

437 frequencies = frequencies[interferometer.frequency_mask] 

438 calibration_priors = PriorDict() 

439 for key in priors.keys(): 

440 if "recalib" in key and name in key: 

441 calibration_priors[key] = copy.copy(priors[key]) 

442 

443 parameters = pd.DataFrame(calibration_priors.sample(n_curves)) 

444 

445 draws = np.array(curves_from_spline_and_prior( 

446 parameters=parameters, 

447 label=name, 

448 n_points=interferometer.calibration_model.n_points, 

449 frequency_array=frequencies, 

450 n_curves=n_curves, 

451 )) 

452 return draws, parameters 

453 

454 

455def curves_from_spline_and_prior(parameters, label, n_points, frequency_array, n_curves): 

456 spline = CubicSpline( 

457 prefix=f"recalib_{label}_", 

458 minimum_frequency=frequency_array[0], 

459 maximum_frequency=frequency_array[-1], 

460 n_points=n_points, 

461 ) 

462 curves = list() 

463 for ii in range(n_curves): 

464 curves.append(spline.get_calibration_factor( 

465 frequency_array=frequency_array, 

466 **parameters.iloc[ii] 

467 )) 

468 return curves