Worker¶
-
class
kq.worker.
Worker
(topic: str, consumer: kafka.consumer.group.KafkaConsumer, callback: Optional[Callable[[...], Any]] = None, deserializer: Optional[Callable[[bytes], Any]] = None, logger: Optional[logging.Logger] = None)[source]¶ Fetches jobs from Kafka topics and processes them.
Parameters: - topic (str) – Name of the Kafka topic.
- consumer (kafka.KafkaConsumer) – Kafka consumer instance with a group ID (required). For more details on consumers, refer to kafka-python’s documentation.
- callback (callable) – Callback function which is executed every time a job is processed. See here for more details.
- deserializer (callable) – Callable which takes a byte string and returns a
deserialized job namedtuple. If not set,
dill.loads
is used by default. See here for more details. - logger (logging.Logger) – Logger for recording worker activities. If not set, logger
named
kq.worker
is used with default settings (you need to define your own formatters and handlers). See here for more details.
Example:
from kafka import KafkaConsumer from kq import Worker # Set up a Kafka consumer. Group ID is required. consumer = KafkaConsumer( bootstrap_servers='127.0.0.1:9092', group_id='group' ) # Set up a worker. worker = Worker(topic='topic', consumer=consumer) # Start the worker to process jobs. worker.start()
-
hosts
¶ Return comma-separated Kafka hosts and ports string.
Returns: Comma-separated Kafka hosts and ports. Return type: str
-
topic
¶ Return the name of the Kafka topic.
Returns: Name of the Kafka topic. Return type: str
-
group
¶ Return the Kafka consumer group ID.
Returns: Kafka consumer group ID. Return type: str
-
consumer
¶ Return the Kafka consumer instance.
Returns: Kafka consumer instance. Return type: kafka.KafkaConsumer
-
deserializer
¶ Return the deserializer function.
Returns: Deserializer function. Return type: callable
-
callback
¶ Return the callback function.
Returns: Callback function, or None if not set. Return type: callable | None
-
start
(max_messages: Optional[int] = None, commit_offsets: bool = True) → int[source]¶ Start processing Kafka messages and executing jobs.
Parameters: - max_messages (int | None) – Maximum number of Kafka messages to process before stopping. If not set, worker runs until interrupted.
- commit_offsets (bool) – If set to True, consumer offsets are committed every time a message is processed (default: True).
Returns: Total number of messages processed.
Return type: int