# Copyright (C) 2020 Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""High-level tools to build GStreamer pipelines.
"""
import os
import uuid
from collections import namedtuple
from collections.abc import Mapping
from typing import Callable, Iterable, Optional, Tuple, Union
from typing import Mapping as MappingType
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstAudio', '1.0')
gi.require_version('GLib', '2.0')
from gi.repository import Gst, GLib
from lal import LIGOTimeGPS
from gstlal import datasource
from gstlal import pipeparts
from gstlal import pipeio
from gstlal import simplehandler
from gstlal.utilities.element_registry import ElementRegistry
SourceElem = namedtuple("SourceElem", "datasource is_live gps_range state_vector dq_vector idq_series")
Buffer = namedtuple("Buffer", "name t0 duration data caps is_gap")
MessageType = Gst.MessageType
[docs]class Stream(ElementRegistry):
"""Class for building a GStreamer-based pipeline.
"""
_gst_init = False
_has_elements = False
_caps_buffer_map = None
def __init__(
self,
name: Optional[str] = None,
mainloop: Optional[GLib.MainLoop] = None,
pipeline: Optional[Gst.Pipeline] = None,
handler: Optional["StreamHandler"] = None,
source: Optional["SourceElem"] = None,
head: Union[MappingType[str, Gst.Element], Gst.Element, None] = None,
) -> None:
"""Create a Stream that can be used to build a GStreamer-based pipeline.
Args:
name:
str, a name for the GStreamer pipeline (optional).
If not set, generates a unique name.
mainloop:
GLib.MainLoop, the GLib event loop to drive the GStreamer pipeline.
If not set, one will be created.
pipeline:
Gst.Pipeline, the GStreamer pipeline object that contains the pipeline graph.
If not set, one will be created.
handler:
StreamHandler, a handler which registers callbacks upon new bus messages and
stops the event loop upon EOS. If not set, one will be created.
source:
SourceElem, an object that stores source information as well as state/DQ vector
elements. If not set, one will be created.
head:
Union[MappingType[str, Gst.Element], Gst.Element], a pointer to the current
element in the pipeline. If not set, the Stream will not have any elements
attached to the pipeline upon instantiation.
"""
# initialize GStreamer if needed
if not self._gst_init:
Gst.init(None)
self._gst_init = True
# register caps to buffer mapping
if self._caps_buffer_map is None:
self._load_caps_buffer_map()
# set up gstreamer pipeline
self.name = name if name else str(uuid.uuid1())
self.mainloop = mainloop if mainloop else GLib.MainLoop()
self.pipeline = pipeline if pipeline else Gst.Pipeline(self.name)
self.handler = handler if handler else StreamHandler(self.mainloop, self.pipeline)
self.head = head
# set up source elem properties
self.source = source if source else None
[docs] def start(self) -> None:
"""Start the main event loop for this stream.
"""
if self.source.is_live:
simplehandler.OneTimeSignalHandler(self.pipeline)
self.set_state(Gst.State.READY)
if not self.source.is_live:
self._seek_gps()
self.set_state(Gst.State.PLAYING)
## Debugging output
if os.environ.get("GST_DEBUG_DUMP_DOT_DIR", False):
name = self.pipeline.get_name()
pipeparts.write_dump_dot(self.pipeline, f"{name}_PLAYING", verbose=True)
## Setup a signal handler to intercept SIGINT in order to write
## the pipeline graph at ctrl+C before cleanly shutting down
class SigHandler(simplehandler.OneTimeSignalHandler):
def do_on_call(self, signum, frame):
pipeparts.write_dump_dot(self.pipeline, f"{name}_SIGINT", verbose=True)
sighandler = SigHandler(self.pipeline)
self.mainloop.run()
[docs] @classmethod
def from_datasource(
cls,
data_source_info: datasource.DataSourceInfo,
ifos: Union[str, Iterable[str]],
name: Optional[str] = None,
verbose: bool = False,
state_vector: bool = False,
dq_vector: bool = False,
idq_series: bool = False
) -> "Stream":
"""Construct a Stream from a datasource.DataSourceInfo object.
Args:
data_source_info:
DataSourceInfo, the object to construct this stream with.
ifos:
Union[str, Iterable[str]], the detectors read timeseries data for.
name:
str, a name for the GStreamer pipeline (optional).
If not set, generates a unique name.
verbose:
bool, default False, whether to display logging/progress information.
state_vector:
bool, default False, whether to attach state vector information to this Stream
dq_vector:
bool, default False, whether to attach data quality vector information to this Stream
idq_series:
bool, default False, whether to fetch idq data information with this Stream
Returns:
Stream, the newly created stream.
"""
is_live = data_source_info.data_source in datasource.KNOWN_LIVE_DATASOURCES
if isinstance(ifos, str):
ifos = [ifos]
keyed = False
else:
keyed = True
stream = cls(name=name, head={})
state_vectors = {}
dq_vectors = {}
idq_series_data = {}
for ifo in ifos:
src, state_vectors[ifo], dq_vectors[ifo], idq_series_data[ifo] = datasource.mkbasicsrc(
stream.pipeline,
data_source_info,
ifo,
verbose=verbose
)
stream[ifo] = cls(
name=stream.name,
mainloop=stream.mainloop,
pipeline=stream.pipeline,
handler=stream.handler,
head=src,
)
stream.source = SourceElem(
datasource=data_source_info.data_source,
is_live=is_live,
gps_range=data_source_info.seg,
state_vector=state_vectors if state_vector else None,
dq_vector=dq_vectors if dq_vector else None,
idq_series = idq_series_data if idq_series else None,
)
if keyed:
return stream
else:
return stream[ifos[0]]
[docs] def connect(self, *args, **kwargs) -> None:
"""Attach a callback to one of this element's signals.
"""
self.head.connect(*args, **kwargs)
[docs] def bufsink(self,
func: Callable[[Buffer], None],
caps: Optional[Gst.Caps] = None
) -> None:
"""Terminate this stream with an appsink element and process new buffers with a callback.
Args:
func:
Callable[[Buffer], None], a callback that gets invoked when a new buffer is available
caps:
Gst.Caps, how to interpret the contents of the raw buffers.
If not set, defaults to raw audio buffers (audio/x-raw).
"""
def sample_handler(elem: Gst.Element):
buf = self._pull_buffer(elem, caps=caps)
func(buf)
return Gst.FlowReturn.OK
if isinstance(self.head, Mapping):
self._appsync = pipeparts.AppSync(appsink_new_buffer=sample_handler)
for key in self.keys():
self._appsync.add_sink(self.pipeline, self.head[key], name=key)
else:
sink = pipeparts.mkappsink(self.pipeline, self.head, max_buffers=1, sync=False)
sink.connect("new-sample", sample_handler)
sink.connect("new-preroll", self._preroll_handler)
[docs] def add_callback(self, msg_type: Gst.MessageType, *args) -> None:
"""Attach a callback which get invoked when new bus messages are available.
Args:
msg_type:
Gst.MessageType, the type of message to invoke a callback for.
*args:
extra arguments
"""
self.handler.add_callback(msg_type, *args)
[docs] def set_state(self, state: Gst.State) -> None:
"""Set pipeline state, checking for errors.
Args:
state:
Gst.State: The state to set this stream's pipeline to.
Raises:
RuntimeError:
If the pipeline failed to transition to the state specified.
"""
if self.pipeline.set_state(state) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError(f"pipeline failed to enter {state.value_name}")
[docs] def get_element_by_name(self, name: str) -> Gst.Element:
"""Retrieve an element from the stream's pipeline by name.
Args:
name:
str, the name of the element to retrieve
Returns:
Gst.Element, the element associated with the name given.
"""
return self.pipeline.get_by_name(name)
[docs] def post_message(self, msg_name: None, timestamp: Optional[int] = None) -> None:
"""Post a new application message to this stream's bus.
Args:
msg_name:
str, the name of the application message to send.
timestamp:
(int, optional), the timestamp to attach to this message.
"""
s = Gst.Structure.new_empty(msg_name)
message = Gst.Message.new_application(self.pipeline, s)
if timestamp:
message.timestamp = timestamp
self.pipeline.get_bus().post(message)
def __getitem__(self, key: str) -> "Stream":
"""Retrieves a new Stream with specified key.
"""
return self.__class__(
name=self.name,
mainloop=self.mainloop,
pipeline=self.pipeline,
handler=self.handler,
source=self.source,
head=self.head.setdefault(key, {}),
)
def __setitem__(self, key: str, value: "Stream") -> None:
"""Attach a new Stream with specified key/value pair.
"""
if self.pipeline:
assert self.name == value.name
assert self.mainloop is value.mainloop
assert self.pipeline is value.pipeline
assert self.handler is value.handler
assert self.source is value.source
else:
self.name = value.name
self.mainloop = value.mainloop
self.pipeline = value.pipeline
self.handler = value.handler
self.source = value.source
self.head[key] = value.head
[docs] def keys(self) -> Iterable[str]:
yield from self.head.keys()
[docs] def values(self) -> Iterable["Stream"]:
for key in self.keys():
yield self[key]
[docs] def items(self) -> Iterable[Tuple[str, "Stream"]]:
for key in self.keys():
yield key, self[key]
[docs] def clear(self) -> "Stream":
"""Return a new stream with all pointers to elements cleared out.
"""
return self.__class__(
name=self.name,
mainloop=self.mainloop,
pipeline=self.pipeline,
handler=self.handler,
source=self.source,
head={},
)
def _seek_gps(self) -> None:
"""Seek pipeline to the given gps start/end times.
"""
start, end = self.source.gps_range
datasource.pipeline_seek_for_gps(self.pipeline, start, end)
@classmethod
def _pull_buffer(cls, elem: Gst.Element, caps: Optional[Gst.Caps] = None):
# get buffer
sample = elem.emit("pull-sample")
buf = sample.get_buffer()
buftime = LIGOTimeGPS(0, buf.pts)
is_gap = bool(buf.mini_object.flags & Gst.BufferFlags.GAP)
if is_gap:
data = None
else:
# read from buffer
if caps:
data = []
for i in range(buf.n_memory()):
memory = buf.peek_memory(i)
success, mapinfo = memory.map(Gst.MapFlags.READ)
assert success
if mapinfo.data:
# FIXME: gst-python 1.18 returns a memoryview
# instead of a read-only bytes-like object, so
# cast to bytes. this is likely inefficient but
# a proper solution will require .from_buffer()
# to leverage the buffer protocol instead
rows = cls._caps_buffer_map[caps.to_string()](bytes(mapinfo.data))
data.extend(rows)
memory.unmap(mapinfo)
else:
data = pipeio.array_from_audio_sample(sample)
return Buffer(
name=elem.name,
t0=buftime,
duration=buf.duration,
data=data,
caps=sample.get_caps(),
is_gap=is_gap,
)
@classmethod
def _load_caps_buffer_map(cls) -> None:
bufmap = {}
# load table definitions if available
# FIXME: this is really ugly, revisit this with importlib or similar
try:
from gstlal.snglinspiraltable import GSTLALSnglInspiral
except ImportError:
pass
else:
bufmap["application/x-lal-snglinspiral"] = GSTLALSnglInspiral.from_buffer
try:
from gstlal.snglbursttable import GSTLALSnglBurst
except ImportError:
pass
else:
bufmap["application/x-lal-snglburst"] = GSTLALSnglBurst.from_buffer
try:
from gstlal.sngltriggertable import GSTLALSnglTrigger
except ImportError:
pass
else:
bufmap["application/gstlal-sngltrigger"] = GSTLALSnglTrigger.from_buffer
cls._caps_buffer_map = bufmap
@staticmethod
def _preroll_handler(elem: Gst.Element) -> Gst.FlowReturn:
buf = elem.emit("pull-preroll")
del buf
return Gst.FlowReturn.OK
[docs]class StreamHandler(simplehandler.Handler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# set up callbacks
self.callbacks = {
Gst.MessageType.ELEMENT: {},
Gst.MessageType.APPLICATION: {},
Gst.MessageType.EOS: {},
}
[docs] def add_callback(self, msg_type: Gst.MessageType, *args) -> None:
"""Attach a callback which get invoked when new bus messages are available.
Args:
msg_type:
Gst.MessageType, the type of message to invoke a callback for.
*args:
extra arguments
"""
# FIXME: would be better to rearrange the method signature so
# this extra step to determine args doesn't need to be done
if len(args) == 1:
msg_name = None
callback = args[0]
else:
msg_name, callback = args
if msg_name in self.callbacks[msg_type]:
raise ValueError("callback already registered for message type/name")
self.callbacks[msg_type][msg_name] = callback
[docs] def do_on_message(self, bus: Gst.Bus, message: Gst.Message):
"""Invoke registered callbacks when new bus messages are received.
Args:
bus:
Gst.Bus, the GStreamer bus.
message:
Gst.Message, the message received.
Returns:
bool, whether further message handling is performed by the parent class
with default cases for EOS, INFO, WARNING and ERROR messages.
"""
if message.type in self.callbacks:
if message.type == Gst.MessageType.EOS:
# EOS messages don't have specific subtypes so we don't
# parse the message's structure to determine how to proceed
message_name = None
elif message.get_structure():
message_name = message.get_structure().get_name()
else:
return False
if message_name in self.callbacks[message.type]:
self.callbacks[message.type][message_name](message)
return False