Source code for ligo.skymap.util.ilwd
#
# Copyright (C) 2020-2024 Leo Singer
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
"""Tools for adapting LIGO-LW row ID formats."""
import re
from ligo.lw.ligolw import Param
from ligo.lw.lsctables import TableByName
from ligo.lw.table import Column, TableStream
from ligo.lw.types import FormatFunc, FromPyType
__all__ = ('use_in',)
IDTypes = {'ilwd:char', 'ilwd:char_u'}
ROWID_PYTYPE = int
ROWID_TYPE = FromPyType[ROWID_PYTYPE]
ROWID_FORMATFUNC = FormatFunc[ROWID_TYPE]
_ilwd_regex = re.compile(r'\s*\w+:\w+:(\d+)\s*')
def ilwd_to_int(ilwd):
match = _ilwd_regex.fullmatch(ilwd)
if not match:
raise ValueError(f'"{ilwd}" is not formatted like an ilwd')
return int(match[1])
[docs]
def use_in(ContentHandler):
"""Convert from old-style to new-style row IDs on the fly.
This is loosely adapted from :func:`ligo.lw.utils.ilwd.strip_ilwdchar`.
Notes
-----
When building a ContentHandler, this must be the _outermost_ decorator,
outside of :func:`ligo.lw.lsctables.use_in`, :func:`ligo.lw.param.use_in`,
or :func:`ligo.lw.table.use_in`.
Examples
--------
>>> from importlib.resources import as_file, files
>>> from ligo.lw import array, ligolw, lsctables, param, table, utils
>>> from ligo.skymap.util import ilwd
>>> @ilwd.use_in
... @lsctables.use_in
... @param.use_in
... @table.use_in
... class ContentHandler(ligolw.LIGOLWContentHandler):
... pass
>>> with as_file(files('ligo.skymap.io.tests.data').joinpath(
... 'G197392_coinc.xml.gz')) as f:
... xmldoc = utils.load_filename(f, contenthandler=ContentHandler)
>>> table = lsctables.SnglInspiralTable.get_table(xmldoc)
>>> table[0].process_id
0
"""
def endElementNS(self, uri_localname, qname,
__orig_endElementNS=ContentHandler.endElementNS):
"""Convert values of <Param> elements from ilwdchar to int."""
if isinstance(self.current, Param) and self.current.Type in IDTypes:
new_value = ilwd_to_int(self.current.pcdata)
self.current.Type = ROWID_TYPE
self.current.pcdata = ROWID_FORMATFUNC(new_value)
__orig_endElementNS(self, uri_localname, qname)
remapped = {}
def startColumn(self, parent, attrs,
__orig_startColumn=ContentHandler.startColumn):
"""Convert types in <Column> elements from ilwdchar to int.
Notes
-----
This method is adapted from
:func:`ligo.lw.utils.ilwd.strip_ilwdchar`.
"""
result = __orig_startColumn(self, parent, attrs)
# If this is an ilwdchar column, then create a function to convert its
# rows' values for use in the startStream method below.
if result.Type in IDTypes:
remapped[(id(parent), result.Name)] = ilwd_to_int
result.Type = ROWID_TYPE
# If this is an ilwdchar column, then normalize the column name.
if parent.Name in TableByName:
validcolumns = TableByName[parent.Name].validcolumns
if result.Name not in validcolumns:
stripped_column_to_valid_column = {
Column.ColumnName(name): name for name in validcolumns}
if result.Name in stripped_column_to_valid_column:
result.setAttribute(
'Name', stripped_column_to_valid_column[result.Name])
return result
def startStream(self, parent, attrs,
__orig_startStream=ContentHandler.startStream):
"""Convert values in table <Stream> elements from ilwdchar to int.
Notes
-----
This method is adapted from
:meth:`ligo.lw.table.TableStream.config`.
"""
result = __orig_startStream(self, parent, attrs)
if isinstance(result, TableStream):
loadcolumns = set(parent.columnnames)
if parent.loadcolumns is not None:
# FIXME: convert loadcolumns attributes to sets to
# avoid the conversion.
loadcolumns &= set(parent.loadcolumns)
result._tokenizer.set_types([
(remapped.pop((id(parent), colname), pytype)
if colname in loadcolumns else None)
for pytype, colname
in zip(parent.columnpytypes, parent.columnnames)])
return result
ContentHandler.endElementNS = endElementNS
ContentHandler.startColumn = startColumn
ContentHandler.startStream = startStream
return ContentHandler