# Copyright (C) 2021 Soichiro Kuwahara
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
#
# =============================================================================
#
# Preamble
#
# ============================================================================
#
import itertools
import math
import multiprocessing
import numpy
import random
from scipy import stats
import sys
import time
from lal import rate
from lalburst import snglcoinc
from ligo.lw import ligolw
from ligo.lw import lsctables
from ligo.lw import array as ligolw_array
from ligo.lw import param as ligolw_param
from ligo.lw import utils as ligolw_utils
from gstlal.stats import trigger_rate
#
# =============================================================================
#
# Likelihood Machinery
#
# ============================================================================
#
#
# Numerator
#
[docs]class LnSignalDensity(snglcoinc.LnLRDensity):
ln_chisq_const = {"H1": -1.9, "L1": -2.0}
threshold = {"H1": 120., "L1": 100.}
width = {"H1": 1.0, "L1": 1.5}
ln_matching = {"H1": -9.5, "L1": -9.5}
def __init__(self, instruments, delta_t, min_instruments):
self.instruments = instruments
self.delta_t = delta_t
self.min_instruments = min_instruments
def __call__(self, t, snr, chisq):
# check if they have same instrument keys
assert set(snr) == set(chisq)
# FIXME: this ignores correlation of SNRs
# FIXME: snr^-4 is for 1/r^2 flux density, snr^-7 is for 1/r flux density
if True:
lnP = -7. * sum(math.log(s) for s in snr.values())
else:
lnP = -4. * sum(math.log(s) for s in snr.values())
for instrument, s in snr.items():
if s >= self.threshold[instrument]:
lnP += stats.norm.pdf(math.log(chisq[instrument] / s ** 2.), self.ln_matching[instrument], self.width[instrument])
else:
lnP += stats.norm.pdf(math.log(chisq[instrument] / s ** 2.), self.ln_matching[instrument] + self.ln_chisq_const[instrument] * math.log(s / self.threshold[instrument]), 0.11 + (self.width[instrument] - 0.11) * math.log(s) / math.log(self.threshold[instrument]))
return lnP
def __iadd__(self, other):
# compatibility checks
assert self.ln_chisq_const == other.ln_chisq_const
assert self.threshold == other.threshold
assert self.width == other.width
assert self.ln_matching == other.ln_matching
# no-op: fixed function
return self
[docs] def increment(self, event):
# not used in this pipeline
raise NotImplementedError
[docs] def finish(self):
# no-op
pass
[docs] def to_xml(self, name):
xml = super(LnSignalDensity, self).to_xml(name)
xml.appendChild(ligolw_param.Param.from_pyvalue("instruments", lsctables.instrumentsproperty.set(self.instruments)))
xml.appendChild(ligolw_param.Param.from_pyvalue("delta_t", self.delta_t))
xml.appendChild(ligolw_param.Param.from_pyvalue("min_instruments", self.min_instruments))
# FIXME: the other PDF parameters aren't saved, so when
# loaded it always gets whatever the class defines them to
# be. is this OK?
return xml
[docs] @classmethod
def from_xml(cls, xml, name):
xml = cls.get_xml_root(xml, name)
self = cls(
instruments = lsctables.instrumentsproperty.get(ligolw_param.get_pyvalue(xml, "instruments")),
delta_t = ligolw_param.get_pyvalue(xml, "delta_t"),
min_instruments = ligolw_param.get_pyvalue(xml, "min_instruments")
)
return self
#
# Denominator
#
[docs]class LnNoiseDensity(snglcoinc.LnLRDensity):
snr_chisq_binning = rate.NDBins((rate.ATanLogarithmicBins(2.6, 26., 300), rate.ATanLogarithmicBins(.001, 0.2, 280)))
def __init__(self, instruments, delta_t, min_instruments):
self.instruments = instruments
self.delta_t = delta_t
self.min_instruments = min_instruments
self.lnpdfs = dict((instrument, rate.BinnedLnPDF(self.snr_chisq_binning)) for instrument in instruments)
self.triggerrates = trigger_rate.triggerrates((instrument, trigger_rate.ratebinlist()) for instrument in self.instruments)
@property
def segmentlists(self):
return self.triggerrates.segmentlistdict()
def __call__(self, t, snr, chisq):
lnP = sum(self.lnpdfs[instrument][s, chisq[instrument] / s**2.] for instrument, s in snr.items())
return lnP
def __iadd__(self, other):
# compatibility checks
assert self.instruments == other.instruments
assert self.delta_t == other.delta_t
assert self.min_instruments == other.min_instruments
# add snr \chi^2 PDFs
for instrument in self.instruments:
self.lnpdfs[instrument] += other.lnpdfs[instrument]
# add trigger rate segment data
self.triggerrates += other.triggerrates
return self
[docs] def increment(self, event):
self.lnpdfs[event.ifo].count[event.snr, event.chisq / event.snr**2.] += 1
[docs] def finish(self):
for instrument in self.instruments:
rate.filter_array(self.lnpdfs[instrument].array, rate.gaussian_window(4., 4.))
self.lnpdfs[instrument].normalize()
#
# never allow PDFs that have had the density estimation
# transform applied to be written to disk: on-disk files
# must only ever provide raw counts. also don't allow
# density estimation to be applied twice
#
def to_xml(*args, **kwargs):
raise NotImplementedError("writing .finish()'ed LnLRDensity object to disk is forbidden")
self.to_xml = to_xml
def finish(*args, **kwargs):
raise NotImplementedError(".finish()ing a .finish()ed LnLRDensity object is forbidden")
self.finish = finish
[docs] def random_params(self, snr_min):
"""
Generator that yields an endless sequence of randomly
generated candidate parameters. NOTE: the parameters will
be within the domain of the repsective binnings but are not
drawn from the PDF stored in those binnings --- this is not
an MCMC style sampler. Each value in the sequence is a
three-element tuple. The first two elements of each tuple
provide the *args and **kwargs values for calls to this PDF
or the numerator PDF or the ranking statistic object. The
final is the natural logarithm (up to an arbitrary
constant) of the PDF from which the parameters have been
drawn evaluated at the point described by the *args and
**kwargs.
The sequence is suitable for input to the .ln_lr_samples()
log likelihood ratio generator.
"""
snr_slope = 0.8 / len(self.instruments)**3
snrchi2gens = dict((instrument, iter(self.lnpdfs[instrument].bins.randcoord(ns = (snr_slope, 1.), domain = (slice(snr_min, None), slice(1e-6, 1e1)))).__next__) for instrument in self.instruments)
t_and_rate_gen = iter(self.triggerrates.random_uniform()).__next__
coinc_rates = snglcoinc.CoincRates(self.instruments, self.delta_t, self.min_instruments)
t_offsets_gen = dict((instruments, coinc_rates.plausible_toas(instruments).__next__) for instruments in coinc_rates.all_instrument_combos)
random_randint = random.randint
random_sample = random.sample
def nCk(n, k):
return math.factorial(n) // math.factorial(k) // math.factorial(n - k)
while 1:
t, rates, lnP_t = t_and_rate_gen()
instruments = tuple(instrument for instrument, rate in rates.items() if rate > 0)
if len(instruments) < self.min_instruments:
# FIXME: doing this invalidates lnP_t. I
# think the error is merely an overall
# normalization error, though, and nothing
# cares about the normalization. if this
# was fixed then the lnP reported by this
# generator would be normalized
continue
# to pick instruments, we first pick an integer k
# between m = min_instruments and n =
# len(instruments) inclusively, then choose that
# many unique names from among the available
# instruments. the probability of the outcome is
#
# = P(k) * P(selection | k)
# = 1 / (n - m + 1) * 1 / nCk
#
# where nCk = number of k choices without
# replacement from a collection of n things.
k = random_randint(self.min_instruments, len(instruments))
lnP_instruments = -math.log((len(instruments) - self.min_instruments + 1) * nCk(len(instruments), k))
instruments = frozenset(random_sample(instruments, k))
seq = sum((snrchi2gens[instrument]() for instrument in instruments), ())
kwargs = {
"t": dict((instrument, t + float(dt)) for instrument, dt in t_offsets_gen[instruments]()),
"snr": dict((instrument, snr) for instrument, (snr, chi2oversnr2) in zip(instruments, seq[0::2])),
"chisq": dict((instrument, chi2oversnr2 * snr**2.) for instrument, (snr, chi2oversnr2) in zip(instruments, seq[0::2]))
}
yield (), kwargs, sum(seq[1::2], lnP_t + lnP_instruments)
[docs] def to_xml(self, name):
xml = super(LnNoiseDensity, self).to_xml(name)
xml.appendChild(ligolw_param.Param.from_pyvalue("instruments", lsctables.instrumentsproperty.set(self.instruments)))
xml.appendChild(ligolw_param.Param.from_pyvalue("delta_t", self.delta_t))
xml.appendChild(ligolw_param.Param.from_pyvalue("min_instruments", self.min_instruments))
xml.appendChild(self.triggerrates.to_xml("triggerrates"))
for instrument, lnpdf in self.lnpdfs.items():
lnpdf.normalize()
xml.appendChild(lnpdf.to_xml("snr_chisq_%s" % instrument))
return xml
[docs] @classmethod
def from_xml(cls, xml, name):
xml = cls.get_xml_root(xml, name)
self = cls(
instruments = lsctables.instrumentsproperty.get(ligolw_param.get_pyvalue(xml, "instruments")),
delta_t = ligolw_param.get_pyvalue(xml, "delta_t"),
min_instruments = ligolw_param.get_pyvalue(xml, "min_instruments")
)
self.triggerrates = self.triggerrates.from_xml(xml, "triggerrates")
self.triggerrates.coalesce() # just in case
for instrument in self.lnpdfs:
self.lnpdfs[instrument] = self.lnpdfs[instrument].from_xml(xml, "snr_chisq_%s" % instrument)
self.lnpdfs[instrument].normalize()
return self
#
# RankingStat
#
[docs]class RankingStat(snglcoinc.LnLikelihoodRatioMixin):
ligo_lw_name_suffix = "gstlal_cherenkov_rankingstat"
def __init__(self, instruments, delta_t, min_instruments = 2):
self.numerator = LnSignalDensity(instruments = instruments, delta_t = delta_t, min_instruments = min_instruments)
self.denominator = LnNoiseDensity(instruments = instruments, delta_t = delta_t, min_instruments = min_instruments)
@property
def instruments(self):
return self.denominator.instruments
@property
def delta_t(self):
return self.denominator.delta_t
@property
def min_instruments(self):
return self.denominator.min_instruments
@property
def segmentlists(self):
return self.denominator.segmentlists
def __iadd__(self, other):
self.numerator += other.numerator
self.denominator += other.denominator
return self
[docs] def kwargs_from_triggers(self, events, offsetvector):
return dict(
t = dict((event.ifo, event.peak + offsetvector[event.ifo]) for event in events),
snr = dict((event.ifo, event.snr) for event in events),
chisq = dict((event.ifo, event.chisq) for event in events)
)
[docs] def ln_lr_from_triggers(self, events, offsetvector):
return self(**self.kwargs_from_triggers(events, offsetvector))
[docs] @classmethod
def get_xml_root(cls, xml, name):
name = "%s:%s" % (name, cls.ligo_lw_name_suffix)
xml = [elem for elem in xml.getElementsByTagName(ligolw.LIGO_LW.tagName) if elem.hasAttribute("Name") and elem.Name == name]
if len(xml) != 1:
raise ValueError("XML tree must contain exactly one %s element named %s" % (ligolw.LIGO_LW.tagName, name))
return xml[0]
[docs] def finish(self):
self.numerator.finish()
self.denominator.finish()
[docs] def to_xml(self, name):
xml = ligolw.LIGO_LW({"Name": "%s:%s" % (name, self.ligo_lw_name_suffix)})
xml.appendChild(self.numerator.to_xml("numerator"))
xml.appendChild(self.denominator.to_xml("denominator"))
return xml
[docs] @classmethod
def from_xml(cls, xml, name):
xml = cls.get_xml_root(xml, name)
self = cls.__new__(cls)
self.numerator = LnSignalDensity.from_xml(xml, "numerator")
self.denominator = LnNoiseDensity.from_xml(xml, "denominator")
return self
#
# =============================================================================
#
# Ranking Statistic PDF
#
# ============================================================================
#
[docs]def binned_log_likelihood_ratio_rates_from_samples(noise_lr_lnpdf, ln_lr_samples, nsamples):
exp = math.exp
isnan = math.isnan
noise_lr_lnpdf_count = noise_lr_lnpdf.count
for ln_lamb, lnP_signal, lnP_noise in itertools.islice(ln_lr_samples, nsamples):
if isnan(ln_lamb):
raise ValueError("encountered NaN likelihood ratio")
if isnan(lnP_signal) or isnan(lnP_noise):
raise ValueError("encountered NaN signal or noise model probability densities")
noise_lr_lnpdf_count[ln_lamb,] += exp(lnP_noise)
[docs]class RankingStatPDF(object):
ligo_lw_name_suffix = "gstlal_cherenkov_rankingstatpdf"
[docs] @staticmethod
def density_estimate(lnpdf, name, kernel = rate.gaussian_window(4.)):
"""
For internal use only.
"""
assert not numpy.isnan(lnpdf.array).any(), "%s log likelihood ratio PDF contain NaNs" % name
rate.filter_array(lnpdf.array, kernel)
[docs] @staticmethod
def binned_log_likelihood_ratio_rates_from_samples_wrapper(queue, noise_lr_lnpdf, ln_lr_samples, nsamples):
"""
For internal use only.
"""
try:
# want the forked processes to use different random
# number sequences, so we re-seed Python and
# numpy's random number generators here in the
# wrapper in the hope that that takes care of it
random.seed()
numpy.random.seed()
binned_log_likelihood_ratio_rates_from_samples(noise_lr_lnpdf, ln_lr_samples, nsamples)
queue.put((noise_lr_lnpdf.array,))
except:
queue.put((None,))
raise
def __init__(self, rankingstat, nsamples = 2**24, nthreads = 8, verbose = False):
#
# bailout out used by .from_xml() class method to get an
# uninitialized instance
#
if rankingstat is None:
return
#
# initialize binnings
#
self.noise_lr_lnpdf = rate.BinnedLnPDF(rate.NDBins((rate.ATanBins(0., 110., 6000),)))
self.zl_lr_lnpdf = rate.BinnedLnPDF(rate.NDBins((rate.ATanBins(0., 110., 6000),)))
#
# bailout used by codes that want all-zeros histograms
#
if not nsamples:
return
#
# run importance-weighted random sampling to populate
# binnings.
#
nthreads = int(nthreads)
assert nthreads >= 1
threads = []
for i in range(nthreads):
assert nsamples // nthreads >= 1
q = multiprocessing.SimpleQueue()
p = multiprocessing.Process(target = lambda: self.binned_log_likelihood_ratio_rates_from_samples_wrapper(
q,
self.noise_lr_lnpdf,
rankingstat.ln_lr_samples(rankingstat.denominator.random_params(4.0)),
nsamples = nsamples // nthreads
))
p.start()
print("a", file=sys.stderr)
threads.append((p, q))
nsamples -= nsamples // nthreads
nthreads -= 1
# sleep a bit to help random number seeds change
time.sleep(1.5)
while threads:
p, q = threads.pop(0)
noise_counts, = q.get()
self.noise_lr_lnpdf.array += noise_counts
p.join()
if p.exitcode:
raise Exception("sampling thread failed")
if verbose:
print("done computing ranking statistic PDFs", file=sys.stderr)
#
# apply density estimation kernel to noise model counts
#
self.density_estimate(self.noise_lr_lnpdf, "noise model")
self.noise_lr_lnpdf.normalize()
[docs] def copy(self):
new = self.__class__(None)
new.noise_lr_lnpdf = self.noise_lr_lnpdf.copy()
new.zl_lr_lnpdf = self.zl_lr_lnpdf.copy()
return new
def __iadd__(self, other):
self.noise_lr_lnpdf += other.noise_lr_lnpdf
self.noise_lr_lnpdf.normalize()
self.zl_lr_lnpdf += other.zl_lr_lnpdf
self.zl_lr_lnpdf.normalize()
return self
[docs] @classmethod
def get_xml_root(cls, xml, name):
"""
Sub-classes can use this in their overrides of the
.from_xml() method to find the root element of the XML
serialization.
"""
name = "%s:%s" % (name, cls.ligo_lw_name_suffix)
xml = [elem for elem in xml.getElementsByTagName(ligolw.LIGO_LW.tagName) if elem.hasAttribute("Name") and elem.Name == name]
if len(xml) != 1:
raise ValueError("XML tree must contain exactly one %s element named %s" % (ligolw.LIGO_LW.tagName, name))
return xml[0]
[docs] @classmethod
def from_xml(cls, xml, name):
# find the root of the XML tree containing the
# serialization of this object
xml = cls.get_xml_root(xml, name)
# create a mostly uninitialized instance
self = cls(None)
# populate from XML
self.noise_lr_lnpdf = rate.BinnedLnPDF.from_xml(xml, "noise_lr_lnpdf")
self.zl_lr_lnpdf = rate.BinnedLnPDF.from_xml(xml, "zl_lr_lnpdf")
# shouldn't be needed, but just in case
self.noise_lr_lnpdf.normalize()
self.zl_lr_lnpdf.normalize()
return self
[docs] def to_xml(self, name):
# do not allow ourselves to be written to disk without our
# PDFs' internal normalization metadata being up to date
self.noise_lr_lnpdf.normalize()
self.zl_lr_lnpdf.normalize()
xml = ligolw.LIGO_LW({"Name": "%s:%s" % (name, self.ligo_lw_name_suffix)})
xml.appendChild(self.noise_lr_lnpdf.to_xml("noise_lr_lnpdf"))
xml.appendChild(self.zl_lr_lnpdf.to_xml("zl_lr_lnpdf"))
return xml
#
# =============================================================================
#
# I/O
#
# ============================================================================
#
[docs]@lsctables.use_in
@ligolw_array.use_in
@ligolw_param.use_in
class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
pass
[docs]def marginalize_rankingstat_urls(urls, verbose = False):
"""
Implements marginalization of PDFs in ranking statistic data files.
The marginalization is over the degree of freedom represented by
the file collection. One or both of the candidate parameter PDFs
and ranking statistic PDFs can be processed, with errors thrown if
one or more files is missing the required component.
"""
name = "rankingstat"
data = None
for n, url in enumerate(urls, start = 1):
if verbose:
print("%d/%d:" % (n, len(urls)), file=sys.stderr)
xmldoc = ligolw_utils.load_url(url, verbose = verbose, contenthandler = LIGOLWContentHandler)
if data is None:
data = RankingStat.from_xml(xmldoc, name)
else:
data += RankingStat.from_xml(xmldoc, name)
xmldoc.unlink()
return data