Source code for segments

# Copyright (C) 2010--2020  Kipp Cannon, Patrick Godwin, Chad Hanna, Ryan Magee
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
# details.
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

import itertools
import math
import os
import ssl
import urllib.request
from typing import Iterable, Mapping, Union
import warnings

import dqsegdb2.query
from ligo import segments
from ligo.segments import utils as segutils
from ligo.lw import ligolw
from ligo.lw import lsctables
from ligo.lw import utils as ligolw_utils
from lal import LIGOTimeGPS


[docs]@lsctables.use_in class LIGOLWContentHandler(ligolw.LIGOLWContentHandler): pass
[docs]def query_dqsegdb_segments( instruments: Union[str, Iterable], start: Union[int, LIGOTimeGPS], end: Union[int, LIGOTimeGPS], flags: Union[str, Mapping], server: str = DEFAULT_DQSEGDB_SERVER, ) -> segments.segmentlistdict: """Query DQSegDB for science segments. Args: instruments: Union[str, Iterable], the instruments to query segments for start: Union[int, LIGOTimeGPS], the GPS start time end: Union[int, LIGOTimeGPS], the GPS end time flags: Union[str, Mapping], the name of the DQ flags used to query server: str, defaults to main DQSegDB server, the server URL Returns: segmentlistdict, the queried segments """ span = segments.segment(LIGOTimeGPS(start), LIGOTimeGPS(end)) if isinstance(flags, str): if not isinstance(instruments, str): raise ValueError("if flags is type str, then instruments must also be type str") flags = {instruments: flags} if isinstance(instruments, str): instruments = [instruments] segs = segments.segmentlistdict() for ifo, flag in flags.items(): active = dqsegdb2.query.query_segments(flag, start, end, host=server, coalesce=True)["active"] segs[ifo] = segments.segmentlist([span]) & active return segs
[docs]def query_dqsegdb_veto_segments( instruments: Union[str, Iterable], start: Union[int, LIGOTimeGPS], end: Union[int, LIGOTimeGPS], veto_definer_file: str, category: str, cumulative: bool = True, server: str = DEFAULT_DQSEGDB_SERVER, ) -> segments.segmentlistdict: """Query DQSegDB for veto segments. Args: instruments: Union[str, Iterable], the instruments to query segments for start: Union[int, LIGOTimeGPS], the GPS start time end: Union[int, LIGOTimeGPS], the GPS end time veto_definer_file: str, the veto definer file in which to query veto segments for category: the veto category to use for vetoes, one of CAT1, CAT2, CAT3 cumulative: whether veto categories are cumulative, e.g. choosing CAT2 also includes CAT1 vetoes server: str, defaults to main DQSegDB server, the server URL Returns: segmentlistdict, the queried veto segments """ if isinstance(instruments, str): instruments = [instruments] if category not in set(("CAT1", "CAT2", "CAT3")): raise ValueError("not valid category") # read in vetoes xmldoc = ligolw_utils.load_filename(veto_definer_file, contenthandler=LIGOLWContentHandler) vetoes = lsctables.VetoDefTable.get_table(xmldoc) # filter vetoes by instruments vetoes[:] = [v for v in vetoes if v.ifo in set(instruments)] # filter vetoes by category cat_level = int(category[-1]) if cumulative: vetoes[:] = [v for v in vetoes if v.category <= cat_level] else: vetoes[:] = [v for v in vetoes if v.category == cat_level] # retrieve segments corresponding to flags segs = segments.segmentlistdict() for instrument in instruments: segs[instrument] = segments.segmentlist() for veto in vetoes: flag = f"{veto.ifo}:{}:{veto.version}" segs[veto.ifo] |= dqsegdb2.query.query_segments(flag, start, end, host=server, coalesce=True)["active"] segs.coalesce() return segs
[docs]def query_gwosc_segments( instruments: Union[str, Iterable], start: Union[int, LIGOTimeGPS], end: Union[int, LIGOTimeGPS], verify_certs: bool = True, ) -> segments.segmentlistdict: """Query GWOSC for science segments. Args: instruments: Union[str, Iterable], the instruments to query segments for start: Union[int, LIGOTimeGPS], the GPS start time end: Union[int, LIGOTimeGPS], the GPS end time verify_certs: bool, default True, whether to verify SSL certificates when querying GWOSC. Returns: segmentlistdict, the queried segments """ if isinstance(instruments, str): instruments = [instruments] # Set up SSL context context = ssl.create_default_context() if not verify_certs: context.check_hostname = False context.verify_mode = ssl.CERT_NONE # Retrieve segments segs = segments.segmentlistdict() for instrument in instruments: url = _gwosc_segment_url(start, end, f"{instrument}_DATA") urldata = urllib.request.urlopen(url, context=context).read().decode('utf-8') with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=FutureWarning) segs[instrument] = segutils.fromsegwizard( urldata.splitlines(), coltype=lsctables.LIGOTimeGPS, ) segs.coalesce() return segs
[docs]def query_gwosc_veto_segments( instruments: Union[str, Iterable], start: Union[int, LIGOTimeGPS], end: Union[int, LIGOTimeGPS], category: str, cumulative: bool = True, verify_certs: bool = True, ) -> segments.segmentlistdict: """Query GWOSC for veto segments. Args: instruments: Union[str, Iterable], the instruments to query segments for start: Union[int, LIGOTimeGPS], the GPS start time end: Union[int, LIGOTimeGPS], the GPS end time category: the veto category to use for vetoes, one of CAT1, CAT2, CAT3 cumulative: whether veto categories are cumulative, e.g. choosing CAT2 also includes CAT1 vetoes verify_certs: bool, default True, whether to verify SSL certificates when querying GWOSC. Returns: segmentlistdict, the queried veto segments """ span = segments.segment(LIGOTimeGPS(start), LIGOTimeGPS(end)) if isinstance(instruments, str): instruments = [instruments] if category not in set(("CAT1", "CAT2", "CAT3")): raise ValueError("not valid category") if cumulative: flags = [f"CBC_CAT{i}" for i in range(1, int(category[-1]) + 1)] else: flags = [f"CBC_{category}"] # hardware injections not in normal categories in GWOSC # so we treat them all as CAT1 (except CW) hw_inj_flags = [ "NO_BURST_HW_INJ", "NO_CBC_HW_INJ", "NO_DETCHAR_HW_INJ", "NO_STOCH_HW_INJ", ] flags += hw_inj_flags # set up SSL context context = ssl.create_default_context() if not verify_certs: context.check_hostname = False context.verify_mode = ssl.CERT_NONE # retrieve segments corresponding to flags segs = segments.segmentlistdict() for instrument in instruments: segs[instrument] = segments.segmentlist([span]) for flag in flags: url = _gwosc_segment_url(start, end, f"{instrument}_{flag}") urldata = urllib.request.urlopen(url, context=context).read().decode('utf-8') with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=FutureWarning) segs[instrument] &= segutils.fromsegwizard( urldata.splitlines(), coltype=lsctables.LIGOTimeGPS, ) segs.coalesce() # invert segments to transform into vetoes for instrument in instruments: segs[instrument] = segments.segmentlist([span]) & ~segs[instrument] return segs
[docs]def analysis_segments( ifos: Iterable[str], allsegs: segments.segmentlistdict, boundary_seg: segments.segment, start_pad: float = 0., overlap: float = 0., min_instruments: int = 1, one_ifo_length: float = (3600 * 8.), ) -> segments.segmentlistdict: """Generate all disjoint detector combination segments for analysis job boundaries. """ ifos = set(ifos) segsdict = segments.segmentlistdict() # segment length dependent on the number of instruments # so that longest job runtimes are similar segment_length = lambda n_ifo: one_ifo_length / 2 ** (n_ifo - 1) # generate analysis segments for n in range(min_instruments, 1 + len(ifos)): for ifo_combos in itertools.combinations(list(ifos), n): ifo_key = frozenset(ifo_combos) segsdict[ifo_key] = allsegs.intersection(ifo_combos) - allsegs.union(ifos - set(ifo_combos)) segsdict[ifo_key] = segsdict[ifo_key].protract(overlap) segsdict[ifo_key] &= segments.segmentlist([boundary_seg]) segsdict[ifo_key] = split_segments(segsdict[ifo_key], segment_length(len(ifo_combos)), start_pad) if not segsdict[ifo_key]: del segsdict[ifo_key] return segsdict
[docs]def split_segments_by_lock( ifos: Iterable, seglistdicts: segments.segmentlistdict, boundary_seg: segments.segment, max_time: float = 10 * 24 * 3600., ) -> segments.segmentlist: """Split segments into segments with maximum time and boundaries outside of lock stretches. """ ifos = set(seglistdicts) # create set of segments for each ifo when it was # in coincidence with at least one other ifo doublesegs = segments.segmentlistdict() for ifo1 in ifos: for ifo2 in ifos - set([ifo1]): if ifo1 in doublesegs: doublesegs[ifo1] |= seglistdicts.intersection((ifo1, ifo2)) else: doublesegs[ifo1] = seglistdicts.intersection((ifo1, ifo2)) # This is the set of segments when at least two ifos were on doublesegsunion = doublesegs.union(doublesegs.keys()) # This is the set of segments when at least one ifo was on segs = seglistdicts.union(seglistdicts.keys()) # define when "enough time" has passed def enoughtime(seglist, start, end): return abs(seglist & segments.segmentlist([segments.segment(start, end)])) > 0.7 * max_time # iterate through all the segment where at least one ifo was on and extract # chunks where each ifo satisfies our coincidence requirement. A consequence is # that we only define boundaries when one ifos is on chunks = segments.segmentlist([boundary_seg]) # This places boundaries when only one ifo or less was on for start, end in doublesegsunion: if all([enoughtime(s, chunks[-1][0], end) for s in doublesegs.values()]): chunks[-1] = segments.segment(chunks[-1][0], end) chunks.append(segments.segment(end, boundary_seg[1])) # check that last segment has enough livetime # if not, merge it with the previous segment if len(chunks) > 1 and abs(chunks[-1]) < 0.3 * max_time: last_chunk = chunks.pop() chunks[-1] = segments.segmentlist([chunks[-1], last_chunk]).coalesce().extent() return chunks
[docs]def split_segments( seglist: segments.segmentlist, maxextent: float, overlap: float ) -> segments.segmentlist: """Split a segmentlist into segments of maximum extent. """ newseglist = segments.segmentlist() for bigseg in seglist: newseglist.extend(split_segment(bigseg, maxextent, overlap)) return newseglist
[docs]def split_segment(seg: segments.segment, maxextent: float, overlap: float) -> segments.segmentlist: """Split a segment into segments of maximum extent. """ if maxextent <= 0: raise ValueError("maxextent must be positive, not %s" % repr(maxextent)) # Simple case of only one segment if abs(seg) < maxextent: return segments.segmentlist([seg]) # adjust maxextent so that segments are divided roughly equally maxextent = max(int(abs(seg) / (int(abs(seg)) // int(maxextent) + 1)), overlap) maxextent = int(math.ceil(abs(seg) / math.ceil(abs(seg) / maxextent))) end = seg[1] seglist = segments.segmentlist() while abs(seg): if (seg[0] + maxextent + overlap) < end: seglist.append(segments.segment(seg[0], seg[0] + maxextent + overlap)) seg = segments.segment(seglist[-1][1] - overlap, seg[1]) else: seglist.append(segments.segment(seg[0], end)) break return seglist
def _gwosc_segment_url(start, end, flag): """Returns the GWOSC URL associated with segments. """ span = segments.segment(LIGOTimeGPS(start), LIGOTimeGPS(end)) # determine GWOSC URL to query from urlbase = "" if start in segments.segment(1126051217, 1137254417): query_url = f"{urlbase}/O1" elif start in segments.segment(1164556817, 1187733618): query_url = f"{urlbase}/O2_16KHZ_R1" elif start in segments.segment(1238166018, 1253977218): query_url = f"{urlbase}/O3a_16KHZ_R1" elif start in segments.segment(1256655618, 1269363618): query_url = f"{urlbase}/O3b_16KHZ_R1" else: raise ValueError("GPS times requested not in GWOSC") return f"{query_url}/{flag}/{span[0]}/{abs(span)}"