Queue

class kq.queue.Queue(topic: str, producer: kafka.producer.kafka.KafkaProducer, serializer: Optional[Callable[[...], bytes]] = None, timeout: int = 0, logger: Optional[logging.Logger] = None)[source]

Enqueues function calls in Kafka topics as jobs.

Parameters:
  • topic (str) – Name of the Kafka topic.
  • producer (kafka.KafkaProducer) – Kafka producer instance. For more details on producers, refer to kafka-python’s documentation.
  • serializer (callable) – Callable which takes a job namedtuple and returns a serialized byte string. If not set, dill.dumps is used by default. See here for more details.
  • timeout (int | float) – Default job timeout threshold in seconds. If left at 0 (default), jobs run until completion. This value can be overridden when enqueueing jobs.
  • logger (logging.Logger) – Logger for recording queue activities. If not set, logger named kq.queue is used with default settings (you need to define your own formatters and handlers). See here for more details.

Example:

import requests

from kafka import KafkaProducer
from kq import Queue

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

# Set up a queue.
queue = Queue(topic='topic', producer=producer, timeout=3600)

# Enqueue a function call.
job = queue.enqueue(requests.get, 'https://www.google.com/')
hosts

Return comma-separated Kafka hosts and ports string.

Returns:Comma-separated Kafka hosts and ports.
Return type:str
topic

Return the name of the Kafka topic.

Returns:Name of the Kafka topic.
Return type:str
producer

Return the Kafka producer instance.

Returns:Kafka producer instance.
Return type:kafka.KafkaProducer
serializer

Return the serializer function.

Returns:Serializer function.
Return type:callable
timeout

Return the default job timeout threshold in seconds.

Returns:Default job timeout threshold in seconds.
Return type:float | int
enqueue(func: Callable[[...], bytes], *args, **kwargs) → kq.job.Job[source]

Enqueue a function call or a job.

Parameters:
  • func (callable | kq.Job) – Function or a job object. Must be serializable and available to workers.
  • args – Positional arguments for the function. Ignored if func is a job object.
  • kwargs – Keyword arguments for the function. Ignored if func is a job object.
Returns:

Enqueued job.

Return type:

kq.Job

Example:

import requests

from kafka import KafkaProducer
from kq import  Job, Queue

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

# Set up a queue.
queue = Queue(topic='topic', producer=producer)

# Enqueue a function call.
queue.enqueue(requests.get, 'https://www.google.com/')

# Enqueue a job object.
job = Job(func=requests.get, args=['https://www.google.com/'])
queue.enqueue(job)

Note

The following rules apply when enqueueing a job:

  • If Job.id is not set, a random one is generated.
  • If Job.timestamp is set, it is replaced with current time.
  • If Job.topic is set, it is replaced with current topic.
  • If Job.timeout is set, its value overrides others.
  • If Job.key is set, its value overrides others.
  • If Job.partition is set, its value overrides others.
using(timeout: Union[float, int, None] = None, key: Optional[bytes] = None, partition: Optional[int] = None) → kq.queue.EnqueueSpec[source]

Set enqueue specifications such as timeout, key and partition.

Parameters:
  • timeout (int | float) – Job timeout threshold in seconds. If not set, default timeout (specified during queue initialization) is used instead.
  • key (bytes) – Kafka message key. Jobs with the same keys are sent to the same topic partition and executed sequentially. Applies only if the partition parameter is not set, and the producer’s partitioner configuration is left as default. For more details on producers, refer to kafka-python’s documentation.
  • partition (int) – Topic partition the message is sent to. If not set, the producer’s partitioner selects the partition. For more details on producers, refer to kafka-python’s documentation.
Returns:

Enqueue specification object which has an enqueue method with the same signature as kq.queue.Queue.enqueue().

Return type:

EnqueueSpec

Example:

import requests

from kafka import KafkaProducer
from kq import Job, Queue

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

# Set up a queue.
queue = Queue(topic='topic', producer=producer)

url = 'https://www.google.com/'

# Enqueue a function call in partition 0 with message key 'foo'.
queue.using(partition=0, key=b'foo').enqueue(requests.get, url)

# Enqueue a function call with a timeout of 10 seconds.
queue.using(timeout=10).enqueue(requests.get, url)

# Job values are preferred over values set with "using" method.
job = Job(func=requests.get, args=[url], timeout=5)
queue.using(timeout=10).enqueue(job)  # timeout is still 5