from kafka import KafkaProducer, KafkaConsumer import kafka import socket from json import dumps, loads import time from enum import Enum class Status(Enum): UNKNOWN = -1 OFF = 0 ON = 1 class Manager: def __init__(self, id : int, moisture : float = 0, temp : float = 20, valveStatus : Status = Status.UNKNOWN, heaterStatus : Status = Status.UNKNOWN, isAutoOn : bool = False): self.id = id self.moisture = moisture self.temp = temp self.valveStatus = valveStatus self.heaterStatus = heaterStatus self.isAutoOn = isAutoOn self.dataPublisher = KafkaProducer( bootstrap_servers=['localhost:9092'], client_id=f'manager{id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) self.detectorConsumer = KafkaConsumer( 'dataDetectors', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, consumer_timeout_ms = 1000, group_id=f'manager{id}', value_deserializer=lambda x: loads(x.decode('utf-8')) ) def update(self): for message in self.detectorConsumer: if message.value['id'] == self.id: print(f"Manager {self.id} received message: ") print(message.value) self.moisture = message.value['moisture'] self.temp = message.value['temperature'] print("Updating info...\n") self.sendData() def sendData(self): print("sending data...") message = { 'id': self.id, 'moisture': self.moisture, 'temp': self.temp, 'valveStatus': str(self.valveStatus), 'heaterStatus': str(self.heaterStatus), 'isAutoOn': self.isAutoOn } print(message) self.dataPublisher.send('data', message) manager1 = Manager(id = 1) managers = [manager1] while True: time.sleep(1) for manager in managers: manager.update()