from kafka import KafkaProducer, KafkaConsumer import kafka import socket from json import dumps, loads import time consumer = KafkaConsumer( 'commands', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', value_deserializer=lambda x: loads(x.decode('utf-8'))) print(consumer.topics()) consumer.subscribe(['commands']) producer = KafkaProducer(bootstrap_servers = ['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8')) data = {'message' : 'hello'} producer.send('commands', value=data) while True: for message in consumer: print(message) time.sleep(1)