CallbackΒΆ

KQ allows you to assign a callback function to workers. The callback function is invoked every time a message is processed. It must take the following positional arguments:

  • status (str): Job status. Possible values are:

    • invalid : Job could not be deserialized, or was malformed.
    • failure : Job raised an exception.
    • timeout : Job took too long and timed out.
    • success : Job successfully finished and returned a result.
  • message (kq.Message): Kafka message.

  • job (kq.Job | None): Job object, or None if Kafka message was invalid or malformed.

  • result (object | None): Job result, or None if an exception was raised.

  • exception (Exception | None): Exception raised, or None if job finished successfully.

  • stacktrace (str | None): Exception stacktrace, or None if job finished successfully.

You can inject your callback function during worker initialization.

Example:

from kafka import KafkaConsumer
from kq import Worker


def callback(status, message, job, result, exception, stacktrace):
    """This is an example callback showing what arguments to expect."""

    assert status in ['invalid', 'success', 'timeout', 'failure']
    assert isinstance(message, kq.Message)

    if status == 'invalid':
        assert job is None
        assert result is None
        assert exception is None
        assert stacktrace is None

    if status == 'success':
        assert isinstance(job, kq.Job)
        assert exception is None
        assert stacktrace is None

    elif status == 'timeout':
        assert isinstance(job, kq.Job)
        assert result is None
        assert exception is None
        assert stacktrace is None

    elif status == 'failure':
        assert isinstance(job, kq.Job)
        assert result is None
        assert exception is not None
        assert stacktrace is not None

consumer = KafkaConsumer(
    bootstrap_servers='127.0.0.1:9092',
    group_id='group'
)

# Inject your callback function during worker initialization.
worker = Worker('topic', consumer, callback=callback)