""""Module for producing sink elements
"""
import math
import os
import sys
import threading
from typing import Tuple
import numpy
from lal import LIGOTimeGPS
from lal.utils import CacheEntry
from ligo import segments
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject
from gi.repository import Gst
GObject.threads_init()
Gst.init(None)
from gstlal.pipeparts import pipetools, pipedot, mux, encode, filters, transform
BYTE_ORDER = 'LE' if sys.byteorder == "little" else 'BE'
[docs]def framecpp_filesink_ldas_path_handler(elem: pipetools.Element, pspec, path_digits: Tuple[str, int]):
"""Add path for file sink to element
Args:
elem:
Element, the element to which to add a filesink path property
pspec:
Unknown
path_digits:
Tuple[str, int], a string outpath and a directory digits int
Examples:
>>> filesinkelem.connect("notify::timestamp", framecpp_filesink_ldas_path_handler, (".", 5))
Returns:
Element, with the formatted outpath attached as the "path" property
"""
outpath, dir_digits = path_digits
# get timestamp and truncate to integer seconds
timestamp = elem.get_property("timestamp") // Gst.SECOND
# extract leading digits
leading_digits = timestamp // 10 ** int(math.log10(timestamp) + 1 - dir_digits)
# get other metadata
instrument = elem.get_property("instrument")
frame_type = elem.get_property("frame-type")
# make target directory, and set path
path = os.path.join(outpath, "%s-%s-%d" % (instrument, frame_type, leading_digits))
if not os.path.exists(path):
os.makedirs(path)
elem.set_property("path", path)
[docs]def framecpp_filesink_cache_entry_from_mfs_message(message):
"""Translate an element message posted by the multifilesink element
inside a framecpp_filesink bin into a lal.utils.CacheEntry object
describing the file being written by the multifilesink element.
"""
# extract the segment spanned by the file from the message directly
start = LIGOTimeGPS(0, message.get_structure()["timestamp"])
end = start + LIGOTimeGPS(0, message.get_structure()["duration"])
# retrieve the framecpp_filesink bin (for instrument/observatory
# and frame file type)
parent = message.src.get_parent()
# construct and return a CacheEntry object
return CacheEntry(parent.get_property("instrument"), parent.get_property("frame-type"), segments.segment(start, end),
"file://localhost%s" % os.path.abspath(message.get_structure()["filename"]))
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-multifilesink.html">multifilesink</a> element to a pipeline with useful default properties
[docs]def multi_file(pipeline: pipetools.Pipeline, src: pipetools.Element, next_file: int = 0, sync: bool = False, async_: bool = False, **properties) -> pipetools.Element:
"""Adds a sink element to a pipeline with useful default properties
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
next_file:
int, default 0
sync:
bool, default False
async_:
bool, default False
**properties:
Returns:
Element
"""
properties["async"] = async_
return pipetools.make_element_with_src(pipeline, src, "multifilesink", next_file=next_file, sync=sync, **properties)
[docs]def gwf(pipeline: pipetools.Pipeline, src: pipetools.Element, message_forward: bool = True, **properties) -> pipetools.Element:
"""Add a framecpp file sink element to pipeline, that will write out a GWF file
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
message_forward:
bool, default True
**properties:
References:
Implementation: gstlal-ugly/gst/framecpp/framecpp_filesink.c
Returns:
Element
"""
post_messages = properties.pop("post_messages", True)
elem = pipetools.make_element_with_src(pipeline, src, "framecpp_filesink", message_forward=message_forward, **properties)
# FIXME: there's supposed to be some sort of proxy mechanism for
# setting properties on child elements, but we can't seem to get
# anything to work
elem.get_by_name("multifilesink").set_property("post-messages", post_messages)
return elem
## Adds a <a href="@gstdoc/gstreamer-plugins-fakesink.html">fakesink</a> element to a pipeline with useful default properties
[docs]def fake(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element:
"""Create a fake sink element
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "fakesink", sync=False, **{"async": False})
## Adds a <a href="@gstdoc/gstreamer-plugins-filesink.html">filesink</a> element to a pipeline with useful default properties
[docs]def file(pipeline: pipetools.Pipeline, src: pipetools.Element, filename: str, sync: bool = False, async_: bool = False) -> pipetools.Element:
"""Add file sink to pipeline
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
filename:
str, the name of the output file
sync:
bool, default False
async_:
bool, default False
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "filesink", sync=sync, buffer_mode=2, location=filename, **{"async": async_})
## Adds a <a href="@gstlalgtkdoc/GstTSVEnc.html">lal_nxydump</a> element to a pipeline with useful default properties
[docs]def tsv(pipeline: pipetools.Pipeline, src: pipetools.Element, filename: str, segment: pipetools.Segment = None) -> pipetools.Element:
"""Converts audio time-series to tab-separated ascii text, a format compatible with most plotting utilities.
The output is multi-column tab-separated ASCII text. The first column is the time, the remaining columns are
the values of the channels in order.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
filename:
str, the filename of the output text file
segment:
Segment, default None, a ligo.segments.segment
Returns:
Element
"""
if segment is not None:
elem = pipetools.make_element_with_src(pipeline, src, "lal_nxydump", start_time=segment[0].ns(), stop_time=segment[1].ns())
else:
elem = pipetools.make_element_with_src(pipeline, src, "lal_nxydump")
return file(pipeline, elem, filename)
[docs]def ogm_video(pipeline: pipetools.Pipeline, videosrc: pipetools.Element, filename: str, audiosrc: pipetools.Element = None, verbose: bool = False):
"""Make a ogm video sink element
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
videosrc:
Gst.Element, the video source element
filename:
str, the name of the output video file
audiosrc:
Gst.Element, default None, the audio source element
verbose:
bool, default False
Returns:
Element, the sink element
"""
src = transform.colorspace(pipeline, videosrc)
src = filters.caps(pipeline, src, "video/x-raw-yuv, format=(fourcc)I420")
src = encode.theora(pipeline, src,
# border=2,
quality=48,
# quick=False
)
src = mux.ogg_mux(pipeline, src)
if audiosrc is not None:
encode.flac(pipeline, filters.caps(pipeline, transform.audio_convert(pipeline, audiosrc), "audio/x-raw, format=S24%s" % BYTE_ORDER)).link(src)
if verbose:
src = progress_report(pipeline, src, filename)
return file(pipeline, src, filename)
[docs]def auto_video(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element:
"""Create a video sink that automatically detects an appropriate video sink to use. It does so by scanning the
registry for all elements that have "Sink" and "Video" in the class field of their element information, and
also have a non-zero autoplugging rank.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
References:
[1] https://gstreamer.freedesktop.org/documentation/autodetect/autovideosink.html?gi-language=python
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, transform.colorspace(pipeline, src), "autovideosink")
## Adds a <a href="@gstpluginsgooddoc/gst-plugins-good-plugins-autoaudiosink.html">autoaudiosink</a> element to a pipeline with useful default properties
[docs]def auto_audio(pipeline: pipetools.Pipeline, src: pipetools.Element) -> pipetools.Element:
"""Create an audio sink that automatically detects an appropriate audio sink to use. It does so by
scanning the registry for all elements that have "Sink" and "Audio" in the class field of their element
information, and also have a non-zero autoplugging rank.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
References:
[1] https://gstreamer.freedesktop.org/documentation/autodetect/autoaudiosink.html?gi-language=python
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, transform.queue(pipeline, src), "autoaudiosink")
[docs]def playback(pipeline: pipetools.Pipeline, src: pipetools.Element, amplification: float = 0.1) -> pipetools.Element:
"""Create a playback pipeline and add it to existing pipeline
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
amplification:
float, default 0.1
Returns:
Element
"""
elems = (
Gst.ElementFactory.make("audioconvert", None),
Gst.ElementFactory.make("capsfilter", None),
Gst.ElementFactory.make("audioamplify", None),
Gst.ElementFactory.make("audioconvert", None),
Gst.ElementFactory.make("queue", None),
Gst.ElementFactory.make("autoaudiosink", None)
)
elems[1].set_property("caps", Gst.Caps.from_string("audio/x-raw, format=F32%s" % BYTE_ORDER))
elems[2].set_property("amplification", amplification)
elems[4].set_property("max-size-time", 1 * Gst.SECOND)
pipeline.add(*elems)
return Gst.element_link_many(src, *elems) # MOD: Error line [733]: element_link_many not yet implemented. See web page **
[docs]def tsv_tee(pipeline: pipetools.Pipeline, src: pipetools.Element, *args, **properties) -> pipetools.Element:
"""Split data from source to an nxy dump
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
*args:
**properties:
Returns:
Element
"""
t = transform.tee(pipeline, src)
tsv(pipeline, transform.queue(pipeline, t), *args, **properties)
return t
[docs]def trigger_xml_writer(pipeline: pipetools.Pipeline, src: pipetools.Element, filename: str):
"""Write xml file
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
filename:
str, output path for xml file
References:
Implementation
Returns:
Element
"""
return pipetools.make_element_with_src(pipeline, src, "lal_triggerxmlwriter", location=filename, sync=False, **{"async": False})
# FIXME no specific alias for this url since this library only has one element.
# DO NOT DOCUMENT OTHER CODES THIS WAY! Use @gstdoc @gstpluginsbasedoc etc.
## Adds a <a href="http://gstreamer.freedesktop.org/data/doc/gstreamer/head/gst-plugins-base-libs/html/gstreamer-app.html">appsink</a> element to a pipeline with useful default properties
[docs]def app(pipeline: pipetools.Pipeline, src: pipetools.Element, max_buffers: int = 1, drop: bool = False, sync: bool = False, async_: bool = False, **properties):
"""Create an app sink, Appsink is a sink plugin that supports many different methods for making the
application get a handle on the GStreamer data in a pipeline. Unlike most GStreamer elements,
Appsink provides external API functions.
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
max_buffers:
int, default 1
drop:
bool, default False
sync:
bool, default False
async_:
bool, default False
**properties:
References:
[1] https://gstreamer.freedesktop.org/documentation/app/appsink.html?gi-language=python
Returns:
Element
"""
properties["async"] = async_
return pipetools.make_element_with_src(pipeline, src, "appsink", sync=sync, emit_signals=True, max_buffers=max_buffers, drop=drop, **properties)
[docs]class AppSync(object):
def __init__(self, appsink_new_buffer, appsinks=[]):
self.lock = threading.Lock()
# handler to invoke on availability of new time-ordered
# buffer
self.appsink_new_buffer = appsink_new_buffer
# element --> timestamp of current buffer or None if no
# buffer yet available
self.appsinks = {}
# set of sink elements that are currently at EOS
self.at_eos = set()
# attach handlers to appsink elements provided at this time
for elem in appsinks:
self.attach(elem)
[docs] def add_sink(self, pipeline, src, drop=False, **properties):
return self.attach(app(pipeline, src, drop=drop, **properties))
[docs] def attach(self, appsink):
"""
connect this AppSync's signal handlers to the given appsink
element. the element's max-buffers property will be set to
1 (required for AppSync to work).
"""
if appsink in self.appsinks:
raise ValueError("duplicate appsinks %s" % repr(appsink))
appsink.set_property("max-buffers", 1)
handler_id = appsink.connect("new-preroll", self.new_preroll_handler)
assert handler_id > 0
handler_id = appsink.connect("new-sample", self.new_sample_handler)
assert handler_id > 0
handler_id = appsink.connect("eos", self.eos_handler)
assert handler_id > 0
self.appsinks[appsink] = None
return appsink
[docs] def new_preroll_handler(self, elem):
with self.lock:
# clear eos status
self.at_eos.discard(elem)
# ignore preroll buffers
elem.emit("pull-preroll")
return Gst.FlowReturn.OK
[docs] def new_sample_handler(self, elem):
with self.lock:
# clear eos status, and retrieve buffer timestamp
self.at_eos.discard(elem)
assert self.appsinks[elem] is None
self.appsinks[elem] = elem.get_last_sample().get_buffer().pts
# pull available buffers from appsink elements
return self.pull_buffers(elem)
[docs] def eos_handler(self, elem):
with self.lock:
# set eos status
self.at_eos.add(elem)
# pull available buffers from appsink elements
return self.pull_buffers(elem)
[docs] def pull_buffers(self, elem):
"""
for internal use. must be called with lock held.
"""
# keep looping while we can process buffers
while 1:
# retrieve the timestamps of all elements that
# aren't at eos and all elements at eos that still
# have buffers in them
timestamps = [(t, e) for e, t in self.appsinks.items() if e not in self.at_eos or t is not None]
# if all elements are at eos and none have buffers,
# then we're at eos
if not timestamps:
return Gst.FlowReturn.EOS
# find the element with the oldest timestamp. None
# compares as less than everything, so we'll find
# any element (that isn't at eos) that doesn't yet
# have a buffer (elements at eos and that are
# without buffers aren't in the list)
timestamp, elem_with_oldest = min(timestamps, key=lambda x: x[0] if x[0] is not None else -numpy.inf)
# if there's an element without a buffer, quit for
# now --- we require all non-eos elements to have
# buffers before proceding
if timestamp is None:
return Gst.FlowReturn.OK
# clear timestamp and pass element to handler func.
# function call is done last so that all of our
# book-keeping has been taken care of in case an
# exception gets raised
self.appsinks[elem_with_oldest] = None
self.appsink_new_buffer(elem_with_oldest)
[docs]class ConnectAppsinkDumpDot(object):
"""Add a signal handler to write a pipeline graph upon receipt of the
first trigger buffer. the caps in the pipeline graph are not fully
negotiated until data comes out the end, so this version of the graph
shows the final formats on all links
"""
def __init__(self, pipeline, appsinks, basename, verbose=False):
self.pipeline = pipeline
self.filestem = "%s.%s" % (basename, "TRIGGERS")
self.verbose = verbose
# map element to handler ID
self.remaining_lock = threading.Lock()
self.remaining = {}
for sink in appsinks:
self.remaining[sink] = sink.connect_after("new-preroll", self.execute)
assert self.remaining[sink] > 0
[docs] def execute(self, elem):
with self.remaining_lock:
handler_id = self.remaining.pop(elem)
if not self.remaining:
pipedot.write_dump_dot(self.pipeline, self.filestem, verbose=self.verbose)
elem.disconnect(handler_id)
return Gst.FlowReturn.OK
[docs]def tcp_server(pipeline: pipetools.Pipeline, src: pipetools.Element, **properties) -> pipetools.Element:
"""Create a sink via TCP server
Args:
pipeline:
Gst.Pipeline, the pipeline to which the new element will be added
src:
Gst.Element, the source element
**properties:
References:
[1] https://gstreamer.freedesktop.org/documentation/tcp/tcpserversink.html?gi-language=python
Returns:
Element
"""
# units_soft_max = 1 GB
# FIXME: are these sensible defaults?
return pipetools.make_element_with_src(pipeline, src, "tcpserversink", sync=True, sync_method="latest-keyframe", recover_policy="keyframe", unit_type="bytes",
units_soft_max=1024 ** 3,
**properties)