Cucumber/GreenhouseManager/manager.py

140 lines
4.3 KiB
Python
Raw Normal View History

import os
2024-10-28 14:47:20 +04:00
from kafka import KafkaProducer, KafkaConsumer
import kafka
import socket
2024-10-28 19:43:13 +04:00
from json import dumps, loads
2024-11-12 16:18:14 +04:00
from flask import Flask, request
2024-10-28 19:43:13 +04:00
import time
2024-10-29 17:31:47 +04:00
from enum import Enum
2024-11-12 16:18:14 +04:00
import threading
2024-10-28 14:47:20 +04:00
2024-11-12 16:18:14 +04:00
app = Flask(__name__)
2024-10-29 17:31:47 +04:00
2024-11-12 16:18:14 +04:00
def start_manager():
return
2024-10-30 11:10:39 +04:00
2024-10-29 17:31:47 +04:00
class Manager:
2024-11-12 17:04:24 +04:00
def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed",
2024-10-30 11:10:39 +04:00
heater_state: str = "off"):
KAFKA_URL = os.environ.get('KAFKA_URL')
print("KAFKA_URL=", KAFKA_URL)
2024-11-12 17:04:24 +04:00
self._id = _id
2024-10-29 17:31:47 +04:00
self.moisture = moisture
self.temp = temp
self.isAutoOn = isAutoOn
2024-10-30 11:10:39 +04:00
self.valve_state = valve_state
self.heater_state = heater_state
2024-10-29 17:31:47 +04:00
self.dataPublisher = KafkaProducer(
bootstrap_servers=[KAFKA_URL],
2024-11-12 17:04:24 +04:00
client_id=f'manager{self._id}_producer',
2024-10-29 17:31:47 +04:00
value_serializer=lambda v: dumps(v).encode('utf-8')
)
2024-10-30 11:10:39 +04:00
self.controllerConsumer = KafkaConsumer(
'commands',
bootstrap_servers=[KAFKA_URL],
2024-10-29 17:31:47 +04:00
auto_offset_reset='earliest',
enable_auto_commit=True,
2024-10-30 11:10:39 +04:00
consumer_timeout_ms=2000,
2024-11-12 17:04:24 +04:00
group_id=f'manager{self._id}',
2024-10-29 17:31:47 +04:00
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
2024-10-30 11:10:39 +04:00
self.controllerConsumerResponse = KafkaProducer(
bootstrap_servers=[KAFKA_URL],
2024-11-12 17:04:24 +04:00
client_id=f'manager{self._id}_producer',
2024-10-30 11:10:39 +04:00
value_serializer=lambda v: dumps(v).encode('utf-8')
)
2024-10-29 17:31:47 +04:00
def sendData(self):
print("sending data...")
message = {
2024-11-12 17:04:24 +04:00
'id': self._id,
'moisture': self.moisture,
'temp': self.temp,
2024-10-30 11:10:39 +04:00
'valveStatus': str(self.valve_state),
'heaterStatus': str(self.heater_state),
'isAutoOn': self.isAutoOn
2024-10-29 17:31:47 +04:00
}
print(message)
self.dataPublisher.send('data', message)
self.dataPublisher.flush()
2024-10-29 17:31:47 +04:00
2024-10-30 11:10:39 +04:00
def toggle_device(self, device, request_id, greenhouse_id):
if device == 'valve':
if self.valve_state == 'closed':
self.valve_state = 'open'
print("Valve opened")
else:
self.valve_state = 'closed'
print("Valve closed")
elif device == 'heater':
if self.heater_state == 'off':
self.heater_state = 'on'
print("Heater turned on")
else:
self.heater_state = 'off'
print("Heater turned off")
self.send_status(request_id, greenhouse_id)
def send_status(self, request_id, greenhouse_id):
status = {
'request_id': request_id,
'greenhouse_id': greenhouse_id,
'valve_state': self.valve_state,
'heater_state': self.heater_state
}
self.sendDataCommand(status)
print("Updating info...\n")
def sendDataCommand(self, message):
print("sending data...")
self.dataPublisher.send('response', message)
def getCommand(self):
messages = self.controllerConsumer.poll(timeout_ms=1000)
# Проверяем, есть ли сообщения
for tp, msgs in messages.items():
for message in msgs:
2024-11-12 17:04:24 +04:00
print(f"Manager {self._id} received message: ")
2024-10-30 11:10:39 +04:00
print(message.value)
self.request_id = message.value['request_id']
self.greenhouse_id = message.value['greenhouse_id']
self.command = message.value['command']
self.toggle_device(self.command, self.request_id, self.greenhouse_id)
2024-11-12 17:04:24 +04:00
@app.route(f'/webhook', methods=['POST'])
2024-11-12 16:18:14 +04:00
def webhook():
2024-11-13 00:53:31 +04:00
print("received webhook", request.args.get('id'))
2024-11-12 17:04:24 +04:00
for manager in managers:
2024-11-13 00:53:31 +04:00
print()
if int(request.args.get('id')) == manager._id and request.method == 'POST':
2024-11-12 17:04:24 +04:00
print("Data received from Webhook is", request.json)
body = request.json
for key, value in body.items():
setattr(manager, key, value)
2024-10-30 11:10:39 +04:00
manager.sendData()
2024-11-12 17:04:24 +04:00
return f"Webhook received for manager {manager._id}"
return "Webhook ignored"
2024-10-29 17:31:47 +04:00
2024-11-13 00:53:31 +04:00
t1 = threading.Thread(target=start_manager)
2024-11-12 17:04:24 +04:00
manager1 = Manager(_id=1)
2024-10-29 17:31:47 +04:00
managers = [manager1]
2024-10-28 14:47:20 +04:00
2024-11-12 16:18:14 +04:00
if __name__ == "__main__":
2024-11-13 00:53:31 +04:00
threading.Thread(target=lambda: app.run(host="0.0.0.0", port=20002, debug=True, use_reloader=False)).start()