You can inject your own functions for serializing (pickling) jobs. By default, KQ uses the dill library.
The serializer function must take a job namedtuple and return a byte string. You can inject it during queue initialization.
# Let's use pickle instead of dill import pickle from kafka import KafkaProducer from kq import Queue producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092') # Inject your serializer function during queue initialization. queue = Queue('topic', producer, serializer=pickle.dumps)
The deserializer function must take a byte string and returns a job namedtuple. You can inject it during worker initialization.
# Let's use pickle instead of dill import pickle from kafka import KafkaConsumer from kq import Worker consumer = KafkaConsumer( bootstrap_servers='127.0.0.1:9092', group_id='group' ) # Inject your deserializer function during worker initialization. worker = Worker('topic', consumer, deserializer=pickle.loads)