Source code for pipeparts.sink

""""Module for producing sink elements

"""
import math
import os
import sys
import threading
from typing import Tuple

import numpy
from lal import LIGOTimeGPS
from lal.utils import CacheEntry
from ligo import segments

import gi

gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst

GObject.threads_init()
Gst.init(None)

from gstlal.pipeparts import pipetools, pipedot, mux, encode, filters, transform

BYTE_ORDER = 'LE' if sys.byteorder == "little" else 'BE'


[docs]def framecpp_filesink_ldas_path_handler(elem: pipetools.Element, pspec, path_digits: Tuple[str, int]): """Add path for file sink to element Args: elem: Element, the element to which to add a filesink path property pspec: Unknown path_digits: Tuple[str, int], a string outpath and a directory digits int Examples: >>> filesinkelem.connect("notify::timestamp", framecpp_filesink_ldas_path_handler, (".", 5)) Returns: Element, with the formatted outpath attached as the "path" property """ outpath, dir_digits = path_digits # get timestamp and truncate to integer seconds timestamp = elem.get_property("timestamp") // Gst.SECOND # extract leading digits leading_digits = timestamp // 10 ** int(math.log10(timestamp) + 1 - dir_digits) # get other metadata instrument = elem.get_property("instrument") frame_type = elem.get_property("frame-type") # make target directory, and set path path = os.path.join(outpath, "%s-%s-%d" % (instrument, frame_type, leading_digits)) if not os.path.exists(path): os.makedirs(path) elem.set_property("path", path)
[docs]def framecpp_filesink_cache_entry_from_mfs_message(message): """Translate an element message posted by the multifilesink element inside a framecpp_filesink bin into a lal.utils.CacheEntry object describing the file being written by the multifilesink element. """ # extract the segment spanned by the file from the message directly start = LIGOTimeGPS(0, message.get_structure()["timestamp"]) end = start + LIGOTimeGPS(0, message.get_structure()["duration"]) # retrieve the framecpp_filesink bin (for instrument/observatory # and frame file type) parent = message.src.get_parent() # construct and return a CacheEntry object return CacheEntry(parent.get_property("instrument"), parent.get_property("frame-type"), segments.segment(start, end), "file://localhost%s" % os.path.abspath(message.get_structure()["filename"]))
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-multifilesink.html">multifilesink</a> element to a pipeline with useful default properties
[docs]def multi_file(pipeline: pipetools.Pipeline, src: pipetools.Element, next_file: int = 0, sync: bool = False, async_: bool = False, **properties) -> pipetools.Element: """Adds a sink element to a pipeline with useful default properties Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element next_file: int, default 0 sync: bool, default False async_: bool, default False **properties: Returns: Element """ properties["async"] = async_ return pipetools.make_element_with_src(pipeline, src, "multifilesink", next_file=next_file, sync=sync, **properties)
[docs]def gwf(pipeline: pipetools.Pipeline, src: pipetools.Element, message_forward: bool = True, **properties) -> pipetools.Element: """Add a framecpp file sink element to pipeline, that will write out a GWF file Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element message_forward: bool, default True **properties: References: Implementation: gstlal-ugly/gst/framecpp/framecpp_filesink.c Returns: Element """ post_messages = properties.pop("post_messages", True) elem = pipetools.make_element_with_src(pipeline, src, "framecpp_filesink", message_forward=message_forward, **properties) # FIXME: there's supposed to be some sort of proxy mechanism for # setting properties on child elements, but we can't seem to get # anything to work elem.get_by_name("multifilesink").set_property("post-messages", post_messages) return elem
## Adds a <a href="@gstdoc/gstreamer-plugins-fakesink.html">fakesink</a> element to a pipeline with useful default properties
[docs]def fake(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element: """Create a fake sink element Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "fakesink", sync=False, **{"async": False})
## Adds a <a href="@gstdoc/gstreamer-plugins-filesink.html">filesink</a> element to a pipeline with useful default properties
[docs]def file(pipeline: pipetools.Pipeline, src: pipetools.Element, filename: str, sync: bool = False, async_: bool = False) -> pipetools.Element: """Add file sink to pipeline Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element filename: str, the name of the output file sync: bool, default False async_: bool, default False Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "filesink", sync=sync, buffer_mode=2, location=filename, **{"async": async_})
## Adds a <a href="@gstlalgtkdoc/GstTSVEnc.html">lal_nxydump</a> element to a pipeline with useful default properties
[docs]def tsv(pipeline: pipetools.Pipeline, src: pipetools.Element, filename: str, segment: pipetools.Segment = None) -> pipetools.Element: """Converts audio time-series to tab-separated ascii text, a format compatible with most plotting utilities. The output is multi-column tab-separated ASCII text. The first column is the time, the remaining columns are the values of the channels in order. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element filename: str, the filename of the output text file segment: Segment, default None, a ligo.segments.segment Returns: Element """ if segment is not None: elem = pipetools.make_element_with_src(pipeline, src, "lal_nxydump", start_time=segment[0].ns(), stop_time=segment[1].ns()) else: elem = pipetools.make_element_with_src(pipeline, src, "lal_nxydump") return file(pipeline, elem, filename)
[docs]def ogm_video(pipeline: pipetools.Pipeline, videosrc: pipetools.Element, filename: str, audiosrc: pipetools.Element = None, verbose: bool = False): """Make a ogm video sink element Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added videosrc: Gst.Element, the video source element filename: str, the name of the output video file audiosrc: Gst.Element, default None, the audio source element verbose: bool, default False Returns: Element, the sink element """ src = transform.colorspace(pipeline, videosrc) src = filters.caps(pipeline, src, "video/x-raw-yuv, format=(fourcc)I420") src = encode.theora(pipeline, src, # border=2, quality=48, # quick=False ) src = mux.ogg_mux(pipeline, src) if audiosrc is not None: encode.flac(pipeline, filters.caps(pipeline, transform.audio_convert(pipeline, audiosrc), "audio/x-raw, format=S24%s" % BYTE_ORDER)).link(src) if verbose: src = progress_report(pipeline, src, filename) return file(pipeline, src, filename)
[docs]def auto_video(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element: """Create a video sink that automatically detects an appropriate video sink to use. It does so by scanning the registry for all elements that have "Sink" and "Video" in the class field of their element information, and also have a non-zero autoplugging rank. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element References: [1] https://gstreamer.freedesktop.org/documentation/autodetect/autovideosink.html?gi-language=python Returns: Element """ return pipetools.make_element_with_src(pipeline, transform.colorspace(pipeline, src), "autovideosink")
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-autoaudiosink.html">autoaudiosink</a> element to a pipeline with useful default properties
[docs]def auto_audio(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element: """Create an audio sink that automatically detects an appropriate audio sink to use. It does so by scanning the registry for all elements that have "Sink" and "Audio" in the class field of their element information, and also have a non-zero autoplugging rank. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element References: [1] https://gstreamer.freedesktop.org/documentation/autodetect/autoaudiosink.html?gi-language=python Returns: Element """ return pipetools.make_element_with_src(pipeline, transform.queue(pipeline, src), "autoaudiosink")
[docs]def playback(pipeline: pipetools.Pipeline, src: pipetools.Element, amplification: float = 0.1) -> pipetools.Element: """Create a playback pipeline and add it to existing pipeline Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element amplification: float, default 0.1 Returns: Element """ elems = ( Gst.ElementFactory.make("audioconvert", None), Gst.ElementFactory.make("capsfilter", None), Gst.ElementFactory.make("audioamplify", None), Gst.ElementFactory.make("audioconvert", None), Gst.ElementFactory.make("queue", None), Gst.ElementFactory.make("autoaudiosink", None) ) elems[1].set_property("caps", Gst.Caps.from_string("audio/x-raw, format=F32%s" % BYTE_ORDER)) elems[2].set_property("amplification", amplification) elems[4].set_property("max-size-time", 1 * Gst.SECOND) pipeline.add(*elems) return Gst.element_link_many(src, *elems) # MOD: Error line [733]: element_link_many not yet implemented. See web page **
[docs]def tsv_tee(pipeline: pipetools.Pipeline, src: pipetools.Element, *args, **properties) -> pipetools.Element: """Split data from source to an nxy dump Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element *args: **properties: Returns: Element """ t = transform.tee(pipeline, src) tsv(pipeline, transform.queue(pipeline, t), *args, **properties) return t
[docs]def trigger_xml_writer(pipeline: pipetools.Pipeline, src: pipetools.Element, filename: str): """Write xml file Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element filename: str, output path for xml file References: Implementation Returns: Element """ return pipetools.make_element_with_src(pipeline, src, "lal_triggerxmlwriter", location=filename, sync=False, **{"async": False})
# FIXME no specific alias for this url since this library only has one element. # DO NOT DOCUMENT OTHER CODES THIS WAY! Use @gstdoc @gstpluginsbasedoc etc. ## Adds a <a href="http://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-base-libs/html/gstreamer-app.html">appsink</a> element to a pipeline with useful default properties
[docs]def app(pipeline: pipetools.Pipeline, src: pipetools.Element, max_buffers: int = 1, drop: bool = False, sync: bool = False, async_: bool = False, **properties): """Create an app sink, Appsink is a sink plugin that supports many different methods for making the application get a handle on the GStreamer data in a pipeline. Unlike most GStreamer elements, Appsink provides external API functions. Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element max_buffers: int, default 1 drop: bool, default False sync: bool, default False async_: bool, default False **properties: References: [1] https://gstreamer.freedesktop.org/documentation/app/appsink.html?gi-language=python Returns: Element """ properties["async"] = async_ return pipetools.make_element_with_src(pipeline, src, "appsink", sync=sync, emit_signals=True, max_buffers=max_buffers, drop=drop, **properties)
[docs]class AppSync(object): def __init__(self, appsink_new_buffer, appsinks=[]): self.lock = threading.Lock() # handler to invoke on availability of new time-ordered # buffer self.appsink_new_buffer = appsink_new_buffer # element --> timestamp of current buffer or None if no # buffer yet available self.appsinks = {} # set of sink elements that are currently at EOS self.at_eos = set() # attach handlers to appsink elements provided at this time for elem in appsinks: self.attach(elem)
[docs] def add_sink(self, pipeline, src, drop=False, **properties): return self.attach(app(pipeline, src, drop=drop, **properties))
[docs] def attach(self, appsink): """ connect this AppSync's signal handlers to the given appsink element. the element's max-buffers property will be set to 1 (required for AppSync to work). """ if appsink in self.appsinks: raise ValueError("duplicate appsinks %s" % repr(appsink)) appsink.set_property("max-buffers", 1) handler_id = appsink.connect("new-preroll", self.new_preroll_handler) assert handler_id > 0 handler_id = appsink.connect("new-sample", self.new_sample_handler) assert handler_id > 0 handler_id = appsink.connect("eos", self.eos_handler) assert handler_id > 0 self.appsinks[appsink] = None return appsink
[docs] def new_preroll_handler(self, elem): with self.lock: # clear eos status self.at_eos.discard(elem) # ignore preroll buffers elem.emit("pull-preroll") return Gst.FlowReturn.OK
[docs] def new_sample_handler(self, elem): with self.lock: # clear eos status, and retrieve buffer timestamp self.at_eos.discard(elem) assert self.appsinks[elem] is None self.appsinks[elem] = elem.get_last_sample().get_buffer().pts # pull available buffers from appsink elements return self.pull_buffers(elem)
[docs] def eos_handler(self, elem): with self.lock: # set eos status self.at_eos.add(elem) # pull available buffers from appsink elements return self.pull_buffers(elem)
[docs] def pull_buffers(self, elem): """ for internal use. must be called with lock held. """ # keep looping while we can process buffers while 1: # retrieve the timestamps of all elements that # aren't at eos and all elements at eos that still # have buffers in them timestamps = [(t, e) for e, t in self.appsinks.items() if e not in self.at_eos or t is not None] # if all elements are at eos and none have buffers, # then we're at eos if not timestamps: return Gst.FlowReturn.EOS # find the element with the oldest timestamp. None # compares as less than everything, so we'll find # any element (that isn't at eos) that doesn't yet # have a buffer (elements at eos and that are # without buffers aren't in the list) timestamp, elem_with_oldest = min(timestamps, key=lambda x: x[0] if x[0] is not None else -numpy.inf) # if there's an element without a buffer, quit for # now --- we require all non-eos elements to have # buffers before proceding if timestamp is None: return Gst.FlowReturn.OK # clear timestamp and pass element to handler func. # function call is done last so that all of our # book-keeping has been taken care of in case an # exception gets raised self.appsinks[elem_with_oldest] = None self.appsink_new_buffer(elem_with_oldest)
[docs]class ConnectAppsinkDumpDot(object): """Add a signal handler to write a pipeline graph upon receipt of the first trigger buffer. the caps in the pipeline graph are not fully negotiated until data comes out the end, so this version of the graph shows the final formats on all links """ def __init__(self, pipeline, appsinks, basename, verbose=False): self.pipeline = pipeline self.filestem = "%s.%s" % (basename, "TRIGGERS") self.verbose = verbose # map element to handler ID self.remaining_lock = threading.Lock() self.remaining = {} for sink in appsinks: self.remaining[sink] = sink.connect_after("new-preroll", self.execute) assert self.remaining[sink] > 0
[docs] def execute(self, elem): with self.remaining_lock: handler_id = self.remaining.pop(elem) if not self.remaining: pipedot.write_dump_dot(self.pipeline, self.filestem, verbose=self.verbose) elem.disconnect(handler_id) return Gst.FlowReturn.OK
[docs]def tcp_server(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element: """Create a sink via TCP server Args: pipeline: Gst.Pipeline, the pipeline to which the new element will be added src: Gst.Element, the source element **properties: References: [1] https://gstreamer.freedesktop.org/documentation/tcp/tcpserversink.html?gi-language=python Returns: Element """ # units_soft_max = 1 GB # FIXME: are these sensible defaults? return pipetools.make_element_with_src(pipeline, src, "tcpserversink", sync=True, sync_method="latest-keyframe", recover_policy="keyframe", unit_type="bytes", units_soft_max=1024 ** 3, **properties)