KQ allows you to assign a callback function to workers. The callback function is invoked every time a message is processed. It must take the following positional arguments:
status (str): Job status. Possible values are:
invalid: Job could not be deserialized, or was malformed.
failure: Job raised an exception.
timeout: Job took too long and timed out.
success: Job successfully finished and returned a result.
message (kq.Message): Kafka message.
job (kq.Job | None): Job object, or None if Kafka message was invalid or malformed.
result (object | None): Job result, or None if an exception was raised.
exception (Exception | None): Exception raised, or None if job finished successfully.
stacktrace (str | None): Exception stacktrace, or None if job finished successfully.
You can inject your callback function during worker initialization.
from kafka import KafkaConsumer from kq import Worker def callback(status, message, job, result, exception, stacktrace): """This is an example callback showing what arguments to expect.""" assert status in ['invalid', 'success', 'timeout', 'failure'] assert isinstance(message, kq.Message) if status == 'invalid': assert job is None assert result is None assert exception is None assert stacktrace is None if status == 'success': assert isinstance(job, kq.Job) assert exception is None assert stacktrace is None elif status == 'timeout': assert isinstance(job, kq.Job) assert result is None assert exception is None assert stacktrace is None elif status == 'failure': assert isinstance(job, kq.Job) assert result is None assert exception is not None assert stacktrace is not None consumer = KafkaConsumer( bootstrap_servers='127.0.0.1:9092', group_id='group' ) # Inject your callback function during worker initialization. worker = Worker('topic', consumer, callback=callback)