Compare commits

...

21 Commits

Author SHA1 Message Date
the
5fa9c76b99 test changes 2024-11-13 01:14:32 +04:00
the
c032253699 фикс2 2024-11-13 01:00:16 +04:00
the
83aec339c9 фикс 2024-11-13 00:53:31 +04:00
the
b009ebdd0c АААААААААААА 2024-11-13 00:36:31 +04:00
the
73961934f0 СТОЛЬКО МУЧЕНИЙ РАДИ ОДНОЙ СТРОЧКИ, ЕЩЁ И НЕТВОРК НА САМОМ ДЕЛЕ НЕ РАБОТАЕТ 2024-11-13 00:31:21 +04:00
the
ffef39d409 СТОЛЬКО МУЧЕНИЙ РАДИ ОДНОЙ СТРОЧКИ 2024-11-13 00:30:33 +04:00
the
57e05aba90 kafka network unfix 2024-11-12 22:34:06 +04:00
the
ce3f3a4dc6 kafka network fix 2024-11-12 22:31:57 +04:00
the
4747f975c5 kafka network 2024-11-12 18:03:45 +04:00
the
f82c8daa92 detector webhooks 2024-11-12 17:04:24 +04:00
the
b7f4aa3f9f webhook fix 2024-11-12 16:19:37 +04:00
the
7f5262575e webhook test 2024-11-12 16:18:14 +04:00
3f5bb31646 Менеджер 2024-10-30 11:10:39 +04:00
1c38c61fbc о 2024-10-29 19:50:35 +04:00
091dcbd3a3 Управление вентилями/нагревателями 2024-10-29 19:49:33 +04:00
the
7f88f87722 fix, no more infinite loops for consumer 2024-10-29 17:51:20 +04:00
the
6291bb483c I MUST JOB 2024-10-29 17:31:47 +04:00
the
08ee12aa8b simple detector + manager 2024-10-29 16:07:02 +04:00
the
720cf4bd60 connection test 2024-10-28 19:43:13 +04:00
the
d16968bc98 compose + kafka ui 2024-10-28 14:47:20 +04:00
the
3fa35ba617 init 2024-10-28 13:14:46 +04:00
4 changed files with 303 additions and 0 deletions

View File

@ -0,0 +1,48 @@
from json import dumps
from json import dumps
class ManageController:
def __init__(self, producer, topic='commands'):
self.valve_state = "closed"
self.heater_state = "off"
self.producer = producer
self.topic = topic
def toggle_device(self, device, request_id, greenhouse_id):
if device == 'valve':
if self.valve_state == 'closed':
self.valve_state = 'open'
print("Valve opened")
else:
self.valve_state = 'closed'
print("Valve closed")
elif device == 'heater':
if self.heater_state == 'off':
self.heater_state = 'on'
print("Heater turned on")
else:
self.heater_state = 'off'
print("Heater turned off")
self.send_status(request_id, greenhouse_id)
def send_status(self, request_id, greenhouse_id):
status = {
'request_id': request_id,
'greenhouse_id': greenhouse_id,
'valve_state': self.valve_state,
'heater_state': self.heater_state
}
print(f"Sent device status: {status}")
return status

View File

@ -0,0 +1,39 @@
import time
import random as rnd
from flask import Flask
import requests
import threading
app = Flask(__name__)
class Detector:
def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower):
self.id = id
self.moistureThresholdUpper = moistureThresholdUpper
self.moistureThresholdLower = moistureThresholdLower
self.tempThresholdUpper = tempThresholdUpper
self.tempThresholdLower = tempThresholdLower
self.moisture = 0
self.temp = 0
def cycle(self):
self.moisture += rnd.random() / 100
self.temp += (rnd.random() - 0.5) / 100
def sendData(self):
data = {"moisture": self.moisture,
"temp": self.temp}
requests.post(f"http://127.0.0.1:20002/webhook?id={self.id}", json=data)
detector1 = Detector(1, 0.6, 0.2, 40, 20)
detectors = [detector1]
if __name__ =="__main__":
while True:
for detector in detectors:
detector.cycle()
detector.sendData()
time.sleep(1)

View File

@ -0,0 +1,136 @@
from kafka import KafkaProducer, KafkaConsumer
import kafka
import socket
from json import dumps, loads
from flask import Flask, request
import time
from enum import Enum
import threading
app = Flask(__name__)
def start_manager():
return
class Manager:
def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed",
heater_state: str = "off"):
self._id = _id
self.moisture = moisture
self.temp = temp
self.isAutoOn = isAutoOn
self.valve_state = valve_state
self.heater_state = heater_state
self.dataPublisher = KafkaProducer(
bootstrap_servers=['localhost:9092'],
client_id=f'manager{self._id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8')
)
self.controllerConsumer = KafkaConsumer(
'commands',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms=2000,
group_id=f'manager{self._id}',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
self.controllerConsumerResponse = KafkaProducer(
bootstrap_servers=['localhost:9092'],
client_id=f'manager{self._id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8')
)
def sendData(self):
print("sending data...")
message = {
'id': self._id,
'moisture': self.moisture,
'temp': self.temp,
'valveStatus': str(self.valve_state),
'heaterStatus': str(self.heater_state),
'isAutoOn': self.isAutoOn
}
print(message)
self.dataPublisher.send('data', message)
self.dataPublisher.flush()
def toggle_device(self, device, request_id, greenhouse_id):
if device == 'valve':
if self.valve_state == 'closed':
self.valve_state = 'open'
print("Valve opened")
else:
self.valve_state = 'closed'
print("Valve closed")
elif device == 'heater':
if self.heater_state == 'off':
self.heater_state = 'on'
print("Heater turned on")
else:
self.heater_state = 'off'
print("Heater turned off")
self.send_status(request_id, greenhouse_id)
def send_status(self, request_id, greenhouse_id):
status = {
'request_id': request_id,
'greenhouse_id': greenhouse_id,
'valve_state': self.valve_state,
'heater_state': self.heater_state
}
self.sendDataCommand(status)
print("Updating info...\n")
def sendDataCommand(self, message):
print("sending data...")
self.dataPublisher.send('response', message)
def getCommand(self):
messages = self.controllerConsumer.poll(timeout_ms=1000)
# Проверяем, есть ли сообщения
for tp, msgs in messages.items():
for message in msgs:
print(f"Manager {self._id} received message: ")
print(message.value)
self.request_id = message.value['request_id']
self.greenhouse_id = message.value['greenhouse_id']
self.command = message.value['command']
self.toggle_device(self.command, self.request_id, self.greenhouse_id)
@app.route(f'/webhook', methods=['POST'])
def webhook():
print("received webhook", request.args.get('id'))
for manager in managers:
print()
if int(request.args.get('id')) == manager._id and request.method == 'POST':
print("Data received from Webhook is", request.json)
body = request.json
for key, value in body.items():
setattr(manager, key, value)
manager.sendData()
return f"Webhook received for manager {manager._id}"
return "Webhook ignored"
t1 = threading.Thread(target=start_manager)
manager1 = Manager(_id=1)
managers = [manager1]
if __name__ == "__main__":
threading.Thread(target=lambda: app.run(host="0.0.0.0", port=20002, debug=True, use_reloader=False)).start()

80
docker-compose.yml Normal file
View File

@ -0,0 +1,80 @@
networks:
vpn:
name: kafkaVPN
driver: bridge
ipam:
config:
- subnet: "192.168.2.0/24"
gateway: "192.168.2.1"
services:
zookeeper:
networks:
- vpn
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
kafka:
networks:
vpn:
ipv4_address: 192.168.2.10
image: confluentinc/cp-kafka:7.4.0
ports:
- 9092:9092
- 9997:9997
expose:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: HOST://192.168.1.5:9092,DOCKER://kafka:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- zookeeper
init-kafka:
networks:
- vpn
image: confluentinc/cp-kafka:7.4.0
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
kafka-ui:
networks:
- vpn
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997