#!/usr/bin/env python3
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
__description__ = "a module for storing event processing utilities"
#-------------------------------------------------
### imports
import logging
import signal
import sys
import time
import timeit
import json
from gstlal.aggregator import now
try:
from confluent_kafka import Producer, Consumer, KafkaError
except ImportError:
raise ImportError('confluent_kafka is required for this module')
#-------------------------------------------------
### classes
[docs]class EventProcessor(object):
"""Base class for processing events via Kafka.
Parameters
----------
kafka_server : `str`
the host:port combination to connect to the Kafka broker
input_topic : `str`
the name of the input topic
process_cadence : `float`
maximum rate at which data is processed, defaults to 0.1s
request_timeout : `float`
timeout for requesting messages from a topic, defaults to 0.2s
num_messages : `int`
max number of messages to process per cadence, defaults to 10
tag : `str`
a nickname for the instance, defaults to 'default'
send_heartbeats : `bool`
send periodic heartbeat messages to Kafka for monitoring
heartbeat_cadence : `float`
cadence on which to write heartbeat messages to Kafka
heartbeat_topic : `str`
Kafka topic to send heartbeats to
"""
_name = 'processor'
def __init__(
self,
process_cadence=0.1,
request_timeout=0.2,
num_messages=10,
kafka_server=None,
input_topic=None,
tag='default',
send_heartbeats=False,
heartbeat_cadence=None,
heartbeat_topic=None,
):
assert kafka_server, 'kafka_server needs to be set'
self.is_source = not bool(input_topic)
if isinstance(input_topic, str):
input_topic = [input_topic]
### processing settings
self.process_cadence = process_cadence
self.request_timeout = request_timeout
self.num_messages = num_messages
self.is_running = False
### kafka settings
self.tag = tag
self.kafka_settings = {
'bootstrap.servers': kafka_server,
'group.id': '-'.join([self._name, tag])
}
self.producer_settings = {
'message.max.bytes': 5242880, # 5 MB
**self.kafka_settings,
}
self.producer = Producer(self.producer_settings)
if not self.is_source:
self.consumer = Consumer(self.kafka_settings)
self.consumer.subscribe([topic for topic in input_topic])
### signal handler
for sig in [signal.SIGINT, signal.SIGTERM]:
signal.signal(sig, self.catch)
### heartbeat functions for monitoring
self.send_heartbeats = send_heartbeats
if self.send_heartbeats:
self.heartbeat_topic = heartbeat_topic
self.last_heartbeat = float(now())
self.heartbeat_cadence = heartbeat_cadence
[docs] def fetch(self):
"""Fetch for messages from a topic and processes them.
"""
messages = self.consumer.consume(
num_messages=self.num_messages,
timeout=self.request_timeout
)
for message in messages:
### only add to queue if no errors in receiving data
if message:
if not message.error():
self.ingest(message)
elif not message.error().code() == KafkaError._PARTITION_EOF:
logging.warning(f'Received message with error: {message.error()}')
[docs] def process(self):
"""Processes events at the specified cadence.
"""
while self.is_running:
start = timeit.default_timer()
if not self.is_source:
self.fetch()
self.handle()
if self.send_heartbeats:
self.heartbeat()
elapsed = timeit.default_timer() - start
time.sleep(max(self.process_cadence - elapsed, 0))
[docs] def start(self):
"""Starts the event loop.
"""
logging.info('starting {}...'.format(self._name.replace('_', ' ')))
self.is_running = True
self.process()
[docs] def stop(self):
"""Stops the event loop.
"""
logging.info('shutting down {}...'.format(self._name.replace('_', ' ')))
self.finish()
self.is_running = False
[docs] def catch(self, signum, frame):
"""Shuts down the event processor gracefully before exiting.
"""
logging.info("SIG {:d} received, attempting graceful shutdown...".format(signum))
self.stop()
sys.exit(0)
[docs] def ingest(self, message):
"""Ingests a single event.
NOTE: Derived classes need to implement this.
"""
return NotImplementedError
[docs] def handle(self):
"""Handles ingested events.
NOTE: Derived classes need to implement this.
"""
return NotImplementedError
[docs] def finish(self):
"""Finish remaining events when stopped and/or shutting down.
NOTE: Derived classes may implement this if desired.
"""
pass
[docs] def heartbeat(self):
"""Send heartbeat messages to Kakfa to monitor
the health of this process.
"""
time_now = float(now())
if time_now - self.last_heartbeat >= self.heartbeat_cadence:
self.last_heartbeat = time_now
self.producer.produce(
topic = self.heartbeat_topic,
value = json.dumps({
'time': [ time_now ],
'data': [ 1 ],
}),
key = self.tag
)
self.producer.poll(0)
#-------------------------------------------------
### utilities
[docs]def append_args(parser):
"""Append event processing specific options to an ArgumentParser instance.
"""
group = parser.add_argument_group("Event processing options")
group.add_argument("--tag", metavar = "string", default = "default",
help = "Sets the name of the tag used. Default = 'default'")
group.add_argument("--processing-cadence", type = float, default = 0.1,
help = "Rate at which the event uploader acquires and processes data. Default = 0.1 seconds.")
group.add_argument("--request-timeout", type = float, default = 0.2,
help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.")
group.add_argument("--kafka-server", metavar = "string",
help = "Sets the server url that the kafka topic is hosted on. Required.")
group.add_argument("--input-topic", metavar = "string", action = "append",
help = "Sets the input kafka topic. Required.")
return parser