Queue¶
-
class
kq.queue.
Queue
(topic: str, producer: kafka.producer.kafka.KafkaProducer, serializer: Optional[Callable[[...], bytes]] = None, timeout: int = 0, logger: Optional[logging.Logger] = None)[source]¶ Enqueues function calls in Kafka topics as jobs.
Parameters: - topic (str) – Name of the Kafka topic.
- producer (kafka.KafkaProducer) – Kafka producer instance. For more details on producers, refer to kafka-python’s documentation.
- serializer (callable) – Callable which takes a job namedtuple and
returns a serialized byte string. If not set,
dill.dumps
is used by default. See here for more details. - timeout (int | float) – Default job timeout threshold in seconds. If left at 0 (default), jobs run until completion. This value can be overridden when enqueueing jobs.
- logger (logging.Logger) – Logger for recording queue activities. If not set, logger
named
kq.queue
is used with default settings (you need to define your own formatters and handlers). See here for more details.
Example:
import requests from kafka import KafkaProducer from kq import Queue # Set up a Kafka producer. producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') # Set up a queue. queue = Queue(topic='topic', producer=producer, timeout=3600) # Enqueue a function call. job = queue.enqueue(requests.get, 'https://www.google.com/')
-
hosts
¶ Return comma-separated Kafka hosts and ports string.
Returns: Comma-separated Kafka hosts and ports. Return type: str
-
topic
¶ Return the name of the Kafka topic.
Returns: Name of the Kafka topic. Return type: str
-
producer
¶ Return the Kafka producer instance.
Returns: Kafka producer instance. Return type: kafka.KafkaProducer
-
serializer
¶ Return the serializer function.
Returns: Serializer function. Return type: callable
-
timeout
¶ Return the default job timeout threshold in seconds.
Returns: Default job timeout threshold in seconds. Return type: float | int
-
enqueue
(func: Callable[[...], bytes], *args, **kwargs) → kq.job.Job[source]¶ Enqueue a function call or a job.
Parameters: Returns: Enqueued job.
Return type: Example:
import requests from kafka import KafkaProducer from kq import Job, Queue # Set up a Kafka producer. producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') # Set up a queue. queue = Queue(topic='topic', producer=producer) # Enqueue a function call. queue.enqueue(requests.get, 'https://www.google.com/') # Enqueue a job object. job = Job(func=requests.get, args=['https://www.google.com/']) queue.enqueue(job)
Note
The following rules apply when enqueueing a job:
- If
Job.id
is not set, a random one is generated. - If
Job.timestamp
is set, it is replaced with current time. - If
Job.topic
is set, it is replaced with current topic. - If
Job.timeout
is set, its value overrides others. - If
Job.key
is set, its value overrides others. - If
Job.partition
is set, its value overrides others.
- If
-
using
(timeout: Union[float, int, None] = None, key: Optional[bytes] = None, partition: Optional[int] = None) → kq.queue.EnqueueSpec[source]¶ Set enqueue specifications such as timeout, key and partition.
Parameters: - timeout (int | float) – Job timeout threshold in seconds. If not set, default timeout (specified during queue initialization) is used instead.
- key (bytes) – Kafka message key. Jobs with the same keys are sent to the same topic partition and executed sequentially. Applies only if the partition parameter is not set, and the producer’s partitioner configuration is left as default. For more details on producers, refer to kafka-python’s documentation.
- partition (int) – Topic partition the message is sent to. If not set, the producer’s partitioner selects the partition. For more details on producers, refer to kafka-python’s documentation.
Returns: Enqueue specification object which has an
enqueue
method with the same signature askq.queue.Queue.enqueue()
.Return type: EnqueueSpec
Example:
import requests from kafka import KafkaProducer from kq import Job, Queue # Set up a Kafka producer. producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') # Set up a queue. queue = Queue(topic='topic', producer=producer) url = 'https://www.google.com/' # Enqueue a function call in partition 0 with message key 'foo'. queue.using(partition=0, key=b'foo').enqueue(requests.get, url) # Enqueue a function call with a timeout of 10 seconds. queue.using(timeout=10).enqueue(requests.get, url) # Job values are preferred over values set with "using" method. job = Job(func=requests.get, args=[url], timeout=5) queue.using(timeout=10).enqueue(job) # timeout is still 5