Source code for cherenkov.rankingstat

# Copyright (C) 2021 Soichiro Kuwahara
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


#
# =============================================================================
#
#                                   Preamble
#
# ============================================================================
#


import itertools
import math
import multiprocessing
import numpy
import random
from scipy import stats
import sys
import time

from lal import rate
from lalburst import snglcoinc

from ligo.lw import ligolw
from ligo.lw import lsctables
from ligo.lw import array as ligolw_array
from ligo.lw import param as ligolw_param
from ligo.lw import utils as ligolw_utils

from gstlal.stats import trigger_rate


#
# =============================================================================
#
#                             Likelihood Machinery
#
# ============================================================================
#


#
# Numerator
#

[docs]class LnSignalDensity(snglcoinc.LnLRDensity): ln_chisq_const = {"H1": -1.9, "L1": -2.0} threshold = {"H1": 120., "L1": 100.} width = {"H1": 1.0, "L1": 1.5} ln_matching = {"H1": -9.5, "L1": -9.5} def __init__(self, instruments, delta_t, min_instruments): self.instruments = instruments self.delta_t = delta_t self.min_instruments = min_instruments def __call__(self, t, snr, chisq): # check if they have same instrument keys assert set(snr) == set(chisq) # FIXME: this ignores correlation of SNRs # FIXME: snr^-4 is for 1/r^2 flux density, snr^-7 is for 1/r flux density if True: lnP = -7. * sum(math.log(s) for s in snr.values()) else: lnP = -4. * sum(math.log(s) for s in snr.values()) for instrument, s in snr.items(): if s >= self.threshold[instrument]: lnP += stats.norm.pdf(math.log(chisq[instrument] / s ** 2.), self.ln_matching[instrument], self.width[instrument]) else: lnP += stats.norm.pdf(math.log(chisq[instrument] / s ** 2.), self.ln_matching[instrument] + self.ln_chisq_const[instrument] * math.log(s / self.threshold[instrument]), 0.11 + (self.width[instrument] - 0.11) * math.log(s) / math.log(self.threshold[instrument])) return lnP def __iadd__(self, other): # compatibility checks assert self.ln_chisq_const == other.ln_chisq_const assert self.threshold == other.threshold assert self.width == other.width assert self.ln_matching == other.ln_matching # no-op: fixed function return self
[docs] def increment(self, event): # not used in this pipeline raise NotImplementedError
[docs] def finish(self): # no-op pass
[docs] def to_xml(self, name): xml = super(LnSignalDensity, self).to_xml(name) xml.appendChild(ligolw_param.Param.from_pyvalue("instruments", lsctables.instrumentsproperty.set(self.instruments))) xml.appendChild(ligolw_param.Param.from_pyvalue("delta_t", self.delta_t)) xml.appendChild(ligolw_param.Param.from_pyvalue("min_instruments", self.min_instruments)) # FIXME: the other PDF parameters aren't saved, so when # loaded it always gets whatever the class defines them to # be. is this OK? return xml
[docs] @classmethod def from_xml(cls, xml, name): xml = cls.get_xml_root(xml, name) self = cls( instruments = lsctables.instrumentsproperty.get(ligolw_param.get_pyvalue(xml, "instruments")), delta_t = ligolw_param.get_pyvalue(xml, "delta_t"), min_instruments = ligolw_param.get_pyvalue(xml, "min_instruments") ) return self
# # Denominator #
[docs]class LnNoiseDensity(snglcoinc.LnLRDensity): snr_chisq_binning = rate.NDBins((rate.ATanLogarithmicBins(2.6, 26., 300), rate.ATanLogarithmicBins(.001, 0.2, 280))) def __init__(self, instruments, delta_t, min_instruments): self.instruments = instruments self.delta_t = delta_t self.min_instruments = min_instruments self.lnpdfs = dict((instrument, rate.BinnedLnPDF(self.snr_chisq_binning)) for instrument in instruments) self.triggerrates = trigger_rate.triggerrates((instrument, trigger_rate.ratebinlist()) for instrument in self.instruments) @property def segmentlists(self): return self.triggerrates.segmentlistdict() def __call__(self, t, snr, chisq): lnP = sum(self.lnpdfs[instrument][s, chisq[instrument] / s**2.] for instrument, s in snr.items()) return lnP def __iadd__(self, other): # compatibility checks assert self.instruments == other.instruments assert self.delta_t == other.delta_t assert self.min_instruments == other.min_instruments # add snr \chi^2 PDFs for instrument in self.instruments: self.lnpdfs[instrument] += other.lnpdfs[instrument] # add trigger rate segment data self.triggerrates += other.triggerrates return self
[docs] def increment(self, event): self.lnpdfs[event.ifo].count[event.snr, event.chisq / event.snr**2.] += 1
[docs] def finish(self): for instrument in self.instruments: rate.filter_array(self.lnpdfs[instrument].array, rate.gaussian_window(4., 4.)) self.lnpdfs[instrument].normalize() # # never allow PDFs that have had the density estimation # transform applied to be written to disk: on-disk files # must only ever provide raw counts. also don't allow # density estimation to be applied twice # def to_xml(*args, **kwargs): raise NotImplementedError("writing .finish()'ed LnLRDensity object to disk is forbidden") self.to_xml = to_xml def finish(*args, **kwargs): raise NotImplementedError(".finish()ing a .finish()ed LnLRDensity object is forbidden") self.finish = finish
[docs] def random_params(self, snr_min): """ Generator that yields an endless sequence of randomly generated candidate parameters. NOTE: the parameters will be within the domain of the repsective binnings but are not drawn from the PDF stored in those binnings --- this is not an MCMC style sampler. Each value in the sequence is a three-element tuple. The first two elements of each tuple provide the *args and **kwargs values for calls to this PDF or the numerator PDF or the ranking statistic object. The final is the natural logarithm (up to an arbitrary constant) of the PDF from which the parameters have been drawn evaluated at the point described by the *args and **kwargs. The sequence is suitable for input to the .ln_lr_samples() log likelihood ratio generator. """ snr_slope = 0.8 / len(self.instruments)**3 snrchi2gens = dict((instrument, iter(self.lnpdfs[instrument].bins.randcoord(ns = (snr_slope, 1.), domain = (slice(snr_min, None), slice(1e-6, 1e1)))).__next__) for instrument in self.instruments) t_and_rate_gen = iter(self.triggerrates.random_uniform()).__next__ coinc_rates = snglcoinc.CoincRates(self.instruments, self.delta_t, self.min_instruments) t_offsets_gen = dict((instruments, coinc_rates.plausible_toas(instruments).__next__) for instruments in coinc_rates.all_instrument_combos) random_randint = random.randint random_sample = random.sample def nCk(n, k): return math.factorial(n) // math.factorial(k) // math.factorial(n - k) while 1: t, rates, lnP_t = t_and_rate_gen() instruments = tuple(instrument for instrument, rate in rates.items() if rate > 0) if len(instruments) < self.min_instruments: # FIXME: doing this invalidates lnP_t. I # think the error is merely an overall # normalization error, though, and nothing # cares about the normalization. if this # was fixed then the lnP reported by this # generator would be normalized continue # to pick instruments, we first pick an integer k # between m = min_instruments and n = # len(instruments) inclusively, then choose that # many unique names from among the available # instruments. the probability of the outcome is # # = P(k) * P(selection | k) # = 1 / (n - m + 1) * 1 / nCk # # where nCk = number of k choices without # replacement from a collection of n things. k = random_randint(self.min_instruments, len(instruments)) lnP_instruments = -math.log((len(instruments) - self.min_instruments + 1) * nCk(len(instruments), k)) instruments = frozenset(random_sample(instruments, k)) seq = sum((snrchi2gens[instrument]() for instrument in instruments), ()) kwargs = { "t": dict((instrument, t + float(dt)) for instrument, dt in t_offsets_gen[instruments]()), "snr": dict((instrument, snr) for instrument, (snr, chi2oversnr2) in zip(instruments, seq[0::2])), "chisq": dict((instrument, chi2oversnr2 * snr**2.) for instrument, (snr, chi2oversnr2) in zip(instruments, seq[0::2])) } yield (), kwargs, sum(seq[1::2], lnP_t + lnP_instruments)
[docs] def to_xml(self, name): xml = super(LnNoiseDensity, self).to_xml(name) xml.appendChild(ligolw_param.Param.from_pyvalue("instruments", lsctables.instrumentsproperty.set(self.instruments))) xml.appendChild(ligolw_param.Param.from_pyvalue("delta_t", self.delta_t)) xml.appendChild(ligolw_param.Param.from_pyvalue("min_instruments", self.min_instruments)) xml.appendChild(self.triggerrates.to_xml("triggerrates")) for instrument, lnpdf in self.lnpdfs.items(): lnpdf.normalize() xml.appendChild(lnpdf.to_xml("snr_chisq_%s" % instrument)) return xml
[docs] @classmethod def from_xml(cls, xml, name): xml = cls.get_xml_root(xml, name) self = cls( instruments = lsctables.instrumentsproperty.get(ligolw_param.get_pyvalue(xml, "instruments")), delta_t = ligolw_param.get_pyvalue(xml, "delta_t"), min_instruments = ligolw_param.get_pyvalue(xml, "min_instruments") ) self.triggerrates = self.triggerrates.from_xml(xml, "triggerrates") self.triggerrates.coalesce() # just in case for instrument in self.lnpdfs: self.lnpdfs[instrument] = self.lnpdfs[instrument].from_xml(xml, "snr_chisq_%s" % instrument) self.lnpdfs[instrument].normalize() return self
# # RankingStat #
[docs]class RankingStat(snglcoinc.LnLikelihoodRatioMixin): ligo_lw_name_suffix = "gstlal_cherenkov_rankingstat" def __init__(self, instruments, delta_t, min_instruments = 2): self.numerator = LnSignalDensity(instruments = instruments, delta_t = delta_t, min_instruments = min_instruments) self.denominator = LnNoiseDensity(instruments = instruments, delta_t = delta_t, min_instruments = min_instruments) @property def instruments(self): return self.denominator.instruments @property def delta_t(self): return self.denominator.delta_t @property def min_instruments(self): return self.denominator.min_instruments @property def segmentlists(self): return self.denominator.segmentlists def __iadd__(self, other): self.numerator += other.numerator self.denominator += other.denominator return self
[docs] def kwargs_from_triggers(self, events, offsetvector): return dict( t = dict((event.ifo, event.peak + offsetvector[event.ifo]) for event in events), snr = dict((event.ifo, event.snr) for event in events), chisq = dict((event.ifo, event.chisq) for event in events) )
[docs] def ln_lr_from_triggers(self, events, offsetvector): return self(**self.kwargs_from_triggers(events, offsetvector))
[docs] @classmethod def get_xml_root(cls, xml, name): name = "%s:%s" % (name, cls.ligo_lw_name_suffix) xml = [elem for elem in xml.getElementsByTagName(ligolw.LIGO_LW.tagName) if elem.hasAttribute("Name") and elem.Name == name] if len(xml) != 1: raise ValueError("XML tree must contain exactly one %s element named %s" % (ligolw.LIGO_LW.tagName, name)) return xml[0]
[docs] def finish(self): self.numerator.finish() self.denominator.finish()
[docs] def to_xml(self, name): xml = ligolw.LIGO_LW({"Name": "%s:%s" % (name, self.ligo_lw_name_suffix)}) xml.appendChild(self.numerator.to_xml("numerator")) xml.appendChild(self.denominator.to_xml("denominator")) return xml
[docs] @classmethod def from_xml(cls, xml, name): xml = cls.get_xml_root(xml, name) self = cls.__new__(cls) self.numerator = LnSignalDensity.from_xml(xml, "numerator") self.denominator = LnNoiseDensity.from_xml(xml, "denominator") return self
# # ============================================================================= # # Ranking Statistic PDF # # ============================================================================ #
[docs]def binned_log_likelihood_ratio_rates_from_samples(noise_lr_lnpdf, ln_lr_samples, nsamples): exp = math.exp isnan = math.isnan noise_lr_lnpdf_count = noise_lr_lnpdf.count for ln_lamb, lnP_signal, lnP_noise in itertools.islice(ln_lr_samples, nsamples): if isnan(ln_lamb): raise ValueError("encountered NaN likelihood ratio") if isnan(lnP_signal) or isnan(lnP_noise): raise ValueError("encountered NaN signal or noise model probability densities") noise_lr_lnpdf_count[ln_lamb,] += exp(lnP_noise)
[docs]class RankingStatPDF(object): ligo_lw_name_suffix = "gstlal_cherenkov_rankingstatpdf"
[docs] @staticmethod def density_estimate(lnpdf, name, kernel = rate.gaussian_window(4.)): """ For internal use only. """ assert not numpy.isnan(lnpdf.array).any(), "%s log likelihood ratio PDF contain NaNs" % name rate.filter_array(lnpdf.array, kernel)
[docs] @staticmethod def binned_log_likelihood_ratio_rates_from_samples_wrapper(queue, noise_lr_lnpdf, ln_lr_samples, nsamples): """ For internal use only. """ try: # want the forked processes to use different random # number sequences, so we re-seed Python and # numpy's random number generators here in the # wrapper in the hope that that takes care of it random.seed() numpy.random.seed() binned_log_likelihood_ratio_rates_from_samples(noise_lr_lnpdf, ln_lr_samples, nsamples) queue.put((noise_lr_lnpdf.array,)) except: queue.put((None,)) raise
def __init__(self, rankingstat, nsamples = 2**24, nthreads = 8, verbose = False): # # bailout out used by .from_xml() class method to get an # uninitialized instance # if rankingstat is None: return # # initialize binnings # self.noise_lr_lnpdf = rate.BinnedLnPDF(rate.NDBins((rate.ATanBins(0., 110., 6000),))) self.zl_lr_lnpdf = rate.BinnedLnPDF(rate.NDBins((rate.ATanBins(0., 110., 6000),))) # # bailout used by codes that want all-zeros histograms # if not nsamples: return # # run importance-weighted random sampling to populate # binnings. # nthreads = int(nthreads) assert nthreads >= 1 threads = [] for i in range(nthreads): assert nsamples // nthreads >= 1 q = multiprocessing.SimpleQueue() p = multiprocessing.Process(target = lambda: self.binned_log_likelihood_ratio_rates_from_samples_wrapper( q, self.noise_lr_lnpdf, rankingstat.ln_lr_samples(rankingstat.denominator.random_params(4.0)), nsamples = nsamples // nthreads )) p.start() print("a", file=sys.stderr) threads.append((p, q)) nsamples -= nsamples // nthreads nthreads -= 1 # sleep a bit to help random number seeds change time.sleep(1.5) while threads: p, q = threads.pop(0) noise_counts, = q.get() self.noise_lr_lnpdf.array += noise_counts p.join() if p.exitcode: raise Exception("sampling thread failed") if verbose: print("done computing ranking statistic PDFs", file=sys.stderr) # # apply density estimation kernel to noise model counts # self.density_estimate(self.noise_lr_lnpdf, "noise model") self.noise_lr_lnpdf.normalize()
[docs] def copy(self): new = self.__class__(None) new.noise_lr_lnpdf = self.noise_lr_lnpdf.copy() new.zl_lr_lnpdf = self.zl_lr_lnpdf.copy() return new
def __iadd__(self, other): self.noise_lr_lnpdf += other.noise_lr_lnpdf self.noise_lr_lnpdf.normalize() self.zl_lr_lnpdf += other.zl_lr_lnpdf self.zl_lr_lnpdf.normalize() return self
[docs] @classmethod def get_xml_root(cls, xml, name): """ Sub-classes can use this in their overrides of the .from_xml() method to find the root element of the XML serialization. """ name = "%s:%s" % (name, cls.ligo_lw_name_suffix) xml = [elem for elem in xml.getElementsByTagName(ligolw.LIGO_LW.tagName) if elem.hasAttribute("Name") and elem.Name == name] if len(xml) != 1: raise ValueError("XML tree must contain exactly one %s element named %s" % (ligolw.LIGO_LW.tagName, name)) return xml[0]
[docs] @classmethod def from_xml(cls, xml, name): # find the root of the XML tree containing the # serialization of this object xml = cls.get_xml_root(xml, name) # create a mostly uninitialized instance self = cls(None) # populate from XML self.noise_lr_lnpdf = rate.BinnedLnPDF.from_xml(xml, "noise_lr_lnpdf") self.zl_lr_lnpdf = rate.BinnedLnPDF.from_xml(xml, "zl_lr_lnpdf") # shouldn't be needed, but just in case self.noise_lr_lnpdf.normalize() self.zl_lr_lnpdf.normalize() return self
[docs] def to_xml(self, name): # do not allow ourselves to be written to disk without our # PDFs' internal normalization metadata being up to date self.noise_lr_lnpdf.normalize() self.zl_lr_lnpdf.normalize() xml = ligolw.LIGO_LW({"Name": "%s:%s" % (name, self.ligo_lw_name_suffix)}) xml.appendChild(self.noise_lr_lnpdf.to_xml("noise_lr_lnpdf")) xml.appendChild(self.zl_lr_lnpdf.to_xml("zl_lr_lnpdf")) return xml
# # ============================================================================= # # I/O # # ============================================================================ #
[docs]@lsctables.use_in @ligolw_array.use_in @ligolw_param.use_in class LIGOLWContentHandler(ligolw.LIGOLWContentHandler): pass
[docs]def marginalize_rankingstat_urls(urls, verbose = False): """ Implements marginalization of PDFs in ranking statistic data files. The marginalization is over the degree of freedom represented by the file collection. One or both of the candidate parameter PDFs and ranking statistic PDFs can be processed, with errors thrown if one or more files is missing the required component. """ name = "rankingstat" data = None for n, url in enumerate(urls, start = 1): if verbose: print("%d/%d:" % (n, len(urls)), file=sys.stderr) xmldoc = ligolw_utils.load_url(url, verbose = verbose, contenthandler = LIGOLWContentHandler) if data is None: data = RankingStat.from_xml(xmldoc, name) else: data += RankingStat.from_xml(xmldoc, name) xmldoc.unlink() return data