Coverage for bilby/core/sampler/proposal.py: 86%

180 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2025-05-06 04:57 +0000

1from inspect import isclass 

2 

3import numpy as np 

4 

5from ..prior import Uniform 

6from ..utils import random 

7 

8 

9class Sample(dict): 

10 def __init__(self, dictionary=None): 

11 if dictionary is None: 

12 dictionary = dict() 

13 super(Sample, self).__init__(dictionary) 

14 

15 def __add__(self, other): 

16 return Sample({key: self[key] + other[key] for key in self.keys()}) 

17 

18 def __sub__(self, other): 

19 return Sample({key: self[key] - other[key] for key in self.keys()}) 

20 

21 def __mul__(self, other): 

22 return Sample({key: self[key] * other for key in self.keys()}) 

23 

24 @classmethod 

25 def from_cpnest_live_point(cls, cpnest_live_point): 

26 res = cls(dict()) 

27 for i, key in enumerate(cpnest_live_point.names): 

28 res[key] = cpnest_live_point.values[i] 

29 return res 

30 

31 @classmethod 

32 def from_external_type(cls, external_sample, sampler_name): 

33 if sampler_name == "cpnest": 

34 return cls.from_cpnest_live_point(external_sample) 

35 return external_sample 

36 

37 

38class JumpProposal(object): 

39 def __init__(self, priors=None): 

40 """A generic class for jump proposals 

41 

42 Parameters 

43 ========== 

44 priors: bilby.core.prior.PriorDict 

45 Dictionary of priors used in this sampling run 

46 

47 Attributes 

48 ========== 

49 log_j: float 

50 Log Jacobian of the proposal. Characterises whether or not detailed balance 

51 is preserved. If not, log_j needs to be adjusted accordingly. 

52 """ 

53 self.priors = priors 

54 self.log_j = 0.0 

55 

56 def __call__(self, sample, **kwargs): 

57 """A generic wrapper for the jump proposal function 

58 

59 Parameters 

60 ========== 

61 args: Arguments that are going to be passed into the proposal function 

62 kwargs: Keyword arguments that are going to be passed into the proposal function 

63 

64 Returns 

65 ======= 

66 dict: A dictionary with the new samples. Boundary conditions are applied. 

67 

68 """ 

69 return self._apply_boundaries(sample) 

70 

71 def _move_reflecting_keys(self, sample): 

72 keys = [ 

73 key for key in sample.keys() if self.priors[key].boundary == "reflective" 

74 ] 

75 for key in keys: 

76 if ( 

77 sample[key] > self.priors[key].maximum 

78 or sample[key] < self.priors[key].minimum 

79 ): 

80 r = self.priors[key].maximum - self.priors[key].minimum 

81 delta = (sample[key] - self.priors[key].minimum) % (2 * r) 

82 if delta > r: 

83 sample[key] = ( 

84 2 * self.priors[key].maximum - self.priors[key].minimum - delta 

85 ) 

86 elif delta < r: 

87 sample[key] = self.priors[key].minimum + delta 

88 return sample 

89 

90 def _move_periodic_keys(self, sample): 

91 keys = [key for key in sample.keys() if self.priors[key].boundary == "periodic"] 

92 for key in keys: 

93 if ( 

94 sample[key] > self.priors[key].maximum 

95 or sample[key] < self.priors[key].minimum 

96 ): 

97 sample[key] = self.priors[key].minimum + ( 

98 (sample[key] - self.priors[key].minimum) 

99 % (self.priors[key].maximum - self.priors[key].minimum) 

100 ) 

101 return sample 

102 

103 def _apply_boundaries(self, sample): 

104 sample = self._move_periodic_keys(sample) 

105 sample = self._move_reflecting_keys(sample) 

106 return sample 

107 

108 

109class JumpProposalCycle(object): 

110 def __init__(self, proposal_functions, weights, cycle_length=100): 

111 """A generic wrapper class for proposal cycles 

112 

113 Parameters 

114 ========== 

115 proposal_functions: list 

116 A list of callable proposal functions/objects 

117 weights: list 

118 A list of integer weights for the respective proposal functions 

119 cycle_length: int, optional 

120 Length of the proposal cycle 

121 """ 

122 self.proposal_functions = proposal_functions 

123 self.weights = weights 

124 self.cycle_length = cycle_length 

125 self._index = 0 

126 self._cycle = np.array([]) 

127 self.update_cycle() 

128 

129 def __call__(self, **kwargs): 

130 proposal = self._cycle[self.index] 

131 self._index = (self.index + 1) % self.cycle_length 

132 return proposal(**kwargs) 

133 

134 def __len__(self): 

135 return len(self.proposal_functions) 

136 

137 def update_cycle(self): 

138 self._cycle = random.rng.choice( 

139 self.proposal_functions, 

140 size=self.cycle_length, 

141 p=self.weights, 

142 replace=True, 

143 ) 

144 

145 @property 

146 def proposal_functions(self): 

147 return self._proposal_functions 

148 

149 @proposal_functions.setter 

150 def proposal_functions(self, proposal_functions): 

151 for i, proposal in enumerate(proposal_functions): 

152 if isclass(proposal): 

153 proposal_functions[i] = proposal() 

154 self._proposal_functions = proposal_functions 

155 

156 @property 

157 def index(self): 

158 return self._index 

159 

160 @property 

161 def weights(self): 

162 """ 

163 

164 Returns 

165 ======= 

166 Normalised proposal weights 

167 

168 """ 

169 return np.array(self._weights) / np.sum(np.array(self._weights)) 

170 

171 @weights.setter 

172 def weights(self, weights): 

173 assert len(weights) == len(self.proposal_functions) 

174 self._weights = weights 

175 

176 @property 

177 def unnormalised_weights(self): 

178 return self._weights 

179 

180 

181class NormJump(JumpProposal): 

182 def __init__(self, step_size, priors=None): 

183 """ 

184 A normal distributed step centered around the old sample 

185 

186 Parameters 

187 ========== 

188 step_size: float 

189 The scalable step size 

190 priors: 

191 See superclass 

192 """ 

193 super(NormJump, self).__init__(priors) 

194 self.step_size = step_size 

195 

196 def __call__(self, sample, **kwargs): 

197 for key in sample.keys(): 

198 sample[key] = random.rng.normal(sample[key], self.step_size) 

199 return super(NormJump, self).__call__(sample) 

200 

201 

202class EnsembleWalk(JumpProposal): 

203 def __init__( 

204 self, 

205 random_number_generator=random.rng.uniform, 

206 n_points=3, 

207 priors=None, 

208 **random_number_generator_args 

209 ): 

210 """ 

211 An ensemble walk 

212 

213 Parameters 

214 ========== 

215 random_number_generator: func, optional 

216 A random number generator. Default is random.random 

217 n_points: int, optional 

218 Number of points in the ensemble to average over. Default is 3. 

219 priors: 

220 See superclass 

221 random_number_generator_args: 

222 Additional keyword arguments for the random number generator 

223 """ 

224 super(EnsembleWalk, self).__init__(priors) 

225 self.random_number_generator = random_number_generator 

226 self.n_points = n_points 

227 self.random_number_generator_args = random_number_generator_args 

228 

229 def __call__(self, sample, **kwargs): 

230 subset = random.rng.choice(kwargs["coordinates"], self.n_points, replace=False) 

231 for i in range(len(subset)): 

232 subset[i] = Sample.from_external_type( 

233 subset[i], kwargs.get("sampler_name", None) 

234 ) 

235 center_of_mass = self.get_center_of_mass(subset) 

236 for x in subset: 

237 sample += (x - center_of_mass) * self.random_number_generator( 

238 **self.random_number_generator_args 

239 ) 

240 return super(EnsembleWalk, self).__call__(sample) 

241 

242 @staticmethod 

243 def get_center_of_mass(subset): 

244 return {key: np.mean([c[key] for c in subset]) for key in subset[0].keys()} 

245 

246 

247class EnsembleStretch(JumpProposal): 

248 def __init__(self, scale=2.0, priors=None): 

249 """ 

250 Stretch move. Calculates the log Jacobian which can be used in cpnest to bias future moves. 

251 

252 Parameters 

253 ========== 

254 scale: float, optional 

255 Stretching scale. Default is 2.0. 

256 """ 

257 super(EnsembleStretch, self).__init__(priors) 

258 self.scale = scale 

259 

260 def __call__(self, sample, **kwargs): 

261 second_sample = random.rng.choice(kwargs["coordinates"]) 

262 second_sample = Sample.from_external_type( 

263 second_sample, kwargs.get("sampler_name", None) 

264 ) 

265 step = random.rng.uniform(-1, 1) * np.log(self.scale) 

266 sample = second_sample + (sample - second_sample) * np.exp(step) 

267 self.log_j = len(sample) * step 

268 return super(EnsembleStretch, self).__call__(sample) 

269 

270 

271class DifferentialEvolution(JumpProposal): 

272 def __init__(self, sigma=1e-4, mu=1.0, priors=None): 

273 """ 

274 Differential evolution step. Takes two elements from the existing coordinates and differentially evolves the 

275 old sample based on them using some Gaussian randomisation in the step. 

276 

277 Parameters 

278 ========== 

279 sigma: float, optional 

280 Random spread in the evolution step. Default is 1e-4 

281 mu: float, optional 

282 Scale of the randomization. Default is 1.0 

283 """ 

284 super(DifferentialEvolution, self).__init__(priors) 

285 self.sigma = sigma 

286 self.mu = mu 

287 

288 def __call__(self, sample, **kwargs): 

289 a, b = random.rng.choice(kwargs["coordinates"], 2, replace=False) 

290 sample = sample + (b - a) * random.rng.normal(self.mu, self.sigma) 

291 return super(DifferentialEvolution, self).__call__(sample) 

292 

293 

294class EnsembleEigenVector(JumpProposal): 

295 def __init__(self, priors=None): 

296 """ 

297 Ensemble step based on the ensemble eigenvectors. 

298 

299 Parameters 

300 ========== 

301 priors: 

302 See superclass 

303 """ 

304 super(EnsembleEigenVector, self).__init__(priors) 

305 self.eigen_values = None 

306 self.eigen_vectors = None 

307 self.covariance = None 

308 

309 def update_eigenvectors(self, coordinates): 

310 if coordinates is None: 

311 return 

312 elif len(coordinates[0]) == 1: 

313 self._set_1_d_eigenvectors(coordinates) 

314 else: 

315 self._set_n_d_eigenvectors(coordinates) 

316 

317 def _set_1_d_eigenvectors(self, coordinates): 

318 n_samples = len(coordinates) 

319 key = list(coordinates[0].keys())[0] 

320 variance = np.var([coordinates[j][key] for j in range(n_samples)]) 

321 self.eigen_values = np.atleast_1d(variance) 

322 self.covariance = self.eigen_values 

323 self.eigen_vectors = np.eye(1) 

324 

325 def _set_n_d_eigenvectors(self, coordinates): 

326 n_samples = len(coordinates) 

327 dim = len(coordinates[0]) 

328 cov_array = np.zeros((dim, n_samples)) 

329 for i, key in enumerate(coordinates[0].keys()): 

330 for j in range(n_samples): 

331 cov_array[i, j] = coordinates[j][key] 

332 self.covariance = np.cov(cov_array) 

333 self.eigen_values, self.eigen_vectors = np.linalg.eigh(self.covariance) 

334 

335 def __call__(self, sample, **kwargs): 

336 self.update_eigenvectors(kwargs["coordinates"]) 

337 i = random.rng.integers(len(sample)) 

338 jump_size = np.sqrt(np.fabs(self.eigen_values[i])) * random.rng.normal(0, 1) 

339 for j, key in enumerate(sample.keys()): 

340 sample[key] += jump_size * self.eigen_vectors[j, i] 

341 return super(EnsembleEigenVector, self).__call__(sample) 

342 

343 

344class DrawFlatPrior(JumpProposal): 

345 """ 

346 Draws a proposal from the flattened prior distribution. 

347 """ 

348 

349 def __call__(self, sample, **kwargs): 

350 sample = _draw_from_flat_priors(sample, self.priors) 

351 return super(DrawFlatPrior, self).__call__(sample) 

352 

353 

354def _draw_from_flat_priors(sample, priors): 

355 for key in sample.keys(): 

356 flat_prior = Uniform(priors[key].minimum, priors[key].maximum, priors[key].name) 

357 sample[key] = flat_prior.sample() 

358 return sample