"""Module for multiplexing (mux) and demultiplexing (demux) elements
"""
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
GObject.threads_init()
Gst.init(None)
from ligo import segments
from lal import iterutils
from lal import LIGOTimeGPS
from gstlal.pipeparts import pipetools
[docs]class FrameCPPChannelDemuxSetUnitsHandler(object):
def __init__(self, elem, units_dict):
"""
Connect a handler for the pad-added signal of the
framecpp_channeldemux element elem, and when a pad is added
to the element if the pad's name appears as a key in the
units_dict dictionary that pad's units property will be set
to the string value associated with that key in the
dictionary.
Example:
>>> FrameCPPChannelDemuxSetUnitsHandler(elem, {"H1:LSC-STRAIN": "strain"}) # doctest: +SKIP
NOTE: this is a work-around to address the problem that
most (all?) frame files do not have units set on their
channel data, whereas downstream consumers of the data
might require information about the units. The demuxer
provides the units as part of a tag event, and
framecpp_channeldemux_set_units() can be used to override
the values, thereby correcting absent or incorrect units
information.
"""
self.elem = elem
self.pad_added_handler_id = elem.connect("pad-added", self.pad_added, units_dict)
assert self.pad_added_handler_id > 0
[docs] @staticmethod
def pad_added(element, pad, units_dict):
name = pad.get_name()
if name in units_dict:
pad.set_property("units", units_dict[name])
[docs]class FrameCPPChannelDemuxCheckSegmentsHandler(object):
"""
Utility to watch for missing data. Pad probes are used to collect
the times spanned by buffers, these are compared to a segment list
defining the intervals of data the stream is required to have. If
any intervals of data are found to have been skipped or if EOS is
seen before the end of the segment list then a ValueError exception
is raised.
There are two ways to use this tool. To directly install a segment
list monitor on a single pad use the .set_probe() class method.
For elements with dynamic pads, the class can be allowed to
automatically add monitors to pads as they become available by
using the element's pad-added signal. In this case initialize an
instance of the class with the element and a dictionary of segment
lists mapping source pad name to the segment list to check that
pad's output against.
In both cases a jitter parameter sets the maximum size of a skipped
segment that will be ignored (for example, to accomodate round-off
error in element timestamp computations). The default is 1 ns.
"""
# FIXME: this code now has two conflicting mechanisms for removing
# probes from pads: one code path removes probes when pads get to
# EOS, while the othe removes a probe each time the pad for the
# probe appears a second or subsequent time on an element (and then
# re-installs the probe on the new pad). it's possible that these
# two could attempt to remove the same probe twice, which will
# cause a crash, although it should not happen in current use
# cases. the fix is to rework the probe tracking mechanism so that
# both code paths agree on what probes are installed
def __init__(self, elem, seglists, jitter=LIGOTimeGPS(0, 1)):
self.jitter = jitter
self.probe_handler_ids = {}
# make a copy of the segmentlistdict in case the calling
# code modifies it
self.pad_added_handler_id = elem.connect("pad-added", self.pad_added, seglists.copy())
assert self.pad_added_handler_id > 0
[docs] def pad_added(self, element, pad, seglists):
name = pad.get_name()
if name in self.probe_handler_ids:
pad.remove_probe(self.probe_handler_ids.pop(name))
if name in seglists:
self.probe_handler_ids[name] = self.set_probe(pad, seglists[name], self.jitter)
assert self.probe_handler_ids[name] > 0
[docs] @classmethod
def set_probe(cls, pad, seglist, jitter=LIGOTimeGPS(0, 1)):
# use a copy of the segmentlist so the probe can modify it
seglist = segments.segmentlist(seglist)
# mutable object to carry data to probe
data = [seglist, jitter, None]
# install probe, save ID in data
probe_id = data[2] = pad.add_probe(Gst.PadProbeType.DATA_DOWNSTREAM, cls.probe, data)
return probe_id
[docs] @staticmethod
def probe(pad, probeinfo, seg_jitter_id):
seglist, jitter, probe_id = seg_jitter_id
if probeinfo.type & Gst.PadProbeType.BUFFER:
obj = probeinfo.get_buffer()
if not obj.mini_object.flags & Gst.BufferFlags.GAP:
# remove the current buffer from the data
# we're expecting to see
seglist -= segments.segmentlist([segments.segment((LIGOTimeGPS(0, obj.pts), LIGOTimeGPS(0, obj.pts + obj.duration)))])
# ignore missing data intervals unless
# they're bigger than the jitter
iterutils.inplace_filter(lambda seg: abs(seg) > jitter, seglist)
# are we still expecting to see something that
# precedes the current buffer?
preceding = segments.segment((segments.NegInfinity, LIGOTimeGPS(0, obj.pts)))
if seglist.intersects_segment(preceding):
raise ValueError("%s: detected missing data: %s" % (pad.get_name(), seglist & segments.segmentlist([preceding])))
elif probeinfo.type & Gst.PadProbeType.EVENT_DOWNSTREAM and probeinfo.get_event().type == Gst.EventType.EOS:
# detach probe at EOS
pad.remove_probe(probe_id)
# ignore missing data intervals unless they're
# bigger than the jitter
iterutils.inplace_filter(lambda seg: abs(seg) > jitter, seglist)
if seglist:
raise ValueError("%s: at EOS detected missing data: %s" % (pad.get_name(), seglist))
return True
[docs]def framecpp_channel_demux(pipeline, src, **properties):
"""Demux src using framecpp
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
**properties:
dict, keyword arguments to be set as element properties
References:
[1] framecppdemux implementation: gstlal/gstlal-ugly/gst/framecpp/framecpp_channeldemux.cc
Returns:
Element, src demuxed using framecpp
"""
return pipetools.make_element_with_src(pipeline, src, "framecpp_channeldemux", **properties)
[docs]def framecpp_channel_mux(pipeline, channel_src_map, units=None, seglists=None, **properties):
"""Mux a source using framecpp
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
channel_src_map:
dict, mapping a channel -> src element
units:
str, default None, if given set these units on source
seglists:
default None, if given create a segments handler for these segments
**properties:
Returns:
Element, the muxed sources
"""
elem = pipetools.make_element_with_src(pipeline, None, "framecpp_channelmux", **properties)
if channel_src_map is not None:
for channel, src in channel_src_map.items():
for srcpad in src.srcpads:
# FIXME FIXME FIXME. This should use the pad template from the element.
# FIXME once a newer version of some library is available, then we should be able to switch to this
# if srcpad.link(elem.get_request_pad(channel)) == Gst.PadLinkReturn.OK
# Instead. Right now it fails due to the
# underscore in channel names. When it fails
# it fails silently and returns None, which
# gives a cryptic error message
if srcpad.link(elem.request_pad(Gst.PadTemplate.new(channel, Gst.PadDirection.SINK, Gst.PadPresence.REQUEST, Gst.Caps("ANY")), channel)) == Gst.PadLinkReturn.OK:
break
if units is not None:
FrameCPPChannelDemuxSetUnitsHandler(elem, units)
if seglists is not None:
FrameCPPChannelDemuxCheckSegmentsHandler(elem, seglists)
return elem
[docs]def framecpp_channel_mux_from_list(pipeline, *srcs, channels = None, **properties):
"""Mux a source using framecpp
NOTE: This acts similarly to framecpp_channel_mux with a different function
signature to map channels to sources.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
*srcs:
Gst.Element, the source elements
channels:
Union[str, Iterable], default None, the channels mapping to sources
seglists:
default None, if given create a segments handler for these segments
**properties:
Returns:
Element, the muxed sources
"""
if isinstance(channels, str):
channels = [channels]
channel_src_map = {channel: src for channel, src in zip(channels, srcs)}
return framecpp_channel_mux(pipeline, channel_src_map, **properties)
[docs]def ogg_mux(pipeline, src):
"""This element merges streams (audio and video) into ogg files.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
References:
[1] oggmux docs: https://gstreamer.freedesktop.org/documentation/ogg/oggmux.html?gi-language=python
Returns:
Element, the source merged as ogg format
"""
return pipetools.make_element_with_src(pipeline, src, "oggmux")
[docs]def avi_mux(pipeline, src):
"""Muxes raw or compressed audio and/or video streams into an AVI file.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
References:
[1] avimux docs: https://gstreamer.freedesktop.org/documentation/avi/avimux.html?gi-language=python
Returns:
Element, the source merged as avi
"""
return pipetools.make_element_with_src(pipeline, src, "avimux")