from kafka import KafkaProducer, KafkaConsumer import kafka import socket from json import dumps, loads import time from enum import Enum from GreenhouseDetector.detector import Detector class Status(Enum): UNKNOWN = -1 OFF = 0 ON = 1 class Manager: def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", heater_state: str = "off"): self.id = id self.moisture = moisture self.temp = temp self.isAutoOn = isAutoOn self.valve_state = valve_state self.heater_state = heater_state self.dataPublisher = KafkaProducer( bootstrap_servers=['localhost:9092'], client_id=f'manager{id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) # self.detectorConsumer = KafkaConsumer( # 'dataDetectors', # bootstrap_servers=['localhost:9092'], # auto_offset_reset='earliest', # enable_auto_commit=True, # consumer_timeout_ms=1000, #group_id=f'manager{id}', # value_deserializer=lambda x: loads(x.decode('utf-8')) #) self.controllerConsumer = KafkaConsumer( 'commands', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, consumer_timeout_ms=2000, group_id=f'manager{id}', value_deserializer=lambda x: loads(x.decode('utf-8')) ) self.controllerConsumerResponse = KafkaProducer( bootstrap_servers=['localhost:9092'], client_id=f'manager{id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) def update(self): for message in self.detectorConsumer: print(f"Manager {self.id} received message: ") print(message.value) self.moisture = message.value['moisture'] self.temp = message.value['temperature'] print("Updating info...\n") self.sendData() def sendData(self): print("sending data...") message = { 'id': self.id, 'moisture': self.moisture, 'temp': self.temp, 'valveStatus': str(self.valve_state), 'heaterStatus': str(self.heater_state), 'isAutoOn': self.isAutoOn } print(message) self.dataPublisher.send('data', message) def toggle_device(self, device, request_id, greenhouse_id): if device == 'valve': if self.valve_state == 'closed': self.valve_state = 'open' print("Valve opened") else: self.valve_state = 'closed' print("Valve closed") elif device == 'heater': if self.heater_state == 'off': self.heater_state = 'on' print("Heater turned on") else: self.heater_state = 'off' print("Heater turned off") self.send_status(request_id, greenhouse_id) def send_status(self, request_id, greenhouse_id): status = { 'request_id': request_id, 'greenhouse_id': greenhouse_id, 'valve_state': self.valve_state, 'heater_state': self.heater_state } self.sendDataCommand(status) print("Updating info...\n") def sendDataCommand(self, message): print("sending data...") self.dataPublisher.send('response', message) def getCommand(self): messages = self.controllerConsumer.poll(timeout_ms=1000) # Проверяем, есть ли сообщения for tp, msgs in messages.items(): for message in msgs: print(f"Manager {self.id} received message: ") print(message.value) self.request_id = message.value['request_id'] self.greenhouse_id = message.value['greenhouse_id'] self.command = message.value['command'] self.toggle_device(self.command, self.request_id, self.greenhouse_id) manager1 = Manager(id=1) managers = [manager1] while True: time.sleep(5) manager1.sendData() for manager in managers: manager.getCommand()