# Copyright (C) 2009--2016 Kipp Cannon
# Copyright (C) 2016 Chad Hanna
# Copyright (C) 2016 Patrick Brockill
# Copyright (C) 2016 Sarah Caudill
# Copyright (C) 2015 Ryan Everett
# Copyright (C) 2010 Leo Singer
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
## @file
## @package pipeio
#
# =============================================================================
#
# Preamble
#
# =============================================================================
#
from collections.abc import Iterable
import numpy
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstAudio', '1.0')
from gi.repository import GObject
from gi.repository import Gst
from gi.repository import GstAudio
GObject.threads_init()
Gst.init(None)
import lal
from ligo.segments import segment
__author__ = "Kipp Cannon <kipp.cannon@ligo.org>, Chad Hanna <chad.hanna@ligo.org>, Drew Keppel <drew.keppel@ligo.org>"
__version__ = "FIXME"
__date__ = "FIXME"
#
# =============================================================================
#
# Properties
#
# =============================================================================
#
[docs]def repack_complex_array_to_real(arr):
"""
Repack a complex-valued array into a real-valued array with twice
as many columns. Used to set complex arrays as values on elements
that expose them as real-valued array properties (gobject doesn't
understand complex numbers). The return value is a view into the
input array.
"""
# FIXME: this function shouldn't exist, we should add complex
# types to gobject
if arr.dtype.kind != "c":
raise TypeError(arr)
assert arr.dtype.itemsize % 2 == 0
return arr.view(dtype = numpy.dtype("f%d" % (arr.dtype.itemsize // 2)))
[docs]def repack_real_array_to_complex(arr):
"""
Repack a real-valued array into a complex-valued array with half as
many columns. Used to retrieve complex arrays from elements that
expose them as real-valued array properties (gobject doesn't
understand complex numbers). The return value is a view into the
input array.
"""
# FIXME: this function shouldn't exist, we should add complex
# types to gobject
if arr.dtype.kind != "f":
raise TypeError(arr)
return arr.view(dtype = numpy.dtype("c%d" % (arr.dtype.itemsize * 2)))
#
# =============================================================================
#
# Buffers
#
# =============================================================================
#
[docs]def get_unit_size(caps):
struct = caps[0]
name = struct.get_name()
if name == "audio/x-raw":
try:
info = GstAudio.AudioInfo()
info.from_caps(caps)
except NotImplementedError: # removed since gstreamer 1.22
success, info = GstAudio.audio_info_from_caps(caps)
assert success
return info.bpf
elif name == "video/x-raw" and struct["format"] in ("RGB", "RGBA", "ARGB", "ABGR"):
return struct["width"] * struct["height"] * (3 if struct["format"] == "RGB" else 4)
raise ValueError(caps)
[docs]def numpy_dtype_from_caps(caps):
formats_dict = {
GstAudio.AudioFormat.F32: numpy.dtype("float32"),
GstAudio.AudioFormat.F64: numpy.dtype("float64"),
GstAudio.AudioFormat.S8: numpy.dtype("int8"),
GstAudio.AudioFormat.U8: numpy.dtype("uint8"),
GstAudio.AudioFormat.S16: numpy.dtype("int16"),
GstAudio.AudioFormat.U16: numpy.dtype("uint16"),
GstAudio.AudioFormat.S32: numpy.dtype("int32"),
GstAudio.AudioFormat.U32: numpy.dtype("uint32")
}
custom_formats_dict = {
"Z64LE" : numpy.dtype("complex64"),
"Z128LE": numpy.dtype("complex128")
}
try:
info = GstAudio.AudioInfo()
info.from_caps(caps)
except NotImplementedError: # removed since gstreamer 1.22
success, info = GstAudio.audio_info_from_caps(caps)
assert success
if info.finfo.format in formats_dict:
return formats_dict[info.finfo.format]
elif caps.get_structure(0).get_string("format") in custom_formats_dict:
return custom_formats_dict[caps.get_structure(0).get_string("format")]
else:
raise ValueError("unknown GstAudioFormat : %s" % caps.get_structure(0).get_string("format"))
[docs]def caps_from_array(arr, rate = None):
return Gst.Caps.from_string("audio/x-raw, format=(string)%s, rate=(int)%d, channels=(int)%d, layout=(string)interleaved, channel-mask=(bitmask)0" % (format_string_from_numpy_dtype(arr.dtype), rate, arr.shape[1]))
[docs]def array_from_audio_sample(sample):
caps = sample.get_caps()
success, channels = caps.get_structure(0).get_int("channels")
assert success
buf = sample.get_buffer()
success, mapinfo = buf.map(Gst.MapFlags.READ)
assert success
a = numpy.frombuffer(mapinfo.data, dtype = numpy_dtype_from_caps(caps))
buf.unmap(mapinfo)
a.shape = len(a) // channels, channels
return a
[docs]def audio_buffer_from_array(arr, timestamp, offset, rate):
buf = Gst.Buffer.new_wrapped(arr.tobytes())
buf.pts = timestamp
buf.duration = (Gst.SECOND * arr.shape[0] + rate // 2) // rate
buf.offset = offset
buf.offset_end = offset + arr.shape[0]
return buf
#
# =============================================================================
#
# Messages
#
# =============================================================================
#
[docs]def parse_spectrum_message(message):
"""
Parse a "spectrum" message from the lal_whiten element, return a
LAL REAL8FrequencySeries containing the strain spectral density.
"""
s = message.get_structure()
psd = lal.CreateREAL8FrequencySeries(
name = s["instrument"] if s.has_field("instrument") else "",
epoch = lal.LIGOTimeGPS(0, message.timestamp),
f0 = 0.0,
deltaF = s["delta-f"],
sampleUnits = lal.Unit(s["sample-units"].strip()),
length = len(s["magnitude"])
)
psd.data.data = numpy.array(s["magnitude"])
return psd
#
# =============================================================================
#
# Tags
#
# =============================================================================
#
# From Patrick Godwin
# https://git.ligo.org/lscsoft/spiir/-/issues/70#note_602061
# From Patrick Godwin
# https://git.ligo.org/lscsoft/spiir/-/issues/70#note_602061
[docs]def to_gvalue_array(arr):
"""
Converts a list-like object to a GValueArray.
"""
# handle segments as guint64
if isinstance(arr, segment):
st = Gst.Structure(f"converter, array=(guint64) < {arr[0]:d}, {arr[1]:d} >")
elif isinstance(arr, numpy.ndarray):
arr = arr.tolist()
st = Gst.Structure.new_empty("converter")
st["array"] = Gst.ValueArray(list(arr))
else:
st = Gst.Structure.new_empty("converter")
st["array"] = Gst.ValueArray(list(arr))
result, val = st.get_array("array")
if not result:
raise ValueError("could not convert input to GValueArray")
return val
# From Patrick Godwin
# https://git.ligo.org/lscsoft/spiir/-/issues/70#note_602061
[docs]def is_nested_listlike(obj):
"""
Check if object is a nested list-like object.
"""
if isinstance(obj, numpy.ndarray) and obj.ndim > 2:
raise ValueError("Only 1D or 2D numpy arrays are supported")
elif isinstance(obj, numpy.ndarray) and obj.ndim == 2:
return True
elif is_listlike(obj):
return any(is_listlike(row) for row in obj)
else:
return False
# From Patrick Godwin
# https://git.ligo.org/lscsoft/spiir/-/issues/70#note_602061
[docs]def is_listlike(obj):
"""
Check if object is a list-like object.
"""
if isinstance(obj, numpy.ndarray) and obj.ndim == 1:
return True
else:
return isinstance(obj, Iterable) and not isinstance(obj, str)