Source code for pesummary.gw.file.psd

# Licensed under an MIT style license -- see LICENSE.md

import os
import numpy as np
from pesummary import conf
from pesummary.utils.utils import logger, check_file_exists_and_rename
from pesummary.utils.dict import Dict

__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"]


[docs]class PSDDict(Dict): """Class to handle a dictionary of PSDs Parameters ---------- detectors: list list of detectors data: nd list list of psd samples for each detector. First column is frequencies, second column is strains Attributes ---------- detectors: list list of detectors stored in the dictionary Methods ------- plot: Generate a plot based on the psd samples stored to_pycbc: Convert dictionary of PSD objects to a dictionary of pycbc.frequencyseries objects objects Examples -------- >>> from pesummary.gw.file.psd import PSDDict >>> detectors = ["H1", "V1"] >>> psd_data = [ ... [[0.00000e+00, 2.50000e-01], ... [1.25000e-01, 2.50000e-01], ... [2.50000e-01, 2.50000e-01]], ... [[0.00000e+00, 2.50000e-01], ... [1.25000e-01, 2.50000e-01], ... [2.50000e-01, 2.50000e-01]] ... ] >>> psd_dict = PSDDict(detectors, psd_data) >>> psd_data = { ... "H1": [[0.00000e+00, 2.50000e-01], ... [1.25000e-01, 2.50000e-01], ... [2.50000e-01, 2.50000e-01]], ... "V1": [[0.00000e+00, 2.50000e-01], ... [1.25000e-01, 2.50000e-01], ... [2.50000e-01, 2.50000e-01]] ... } >>> psd_dict = PSDDict(psd_data) """ def __init__(self, *args): super(PSDDict, self).__init__( *args, value_class=PSD, value_columns=["frequencies", "strains"] ) @property def detectors(self): return list(self.keys())
[docs] @classmethod def read(cls, files=None, detectors=None, common_string=None): """Initiate PSDDict with a set of PSD files Parameters ---------- files: list/dict, optional Either a list of files or a dictionary of files to read. If a list of files are provided, a list of corresponding detectors must also be provided common_string: str, optional Common string for PSD files. The string must be formattable and take one argument which is the detector. For example common_string='./{}_psd.dat'. Used if files is not provided detectors: list, optional List of detectors to use when loading files. Used if files if not provided or if files is a list or if common_string is provided """ if files is not None: if isinstance(files, list) and detectors is not None: if len(detectors) != len(files): raise ValueError( "Please provide a detector for each file" ) files = {det: ff for det, ff in zip(detectors, files)} elif isinstance(files, dict): pass else: raise ValueError( "Please provide either a dictionary of files, or a list " "files and a list of detectors for which they correspond." ) elif common_string is not None and detectors is not None: files = {det: common_string.format(det) for det in detectors} else: raise ValueError( "Please provide either a list of files to read or " "a common string and a list of detectors to load." ) psd = {} for key, item in files.items(): psd[key] = PSD.read(item, IFO=key) return PSDDict(psd)
[docs] def plot(self, **kwargs): """Generate a plot to display the PSD data stored in PSDDict Parameters ---------- **kwargs: dict all additional kwargs are passed to pesummary.gw.plots.plot._psd_plot """ from pesummary.gw.plots.plot import _psd_plot _detectors = self.detectors frequencies = [self[IFO].frequencies for IFO in _detectors] strains = [self[IFO].strains for IFO in _detectors] return _psd_plot(frequencies, strains, labels=_detectors, **kwargs)
[docs] def to_pycbc(self, *args, **kwargs): """Transform dictionary to pycbc.frequencyseries objects Parameters ---------- *args: tuple all args passed to PSD.to_pycbc() **kwargs: dict, optional all kwargs passed to PSD.to_pycbc() """ psd = {} for key, item in self.items(): psd[key] = item.to_pycbc(*args, **kwargs) return PSDDict(psd)
[docs]class PSD(np.ndarray): """Class to handle PSD data """ def __new__(cls, input_array): obj = np.asarray(input_array).view(cls) if obj.shape[1] != 2: raise ValueError( "Invalid input data. See the docs for instructions" ) obj.delta_f = cls.delta_f(obj) obj.f_high = cls.f_high(obj) return obj @staticmethod def delta_f(array): return array.T[0][1] - array.T[0][0] @staticmethod def f_high(array): return array.T[0][-1]
[docs] @classmethod def read(cls, path_to_file, **kwargs): """Read in a file and initialize the PSD class Parameters ---------- path_to_file: str the path to the file you wish to load **kwargs: dict all kwargs are passed to the read methods """ from pesummary.core.file.formats.base_read import Read mapping = { "dat": PSD.read_from_dat, "txt": PSD.read_from_dat, "xml": PSD.read_from_xml, } if not os.path.isfile(path_to_file): raise FileNotFoundError( "The file '{}' does not exist".format(path_to_file) ) extension = Read.extension_from_path(path_to_file) if ".xml.gz" in path_to_file: return cls(mapping["xml"](path_to_file, **kwargs)) elif extension not in mapping.keys(): raise NotImplementedError( "Unable to read in a PSD with format '{}'. The allowed formats " "are: {}".format(extension, ", ".join(list(mapping.keys()))) ) return cls(mapping[extension](path_to_file, **kwargs))
[docs] @staticmethod def read_from_dat(path_to_file, IFO=None, **kwargs): """Read in a dat file and return a numpy array containing the data Parameters ---------- path_to_file: str the path to the file you wish to load **kwargs: dict all kwargs are passed to the numpy.genfromtxt method """ try: data = np.genfromtxt(path_to_file, **kwargs) return data except ValueError: data = np.genfromtxt(path_to_file, skip_footer=2, **kwargs) return data
[docs] @staticmethod def read_from_xml(path_to_file, IFO=None, **kwargs): """Read in an xml file and return a numpy array containing the data Parameters ---------- path_to_file: str the path to the file you wish to load IFO: str, optional name of the dataset that you wish to load **kwargs: dict all kwargs are passed to the gwpy.frequencyseries.FrequencySeries.read method """ from gwpy.frequencyseries import FrequencySeries data = FrequencySeries.read(path_to_file, name=IFO, **kwargs) frequencies = np.array(data.frequencies) strains = np.array(data) return np.vstack([frequencies, strains]).T
[docs] def save_to_file(self, file_name, comments="#", delimiter=conf.delimiter): """Save the calibration data to file Parameters ---------- file_name: str name of the file name that you wish to use comments: str, optional String that will be prepended to the header and footer strings, to mark them as comments. Default is '#'. delimiter: str, optional String or character separating columns. """ check_file_exists_and_rename(file_name) header = ["Frequency", "Strain"] np.savetxt( file_name, self, delimiter=delimiter, comments=comments, header=delimiter.join(header) )
def __array_finalize__(self, obj): if obj is None: return self.delta_f = getattr(obj, "delta_f", None) self.f_high = getattr(obj, "f_high", None)
[docs] def to_pycbc( self, low_freq_cutoff, f_high=None, length=None, delta_f=None, f_high_override=False ): """Convert the PSD object to an interpolated pycbc.types.FrequencySeries Parameters ---------- length : int, optional Length of the frequency series in samples. delta_f : float, optional Frequency resolution of the frequency series in Herz. low_freq_cutoff : float, optional Frequencies below this value are set to zero. f_high_override: Bool, optional Override the final frequency if it is above the maximum stored. Default False """ from pycbc.psd.read import from_numpy_arrays if delta_f is None: delta_f = self.delta_f if f_high is None: f_high = self.f_high elif f_high > self.f_high: msg = ( "Specified value of final frequency: {} is above the maximum " "frequency stored: {}. ".format(f_high, self.f_high) ) if f_high_override: msg += "Overwriting the final frequency" f_high = self.f_high else: msg += ( "This will result in an interpolation error. Either change " "the final frequency specified or set the 'f_high_override' " "kwarg to True" ) logger.warning(msg) if length is None: length = int(f_high / delta_f) + 1 pycbc_psd = from_numpy_arrays( self.T[0], self.T[1], length, delta_f, low_freq_cutoff ) return pycbc_psd