Getting StartedΒΆ

First, ensure that your Kafka instance is up and running:

~$ ./ -daemon

Define your KQ worker module:


import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
stream_handler = logging.StreamHandler()
logger = logging.getLogger('kq.worker')

# Set up a Kafka consumer.
consumer = KafkaConsumer(

# Set up a worker.
worker = Worker(topic='topic', consumer=consumer)

Start the worker:

~$ python
[INFO] Starting Worker(hosts= topic=topic, group=group) ...

Enqueue a function call:

import requests

from kafka import KafkaProducer
from kq import Queue

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='')

# Set up a queue.
queue = Queue(topic='topic', producer=producer)

# Enqueue a function call.
job = queue.enqueue(requests.get, '')

Sit back and watch the worker process it in the background:

~$ python
[INFO] Starting Worker(hosts=, topic=topic, group=group) ...
[INFO] Processing Message(topic=topic, partition=0, offset=0) ...
[INFO] Executing job c7bf2359: requests.api.get('')
[INFO] Job c7bf2359 returned: <Response [200]>

You can also specify the job timeout, message key and partition:

job = queue.using(timeout=5, key=b'foo', partition=0).enqueue(requests.get, '')