Coverage for parallel_bilby/analysis/read_write.py: 88%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2import os
3import shutil
4import time
6import bilby
7import dill
8import dynesty
9import numpy as np
10from bilby.core.utils import logger
11from pandas import DataFrame
13from ..utils import safe_file_dump, stopwatch
14from .likelihood import reorder_loglikelihoods
17def _get_time_since_file_last_modified(file_path):
18 """Returns the time since the file was last modified in human-readable format
20 Parameters
21 ----------
22 file_path: str
23 Path to the file
25 Returns
26 -------
27 time_since_last_modified: str
28 """
29 if not os.path.exists(file_path):
30 return ""
31 seconds = time.time() - os.path.getmtime(file_path)
32 d, remainder = divmod(seconds, 86400)
33 hr, remainder = divmod(remainder, 3600)
34 m, s = divmod(remainder, 60)
35 strtime = f"{m:02.0f}m {s:02.0f}s"
36 strtime = f"{hr:02.0f}h {strtime}" if hr > 0 else strtime
37 strtime = f"{d:02.0f} days, {strtime}" if d > 0 else strtime
38 return strtime
41@stopwatch(log_level=logging.INFO)
42def write_current_state(sampler, resume_file, sampling_time, rotate=False):
43 """Writes a checkpoint file
45 Parameters
46 ----------
47 sampler: dynesty.NestedSampler
48 The sampler object itself
49 resume_file: str
50 The name of the resume/checkpoint file to use
51 sampling_time: float
52 The total sampling time in seconds
53 rotate: bool
54 If resume_file already exists, first make a backup file (ending in '.bk').
55 """
56 print("")
58 time_since_save = _get_time_since_file_last_modified(resume_file)
59 logger.info(
60 "Start checkpoint writing" + f" (last checkpoint {time_since_save} ago)"
61 if time_since_save
62 else ""
63 )
64 if rotate and os.path.isfile(resume_file):
65 resume_file_bk = resume_file + ".bk"
66 logger.info(f"Backing up existing checkpoint file to {resume_file_bk}")
67 shutil.copyfile(resume_file, resume_file_bk)
68 sampler.kwargs["sampling_time"] = sampling_time
70 # Get random state and package it into the resume object
71 sampler.kwargs["random_state"] = sampler.rstate.bit_generator.state
73 if dill.pickles(sampler):
74 safe_file_dump(sampler, resume_file, dill)
75 logger.info(f"Written checkpoint file {resume_file}")
76 else:
77 logger.warning("Cannot write pickle resume file!")
79 # Delete the random state so that the object is unchanged
80 del sampler.kwargs["random_state"]
83def write_sample_dump(sampler, samples_file, search_parameter_keys):
84 """Writes a checkpoint file
86 Parameters
87 ----------
88 sampler: dynesty.NestedSampler
89 The sampler object itself
90 """
92 ln_weights = sampler.saved_run.D["logwt"] - sampler.saved_run.D["logz"][-1]
93 weights = np.exp(ln_weights)
94 samples = bilby.core.result.rejection_sample(
95 np.array(sampler.saved_run.D["v"]), weights
96 )
97 nsamples = len(samples)
99 # If we don't have enough samples, don't dump them
100 if nsamples < 100:
101 return
103 logger.info(f"Writing {nsamples} current samples to {samples_file}")
104 df = DataFrame(samples, columns=search_parameter_keys)
105 df.to_csv(samples_file, index=False, header=True, sep=" ")
108@stopwatch
109def read_saved_state(resume_file, continuing=True):
110 """
111 Read a saved state of the sampler to disk.
113 The required information to reconstruct the state of the run is read from a
114 pickle file.
116 Parameters
117 ----------
118 resume_file: str
119 The path to the resume file to read
121 Returns
122 -------
123 sampler: dynesty.NestedSampler
124 If a resume file exists and was successfully read, the nested sampler
125 instance updated with the values stored to disk. If unavailable,
126 returns False
127 sampling_time: int
128 The sampling time from previous runs
129 """
131 if os.path.isfile(resume_file):
132 logger.info(f"Reading resume file {resume_file}")
133 with open(resume_file, "rb") as file:
134 sampler = dill.load(file)
135 if sampler.added_live and continuing:
136 sampler._remove_live_points()
138 # Create random number generator and restore state
139 # from file, then remove it from kwargs because it
140 # is not useful after the generator has been cycled
141 sampler.rstate = np.random.Generator(np.random.PCG64())
142 sampler.rstate.bit_generator.state = sampler.kwargs.pop("random_state")
144 sampling_time = sampler.kwargs.pop("sampling_time")
145 return sampler, sampling_time
146 else:
147 logger.info(f"Resume file {resume_file} does not exist.")
148 return False, 0
151def format_result(
152 run,
153 data_dump,
154 out,
155 weights,
156 nested_samples,
157 sampler_kwargs,
158 sampling_time,
159 rejection_sample_posterior=True,
160):
161 """
162 Packs the variables from the run into a bilby result object
164 Parameters
165 ----------
166 run: AnalysisRun
167 Parallel Bilby run object
168 data_dump: str
169 Path to the *_data_dump.pickle file
170 out: dynesty.results.Results
171 Results from the dynesty sampler
172 weights: numpy.ndarray
173 Array of weights for the points
174 nested_samples: pandas.core.frame.DataFrame
175 DataFrame of the weights and likelihoods
176 sampler_kwargs: dict
177 Dictionary of keyword arguments for the sampler
178 sampling_time: float
179 Time in seconds spent sampling
180 rejection_sample_posterior: bool
181 Whether to generate the posterior samples by rejection sampling the
182 nested samples or resampling with replacement
184 Returns
185 -------
186 result: bilby.core.result.Result
187 result object with values written into its attributes
188 """
190 result = bilby.core.result.Result(
191 label=run.label, outdir=run.outdir, search_parameter_keys=run.sampling_keys
192 )
193 result.priors = run.priors
194 result.nested_samples = nested_samples
195 result.meta_data = run.data_dump["meta_data"]
196 result.meta_data["command_line_args"]["sampler"] = "parallel_bilby"
197 result.meta_data["data_dump"] = data_dump
198 result.meta_data["likelihood"] = run.likelihood.meta_data
199 result.meta_data["sampler_kwargs"] = run.init_sampler_kwargs
200 result.meta_data["run_sampler_kwargs"] = sampler_kwargs
201 result.meta_data["injection_parameters"] = run.injection_parameters
202 result.injection_parameters = run.injection_parameters
204 if rejection_sample_posterior:
205 keep = weights > np.random.uniform(0, max(weights), len(weights))
206 result.samples = out.samples[keep]
207 result.log_likelihood_evaluations = out.logl[keep]
208 logger.info(
209 f"Rejection sampling nested samples to obtain {sum(keep)} posterior samples"
210 )
211 else:
212 result.samples = dynesty.utils.resample_equal(out.samples, weights)
213 result.log_likelihood_evaluations = reorder_loglikelihoods(
214 unsorted_loglikelihoods=out.logl,
215 unsorted_samples=out.samples,
216 sorted_samples=result.samples,
217 )
218 logger.info("Resampling nested samples to posterior samples in place.")
220 result.log_evidence = out.logz[-1] + run.likelihood.noise_log_likelihood()
221 result.log_evidence_err = out.logzerr[-1]
222 result.log_noise_evidence = run.likelihood.noise_log_likelihood()
223 result.log_bayes_factor = result.log_evidence - result.log_noise_evidence
224 result.sampling_time = sampling_time
225 result.num_likelihood_evaluations = np.sum(out.ncall)
227 result.samples_to_posterior(likelihood=run.likelihood, priors=result.priors)
228 return result