Worker¶
-
class
kq.worker.Worker(topic: str, consumer: kafka.consumer.group.KafkaConsumer, callback: Optional[Callable[[...], Any]] = None, deserializer: Optional[Callable[[bytes], Any]] = None, logger: Optional[logging.Logger] = None)[source]¶ Fetches jobs from Kafka topics and processes them.
Parameters: - topic (str) – Name of the Kafka topic.
- consumer (kafka.KafkaConsumer) – Kafka consumer instance with a group ID (required). For more details on consumers, refer to kafka-python’s documentation.
- callback (callable) – Callback function which is executed every time a job is processed. See here for more details.
- deserializer (callable) – Callable which takes a byte string and returns a
deserialized job namedtuple. If not set,
dill.loadsis used by default. See here for more details. - logger (logging.Logger) – Logger for recording worker activities. If not set, logger
named
kq.workeris used with default settings (you need to define your own formatters and handlers). See here for more details.
Example:
from kafka import KafkaConsumer from kq import Worker # Set up a Kafka consumer. Group ID is required. consumer = KafkaConsumer( bootstrap_servers='127.0.0.1:9092', group_id='group' ) # Set up a worker. worker = Worker(topic='topic', consumer=consumer) # Start the worker to process jobs. worker.start()
-
hosts¶ Return comma-separated Kafka hosts and ports string.
Returns: Comma-separated Kafka hosts and ports. Return type: str
-
topic¶ Return the name of the Kafka topic.
Returns: Name of the Kafka topic. Return type: str
-
group¶ Return the Kafka consumer group ID.
Returns: Kafka consumer group ID. Return type: str
-
consumer¶ Return the Kafka consumer instance.
Returns: Kafka consumer instance. Return type: kafka.KafkaConsumer
-
deserializer¶ Return the deserializer function.
Returns: Deserializer function. Return type: callable
-
callback¶ Return the callback function.
Returns: Callback function, or None if not set. Return type: callable | None
-
start(max_messages: Optional[int] = None, commit_offsets: bool = True) → int[source]¶ Start processing Kafka messages and executing jobs.
Parameters: - max_messages (int | None) – Maximum number of Kafka messages to process before stopping. If not set, worker runs until interrupted.
- commit_offsets (bool) – If set to True, consumer offsets are committed every time a message is processed (default: True).
Returns: Total number of messages processed.
Return type: int