Coverage for bilby/gw/detector/calibration.py: 92%
185 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
1""" Functions for adding calibration factors to waveform templates.
2"""
3import copy
4import os
6import numpy as np
7import pandas as pd
8from scipy.interpolate import interp1d
10from ...core.utils.log import logger
11from ...core.prior.dict import PriorDict
12from ..prior import CalibrationPriorDict
15def read_calibration_file(filename, frequency_array, number_of_response_curves, starting_index=0):
16 """
17 Function to read the hdf5 files from the calibration group containing the physical calibration response curves.
19 Parameters
20 ----------
21 filename: str
22 Location of the HDF5 file that contains the curves
23 frequency_array: array-like
24 The frequency values to calculate the calibration response curves at
25 number_of_response_curves: int
26 Number of random draws to use from the calibration file
27 starting_index: int
28 Index of the first curve to use within the array. This allows for segmenting the calibration curve array
29 into smaller pieces.
31 Returns
32 -------
33 calibration_draws: array-like
34 Array which contains the calibration responses as a function of the frequency array specified.
35 Shape is (number_of_response_curves x len(frequency_array))
37 """
38 import tables
40 logger.info(f"Reading calibration draws from {filename}")
41 calibration_file = tables.open_file(filename, 'r')
42 calibration_amplitude = \
43 calibration_file.root.deltaR.draws_amp_rel[starting_index:number_of_response_curves + starting_index]
44 calibration_phase = \
45 calibration_file.root.deltaR.draws_phase[starting_index:number_of_response_curves + starting_index]
47 calibration_frequencies = calibration_file.root.deltaR.freq[:]
49 calibration_file.close()
51 if len(calibration_amplitude.dtype) != 0: # handling if this is a calibration group hdf5 file
52 calibration_amplitude = calibration_amplitude.view(np.float64).reshape(calibration_amplitude.shape + (-1,))
53 calibration_phase = calibration_phase.view(np.float64).reshape(calibration_phase.shape + (-1,))
54 calibration_frequencies = calibration_frequencies.view(np.float64)
56 # interpolate to the frequency array (where if outside the range of the calibration uncertainty its fixed to 1)
57 calibration_draws = calibration_amplitude * np.exp(1j * calibration_phase)
58 calibration_draws = interp1d(
59 calibration_frequencies, calibration_draws, kind='cubic',
60 bounds_error=False, fill_value=1)(frequency_array)
62 try:
63 parameter_draws = pd.read_hdf(filename, key="CalParams")
64 except KeyError:
65 parameter_draws = None
67 return calibration_draws, parameter_draws
70def write_calibration_file(filename, frequency_array, calibration_draws, calibration_parameter_draws=None):
71 """
72 Function to write the generated response curves to file
74 Parameters
75 ----------
76 filename: str
77 Location and filename to save the file
78 frequency_array: array-like
79 The frequency values where the calibration response was calculated
80 calibration_draws: array-like
81 Array which contains the calibration responses as a function of the frequency array specified.
82 Shape is (number_of_response_curves x len(frequency_array))
83 calibration_parameter_draws: data_frame
84 Parameters used to generate the random draws of the calibration response curves
86 """
87 import tables
89 logger.info(f"Writing calibration draws to {filename}")
90 calibration_file = tables.open_file(filename, 'w')
91 deltaR_group = calibration_file.create_group(calibration_file.root, 'deltaR')
93 # Save output
94 calibration_file.create_carray(deltaR_group, 'draws_amp_rel', obj=np.abs(calibration_draws))
95 calibration_file.create_carray(deltaR_group, 'draws_phase', obj=np.angle(calibration_draws))
96 calibration_file.create_carray(deltaR_group, 'freq', obj=frequency_array)
98 calibration_file.close()
100 # Save calibration parameter draws
101 if calibration_parameter_draws is not None:
102 calibration_parameter_draws.to_hdf(filename, key='CalParams', data_columns=True, format='table')
105class Recalibrate(object):
107 name = 'none'
109 def __init__(self, prefix='recalib_'):
110 """
111 Base calibration object. This applies no transformation
113 Parameters
114 ==========
115 prefix: str
116 Prefix on parameters relating to the calibration.
117 """
118 self.params = dict()
119 self.prefix = prefix
121 def __repr__(self):
122 return self.__class__.__name__ + '(prefix=\'{}\')'.format(self.prefix)
124 def get_calibration_factor(self, frequency_array, **params):
125 """Apply calibration model
127 This method should be overwritten by subclasses
129 Parameters
130 ==========
131 frequency_array: array-like
132 The frequency values to calculate the calibration factor for.
133 params : dict
134 Dictionary of sampling parameters which includes
135 calibration parameters.
137 Returns
138 =======
139 calibration_factor : array-like
140 The factor to multiply the strain by.
141 """
142 return np.ones_like(frequency_array)
144 def set_calibration_parameters(self, **params):
145 self.params.update({key[len(self.prefix):]: params[key] for key in params
146 if self.prefix in key})
148 def __eq__(self, other):
149 return self.__dict__ == other.__dict__
152class CubicSpline(Recalibrate):
154 name = 'cubic_spline'
156 def __init__(self, prefix, minimum_frequency, maximum_frequency, n_points):
157 """
158 Cubic spline recalibration
160 see https://dcc.ligo.org/DocDB/0116/T1400682/001/calnote.pdf
162 This assumes the spline points follow
163 np.logspace(np.log(minimum_frequency), np.log(maximum_frequency), n_points)
165 Parameters
166 ==========
167 prefix: str
168 Prefix on parameters relating to the calibration.
169 minimum_frequency: float
170 minimum frequency of spline points
171 maximum_frequency: float
172 maximum frequency of spline points
173 n_points: int
174 number of spline points
175 """
176 super(CubicSpline, self).__init__(prefix=prefix)
177 if n_points < 4:
178 raise ValueError('Cubic spline calibration requires at least 4 spline nodes.')
179 self.n_points = n_points
180 self.minimum_frequency = minimum_frequency
181 self.maximum_frequency = maximum_frequency
182 self._log_spline_points = np.linspace(
183 np.log10(minimum_frequency), np.log10(maximum_frequency), n_points)
185 @property
186 def delta_log_spline_points(self):
187 if not hasattr(self, "_delta_log_spline_points"):
188 self._delta_log_spline_points = self._log_spline_points[1] - self._log_spline_points[0]
189 return self._delta_log_spline_points
191 @property
192 def nodes_to_spline_coefficients(self):
193 if not hasattr(self, "_nodes_to_spline_coefficients"):
194 self._setup_spline_coefficients()
195 return self._nodes_to_spline_coefficients
197 def _setup_spline_coefficients(self):
198 """
199 Precompute matrix converting values at nodes to spline coefficients.
200 The algorithm for interpolation is described in
201 https://dcc.ligo.org/LIGO-T2300140, and the matrix calculated here is
202 to solve Eq. (9) in the note.
203 """
204 tmp1 = np.zeros(shape=(self.n_points, self.n_points))
205 tmp1[0, 0] = -1
206 tmp1[0, 1] = 2
207 tmp1[0, 2] = -1
208 tmp1[-1, -3] = -1
209 tmp1[-1, -2] = 2
210 tmp1[-1, -1] = -1
211 for i in range(1, self.n_points - 1):
212 tmp1[i, i - 1] = 1 / 6
213 tmp1[i, i] = 2 / 3
214 tmp1[i, i + 1] = 1 / 6
215 tmp2 = np.zeros(shape=(self.n_points, self.n_points))
216 for i in range(1, self.n_points - 1):
217 tmp2[i, i - 1] = 1
218 tmp2[i, i] = -2
219 tmp2[i, i + 1] = 1
220 self._nodes_to_spline_coefficients = np.linalg.solve(tmp1, tmp2)
222 @property
223 def log_spline_points(self):
224 return self._log_spline_points
226 def __repr__(self):
227 return self.__class__.__name__ + '(prefix=\'{}\', minimum_frequency={}, maximum_frequency={}, n_points={})'\
228 .format(self.prefix, self.minimum_frequency, self.maximum_frequency, self.n_points)
230 def _evaluate_spline(self, kind, a, b, c, d, previous_nodes):
231 """Evaluate Eq. (1) in https://dcc.ligo.org/LIGO-T2300140"""
232 parameters = np.array([self.params[f"{kind}_{ii}"] for ii in range(self.n_points)])
233 next_nodes = previous_nodes + 1
234 spline_coefficients = self.nodes_to_spline_coefficients.dot(parameters)
235 return (
236 a * parameters[previous_nodes]
237 + b * parameters[next_nodes]
238 + c * spline_coefficients[previous_nodes]
239 + d * spline_coefficients[next_nodes]
240 )
242 def get_calibration_factor(self, frequency_array, **params):
243 """Apply calibration model
245 Parameters
246 ==========
247 frequency_array: array-like
248 The frequency values to calculate the calibration factor for.
249 prefix: str
250 Prefix for calibration parameter names
251 params : dict
252 Dictionary of sampling parameters which includes
253 calibration parameters.
255 Returns
256 =======
257 calibration_factor : array-like
258 The factor to multiply the strain by.
259 """
260 log10f_per_deltalog10f = (
261 np.log10(frequency_array) - self.log_spline_points[0]
262 ) / self.delta_log_spline_points
263 previous_nodes = np.clip(np.floor(log10f_per_deltalog10f).astype(int), a_min=0, a_max=self.n_points - 2)
264 b = log10f_per_deltalog10f - previous_nodes
265 a = 1 - b
266 c = (a**3 - a) / 6
267 d = (b**3 - b) / 6
269 self.set_calibration_parameters(**params)
271 delta_amplitude = self._evaluate_spline("amplitude", a, b, c, d, previous_nodes)
272 delta_phase = self._evaluate_spline("phase", a, b, c, d, previous_nodes)
273 calibration_factor = (1 + delta_amplitude) * (2 + 1j * delta_phase) / (2 - 1j * delta_phase)
275 return calibration_factor
278class Precomputed(Recalibrate):
280 name = "precomputed"
282 def __init__(self, label, curves, frequency_array, parameters=None):
283 """
284 A class for accessing an array of precomputed recalibration curves.
286 Parameters
287 ==========
288 label: str
289 The label for the interferometer, e.g., H1. The corresponding
290 parameter is :code:`recalib_index_{label}`.
291 curves: array-like
292 Array with shape (n_curves, n_frequencies) with the recalibration
293 curves.
294 frequency_array: array-like
295 Array of frequencies at which the curves are evaluated.
296 """
297 self.label = label
298 self.curves = curves
299 self.frequency_array = frequency_array
300 self.parameters = parameters
301 super(Precomputed, self).__init__(prefix=f"recalib_index_{self.label}")
303 def get_calibration_factor(self, frequency_array, **params):
304 idx = int(params.get(self.prefix, None))
305 if idx is None:
306 raise KeyError(f"Calibration index for {self.label} not found.")
307 if not np.array_equal(frequency_array, self.frequency_array):
308 raise ValueError("Frequency grid passed to calibrator doesn't match.")
309 return self.curves[idx]
311 @classmethod
312 def constant_uncertainty_spline(
313 cls, amplitude_sigma, phase_sigma, frequency_array, n_nodes, label, n_curves
314 ):
315 priors = CalibrationPriorDict.constant_uncertainty_spline(
316 amplitude_sigma=amplitude_sigma,
317 phase_sigma=phase_sigma,
318 minimum_frequency=frequency_array[0],
319 maximum_frequency=frequency_array[-1],
320 n_nodes=n_nodes,
321 label=label,
322 )
323 parameters = pd.DataFrame(priors.sample(n_curves))
324 curves = curves_from_spline_and_prior(
325 label=label,
326 frequency_array=frequency_array,
327 n_points=n_nodes,
328 parameters=parameters,
329 n_curves=n_curves
330 )
331 return cls(
332 label=label,
333 curves=np.array(curves),
334 frequency_array=frequency_array,
335 parameters=parameters,
336 )
338 @classmethod
339 def from_envelope_file(
340 cls, envelope, frequency_array, n_nodes, label, n_curves
341 ):
342 priors = CalibrationPriorDict.from_envelope_file(
343 envelope_file=envelope,
344 minimum_frequency=frequency_array[0],
345 maximum_frequency=frequency_array[-1],
346 n_nodes=n_nodes,
347 label=label,
348 )
349 parameters = pd.DataFrame(priors.sample(n_curves))
350 curves = curves_from_spline_and_prior(
351 label=label,
352 frequency_array=frequency_array,
353 n_points=n_nodes,
354 parameters=parameters,
355 n_curves=n_curves,
356 )
357 return cls(
358 label=label,
359 curves=np.array(curves),
360 frequency_array=frequency_array,
361 parameters=parameters,
362 )
364 @classmethod
365 def from_calibration_file(cls, label, filename, frequency_array, n_curves, starting_index=0):
366 curves, parameters = read_calibration_file(
367 filename=filename,
368 frequency_array=frequency_array,
369 number_of_response_curves=n_curves,
370 starting_index=starting_index,
371 )
372 return cls(
373 label=label,
374 curves=np.array(curves),
375 frequency_array=frequency_array,
376 parameters=parameters,
377 )
380def build_calibration_lookup(
381 interferometers,
382 lookup_files=None,
383 priors=None,
384 number_of_response_curves=1000,
385 starting_index=0,
386):
387 if lookup_files is None and priors is None:
388 raise ValueError(
389 "One of calibration_lookup_table or priors must be specified for "
390 "building calibration marginalization lookup table."
391 )
392 elif lookup_files is None:
393 lookup_files = dict()
395 draws = dict()
396 parameters = dict()
397 for interferometer in interferometers:
398 name = interferometer.name
399 frequencies = interferometer.frequency_array
400 frequencies = frequencies[interferometer.frequency_mask]
401 filename = lookup_files.get(name, f"{name}_calibration_file.h5")
403 if os.path.exists(filename):
404 draws[name], parameters[name] = read_calibration_file(
405 filename,
406 frequencies,
407 number_of_response_curves,
408 starting_index,
409 )
410 elif isinstance(interferometer.calibration_model, Precomputed):
411 model = interferometer.calibration_model
412 idxs = np.arange(number_of_response_curves, dtype=int) + starting_index
413 draws[name] = model.curves[idxs]
414 parameters[name] = pd.DataFrame(model.parameters.iloc[idxs])
415 parameters[name][model.prefix] = idxs
416 else:
417 if priors is None:
418 raise ValueError(
419 "Priors must be passed to generate calibration response curves "
420 "for cubic spline."
421 )
422 draws[name], parameters[name] = _generate_calibration_draws(
423 interferometer=interferometer,
424 priors=priors,
425 n_curves=number_of_response_curves,
426 )
427 write_calibration_file(filename, frequencies, draws[name], parameters[name])
429 interferometer.calibration_model = Recalibrate()
431 return draws, parameters
434def _generate_calibration_draws(interferometer, priors, n_curves):
435 name = interferometer.name
436 frequencies = interferometer.frequency_array
437 frequencies = frequencies[interferometer.frequency_mask]
438 calibration_priors = PriorDict()
439 for key in priors.keys():
440 if "recalib" in key and name in key:
441 calibration_priors[key] = copy.copy(priors[key])
443 parameters = pd.DataFrame(calibration_priors.sample(n_curves))
445 draws = np.array(curves_from_spline_and_prior(
446 parameters=parameters,
447 label=name,
448 n_points=interferometer.calibration_model.n_points,
449 frequency_array=frequencies,
450 n_curves=n_curves,
451 ))
452 return draws, parameters
455def curves_from_spline_and_prior(parameters, label, n_points, frequency_array, n_curves):
456 spline = CubicSpline(
457 prefix=f"recalib_{label}_",
458 minimum_frequency=frequency_array[0],
459 maximum_frequency=frequency_array[-1],
460 n_points=n_points,
461 )
462 curves = list()
463 for ii in range(n_curves):
464 curves.append(spline.get_calibration_factor(
465 frequency_array=frequency_array,
466 **parameters.iloc[ii]
467 ))
468 return curves