Worker

class kq.worker.Worker(topic: str, consumer: kafka.consumer.group.KafkaConsumer, callback: Optional[Callable[[...], Any]] = None, deserializer: Optional[Callable[[bytes], Any]] = None, logger: Optional[logging.Logger] = None)[source]

Fetches jobs from Kafka topics and processes them.

Parameters:
  • topic (str) – Name of the Kafka topic.
  • consumer (kafka.KafkaConsumer) – Kafka consumer instance with a group ID (required). For more details on consumers, refer to kafka-python’s documentation.
  • callback (callable) – Callback function which is executed every time a job is processed. See here for more details.
  • deserializer (callable) – Callable which takes a byte string and returns a deserialized job namedtuple. If not set, dill.loads is used by default. See here for more details.
  • logger (logging.Logger) – Logger for recording worker activities. If not set, logger named kq.worker is used with default settings (you need to define your own formatters and handlers). See here for more details.

Example:

from kafka import KafkaConsumer
from kq import Worker

# Set up a Kafka consumer. Group ID is required.
consumer = KafkaConsumer(
    bootstrap_servers='127.0.0.1:9092',
    group_id='group'
)

# Set up a worker.
worker = Worker(topic='topic', consumer=consumer)

# Start the worker to process jobs.
worker.start()
hosts

Return comma-separated Kafka hosts and ports string.

Returns:Comma-separated Kafka hosts and ports.
Return type:str
topic

Return the name of the Kafka topic.

Returns:Name of the Kafka topic.
Return type:str
group

Return the Kafka consumer group ID.

Returns:Kafka consumer group ID.
Return type:str
consumer

Return the Kafka consumer instance.

Returns:Kafka consumer instance.
Return type:kafka.KafkaConsumer
deserializer

Return the deserializer function.

Returns:Deserializer function.
Return type:callable
callback

Return the callback function.

Returns:Callback function, or None if not set.
Return type:callable | None
start(max_messages: Optional[int] = None, commit_offsets: bool = True) → int[source]

Start processing Kafka messages and executing jobs.

Parameters:
  • max_messages (int | None) – Maximum number of Kafka messages to process before stopping. If not set, worker runs until interrupted.
  • commit_offsets (bool) – If set to True, consumer offsets are committed every time a message is processed (default: True).
Returns:

Total number of messages processed.

Return type:

int