SerializerΒΆ
You can use custom functions for serialization. By default, KQ uses the dill library.
The serializer function must take a job namedtuple and return a byte string. You can inject it during queue initialization.
Example:
# Let's use pickle instead of dill
import pickle
from kafka import KafkaProducer
from kq import Queue
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
# Inject your serializer function during queue initialization.
queue = Queue('topic', producer, serializer=pickle.dumps)
The deserializer function must take a byte string and returns a job namedtuple. You can inject it during worker initialization.
Example:
# Let's use pickle instead of dill
import pickle
from kafka import KafkaConsumer
from kq import Worker
consumer = KafkaConsumer(
bootstrap_servers='127.0.0.1:9092',
group_id='group'
)
# Inject your deserializer function during worker initialization.
worker = Worker('topic', consumer, deserializer=pickle.loads)