Source code for pipeparts.mux

"""Module for multiplexing (mux) and demultiplexing (demux) elements

"""

import gi

gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst

GObject.threads_init()
Gst.init(None)

from ligo import segments
from lal import iterutils
from lal import LIGOTimeGPS
from gstlal.pipeparts import pipetools


[docs]class FrameCPPChannelDemuxSetUnitsHandler(object): def __init__(self, elem, units_dict): """ Connect a handler for the pad-added signal of the framecpp_channeldemux element elem, and when a pad is added to the element if the pad's name appears as a key in the units_dict dictionary that pad's units property will be set to the string value associated with that key in the dictionary. Example: >>> FrameCPPChannelDemuxSetUnitsHandler(elem, {"H1:LSC-STRAIN": "strain"}) # doctest: +SKIP NOTE: this is a work-around to address the problem that most (all?) frame files do not have units set on their channel data, whereas downstream consumers of the data might require information about the units. The demuxer provides the units as part of a tag event, and framecpp_channeldemux_set_units() can be used to override the values, thereby correcting absent or incorrect units information. """ self.elem = elem self.pad_added_handler_id = elem.connect("pad-added", self.pad_added, units_dict) assert self.pad_added_handler_id > 0
[docs] @staticmethod def pad_added(element, pad, units_dict): name = pad.get_name() if name in units_dict: pad.set_property("units", units_dict[name])
[docs]class FrameCPPChannelDemuxCheckSegmentsHandler(object): """ Utility to watch for missing data. Pad probes are used to collect the times spanned by buffers, these are compared to a segment list defining the intervals of data the stream is required to have. If any intervals of data are found to have been skipped or if EOS is seen before the end of the segment list then a ValueError exception is raised. There are two ways to use this tool. To directly install a segment list monitor on a single pad use the .set_probe() class method. For elements with dynamic pads, the class can be allowed to automatically add monitors to pads as they become available by using the element's pad-added signal. In this case initialize an instance of the class with the element and a dictionary of segment lists mapping source pad name to the segment list to check that pad's output against. In both cases a jitter parameter sets the maximum size of a skipped segment that will be ignored (for example, to accomodate round-off error in element timestamp computations). The default is 1 ns. """ # FIXME: this code now has two conflicting mechanisms for removing # probes from pads: one code path removes probes when pads get to # EOS, while the othe removes a probe each time the pad for the # probe appears a second or subsequent time on an element (and then # re-installs the probe on the new pad). it's possible that these # two could attempt to remove the same probe twice, which will # cause a crash, although it should not happen in current use # cases. the fix is to rework the probe tracking mechanism so that # both code paths agree on what probes are installed def __init__(self, elem, seglists, jitter=LIGOTimeGPS(0, 1)): self.jitter = jitter self.probe_handler_ids = {} # make a copy of the segmentlistdict in case the calling # code modifies it self.pad_added_handler_id = elem.connect("pad-added", self.pad_added, seglists.copy()) assert self.pad_added_handler_id > 0
[docs] def pad_added(self, element, pad, seglists): name = pad.get_name() if name in self.probe_handler_ids: pad.remove_probe(self.probe_handler_ids.pop(name)) if name in seglists: self.probe_handler_ids[name] = self.set_probe(pad, seglists[name], self.jitter) assert self.probe_handler_ids[name] > 0
[docs] @classmethod def set_probe(cls, pad, seglist, jitter=LIGOTimeGPS(0, 1)): # use a copy of the segmentlist so the probe can modify it seglist = segments.segmentlist(seglist) # mutable object to carry data to probe data = [seglist, jitter, None] # install probe, save ID in data probe_id = data[2] = pad.add_probe(Gst.PadProbeType.DATA_DOWNSTREAM, cls.probe, data) return probe_id
[docs] @staticmethod def probe(pad, probeinfo, seg_jitter_id): seglist, jitter, probe_id = seg_jitter_id if probeinfo.type & Gst.PadProbeType.BUFFER: obj = probeinfo.get_buffer() if not obj.mini_object.flags & Gst.BufferFlags.GAP: # remove the current buffer from the data # we're expecting to see seglist -= segments.segmentlist([segments.segment((LIGOTimeGPS(0, obj.pts), LIGOTimeGPS(0, obj.pts + obj.duration)))]) # ignore missing data intervals unless # they're bigger than the jitter iterutils.inplace_filter(lambda seg: abs(seg) > jitter, seglist) # are we still expecting to see something that # precedes the current buffer? preceding = segments.segment((segments.NegInfinity, LIGOTimeGPS(0, obj.pts))) if seglist.intersects_segment(preceding): raise ValueError("%s: detected missing data: %s" % (pad.get_name(), seglist & segments.segmentlist([preceding]))) elif probeinfo.type & Gst.PadProbeType.EVENT_DOWNSTREAM and probeinfo.get_event().type == Gst.EventType.EOS: # detach probe at EOS pad.remove_probe(probe_id) # ignore missing data intervals unless they're # bigger than the jitter iterutils.inplace_filter(lambda seg: abs(seg) > jitter, seglist) if seglist: raise ValueError("%s: at EOS detected missing data: %s" % (pad.get_name(), seglist)) return True
[docs]def framecpp_channel_demux(pipeline, src, **properties): """Demux src using framecpp Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element **properties: dict, keyword arguments to be set as element properties References: [1] framecppdemux implementation: gstlal/gstlal-ugly/gst/framecpp/framecpp_channeldemux.cc Returns: Element, src demuxed using framecpp """ return pipetools.make_element_with_src(pipeline, src, "framecpp_channeldemux", **properties)
[docs]def framecpp_channel_mux(pipeline, channel_src_map, units=None, seglists=None, **properties): """Mux a source using framecpp Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added channel_src_map: dict, mapping a channel -> src element units: str, default None, if given set these units on source seglists: default None, if given create a segments handler for these segments **properties: Returns: Element, the muxed sources """ elem = pipetools.make_element_with_src(pipeline, None, "framecpp_channelmux", **properties) if channel_src_map is not None: for channel, src in channel_src_map.items(): for srcpad in src.srcpads: # FIXME FIXME FIXME. This should use the pad template from the element. # FIXME once a newer version of some library is available, then we should be able to switch to this # if srcpad.link(elem.get_request_pad(channel)) == Gst.PadLinkReturn.OK # Instead. Right now it fails due to the # underscore in channel names. When it fails # it fails silently and returns None, which # gives a cryptic error message if srcpad.link(elem.request_pad(Gst.PadTemplate.new(channel, Gst.PadDirection.SINK, Gst.PadPresence.REQUEST, Gst.Caps("ANY")), channel)) == Gst.PadLinkReturn.OK: break if units is not None: FrameCPPChannelDemuxSetUnitsHandler(elem, units) if seglists is not None: FrameCPPChannelDemuxCheckSegmentsHandler(elem, seglists) return elem
[docs]def framecpp_channel_mux_from_list(pipeline, *srcs, channels = None, **properties): """Mux a source using framecpp NOTE: This acts similarly to framecpp_channel_mux with a different function signature to map channels to sources. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added *srcs: Gst.Element, the source elements channels: Union[str, Iterable], default None, the channels mapping to sources seglists: default None, if given create a segments handler for these segments **properties: Returns: Element, the muxed sources """ if isinstance(channels, str): channels = [channels] channel_src_map = {channel: src for channel, src in zip(channels, srcs)} return framecpp_channel_mux(pipeline, channel_src_map, **properties)
[docs]def ogg_mux(pipeline, src): """This element merges streams (audio and video) into ogg files. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element References: [1] oggmux docs: https://gstreamer.freedesktop.org/documentation/ogg/oggmux.html?gi-language=python Returns: Element, the source merged as ogg format """ return pipetools.make_element_with_src(pipeline, src, "oggmux")
[docs]def avi_mux(pipeline, src): """Muxes raw or compressed audio and/or video streams into an AVI file. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element References: [1] avimux docs: https://gstreamer.freedesktop.org/documentation/avi/avimux.html?gi-language=python Returns: Element, the source merged as avi """ return pipetools.make_element_with_src(pipeline, src, "avimux")