
KQ lets you assign a callback function to workers. The callback function is invoked each time a message is processed. It must accept the following positional arguments:

  • status (str): Job status. Possible values are:

    • invalid : Job could not be deserialized, or was malformed.
    • failure : Job raised an exception.
    • timeout : Job took too long and timed out.
    • success : Job successfully finished and returned a result.
  • message (kq.Message): Kafka message.

  • job (kq.Job | None): Job object, or None if Kafka message was invalid or malformed.

  • result (object | None): Job result, or None if an exception was raised.

  • exception (Exception | None): Exception raised, or None if job finished successfully.

  • stacktrace (str | None): Exception stacktrace, or None if job finished successfully.

You can assign your callback function during worker initialization.


from kafka import KafkaConsumer
from kq import Worker

def callback(status, message, job, result, exception, stacktrace):
    """This is an example callback showing what arguments to expect."""

    assert status in ['invalid', 'success', 'timeout', 'failure']
    assert isinstance(message, kq.Message)

    if status == 'invalid':
        assert job is None
        assert result is None
        assert exception is None
        assert stacktrace is None

    if status == 'success':
        assert isinstance(job, kq.Job)
        assert exception is None
        assert stacktrace is None

    elif status == 'timeout':
        assert isinstance(job, kq.Job)
        assert result is None
        assert exception is None
        assert stacktrace is None

    elif status == 'failure':
        assert isinstance(job, kq.Job)
        assert result is None
        assert exception is not None
        assert stacktrace is not None

consumer = KafkaConsumer(

# Inject your callback function during worker initialization.
worker = Worker('topic', consumer, callback=callback)