import os from kafka import KafkaProducer, KafkaConsumer import kafka import socket from json import dumps, loads from flask import Flask, request import time from enum import Enum import threading app = Flask(__name__) def start_manager(): return class Manager: def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", heater_state: str = "off"): KAFKA_URL = os.environ.get('KAFKA_URL') print("KAFKA_URL=", KAFKA_URL) self._id = _id self.moisture = moisture self.temp = temp self.isAutoOn = isAutoOn self.valve_state = valve_state self.heater_state = heater_state self.dataPublisher = KafkaProducer( bootstrap_servers=[KAFKA_URL], client_id=f'manager{self._id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) self.controllerConsumer = KafkaConsumer( 'commands', bootstrap_servers=[KAFKA_URL], auto_offset_reset='earliest', enable_auto_commit=True, consumer_timeout_ms=2000, group_id=f'manager{self._id}', value_deserializer=lambda x: loads(x.decode('utf-8')) ) self.controllerConsumerResponse = KafkaProducer( bootstrap_servers=[KAFKA_URL], client_id=f'manager{self._id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) def sendData(self): print("sending data...") message = { 'id': self._id, 'moisture': self.moisture, 'temp': self.temp, 'valveStatus': str(self.valve_state), 'heaterStatus': str(self.heater_state), 'isAutoOn': self.isAutoOn } print(message) self.dataPublisher.send('data', message) self.dataPublisher.flush() def toggle_device(self, device, request_id, greenhouse_id): if device == 'valve': if self.valve_state == 'closed': self.valve_state = 'open' print("Valve opened") else: self.valve_state = 'closed' print("Valve closed") elif device == 'heater': if self.heater_state == 'off': self.heater_state = 'on' print("Heater turned on") else: self.heater_state = 'off' print("Heater turned off") self.send_status(request_id, greenhouse_id) def send_status(self, request_id, greenhouse_id): status = { 'request_id': request_id, 'greenhouse_id': greenhouse_id, 'valve_state': self.valve_state, 'heater_state': self.heater_state } self.sendDataCommand(status) print("Updating info...\n") def sendDataCommand(self, message): print("sending data...") self.dataPublisher.send('response', message) def getCommand(self): messages = self.controllerConsumer.poll(timeout_ms=1000) # Проверяем, есть ли сообщения for tp, msgs in messages.items(): for message in msgs: print(f"Manager {self._id} received message: ") print(message.value) self.request_id = message.value['request_id'] self.greenhouse_id = message.value['greenhouse_id'] self.command = message.value['command'] self.toggle_device(self.command, self.request_id, self.greenhouse_id) @app.route(f'/webhook', methods=['POST']) def webhook(): print("received webhook", request.args.get('id')) for manager in managers: print() if int(request.args.get('id')) == manager._id and request.method == 'POST': print("Data received from Webhook is", request.json) body = request.json for key, value in body.items(): setattr(manager, key, value) manager.sendData() return f"Webhook received for manager {manager._id}" return "Webhook ignored" t1 = threading.Thread(target=start_manager) manager1 = Manager(_id=1) managers = [manager1] if __name__ == "__main__": threading.Thread(target=lambda: app.run(host="0.0.0.0", port=20002, debug=True, use_reloader=False)).start()