from kafka import KafkaProducer, KafkaConsumer import kafka import socket from json import dumps, loads import time from enum import Enum class Status(Enum): UNKNOWN = -1 OFF = 0 ON = 1 class Manager: def __init__(self, id : int, moisture : float = 0, temp : float = 20, valveStatus : Status = Status.UNKNOWN, heaterStatus : Status = Status.UNKNOWN, isAutoOn : bool = False): self.id = id self.moisture = moisture self.temp = temp self.valveStatus = valveStatus self.heaterStatus = heaterStatus self.isAutoOn = isAutoOn self.dataPublisher = KafkaProducer( bootstrap_servers=['localhost:9092'], client_id=f'manager{id}', value_serializer=lambda v: dumps(v).encode('utf-8') ) self.detectorConsumer = KafkaConsumer( 'dataDetectors', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id=f'manager{id}', value_deserializer=lambda x: loads(x.decode('utf-8')) ) def update(self): for message in self.detectorConsumer: if message.value['id'] == self.id: print(f"Manager {self.id} received message: ") print(message.value) print("Updating info...\n") self.moisture = message.value['moisture'] self.temp = message.value['temperature'] dataMessage = { 'id' : self.id, 'moisture' : self.moisture, 'temp' : self.temp, 'valveStatus': str(self.valveStatus), 'heaterStatus': str(self.heaterStatus), 'isAutoOn' : self.isAutoOn } self.dataPublisher.send('data', dataMessage) manager1 = Manager(id = 1) managers = [manager1] while True: time.sleep(1) for manager in managers: manager.update()