"""Miscellaneous utilities for working with Gstreamer Pipelines
References:
[1] 1.0 API: https://lazka.github.io/pgi-docs/Gst-1.0/index.html
"""
from typing import Any, Union, Tuple, Iterable, Optional, Callable
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
GObject.threads_init()
Gst.init(None)
from lal import LIGOTimeGPS
from gstlal import simplehandler
from gstlal import pipeio
LIVE_SEGMENT = (None, None)
Time = Union[int, float, LIGOTimeGPS]
[docs]def is_element(x: Any) -> bool:
"""Test whether an object is a Gst Element. TODO: do these belong somewhere more general?
Args:
x:
Any, the object to test
Returns:
bool, True if x is a Gst Element, false otherwise
"""
return isinstance(x, Gst.Element)
[docs]def is_pad(x: Any) -> bool:
"""Test whether an object is a Gst Pad. TODO: do these belong somewhere more general?
Args:
x:
Any, the object to test
Returns:
bool, True if x is a Gst Pad, false otherwise
"""
return isinstance(x, Gst.Pad)
[docs]def to_caps(x: Union[str, Gst.Caps]) -> Gst.Caps:
"""Create a Caps object from a string, or pass thru if already is Caps
Args:
x:
str or Caps, if a string construct Caps instance from x, else pass thru as Caps
Returns:
Caps
"""
if isinstance(x, str):
return Gst.Caps.from_string(x)
if not isinstance(x, Gst.Caps):
raise ValueError('Cannot coerce type {} to Caps: {}'.format(type(x), str(x)))
return x
[docs]def make_element(type_name: str, name: str = None, **properties: dict) -> Gst.Element:
"""Create a new element of the type defined by the given element factory. If name is None, then the element will
receive a guaranteed unique name, consisting of the element factory name and a number. If name is given, it will
be given the name supplied.
Args:
type_name:
str, the name of the type of element
elem_name:
str, default None, the name of the element instance
properties:
dict, keyword arguments to set as properties of the element
References:
[1] ElementFactor.make: https://lazka.github.io/pgi-docs/Gst-1.0/classes/ElementFactory.html#Gst.ElementFactory.make
Returns:
Gst.Element
"""
elem = Gst.ElementFactory.make(type_name, name)
if elem is None:
raise RuntimeError("Unknown failure creating {} element: confirm that the correct plugins are being loaded".format(type_name))
# Set element properties
for k, v in properties.items():
elem.set_property(k, pipeio.format_property(v))
return elem
[docs]def make_pipeline(name: str) -> Gst.Pipeline:
"""Create a Pipeline, which is the macroscopic container for Gstreamer elements and is necessary
prior to creating any elements
Args:
name:
str, the name of the pipeline
References:
[1] https://gstreamer.freedesktop.org/documentation/application-development/introduction/basics.html?gi-language=c#bins-and-pipelines
Returns:
Pipeline
"""
return Gst.Pipeline(name)
[docs]def run_pipeline(pipeline: Gst.Pipeline, segment: Tuple[Time, Time] = LIVE_SEGMENT, handlers: Optional[Iterable[Callable]] = None):
"""Run a pipeline using the main event loop
Args:
pipeline:
Gst.Pipeline, the pipeline to run
segment:
Tuple[Time, Time], a playback segment
handlers:
Iterable[Callable], default None, an optional list of functions that return Handlers. Each must
accept only a "mainloop" and "pipeline" argument.
Returns:
None
"""
if handlers is None:
handlers = (simplehandler.Handler,)
mainloop = GObject.MainLoop()
# Set handlers
for handler in handlers:
_ = handler(mainloop=mainloop, pipeline=pipeline)
if pipeline.set_state(Gst.State.READY) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline did not enter ready state")
seek(pipeline, segment)
if pipeline.set_state(Gst.State.PLAYING) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline did not enter playing state")
mainloop.run()
[docs]def seek(pipeline, segment: Tuple[Time, Time], flags=Gst.SeekFlags.FLUSH):
"""Create a new seek event, i.e., Gst.Event.new_seek() for a given
gps_start_time and gps_end_time, with optional flags.
Args:
pipeline:
gps_start_time:
start time as LIGOTimeGPS, float
gps_end_time:
end time as LIGOTimeGPS, float
flags:
Optional flags, see [2] for options
Notes:
In contrast to the documentation, we set the seek event directly on the pipeline sources. This is because of implementation
issues in framecpp demux that prevent the full backward propagation of the seek event from sink -> source (as is done in the
gstreamer documentation [1]).
References:
[1] https://gstreamer.freedesktop.org/documentation/additional/design/seeking.html?gi-language=python
[2] Flags https://gstreamer.freedesktop.org/documentation/gstreamer/gstsegment.html?gi-language=python#GstSeekFlags
Returns:
None
"""
start, end = segment
start_type, start_time = seek_args(start)
stop_type, stop_time = seek_args(end)
if pipeline.current_state != Gst.State.READY:
raise ValueError("pipeline must be in READY state")
pipeline.seek(rate=1.0,
format=Gst.Format(Gst.Format.TIME),
flags=flags,
start_type=start_type,
start=start_time,
stop_type=stop_type,
stop=stop_time)
for elem in pipeline.iterate_sources():
elem.seek(rate=1.0,
format=Gst.Format(Gst.Format.TIME),
flags=flags,
start_type=start_type,
start=start_time,
stop_type=stop_type,
stop=stop_time)
[docs]def seek_args(time: Time) -> Tuple[Gst.SeekType, int]:
"""Convenience function for determining the type of arguments to seek for a given time input
Args:
time:
Time, either a float or LIGOTimeGPS
Returns:
Tuple[Gst.SeekType, int]
"""
if time is None or time == -1:
return (Gst.SeekType.NONE, -1) # -1 == Gst.CLOCK_TIME_NONE
elif isinstance(time, LIGOTimeGPS):
return (Gst.SeekType.SET, time.ns())
else:
return (Gst.SeekType.SET, int(float(time) * Gst.SECOND))