Coverage for parallel_bilby/analysis/read_write.py: 88%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

91 statements  

1import logging 

2import os 

3import shutil 

4import time 

5 

6import bilby 

7import dill 

8import dynesty 

9import numpy as np 

10from bilby.core.utils import logger 

11from pandas import DataFrame 

12 

13from ..utils import safe_file_dump, stopwatch 

14from .likelihood import reorder_loglikelihoods 

15 

16 

17def _get_time_since_file_last_modified(file_path): 

18 """Returns the time since the file was last modified in human-readable format 

19 

20 Parameters 

21 ---------- 

22 file_path: str 

23 Path to the file 

24 

25 Returns 

26 ------- 

27 time_since_last_modified: str 

28 """ 

29 if not os.path.exists(file_path): 

30 return "" 

31 seconds = time.time() - os.path.getmtime(file_path) 

32 d, remainder = divmod(seconds, 86400) 

33 hr, remainder = divmod(remainder, 3600) 

34 m, s = divmod(remainder, 60) 

35 strtime = f"{m:02.0f}m {s:02.0f}s" 

36 strtime = f"{hr:02.0f}h {strtime}" if hr > 0 else strtime 

37 strtime = f"{d:02.0f} days, {strtime}" if d > 0 else strtime 

38 return strtime 

39 

40 

41@stopwatch(log_level=logging.INFO) 

42def write_current_state(sampler, resume_file, sampling_time, rotate=False): 

43 """Writes a checkpoint file 

44 

45 Parameters 

46 ---------- 

47 sampler: dynesty.NestedSampler 

48 The sampler object itself 

49 resume_file: str 

50 The name of the resume/checkpoint file to use 

51 sampling_time: float 

52 The total sampling time in seconds 

53 rotate: bool 

54 If resume_file already exists, first make a backup file (ending in '.bk'). 

55 """ 

56 print("") 

57 

58 time_since_save = _get_time_since_file_last_modified(resume_file) 

59 logger.info( 

60 "Start checkpoint writing" + f" (last checkpoint {time_since_save} ago)" 

61 if time_since_save 

62 else "" 

63 ) 

64 if rotate and os.path.isfile(resume_file): 

65 resume_file_bk = resume_file + ".bk" 

66 logger.info(f"Backing up existing checkpoint file to {resume_file_bk}") 

67 shutil.copyfile(resume_file, resume_file_bk) 

68 sampler.kwargs["sampling_time"] = sampling_time 

69 

70 # Get random state and package it into the resume object 

71 sampler.kwargs["random_state"] = sampler.rstate.bit_generator.state 

72 

73 if dill.pickles(sampler): 

74 safe_file_dump(sampler, resume_file, dill) 

75 logger.info(f"Written checkpoint file {resume_file}") 

76 else: 

77 logger.warning("Cannot write pickle resume file!") 

78 

79 # Delete the random state so that the object is unchanged 

80 del sampler.kwargs["random_state"] 

81 

82 

83def write_sample_dump(sampler, samples_file, search_parameter_keys): 

84 """Writes a checkpoint file 

85 

86 Parameters 

87 ---------- 

88 sampler: dynesty.NestedSampler 

89 The sampler object itself 

90 """ 

91 

92 ln_weights = sampler.saved_run.D["logwt"] - sampler.saved_run.D["logz"][-1] 

93 weights = np.exp(ln_weights) 

94 samples = bilby.core.result.rejection_sample( 

95 np.array(sampler.saved_run.D["v"]), weights 

96 ) 

97 nsamples = len(samples) 

98 

99 # If we don't have enough samples, don't dump them 

100 if nsamples < 100: 

101 return 

102 

103 logger.info(f"Writing {nsamples} current samples to {samples_file}") 

104 df = DataFrame(samples, columns=search_parameter_keys) 

105 df.to_csv(samples_file, index=False, header=True, sep=" ") 

106 

107 

108@stopwatch 

109def read_saved_state(resume_file, continuing=True): 

110 """ 

111 Read a saved state of the sampler to disk. 

112 

113 The required information to reconstruct the state of the run is read from a 

114 pickle file. 

115 

116 Parameters 

117 ---------- 

118 resume_file: str 

119 The path to the resume file to read 

120 

121 Returns 

122 ------- 

123 sampler: dynesty.NestedSampler 

124 If a resume file exists and was successfully read, the nested sampler 

125 instance updated with the values stored to disk. If unavailable, 

126 returns False 

127 sampling_time: int 

128 The sampling time from previous runs 

129 """ 

130 

131 if os.path.isfile(resume_file): 

132 logger.info(f"Reading resume file {resume_file}") 

133 with open(resume_file, "rb") as file: 

134 sampler = dill.load(file) 

135 if sampler.added_live and continuing: 

136 sampler._remove_live_points() 

137 

138 # Create random number generator and restore state 

139 # from file, then remove it from kwargs because it 

140 # is not useful after the generator has been cycled 

141 sampler.rstate = np.random.Generator(np.random.PCG64()) 

142 sampler.rstate.bit_generator.state = sampler.kwargs.pop("random_state") 

143 

144 sampling_time = sampler.kwargs.pop("sampling_time") 

145 return sampler, sampling_time 

146 else: 

147 logger.info(f"Resume file {resume_file} does not exist.") 

148 return False, 0 

149 

150 

151def format_result( 

152 run, 

153 data_dump, 

154 out, 

155 weights, 

156 nested_samples, 

157 sampler_kwargs, 

158 sampling_time, 

159 rejection_sample_posterior=True, 

160): 

161 """ 

162 Packs the variables from the run into a bilby result object 

163 

164 Parameters 

165 ---------- 

166 run: AnalysisRun 

167 Parallel Bilby run object 

168 data_dump: str 

169 Path to the *_data_dump.pickle file 

170 out: dynesty.results.Results 

171 Results from the dynesty sampler 

172 weights: numpy.ndarray 

173 Array of weights for the points 

174 nested_samples: pandas.core.frame.DataFrame 

175 DataFrame of the weights and likelihoods 

176 sampler_kwargs: dict 

177 Dictionary of keyword arguments for the sampler 

178 sampling_time: float 

179 Time in seconds spent sampling 

180 rejection_sample_posterior: bool 

181 Whether to generate the posterior samples by rejection sampling the 

182 nested samples or resampling with replacement 

183 

184 Returns 

185 ------- 

186 result: bilby.core.result.Result 

187 result object with values written into its attributes 

188 """ 

189 

190 result = bilby.core.result.Result( 

191 label=run.label, outdir=run.outdir, search_parameter_keys=run.sampling_keys 

192 ) 

193 result.priors = run.priors 

194 result.nested_samples = nested_samples 

195 result.meta_data = run.data_dump["meta_data"] 

196 result.meta_data["command_line_args"]["sampler"] = "parallel_bilby" 

197 result.meta_data["data_dump"] = data_dump 

198 result.meta_data["likelihood"] = run.likelihood.meta_data 

199 result.meta_data["sampler_kwargs"] = run.init_sampler_kwargs 

200 result.meta_data["run_sampler_kwargs"] = sampler_kwargs 

201 result.meta_data["injection_parameters"] = run.injection_parameters 

202 result.injection_parameters = run.injection_parameters 

203 

204 if rejection_sample_posterior: 

205 keep = weights > np.random.uniform(0, max(weights), len(weights)) 

206 result.samples = out.samples[keep] 

207 result.log_likelihood_evaluations = out.logl[keep] 

208 logger.info( 

209 f"Rejection sampling nested samples to obtain {sum(keep)} posterior samples" 

210 ) 

211 else: 

212 result.samples = dynesty.utils.resample_equal(out.samples, weights) 

213 result.log_likelihood_evaluations = reorder_loglikelihoods( 

214 unsorted_loglikelihoods=out.logl, 

215 unsorted_samples=out.samples, 

216 sorted_samples=result.samples, 

217 ) 

218 logger.info("Resampling nested samples to posterior samples in place.") 

219 

220 result.log_evidence = out.logz[-1] + run.likelihood.noise_log_likelihood() 

221 result.log_evidence_err = out.logzerr[-1] 

222 result.log_noise_evidence = run.likelihood.noise_log_likelihood() 

223 result.log_bayes_factor = result.log_evidence - result.log_noise_evidence 

224 result.sampling_time = sampling_time 

225 result.num_likelihood_evaluations = np.sum(out.ncall) 

226 

227 result.samples_to_posterior(likelihood=run.likelihood, priors=result.priors) 

228 return result