Cucumber/GreenhouseManager/manager.py

140 lines
4.3 KiB
Python

import os
from kafka import KafkaProducer, KafkaConsumer
import kafka
import socket
from json import dumps, loads
from flask import Flask, request
import time
from enum import Enum
import threading
app = Flask(__name__)
def start_manager():
return
class Manager:
def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed",
heater_state: str = "off"):
KAFKA_URL = os.environ.get('KAFKA_URL')
print("KAFKA_URL=", KAFKA_URL)
self._id = _id
self.moisture = moisture
self.temp = temp
self.isAutoOn = isAutoOn
self.valve_state = valve_state
self.heater_state = heater_state
self.dataPublisher = KafkaProducer(
bootstrap_servers=[KAFKA_URL],
client_id=f'manager{self._id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8')
)
self.controllerConsumer = KafkaConsumer(
'commands',
bootstrap_servers=[KAFKA_URL],
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms=2000,
group_id=f'manager{self._id}',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
self.controllerConsumerResponse = KafkaProducer(
bootstrap_servers=[KAFKA_URL],
client_id=f'manager{self._id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8')
)
def sendData(self):
print("sending data...")
message = {
'id': self._id,
'moisture': self.moisture,
'temp': self.temp,
'valveStatus': str(self.valve_state),
'heaterStatus': str(self.heater_state),
'isAutoOn': self.isAutoOn
}
print(message)
self.dataPublisher.send('data', message)
self.dataPublisher.flush()
def toggle_device(self, device, request_id, greenhouse_id):
if device == 'valve':
if self.valve_state == 'closed':
self.valve_state = 'open'
print("Valve opened")
else:
self.valve_state = 'closed'
print("Valve closed")
elif device == 'heater':
if self.heater_state == 'off':
self.heater_state = 'on'
print("Heater turned on")
else:
self.heater_state = 'off'
print("Heater turned off")
self.send_status(request_id, greenhouse_id)
def send_status(self, request_id, greenhouse_id):
status = {
'request_id': request_id,
'greenhouse_id': greenhouse_id,
'valve_state': self.valve_state,
'heater_state': self.heater_state
}
self.sendDataCommand(status)
print("Updating info...\n")
def sendDataCommand(self, message):
print("sending data...")
self.dataPublisher.send('response', message)
def getCommand(self):
messages = self.controllerConsumer.poll(timeout_ms=1000)
# Проверяем, есть ли сообщения
for tp, msgs in messages.items():
for message in msgs:
print(f"Manager {self._id} received message: ")
print(message.value)
self.request_id = message.value['request_id']
self.greenhouse_id = message.value['greenhouse_id']
self.command = message.value['command']
self.toggle_device(self.command, self.request_id, self.greenhouse_id)
@app.route(f'/webhook', methods=['POST'])
def webhook():
print("received webhook", request.args.get('id'))
for manager in managers:
print()
if int(request.args.get('id')) == manager._id and request.method == 'POST':
print("Data received from Webhook is", request.json)
body = request.json
for key, value in body.items():
setattr(manager, key, value)
manager.sendData()
return f"Webhook received for manager {manager._id}"
return "Webhook ignored"
t1 = threading.Thread(target=start_manager)
manager1 = Manager(_id=1)
managers = [manager1]
if __name__ == "__main__":
threading.Thread(target=lambda: app.run(host="0.0.0.0", port=20002, debug=True, use_reloader=False)).start()