Source code for events

#!/usr/bin/env python3

__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
__description__ = "a module for storing event processing utilities"

#-------------------------------------------------
### imports

import logging
import signal
import sys
import time
import timeit
import json

from gstlal.aggregator import now

try:
	from confluent_kafka import Producer, Consumer, KafkaError
except ImportError:
	raise ImportError('confluent_kafka is required for this module')


#-------------------------------------------------
### classes

[docs]class EventProcessor(object): """Base class for processing events via Kafka. Parameters ---------- kafka_server : `str` the host:port combination to connect to the Kafka broker input_topic : `str` the name of the input topic process_cadence : `float` maximum rate at which data is processed, defaults to 0.1s request_timeout : `float` timeout for requesting messages from a topic, defaults to 0.2s num_messages : `int` max number of messages to process per cadence, defaults to 10 tag : `str` a nickname for the instance, defaults to 'default' send_heartbeats : `bool` send periodic heartbeat messages to Kafka for monitoring heartbeat_cadence : `float` cadence on which to write heartbeat messages to Kafka heartbeat_topic : `str` Kafka topic to send heartbeats to """ _name = 'processor' def __init__( self, process_cadence=0.1, request_timeout=0.2, num_messages=10, kafka_server=None, input_topic=None, tag='default', send_heartbeats=False, heartbeat_cadence=None, heartbeat_topic=None, ): assert kafka_server, 'kafka_server needs to be set' self.is_source = not bool(input_topic) if isinstance(input_topic, str): input_topic = [input_topic] ### processing settings self.process_cadence = process_cadence self.request_timeout = request_timeout self.num_messages = num_messages self.is_running = False ### kafka settings self.tag = tag self.kafka_settings = { 'bootstrap.servers': kafka_server, 'group.id': '-'.join([self._name, tag]) } self.producer_settings = { 'message.max.bytes': 5242880, # 5 MB **self.kafka_settings, } self.producer = Producer(self.producer_settings) if not self.is_source: self.consumer = Consumer(self.kafka_settings) self.consumer.subscribe([topic for topic in input_topic]) ### signal handler for sig in [signal.SIGINT, signal.SIGTERM]: signal.signal(sig, self.catch) ### heartbeat functions for monitoring self.send_heartbeats = send_heartbeats if self.send_heartbeats: self.heartbeat_topic = heartbeat_topic self.last_heartbeat = float(now()) self.heartbeat_cadence = heartbeat_cadence
[docs] def fetch(self): """Fetch for messages from a topic and processes them. """ messages = self.consumer.consume( num_messages=self.num_messages, timeout=self.request_timeout ) for message in messages: ### only add to queue if no errors in receiving data if message: if not message.error(): self.ingest(message) elif not message.error().code() == KafkaError._PARTITION_EOF: logging.warning(f'Received message with error: {message.error()}')
[docs] def process(self): """Processes events at the specified cadence. """ while self.is_running: start = timeit.default_timer() if not self.is_source: self.fetch() self.handle() if self.send_heartbeats: self.heartbeat() elapsed = timeit.default_timer() - start time.sleep(max(self.process_cadence - elapsed, 0))
[docs] def start(self): """Starts the event loop. """ logging.info('starting {}...'.format(self._name.replace('_', ' '))) self.is_running = True self.process()
[docs] def stop(self): """Stops the event loop. """ logging.info('shutting down {}...'.format(self._name.replace('_', ' '))) self.finish() self.is_running = False
[docs] def catch(self, signum, frame): """Shuts down the event processor gracefully before exiting. """ logging.info("SIG {:d} received, attempting graceful shutdown...".format(signum)) self.stop() sys.exit(0)
[docs] def ingest(self, message): """Ingests a single event. NOTE: Derived classes need to implement this. """ return NotImplementedError
[docs] def handle(self): """Handles ingested events. NOTE: Derived classes need to implement this. """ return NotImplementedError
[docs] def finish(self): """Finish remaining events when stopped and/or shutting down. NOTE: Derived classes may implement this if desired. """ pass
[docs] def heartbeat(self): """Send heartbeat messages to Kakfa to monitor the health of this process. """ time_now = float(now()) if time_now - self.last_heartbeat >= self.heartbeat_cadence: self.last_heartbeat = time_now self.producer.produce( topic = self.heartbeat_topic, value = json.dumps({ 'time': [ time_now ], 'data': [ 1 ], }), key = self.tag ) self.producer.poll(0)
#------------------------------------------------- ### utilities
[docs]def append_args(parser): """Append event processing specific options to an ArgumentParser instance. """ group = parser.add_argument_group("Event processing options") group.add_argument("--tag", metavar = "string", default = "default", help = "Sets the name of the tag used. Default = 'default'") group.add_argument("--processing-cadence", type = float, default = 0.1, help = "Rate at which the event uploader acquires and processes data. Default = 0.1 seconds.") group.add_argument("--request-timeout", type = float, default = 0.2, help = "Timeout for requesting messages from a topic. Default = 0.2 seconds.") group.add_argument("--kafka-server", metavar = "string", help = "Sets the server url that the kafka topic is hosted on. Required.") group.add_argument("--input-topic", metavar = "string", action = "append", help = "Sets the input kafka topic. Required.") return parser