Source code for stream

# Copyright (C) 2020  Patrick Godwin
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""High-level tools to build GStreamer pipelines.

"""
import os
import uuid
from collections import namedtuple
from collections.abc import Mapping
from typing import Callable, Iterable, Optional, Tuple, Union
from typing import Mapping as MappingType

import gi

gi.require_version('Gst', '1.0')
gi.require_version('GstAudio', '1.0')
gi.require_version('GLib', '2.0')
from gi.repository import Gst, GLib

from lal import LIGOTimeGPS

from gstlal import datasource
from gstlal import pipeparts
from gstlal import pipeio
from gstlal import simplehandler
from gstlal.utilities.element_registry import ElementRegistry


SourceElem = namedtuple("SourceElem", "datasource is_live gps_range state_vector dq_vector idq_series")
Buffer = namedtuple("Buffer", "name t0 duration data caps is_gap")

MessageType = Gst.MessageType


[docs]class Stream(ElementRegistry): """Class for building a GStreamer-based pipeline. """ _gst_init = False _has_elements = False _caps_buffer_map = None def __init__( self, name: Optional[str] = None, mainloop: Optional[GLib.MainLoop] = None, pipeline: Optional[Gst.Pipeline] = None, handler: Optional["StreamHandler"] = None, source: Optional["SourceElem"] = None, head: Union[MappingType[str, Gst.Element], Gst.Element, None] = None, ) -> None: """Create a Stream that can be used to build a GStreamer-based pipeline. Args: name: str, a name for the GStreamer pipeline (optional). If not set, generates a unique name. mainloop: GLib.MainLoop, the GLib event loop to drive the GStreamer pipeline. If not set, one will be created. pipeline: Gst.Pipeline, the GStreamer pipeline object that contains the pipeline graph. If not set, one will be created. handler: StreamHandler, a handler which registers callbacks upon new bus messages and stops the event loop upon EOS. If not set, one will be created. source: SourceElem, an object that stores source information as well as state/DQ vector elements. If not set, one will be created. head: Union[MappingType[str, Gst.Element], Gst.Element], a pointer to the current element in the pipeline. If not set, the Stream will not have any elements attached to the pipeline upon instantiation. """ # initialize GStreamer if needed if not self._gst_init: Gst.init(None) self._gst_init = True # register caps to buffer mapping if self._caps_buffer_map is None: self._load_caps_buffer_map() # set up gstreamer pipeline self.name = name if name else str(uuid.uuid1()) self.mainloop = mainloop if mainloop else GLib.MainLoop() self.pipeline = pipeline if pipeline else Gst.Pipeline(self.name) self.handler = handler if handler else StreamHandler(self.mainloop, self.pipeline) self.head = head # set up source elem properties self.source = source if source else None
[docs] def start(self) -> None: """Start the main event loop for this stream. """ if self.source.is_live: simplehandler.OneTimeSignalHandler(self.pipeline) self.set_state(Gst.State.READY) if not self.source.is_live: self._seek_gps() self.set_state(Gst.State.PLAYING) ## Debugging output if os.environ.get("GST_DEBUG_DUMP_DOT_DIR", False): name = self.pipeline.get_name() pipeparts.write_dump_dot(self.pipeline, f"{name}_PLAYING", verbose=True) ## Setup a signal handler to intercept SIGINT in order to write ## the pipeline graph at ctrl+C before cleanly shutting down class SigHandler(simplehandler.OneTimeSignalHandler): def do_on_call(self, signum, frame): pipeparts.write_dump_dot(self.pipeline, f"{name}_SIGINT", verbose=True) sighandler = SigHandler(self.pipeline) self.mainloop.run()
[docs] @classmethod def from_datasource( cls, data_source_info: datasource.DataSourceInfo, ifos: Union[str, Iterable[str]], name: Optional[str] = None, verbose: bool = False, state_vector: bool = False, dq_vector: bool = False, idq_series: bool = False ) -> "Stream": """Construct a Stream from a datasource.DataSourceInfo object. Args: data_source_info: DataSourceInfo, the object to construct this stream with. ifos: Union[str, Iterable[str]], the detectors read timeseries data for. name: str, a name for the GStreamer pipeline (optional). If not set, generates a unique name. verbose: bool, default False, whether to display logging/progress information. state_vector: bool, default False, whether to attach state vector information to this Stream dq_vector: bool, default False, whether to attach data quality vector information to this Stream idq_series: bool, default False, whether to fetch idq data information with this Stream Returns: Stream, the newly created stream. """ is_live = data_source_info.data_source in datasource.KNOWN_LIVE_DATASOURCES if isinstance(ifos, str): ifos = [ifos] keyed = False else: keyed = True stream = cls(name=name, head={}) state_vectors = {} dq_vectors = {} idq_series_data = {} for ifo in ifos: src, state_vectors[ifo], dq_vectors[ifo], idq_series_data[ifo] = datasource.mkbasicsrc( stream.pipeline, data_source_info, ifo, verbose=verbose ) stream[ifo] = cls( name=stream.name, mainloop=stream.mainloop, pipeline=stream.pipeline, handler=stream.handler, head=src, ) stream.source = SourceElem( datasource=data_source_info.data_source, is_live=is_live, gps_range=data_source_info.seg, state_vector=state_vectors if state_vector else None, dq_vector=dq_vectors if dq_vector else None, idq_series = idq_series_data if idq_series else None, ) if keyed: return stream else: return stream[ifos[0]]
[docs] def connect(self, *args, **kwargs) -> None: """Attach a callback to one of this element's signals. """ self.head.connect(*args, **kwargs)
[docs] def bufsink(self, func: Callable[[Buffer], None], caps: Optional[Gst.Caps] = None ) -> None: """Terminate this stream with an appsink element and process new buffers with a callback. Args: func: Callable[[Buffer], None], a callback that gets invoked when a new buffer is available caps: Gst.Caps, how to interpret the contents of the raw buffers. If not set, defaults to raw audio buffers (audio/x-raw). """ def sample_handler(elem: Gst.Element): buf = self._pull_buffer(elem, caps=caps) func(buf) return Gst.FlowReturn.OK if isinstance(self.head, Mapping): self._appsync = pipeparts.AppSync(appsink_new_buffer=sample_handler) for key in self.keys(): self._appsync.add_sink(self.pipeline, self.head[key], name=key) else: sink = pipeparts.mkappsink(self.pipeline, self.head, max_buffers=1, sync=False) sink.connect("new-sample", sample_handler) sink.connect("new-preroll", self._preroll_handler)
[docs] def add_callback(self, msg_type: Gst.MessageType, *args) -> None: """Attach a callback which get invoked when new bus messages are available. Args: msg_type: Gst.MessageType, the type of message to invoke a callback for. *args: extra arguments """ self.handler.add_callback(msg_type, *args)
[docs] def set_state(self, state: Gst.State) -> None: """Set pipeline state, checking for errors. Args: state: Gst.State: The state to set this stream's pipeline to. Raises: RuntimeError: If the pipeline failed to transition to the state specified. """ if self.pipeline.set_state(state) == Gst.StateChangeReturn.FAILURE: raise RuntimeError(f"pipeline failed to enter {state.value_name}")
[docs] def get_element_by_name(self, name: str) -> Gst.Element: """Retrieve an element from the stream's pipeline by name. Args: name: str, the name of the element to retrieve Returns: Gst.Element, the element associated with the name given. """ return self.pipeline.get_by_name(name)
[docs] def post_message(self, msg_name: None, timestamp: Optional[int] = None) -> None: """Post a new application message to this stream's bus. Args: msg_name: str, the name of the application message to send. timestamp: (int, optional), the timestamp to attach to this message. """ s = Gst.Structure.new_empty(msg_name) message = Gst.Message.new_application(self.pipeline, s) if timestamp: message.timestamp = timestamp self.pipeline.get_bus().post(message)
def __getitem__(self, key: str) -> "Stream": """Retrieves a new Stream with specified key. """ return self.__class__( name=self.name, mainloop=self.mainloop, pipeline=self.pipeline, handler=self.handler, source=self.source, head=self.head.setdefault(key, {}), ) def __setitem__(self, key: str, value: "Stream") -> None: """Attach a new Stream with specified key/value pair. """ if self.pipeline: assert self.name == value.name assert self.mainloop is value.mainloop assert self.pipeline is value.pipeline assert self.handler is value.handler assert self.source is value.source else: self.name = value.name self.mainloop = value.mainloop self.pipeline = value.pipeline self.handler = value.handler self.source = value.source self.head[key] = value.head
[docs] def keys(self) -> Iterable[str]: yield from self.head.keys()
[docs] def values(self) -> Iterable["Stream"]: for key in self.keys(): yield self[key]
[docs] def items(self) -> Iterable[Tuple[str, "Stream"]]: for key in self.keys(): yield key, self[key]
[docs] def clear(self) -> "Stream": """Return a new stream with all pointers to elements cleared out. """ return self.__class__( name=self.name, mainloop=self.mainloop, pipeline=self.pipeline, handler=self.handler, source=self.source, head={}, )
def _seek_gps(self) -> None: """Seek pipeline to the given gps start/end times. """ start, end = self.source.gps_range datasource.pipeline_seek_for_gps(self.pipeline, start, end) @classmethod def _pull_buffer(cls, elem: Gst.Element, caps: Optional[Gst.Caps] = None): # get buffer sample = elem.emit("pull-sample") buf = sample.get_buffer() buftime = LIGOTimeGPS(0, buf.pts) is_gap = bool(buf.mini_object.flags & Gst.BufferFlags.GAP) if is_gap: data = None else: # read from buffer if caps: data = [] for i in range(buf.n_memory()): memory = buf.peek_memory(i) success, mapinfo = memory.map(Gst.MapFlags.READ) assert success if mapinfo.data: # FIXME: gst-python 1.18 returns a memoryview # instead of a read-only bytes-like object, so # cast to bytes. this is likely inefficient but # a proper solution will require .from_buffer() # to leverage the buffer protocol instead rows = cls._caps_buffer_map[caps.to_string()](bytes(mapinfo.data)) data.extend(rows) memory.unmap(mapinfo) else: data = pipeio.array_from_audio_sample(sample) return Buffer( name=elem.name, t0=buftime, duration=buf.duration, data=data, caps=sample.get_caps(), is_gap=is_gap, ) @classmethod def _load_caps_buffer_map(cls) -> None: bufmap = {} # load table definitions if available # FIXME: this is really ugly, revisit this with importlib or similar try: from gstlal.snglinspiraltable import GSTLALSnglInspiral except ImportError: pass else: bufmap["application/x-lal-snglinspiral"] = GSTLALSnglInspiral.from_buffer try: from gstlal.snglbursttable import GSTLALSnglBurst except ImportError: pass else: bufmap["application/x-lal-snglburst"] = GSTLALSnglBurst.from_buffer try: from gstlal.sngltriggertable import GSTLALSnglTrigger except ImportError: pass else: bufmap["application/gstlal-sngltrigger"] = GSTLALSnglTrigger.from_buffer cls._caps_buffer_map = bufmap @staticmethod def _preroll_handler(elem: Gst.Element) -> Gst.FlowReturn: buf = elem.emit("pull-preroll") del buf return Gst.FlowReturn.OK
[docs]class StreamHandler(simplehandler.Handler): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # set up callbacks self.callbacks = { Gst.MessageType.ELEMENT: {}, Gst.MessageType.APPLICATION: {}, Gst.MessageType.EOS: {}, }
[docs] def add_callback(self, msg_type: Gst.MessageType, *args) -> None: """Attach a callback which get invoked when new bus messages are available. Args: msg_type: Gst.MessageType, the type of message to invoke a callback for. *args: extra arguments """ # FIXME: would be better to rearrange the method signature so # this extra step to determine args doesn't need to be done if len(args) == 1: msg_name = None callback = args[0] else: msg_name, callback = args if msg_name in self.callbacks[msg_type]: raise ValueError("callback already registered for message type/name") self.callbacks[msg_type][msg_name] = callback
[docs] def do_on_message(self, bus: Gst.Bus, message: Gst.Message): """Invoke registered callbacks when new bus messages are received. Args: bus: Gst.Bus, the GStreamer bus. message: Gst.Message, the message received. Returns: bool, whether further message handling is performed by the parent class with default cases for EOS, INFO, WARNING and ERROR messages. """ if message.type in self.callbacks: if message.type == Gst.MessageType.EOS: # EOS messages don't have specific subtypes so we don't # parse the message's structure to determine how to proceed message_name = None elif message.get_structure(): message_name = message.get_structure().get_name() else: return False if message_name in self.callbacks[message.type]: self.callbacks[message.type][message_name](message) return False