From 3f5bb31646a19fa7126f0a1a581dd5f1e64cac76 Mon Sep 17 00:00:00 2001 From: dimazhelovanov Date: Wed, 30 Oct 2024 11:10:39 +0400 Subject: [PATCH] =?UTF-8?q?=D0=9C=D0=B5=D0=BD=D0=B5=D0=B4=D0=B6=D0=B5?= =?UTF-8?q?=D1=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- GreenhouseController/ManageController.py | 5 +- GreenhouseManager/manager.py | 98 +++++++++++++++++++++--- 2 files changed, 89 insertions(+), 14 deletions(-) diff --git a/GreenhouseController/ManageController.py b/GreenhouseController/ManageController.py index 37f60b5..2d9ad10 100644 --- a/GreenhouseController/ManageController.py +++ b/GreenhouseController/ManageController.py @@ -43,5 +43,6 @@ class ManageController: 'valve_state': self.valve_state, 'heater_state': self.heater_state } - self.producer.send(self.topic, value=status) - print(f"Sent device status: {status}") \ No newline at end of file + + print(f"Sent device status: {status}") + return status \ No newline at end of file diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index 5c2b975..c082d04 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -4,20 +4,23 @@ import socket from json import dumps, loads import time from enum import Enum +from GreenhouseDetector.detector import Detector class Status(Enum): UNKNOWN = -1 OFF = 0 ON = 1 + class Manager: - def __init__(self, id : int, moisture : float = 0, temp : float = 20, valveStatus : Status = Status.UNKNOWN, heaterStatus : Status = Status.UNKNOWN, isAutoOn : bool = False): + def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", + heater_state: str = "off"): self.id = id self.moisture = moisture self.temp = temp - self.valveStatus = valveStatus - self.heaterStatus = heaterStatus self.isAutoOn = isAutoOn + self.valve_state = valve_state + self.heater_state = heater_state self.dataPublisher = KafkaProducer( bootstrap_servers=['localhost:9092'], @@ -25,19 +28,33 @@ class Manager: value_serializer=lambda v: dumps(v).encode('utf-8') ) - self.detectorConsumer = KafkaConsumer( - 'dataDetectors', + # self.detectorConsumer = KafkaConsumer( + # 'dataDetectors', + # bootstrap_servers=['localhost:9092'], + # auto_offset_reset='earliest', + # enable_auto_commit=True, + # consumer_timeout_ms=1000, + #group_id=f'manager{id}', + # value_deserializer=lambda x: loads(x.decode('utf-8')) + #) + self.controllerConsumer = KafkaConsumer( + 'commands', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, - consumer_timeout_ms = 1000, + consumer_timeout_ms=2000, group_id=f'manager{id}', value_deserializer=lambda x: loads(x.decode('utf-8')) ) + self.controllerConsumerResponse = KafkaProducer( + bootstrap_servers=['localhost:9092'], + client_id=f'manager{id}_producer', + value_serializer=lambda v: dumps(v).encode('utf-8') + ) def update(self): for message in self.detectorConsumer: - if message.value['id'] == self.id: + print(f"Manager {self.id} received message: ") print(message.value) self.moisture = message.value['moisture'] @@ -52,19 +69,76 @@ class Manager: 'id': self.id, 'moisture': self.moisture, 'temp': self.temp, - 'valveStatus': str(self.valveStatus), - 'heaterStatus': str(self.heaterStatus), + 'valveStatus': str(self.valve_state), + 'heaterStatus': str(self.heater_state), 'isAutoOn': self.isAutoOn } print(message) self.dataPublisher.send('data', message) -manager1 = Manager(id = 1) + def toggle_device(self, device, request_id, greenhouse_id): + + if device == 'valve': + + if self.valve_state == 'closed': + self.valve_state = 'open' + print("Valve opened") + else: + self.valve_state = 'closed' + print("Valve closed") + + elif device == 'heater': + + if self.heater_state == 'off': + self.heater_state = 'on' + print("Heater turned on") + else: + self.heater_state = 'off' + print("Heater turned off") + + self.send_status(request_id, greenhouse_id) + + def send_status(self, request_id, greenhouse_id): + + status = { + 'request_id': request_id, + 'greenhouse_id': greenhouse_id, + 'valve_state': self.valve_state, + 'heater_state': self.heater_state + } + self.sendDataCommand(status) + print("Updating info...\n") + + def sendDataCommand(self, message): + print("sending data...") + + self.dataPublisher.send('response', message) + + def getCommand(self): + messages = self.controllerConsumer.poll(timeout_ms=1000) + + # Проверяем, есть ли сообщения + + for tp, msgs in messages.items(): + for message in msgs: + print(f"Manager {self.id} received message: ") + print(message.value) + self.request_id = message.value['request_id'] + self.greenhouse_id = message.value['greenhouse_id'] + self.command = message.value['command'] + self.toggle_device(self.command, self.request_id, self.greenhouse_id) + + + +manager1 = Manager(id=1) managers = [manager1] + while True: - time.sleep(1) + time.sleep(5) + manager1.sendData() for manager in managers: - manager.update() \ No newline at end of file + + manager.getCommand()