Coverage for bilby/core/sampler/proposal.py: 86%
180 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2025-05-06 04:57 +0000
1from inspect import isclass
3import numpy as np
5from ..prior import Uniform
6from ..utils import random
9class Sample(dict):
10 def __init__(self, dictionary=None):
11 if dictionary is None:
12 dictionary = dict()
13 super(Sample, self).__init__(dictionary)
15 def __add__(self, other):
16 return Sample({key: self[key] + other[key] for key in self.keys()})
18 def __sub__(self, other):
19 return Sample({key: self[key] - other[key] for key in self.keys()})
21 def __mul__(self, other):
22 return Sample({key: self[key] * other for key in self.keys()})
24 @classmethod
25 def from_cpnest_live_point(cls, cpnest_live_point):
26 res = cls(dict())
27 for i, key in enumerate(cpnest_live_point.names):
28 res[key] = cpnest_live_point.values[i]
29 return res
31 @classmethod
32 def from_external_type(cls, external_sample, sampler_name):
33 if sampler_name == "cpnest":
34 return cls.from_cpnest_live_point(external_sample)
35 return external_sample
38class JumpProposal(object):
39 def __init__(self, priors=None):
40 """A generic class for jump proposals
42 Parameters
43 ==========
44 priors: bilby.core.prior.PriorDict
45 Dictionary of priors used in this sampling run
47 Attributes
48 ==========
49 log_j: float
50 Log Jacobian of the proposal. Characterises whether or not detailed balance
51 is preserved. If not, log_j needs to be adjusted accordingly.
52 """
53 self.priors = priors
54 self.log_j = 0.0
56 def __call__(self, sample, **kwargs):
57 """A generic wrapper for the jump proposal function
59 Parameters
60 ==========
61 args: Arguments that are going to be passed into the proposal function
62 kwargs: Keyword arguments that are going to be passed into the proposal function
64 Returns
65 =======
66 dict: A dictionary with the new samples. Boundary conditions are applied.
68 """
69 return self._apply_boundaries(sample)
71 def _move_reflecting_keys(self, sample):
72 keys = [
73 key for key in sample.keys() if self.priors[key].boundary == "reflective"
74 ]
75 for key in keys:
76 if (
77 sample[key] > self.priors[key].maximum
78 or sample[key] < self.priors[key].minimum
79 ):
80 r = self.priors[key].maximum - self.priors[key].minimum
81 delta = (sample[key] - self.priors[key].minimum) % (2 * r)
82 if delta > r:
83 sample[key] = (
84 2 * self.priors[key].maximum - self.priors[key].minimum - delta
85 )
86 elif delta < r:
87 sample[key] = self.priors[key].minimum + delta
88 return sample
90 def _move_periodic_keys(self, sample):
91 keys = [key for key in sample.keys() if self.priors[key].boundary == "periodic"]
92 for key in keys:
93 if (
94 sample[key] > self.priors[key].maximum
95 or sample[key] < self.priors[key].minimum
96 ):
97 sample[key] = self.priors[key].minimum + (
98 (sample[key] - self.priors[key].minimum)
99 % (self.priors[key].maximum - self.priors[key].minimum)
100 )
101 return sample
103 def _apply_boundaries(self, sample):
104 sample = self._move_periodic_keys(sample)
105 sample = self._move_reflecting_keys(sample)
106 return sample
109class JumpProposalCycle(object):
110 def __init__(self, proposal_functions, weights, cycle_length=100):
111 """A generic wrapper class for proposal cycles
113 Parameters
114 ==========
115 proposal_functions: list
116 A list of callable proposal functions/objects
117 weights: list
118 A list of integer weights for the respective proposal functions
119 cycle_length: int, optional
120 Length of the proposal cycle
121 """
122 self.proposal_functions = proposal_functions
123 self.weights = weights
124 self.cycle_length = cycle_length
125 self._index = 0
126 self._cycle = np.array([])
127 self.update_cycle()
129 def __call__(self, **kwargs):
130 proposal = self._cycle[self.index]
131 self._index = (self.index + 1) % self.cycle_length
132 return proposal(**kwargs)
134 def __len__(self):
135 return len(self.proposal_functions)
137 def update_cycle(self):
138 self._cycle = random.rng.choice(
139 self.proposal_functions,
140 size=self.cycle_length,
141 p=self.weights,
142 replace=True,
143 )
145 @property
146 def proposal_functions(self):
147 return self._proposal_functions
149 @proposal_functions.setter
150 def proposal_functions(self, proposal_functions):
151 for i, proposal in enumerate(proposal_functions):
152 if isclass(proposal):
153 proposal_functions[i] = proposal()
154 self._proposal_functions = proposal_functions
156 @property
157 def index(self):
158 return self._index
160 @property
161 def weights(self):
162 """
164 Returns
165 =======
166 Normalised proposal weights
168 """
169 return np.array(self._weights) / np.sum(np.array(self._weights))
171 @weights.setter
172 def weights(self, weights):
173 assert len(weights) == len(self.proposal_functions)
174 self._weights = weights
176 @property
177 def unnormalised_weights(self):
178 return self._weights
181class NormJump(JumpProposal):
182 def __init__(self, step_size, priors=None):
183 """
184 A normal distributed step centered around the old sample
186 Parameters
187 ==========
188 step_size: float
189 The scalable step size
190 priors:
191 See superclass
192 """
193 super(NormJump, self).__init__(priors)
194 self.step_size = step_size
196 def __call__(self, sample, **kwargs):
197 for key in sample.keys():
198 sample[key] = random.rng.normal(sample[key], self.step_size)
199 return super(NormJump, self).__call__(sample)
202class EnsembleWalk(JumpProposal):
203 def __init__(
204 self,
205 random_number_generator=random.rng.uniform,
206 n_points=3,
207 priors=None,
208 **random_number_generator_args
209 ):
210 """
211 An ensemble walk
213 Parameters
214 ==========
215 random_number_generator: func, optional
216 A random number generator. Default is random.random
217 n_points: int, optional
218 Number of points in the ensemble to average over. Default is 3.
219 priors:
220 See superclass
221 random_number_generator_args:
222 Additional keyword arguments for the random number generator
223 """
224 super(EnsembleWalk, self).__init__(priors)
225 self.random_number_generator = random_number_generator
226 self.n_points = n_points
227 self.random_number_generator_args = random_number_generator_args
229 def __call__(self, sample, **kwargs):
230 subset = random.rng.choice(kwargs["coordinates"], self.n_points, replace=False)
231 for i in range(len(subset)):
232 subset[i] = Sample.from_external_type(
233 subset[i], kwargs.get("sampler_name", None)
234 )
235 center_of_mass = self.get_center_of_mass(subset)
236 for x in subset:
237 sample += (x - center_of_mass) * self.random_number_generator(
238 **self.random_number_generator_args
239 )
240 return super(EnsembleWalk, self).__call__(sample)
242 @staticmethod
243 def get_center_of_mass(subset):
244 return {key: np.mean([c[key] for c in subset]) for key in subset[0].keys()}
247class EnsembleStretch(JumpProposal):
248 def __init__(self, scale=2.0, priors=None):
249 """
250 Stretch move. Calculates the log Jacobian which can be used in cpnest to bias future moves.
252 Parameters
253 ==========
254 scale: float, optional
255 Stretching scale. Default is 2.0.
256 """
257 super(EnsembleStretch, self).__init__(priors)
258 self.scale = scale
260 def __call__(self, sample, **kwargs):
261 second_sample = random.rng.choice(kwargs["coordinates"])
262 second_sample = Sample.from_external_type(
263 second_sample, kwargs.get("sampler_name", None)
264 )
265 step = random.rng.uniform(-1, 1) * np.log(self.scale)
266 sample = second_sample + (sample - second_sample) * np.exp(step)
267 self.log_j = len(sample) * step
268 return super(EnsembleStretch, self).__call__(sample)
271class DifferentialEvolution(JumpProposal):
272 def __init__(self, sigma=1e-4, mu=1.0, priors=None):
273 """
274 Differential evolution step. Takes two elements from the existing coordinates and differentially evolves the
275 old sample based on them using some Gaussian randomisation in the step.
277 Parameters
278 ==========
279 sigma: float, optional
280 Random spread in the evolution step. Default is 1e-4
281 mu: float, optional
282 Scale of the randomization. Default is 1.0
283 """
284 super(DifferentialEvolution, self).__init__(priors)
285 self.sigma = sigma
286 self.mu = mu
288 def __call__(self, sample, **kwargs):
289 a, b = random.rng.choice(kwargs["coordinates"], 2, replace=False)
290 sample = sample + (b - a) * random.rng.normal(self.mu, self.sigma)
291 return super(DifferentialEvolution, self).__call__(sample)
294class EnsembleEigenVector(JumpProposal):
295 def __init__(self, priors=None):
296 """
297 Ensemble step based on the ensemble eigenvectors.
299 Parameters
300 ==========
301 priors:
302 See superclass
303 """
304 super(EnsembleEigenVector, self).__init__(priors)
305 self.eigen_values = None
306 self.eigen_vectors = None
307 self.covariance = None
309 def update_eigenvectors(self, coordinates):
310 if coordinates is None:
311 return
312 elif len(coordinates[0]) == 1:
313 self._set_1_d_eigenvectors(coordinates)
314 else:
315 self._set_n_d_eigenvectors(coordinates)
317 def _set_1_d_eigenvectors(self, coordinates):
318 n_samples = len(coordinates)
319 key = list(coordinates[0].keys())[0]
320 variance = np.var([coordinates[j][key] for j in range(n_samples)])
321 self.eigen_values = np.atleast_1d(variance)
322 self.covariance = self.eigen_values
323 self.eigen_vectors = np.eye(1)
325 def _set_n_d_eigenvectors(self, coordinates):
326 n_samples = len(coordinates)
327 dim = len(coordinates[0])
328 cov_array = np.zeros((dim, n_samples))
329 for i, key in enumerate(coordinates[0].keys()):
330 for j in range(n_samples):
331 cov_array[i, j] = coordinates[j][key]
332 self.covariance = np.cov(cov_array)
333 self.eigen_values, self.eigen_vectors = np.linalg.eigh(self.covariance)
335 def __call__(self, sample, **kwargs):
336 self.update_eigenvectors(kwargs["coordinates"])
337 i = random.rng.integers(len(sample))
338 jump_size = np.sqrt(np.fabs(self.eigen_values[i])) * random.rng.normal(0, 1)
339 for j, key in enumerate(sample.keys()):
340 sample[key] += jump_size * self.eigen_vectors[j, i]
341 return super(EnsembleEigenVector, self).__call__(sample)
344class DrawFlatPrior(JumpProposal):
345 """
346 Draws a proposal from the flattened prior distribution.
347 """
349 def __call__(self, sample, **kwargs):
350 sample = _draw_from_flat_priors(sample, self.priors)
351 return super(DrawFlatPrior, self).__call__(sample)
354def _draw_from_flat_priors(sample, priors):
355 for key in sample.keys():
356 flat_prior = Uniform(priors[key].minimum, priors[key].maximum, priors[key].name)
357 sample[key] = flat_prior.sample()
358 return sample