import _thread
import logging
import threading
import traceback
from typing import Any, Callable, Optional
import dill
from kafka import KafkaConsumer
from kq.job import Job
from kq.message import Message
from kq.utils import get_call_repr, is_none_or_func, is_none_or_logger, is_str
[docs]class Worker:
"""Fetches :doc:`jobs <job>` from Kafka topics and processes them.
:param topic: Name of the Kafka topic.
:type topic: str
:param consumer: Kafka consumer instance with a group ID (required). For
more details on consumers, refer to kafka-python's documentation_.
:type consumer: kafka.KafkaConsumer_
:param callback: Callback function which is executed every time a job is
processed. See :doc:`here <callback>` for more details.
:type callback: callable
:param deserializer: Callable which takes a byte string and returns a
deserialized :doc:`job <job>` namedtuple. If not set, ``dill.loads``
is used by default. See :doc:`here <serializer>` for more details.
:type deserializer: callable
:param logger: Logger for recording worker activities. If not set, logger
named ``kq.worker`` is used with default settings (you need to define
your own formatters and handlers). See :doc:`here <logging>` for more
details.
:type logger: logging.Logger
**Example:**
.. code-block:: python
from kafka import KafkaConsumer
from kq import Worker
# Set up a Kafka consumer. Group ID is required.
consumer = KafkaConsumer(
bootstrap_servers='127.0.0.1:9092',
group_id='group'
)
# Set up a worker.
worker = Worker(topic='topic', consumer=consumer)
# Start the worker to process jobs.
worker.start()
.. _documentation:
http://kafka-python.rtfd.io/en/master/#kafkaconsumer
.. _kafka.KafkaConsumer:
http://kafka-python.rtfd.io/en/master/apidoc/KafkaConsumer.html
"""
def __init__(
self,
topic: str,
consumer: KafkaConsumer,
callback: Optional[Callable[..., Any]] = None,
deserializer: Optional[Callable[[bytes], Any]] = None,
logger: Optional[logging.Logger] = None,
):
assert is_str(topic), "topic must be a str"
assert isinstance(consumer, KafkaConsumer), "bad consumer instance"
assert consumer.config["group_id"], "consumer must have group_id"
assert is_none_or_func(callback), "callback must be a callable"
assert is_none_or_func(deserializer), "deserializer must be a callable"
assert is_none_or_logger(logger), "bad logger instance"
self._topic = topic
self._hosts: str = consumer.config["bootstrap_servers"]
self._group: str = consumer.config["group_id"]
self._consumer = consumer
self._callback = callback
self._deserializer = deserializer or dill.loads
self._logger = logger or logging.getLogger("kq.worker")
def __repr__(self) -> str:
"""Return the string representation of the worker.
:return: String representation of the worker.
:rtype: str
"""
return f"Worker(hosts={self._hosts}, topic={self._topic}, group={self._group})"
def __del__(self) -> None: # pragma: no cover
# noinspection PyBroadException
try:
self._consumer.close()
except Exception:
pass
def _execute_callback(
self,
status: str,
message: Message,
job: Optional[Job],
res: Any,
err: Optional[Exception],
stacktrace: Optional[str],
) -> None:
"""Execute the callback.
:param status: Job status. Possible values are "invalid" (job could not
be deserialized or was malformed), "failure" (job raised an error),
"timeout" (job timed out), or "success" (job finished successfully
and returned a result).
:type status: str
:param message: Kafka message.
:type message: :doc:`kq.Message <message>`
:param job: Job object, or None if **status** was "invalid".
:type job: kq.Job
:param res: Job result, or None if an exception was raised.
:type res: object | None
:param err: Exception raised by job, or None if there was none.
:type err: Exception | None
:param stacktrace: Exception traceback, or None if there was none.
:type stacktrace: str | None
"""
if self._callback is not None:
try:
self._logger.info("Executing callback ...")
self._callback(status, message, job, res, err, stacktrace)
except Exception as err:
self._logger.exception(f"Callback raised an exception: {err}")
def _process_message(self, msg: Message) -> None:
"""De-serialize the message and execute the job.
:param msg: Kafka message.
:type msg: :doc:`kq.Message <message>`
"""
self._logger.info(
"Processing Message(topic={}, partition={}, offset={}) ...".format(
msg.topic, msg.partition, msg.offset
)
)
try:
job = self._deserializer(msg.value)
job_repr = get_call_repr(job.func, *job.args, **job.kwargs)
except Exception as err:
self._logger.exception(f"Job was invalid: {err}")
self._execute_callback("invalid", msg, None, None, None, None)
else:
self._logger.info(f"Executing job {job.id}: {job_repr}")
timer: Optional[threading.Timer]
if job.timeout:
timer = threading.Timer(job.timeout, _thread.interrupt_main)
timer.start()
else:
timer = None
try:
res = job.func(*job.args, **job.kwargs)
except KeyboardInterrupt:
self._logger.error(f"Job {job.id} timed out or was interrupted")
self._execute_callback("timeout", msg, job, None, None, None)
except Exception as err:
self._logger.exception(f"Job {job.id} raised an exception:")
tb = traceback.format_exc()
self._execute_callback("failure", msg, job, None, err, tb)
else:
self._logger.info(f"Job {job.id} returned: {res}")
self._execute_callback("success", msg, job, res, None, None)
finally:
if timer is not None:
timer.cancel()
@property
def hosts(self) -> str:
"""Return comma-separated Kafka hosts and ports string.
:return: Comma-separated Kafka hosts and ports.
:rtype: str
"""
return self._hosts
@property
def topic(self) -> str:
"""Return the name of the Kafka topic.
:return: Name of the Kafka topic.
:rtype: str
"""
return self._topic
@property
def group(self) -> str:
"""Return the Kafka consumer group ID.
:return: Kafka consumer group ID.
:rtype: str
"""
return self._group
@property
def consumer(self) -> KafkaConsumer:
"""Return the Kafka consumer instance.
:return: Kafka consumer instance.
:rtype: kafka.KafkaConsumer
"""
return self._consumer
@property
def deserializer(self) -> Callable[[bytes], Any]:
"""Return the deserializer function.
:return: Deserializer function.
:rtype: callable
"""
return self._deserializer
@property
def callback(self) -> Optional[Callable[..., Any]]:
"""Return the callback function.
:return: Callback function, or None if not set.
:rtype: callable | None
"""
return self._callback
[docs] def start(
self, max_messages: Optional[int] = None, commit_offsets: bool = True
) -> int:
"""Start processing Kafka messages and executing jobs.
:param max_messages: Maximum number of Kafka messages to process before
stopping. If not set, worker runs until interrupted.
:type max_messages: int | None
:param commit_offsets: If set to True, consumer offsets are committed
every time a message is processed (default: True).
:type commit_offsets: bool
:return: Total number of messages processed.
:rtype: int
"""
self._logger.info(f"Starting {self} ...")
self._consumer.unsubscribe()
self._consumer.subscribe([self.topic])
messages_processed = 0
while max_messages is None or messages_processed < max_messages:
record = next(self._consumer)
message = Message(
topic=record.topic,
partition=record.partition,
offset=record.offset,
key=record.key,
value=record.value,
)
self._process_message(message)
if commit_offsets:
self._consumer.commit()
messages_processed += 1
return messages_processed