#!/usr/bin/env python
# encoding: utf-8
"""
Defines various nose unit tests
"""
import numpy as np
from .mh import MHSampler
from .ensemble import EnsembleSampler
from .ptsampler import PTSampler
logprecision = -4
[docs]def lnprob_gaussian(x, icov):
return -np.dot(x, np.dot(icov, x)) / 2.0
[docs]def lnprob_gaussian_nan(x, icov):
#if walker's parameters are zeros => return NaN
if not (np.array(x)).any():
result = np.nan
else:
result = -np.dot(x, np.dot(icov, x)) / 2.0
return result
[docs]def log_unit_sphere_volume(ndim):
if ndim % 2 == 0:
logfactorial = 0.0
for i in range(1, ndim / 2 + 1):
logfactorial += np.log(i)
return ndim / 2.0 * np.log(np.pi) - logfactorial
else:
logfactorial = 0.0
for i in range(1, ndim + 1, 2):
logfactorial += np.log(i)
return (ndim + 1) / 2.0 * np.log(2.0) \
+ (ndim - 1) / 2.0 * np.log(np.pi) - logfactorial
[docs]class LnprobGaussian(object):
def __init__(self, icov, cutoff=None):
"""Initialize a gaussian PDF with the given inverse covariance
matrix. If not ``None``, ``cutoff`` truncates the PDF at the
given number of sigma from the origin (i.e. the PDF is
non-zero only on an ellipse aligned with the principal axes of
the distribution). Without this cutoff, thermodynamic
integration with a flat prior is logarithmically divergent."""
self.icov = icov
self.cutoff = cutoff
def __call__(self, x):
dist2 = lnprob_gaussian(x, self.icov)
if self.cutoff is not None:
if -dist2 > self.cutoff * self.cutoff / 2.0:
return float('-inf')
else:
return dist2
else:
return dist2
[docs]def ln_flat(x):
return 0.0
[docs]class Tests:
[docs] def setUp(self):
self.nwalkers = 100
self.ndim = 5
self.ntemp = 20
self.N = 1000
self.mean = np.zeros(self.ndim)
self.cov = 0.5 - np.random.rand(self.ndim ** 2) \
.reshape((self.ndim, self.ndim))
self.cov = np.triu(self.cov)
self.cov += self.cov.T - np.diag(self.cov.diagonal())
self.cov = np.dot(self.cov, self.cov)
self.icov = np.linalg.inv(self.cov)
self.p0 = [0.1 * np.random.randn(self.ndim)
for i in range(self.nwalkers)]
self.truth = np.random.multivariate_normal(self.mean, self.cov, 100000)
[docs] def check_sampler(self, N=None, p0=None):
if N is None:
N = self.N
if p0 is None:
p0 = self.p0
for i in self.sampler.sample(p0, iterations=N):
pass
assert np.mean(self.sampler.acceptance_fraction) > 0.25
assert np.all(self.sampler.acceptance_fraction > 0)
chain = self.sampler.flatchain
maxdiff = 10. ** (logprecision)
assert np.all((np.mean(chain, axis=0) - self.mean) ** 2 / self.N ** 2
< maxdiff)
assert np.all((np.cov(chain, rowvar=0) - self.cov) ** 2 / self.N ** 2
< maxdiff)
[docs] def check_pt_sampler(self, cutoff, N=None, p0=None):
if N is None:
N = self.N
if p0 is None:
p0 = self.p0
for i in self.sampler.sample(p0, iterations=N):
pass
# Weaker assertions on acceptance fraction
assert np.mean(self.sampler.acceptance_fraction) > 0.1
assert np.mean(self.sampler.tswap_acceptance_fraction) > 0.1
maxdiff = 10.0 ** logprecision
chain = np.reshape(self.sampler.chain[0, ...],
(-1, self.sampler.chain.shape[-1]))
log_volume = self.ndim * np.log(cutoff) \
+ log_unit_sphere_volume(self.ndim) \
+ 0.5 * np.log(np.linalg.det(self.cov))
gaussian_integral = self.ndim / 2.0 * np.log(2.0 * np.pi) \
+ 0.5 * np.log(np.linalg.det(self.cov))
lnZ, dlnZ = self.sampler.thermodynamic_integration_log_evidence()
assert np.abs(lnZ - (gaussian_integral - log_volume)) < 3 * dlnZ
assert np.all((np.mean(chain, axis=0) - self.mean) ** 2.0 / N ** 2.0
< maxdiff)
assert np.all((np.cov(chain, rowvar=0) - self.cov) ** 2.0 / N ** 2.0
< maxdiff)
[docs] def test_mh(self):
self.sampler = MHSampler(self.cov, self.ndim, lnprob_gaussian,
args=[self.icov])
self.check_sampler(N=self.N * self.nwalkers, p0=self.p0[0])
[docs] def test_ensemble(self):
self.sampler = EnsembleSampler(self.nwalkers, self.ndim,
lnprob_gaussian, args=[self.icov])
self.check_sampler()
[docs] def test_nan_lnprob(self):
self.sampler = EnsembleSampler(self.nwalkers, self.ndim,
lnprob_gaussian_nan,
args=[self.icov])
# If a walker is right at zero, ``lnprobfn`` returns ``np.nan``.
p0 = self.p0
p0[0] = 0.0
try:
self.check_sampler(p0=p0)
except ValueError:
# This should fail *immediately* with a ``ValueError``.
return
assert False, "We should never get here."
[docs] def test_inf_nan_params(self):
self.sampler = EnsembleSampler(self.nwalkers, self.ndim,
lnprob_gaussian, args=[self.icov])
# Set one of the walkers to have a ``np.nan`` value.
p0 = self.p0
p0[0][0] = np.nan
try:
self.check_sampler(p0=p0)
except ValueError:
# This should fail *immediately* with a ``ValueError``.
pass
else:
assert False, "The sampler should have failed by now."
# Set one of the walkers to have a ``np.inf`` value.
p0[0][0] = np.inf
try:
self.check_sampler(p0=p0)
except ValueError:
# This should fail *immediately* with a ``ValueError``.
pass
else:
assert False, "The sampler should have failed by now."
# Set one of the walkers to have a ``np.inf`` value.
p0[0][0] = -np.inf
try:
self.check_sampler(p0=p0)
except ValueError:
# This should fail *immediately* with a ``ValueError``.
pass
else:
assert False, "The sampler should have failed by now."
# def test_parallel(self):
# self.sampler = EnsembleSampler(self.nwalkers, self.ndim,
# lnprob_gaussian, args=[self.icov],
# threads=2)
# self.check_sampler()
[docs] def test_pt_sampler(self):
cutoff = 10.0
self.sampler = PTSampler(self.ntemp, self.nwalkers, self.ndim,
LnprobGaussian(self.icov, cutoff=cutoff),
ln_flat)
p0 = np.random.multivariate_normal(mean=self.mean, cov=self.cov,
size=(self.ntemp, self.nwalkers))
self.check_pt_sampler(cutoff, p0=p0, N=1000)
[docs] def test_blobs(self):
lnprobfn = lambda p: (-0.5 * np.sum(p ** 2), np.random.rand())
self.sampler = EnsembleSampler(self.nwalkers, self.ndim, lnprobfn)
self.check_sampler()
# Make sure that the shapes of everything are as expected.
assert (self.sampler.chain.shape == (self.nwalkers, self.N, self.ndim)
and len(self.sampler.blobs) == self.N
and len(self.sampler.blobs[0]) == self.nwalkers), \
"The blob dimensions are wrong."
# Make sure that the blobs aren't all the same.
blobs = self.sampler.blobs
assert np.any([blobs[-1] != blobs[i] for i in range(len(blobs) - 1)])