Source code for gstpipetools

"""Miscellaneous utilities for working with Gstreamer Pipelines

References:
	[1] 1.0 API: https://lazka.github.io/pgi-docs/Gst-1.0/index.html
"""
from typing import Any, Union, Tuple, Iterable, Optional, Callable

import gi

gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst

GObject.threads_init()
Gst.init(None)

from lal import LIGOTimeGPS
from gstlal import simplehandler
from gstlal import pipeio

LIVE_SEGMENT = (None, None)
Time = Union[int, float, LIGOTimeGPS]


[docs]def is_element(x: Any) -> bool: """Test whether an object is a Gst Element. TODO: do these belong somewhere more general? Args: x: Any, the object to test Returns: bool, True if x is a Gst Element, false otherwise """ return isinstance(x, Gst.Element)
[docs]def is_pad(x: Any) -> bool: """Test whether an object is a Gst Pad. TODO: do these belong somewhere more general? Args: x: Any, the object to test Returns: bool, True if x is a Gst Pad, false otherwise """ return isinstance(x, Gst.Pad)
[docs]def to_caps(x: Union[str, Gst.Caps]) -> Gst.Caps: """Create a Caps object from a string, or pass thru if already is Caps Args: x: str or Caps, if a string construct Caps instance from x, else pass thru as Caps Returns: Caps """ if isinstance(x, str): return Gst.Caps.from_string(x) if not isinstance(x, Gst.Caps): raise ValueError('Cannot coerce type {} to Caps: {}'.format(type(x), str(x))) return x
[docs]def make_element(type_name: str, name: str = None, **properties: dict) -> Gst.Element: """Create a new element of the type defined by the given element factory. If name is None, then the element will receive a guaranteed unique name, consisting of the element factory name and a number. If name is given, it will be given the name supplied. Args: type_name: str, the name of the type of element elem_name: str, default None, the name of the element instance properties: dict, keyword arguments to set as properties of the element References: [1] ElementFactor.make: https://lazka.github.io/pgi-docs/Gst-1.0/classes/ElementFactory.html#Gst.ElementFactory.make Returns: Gst.Element """ elem = Gst.ElementFactory.make(type_name, name) if elem is None: raise RuntimeError("Unknown failure creating {} element: confirm that the correct plugins are being loaded".format(type_name)) # Set element properties for k, v in properties.items(): elem.set_property(k, pipeio.format_property(v)) return elem
[docs]def make_pipeline(name: str) -> Gst.Pipeline: """Create a Pipeline, which is the macroscopic container for Gstreamer elements and is necessary prior to creating any elements Args: name: str, the name of the pipeline References: [1] https://gstreamer.freedesktop.org/documentation/application-development/introduction/basics.html?gi-language=c#bins-and-pipelines Returns: Pipeline """ return Gst.Pipeline(name)
[docs]def run_pipeline(pipeline: Gst.Pipeline, segment: Tuple[Time, Time] = LIVE_SEGMENT, handlers: Optional[Iterable[Callable]] = None): """Run a pipeline using the main event loop Args: pipeline: Gst.Pipeline, the pipeline to run segment: Tuple[Time, Time], a playback segment handlers: Iterable[Callable], default None, an optional list of functions that return Handlers. Each must accept only a "mainloop" and "pipeline" argument. Returns: None """ if handlers is None: handlers = (simplehandler.Handler,) mainloop = GObject.MainLoop() # Set handlers for handler in handlers: _ = handler(mainloop=mainloop, pipeline=pipeline) if pipeline.set_state(Gst.State.READY) != Gst.StateChangeReturn.SUCCESS: raise RuntimeError("pipeline did not enter ready state") seek(pipeline, segment) if pipeline.set_state(Gst.State.PLAYING) != Gst.StateChangeReturn.SUCCESS: raise RuntimeError("pipeline did not enter playing state") mainloop.run()
[docs]def seek(pipeline, segment: Tuple[Time, Time], flags=Gst.SeekFlags.FLUSH): """Create a new seek event, i.e., Gst.Event.new_seek() for a given gps_start_time and gps_end_time, with optional flags. Args: pipeline: gps_start_time: start time as LIGOTimeGPS, float gps_end_time: end time as LIGOTimeGPS, float flags: Optional flags, see [2] for options Notes: In contrast to the documentation, we set the seek event directly on the pipeline sources. This is because of implementation issues in framecpp demux that prevent the full backward propagation of the seek event from sink -> source (as is done in the gstreamer documentation [1]). References: [1] https://gstreamer.freedesktop.org/documentation/additional/design/seeking.html?gi-language=python [2] Flags https://gstreamer.freedesktop.org/documentation/gstreamer/gstsegment.html?gi-language=python#GstSeekFlags Returns: None """ start, end = segment start_type, start_time = seek_args(start) stop_type, stop_time = seek_args(end) if pipeline.current_state != Gst.State.READY: raise ValueError("pipeline must be in READY state") pipeline.seek(rate=1.0, format=Gst.Format(Gst.Format.TIME), flags=flags, start_type=start_type, start=start_time, stop_type=stop_type, stop=stop_time) for elem in pipeline.iterate_sources(): elem.seek(rate=1.0, format=Gst.Format(Gst.Format.TIME), flags=flags, start_type=start_type, start=start_time, stop_type=stop_type, stop=stop_time)
[docs]def seek_args(time: Time) -> Tuple[Gst.SeekType, int]: """Convenience function for determining the type of arguments to seek for a given time input Args: time: Time, either a float or LIGOTimeGPS Returns: Tuple[Gst.SeekType, int] """ if time is None or time == -1: return (Gst.SeekType.NONE, -1) # -1 == Gst.CLOCK_TIME_NONE elif isinstance(time, LIGOTimeGPS): return (Gst.SeekType.SET, time.ns()) else: return (Gst.SeekType.SET, int(float(time) * Gst.SECOND))