Cucumber/GreenhouseManager/manager.py
2024-11-12 16:19:37 +04:00

136 lines
4.1 KiB
Python

from kafka import KafkaProducer, KafkaConsumer
import kafka
import socket
from json import dumps, loads
from flask import Flask, request
import time
from enum import Enum
import threading
app = Flask(__name__)
def start_manager():
return
class Manager:
def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed",
heater_state: str = "off"):
self.id = id
self.moisture = moisture
self.temp = temp
self.isAutoOn = isAutoOn
self.valve_state = valve_state
self.heater_state = heater_state
self.dataPublisher = KafkaProducer(
bootstrap_servers=['localhost:9092'],
client_id=f'manager{id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8')
)
self.controllerConsumer = KafkaConsumer(
'commands',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms=2000,
group_id=f'manager{id}',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
self.controllerConsumerResponse = KafkaProducer(
bootstrap_servers=['localhost:9092'],
client_id=f'manager{id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8')
)
def update(self):
for message in self.detectorConsumer:
print(f"Manager {self.id} received message: ")
print(message.value)
self.moisture = message.value['moisture']
self.temp = message.value['temperature']
print("Updating info...\n")
self.sendData()
def sendData(self):
print("sending data...")
message = {
'id': self.id,
'moisture': self.moisture,
'temp': self.temp,
'valveStatus': str(self.valve_state),
'heaterStatus': str(self.heater_state),
'isAutoOn': self.isAutoOn
}
print(message)
self.dataPublisher.send('data', message)
def toggle_device(self, device, request_id, greenhouse_id):
if device == 'valve':
if self.valve_state == 'closed':
self.valve_state = 'open'
print("Valve opened")
else:
self.valve_state = 'closed'
print("Valve closed")
elif device == 'heater':
if self.heater_state == 'off':
self.heater_state = 'on'
print("Heater turned on")
else:
self.heater_state = 'off'
print("Heater turned off")
self.send_status(request_id, greenhouse_id)
def send_status(self, request_id, greenhouse_id):
status = {
'request_id': request_id,
'greenhouse_id': greenhouse_id,
'valve_state': self.valve_state,
'heater_state': self.heater_state
}
self.sendDataCommand(status)
print("Updating info...\n")
def sendDataCommand(self, message):
print("sending data...")
self.dataPublisher.send('response', message)
def getCommand(self):
messages = self.controllerConsumer.poll(timeout_ms=1000)
# Проверяем, есть ли сообщения
for tp, msgs in messages.items():
for message in msgs:
print(f"Manager {self.id} received message: ")
print(message.value)
self.request_id = message.value['request_id']
self.greenhouse_id = message.value['greenhouse_id']
self.command = message.value['command']
self.toggle_device(self.command, self.request_id, self.greenhouse_id)
@app.route('/webhook', methods=['POST'])
def webhook():
if request.method == 'POST':
print("Data received from Webhook is", request.json)
return "Webhook received"
manager1 = Manager(id=1)
managers = [manager1]
t1 = threading.Thread(target=start_manager)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=20002)