From 3fa35ba61714f3eb28686a87ff977383b32f91ec Mon Sep 17 00:00:00 2001 From: the Date: Mon, 28 Oct 2024 13:14:46 +0400 Subject: [PATCH 1/8] init --- GreenhouseDetector/detector.py | 0 GreenhouseManager/manager.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 GreenhouseDetector/detector.py create mode 100644 GreenhouseManager/manager.py diff --git a/GreenhouseDetector/detector.py b/GreenhouseDetector/detector.py new file mode 100644 index 0000000..e69de29 diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py new file mode 100644 index 0000000..e69de29 From d16968bc988158596272610307fe9a4d9a91abfc Mon Sep 17 00:00:00 2001 From: the Date: Mon, 28 Oct 2024 14:47:20 +0400 Subject: [PATCH 2/8] compose + kafka ui --- GreenhouseManager/manager.py | 7 +++++ docker-compose.yml | 58 ++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+) create mode 100644 docker-compose.yml diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index e69de29..72b3332 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -0,0 +1,7 @@ +from kafka import KafkaProducer, KafkaConsumer +import kafka +import socket + +consumer = KafkaConsumer(bootstrap_servers = ['localhost:9092']) + +print(consumer.topics()) diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..0683811 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,58 @@ +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.4.0 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 2181:2181 + + kafka: + image: confluentinc/cp-kafka:7.4.0 + ports: + - 9092:9092 + - 9997:9997 + expose: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + depends_on: + - zookeeper + + init-kafka: + image: confluentinc/cp-kafka:7.4.0 + depends_on: + - kafka + entrypoint: [ '/bin/sh', '-c' ] + command: | + " + # blocks until kafka is reachable + kafka-topics --bootstrap-server kafka:29092 --list + + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server kafka:29092 --list + " + + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + depends_on: + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_METRICS_PORT: 9997 \ No newline at end of file From 720cf4bd60153d776f563138b226ba28b8a64d81 Mon Sep 17 00:00:00 2001 From: the Date: Mon, 28 Oct 2024 19:43:13 +0400 Subject: [PATCH 3/8] connection test --- GreenhouseManager/manager.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index 72b3332..74fed03 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -1,7 +1,27 @@ from kafka import KafkaProducer, KafkaConsumer import kafka import socket +from json import dumps, loads +import time -consumer = KafkaConsumer(bootstrap_servers = ['localhost:9092']) +consumer = KafkaConsumer( + 'commands', + bootstrap_servers=['localhost:9092'], + auto_offset_reset='earliest', + enable_auto_commit=True, + group_id='my-group', + value_deserializer=lambda x: loads(x.decode('utf-8'))) print(consumer.topics()) +consumer.subscribe(['commands']) +producer = KafkaProducer(bootstrap_servers = ['localhost:9092'], + value_serializer=lambda x: + dumps(x).encode('utf-8')) + +data = {'message' : 'hello'} +producer.send('commands', value=data) + +while True: + for message in consumer: + print(message) + time.sleep(1) \ No newline at end of file From 08ee12aa8bef22ec1e987965a8a606e45b8d597a Mon Sep 17 00:00:00 2001 From: the Date: Tue, 29 Oct 2024 16:07:02 +0400 Subject: [PATCH 4/8] simple detector + manager --- GreenhouseDetector/detector.py | 40 ++++++++++++++++++++++++++++++++++ GreenhouseManager/manager.py | 18 ++++----------- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/GreenhouseDetector/detector.py b/GreenhouseDetector/detector.py index e69de29..1ecb671 100644 --- a/GreenhouseDetector/detector.py +++ b/GreenhouseDetector/detector.py @@ -0,0 +1,40 @@ +from random import random +from turtledemo.penrose import start + +from kafka import KafkaProducer, KafkaConsumer +import kafka +import socket +from json import dumps, loads +import time +import random as rnd + +class Detector: + def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower): + self.id = id + self.moistureThresholdUpper = moistureThresholdUpper + self.moistureThresholdLower = moistureThresholdLower + self.tempThresholdUpper = tempThresholdUpper + self.tempThresholdLower = tempThresholdLower + self.moisture = 0 + self.temp = 0 + + self.producer = KafkaProducer( + bootstrap_servers=['localhost:9092'], + client_id=f'detector{self.id}', + value_serializer=lambda v: dumps(v).encode('utf-8') + ) + + def sendData(self): + message = {'id' : self.id, + 'moisture': self.moisture, + 'temperature' : self.temp } + self.producer.send('data', message) + + def cycle(self): + self.moisture += rnd.random() / 100 + self.temp += (rnd.random() - 0.5) / 100 + + +while True: + time.sleep(1) + diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index 74fed03..dbda11b 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -4,24 +4,14 @@ import socket from json import dumps, loads import time -consumer = KafkaConsumer( - 'commands', +dataConsumer = KafkaConsumer( + 'data', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', value_deserializer=lambda x: loads(x.decode('utf-8'))) -print(consumer.topics()) -consumer.subscribe(['commands']) -producer = KafkaProducer(bootstrap_servers = ['localhost:9092'], - value_serializer=lambda x: - dumps(x).encode('utf-8')) - -data = {'message' : 'hello'} -producer.send('commands', value=data) - while True: - for message in consumer: - print(message) - time.sleep(1) \ No newline at end of file + for message in dataConsumer: + print(message) \ No newline at end of file From 6291bb483c4fef9806732b330e5f9e7eafda94ed Mon Sep 17 00:00:00 2001 From: the Date: Tue, 29 Oct 2024 17:31:47 +0400 Subject: [PATCH 5/8] I MUST JOB --- GreenhouseDetector/detector.py | 10 +++++- GreenhouseManager/manager.py | 65 +++++++++++++++++++++++++++++----- docker-compose.yml | 1 + 3 files changed, 66 insertions(+), 10 deletions(-) diff --git a/GreenhouseDetector/detector.py b/GreenhouseDetector/detector.py index 1ecb671..728ec1d 100644 --- a/GreenhouseDetector/detector.py +++ b/GreenhouseDetector/detector.py @@ -28,13 +28,21 @@ class Detector: message = {'id' : self.id, 'moisture': self.moisture, 'temperature' : self.temp } - self.producer.send('data', message) + self.producer.send('dataDetectors', message) def cycle(self): self.moisture += rnd.random() / 100 self.temp += (rnd.random() - 0.5) / 100 +detector1 = Detector(1, 0.6, 0.2, 40, 20) +detector2 = Detector(2, 0.7, 0.3, 40, 20) +detector3 = Detector(3, 0.9, 0.6, 40, 20) + +detectors = [detector1, detector2, detector3] while True: + for detector in detectors: + detector.cycle() + detector.sendData() time.sleep(1) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index dbda11b..4d1d836 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -3,15 +3,62 @@ import kafka import socket from json import dumps, loads import time +from enum import Enum -dataConsumer = KafkaConsumer( - 'data', - bootstrap_servers=['localhost:9092'], - auto_offset_reset='earliest', - enable_auto_commit=True, - group_id='my-group', - value_deserializer=lambda x: loads(x.decode('utf-8'))) +class Status(Enum): + UNKNOWN = -1 + OFF = 0 + ON = 1 + +class Manager: + def __init__(self, id : int, moisture : float = 0, temp : float = 20, valveStatus : Status = Status.UNKNOWN, heaterStatus : Status = Status.UNKNOWN, isAutoOn : bool = False): + self.id = id + self.moisture = moisture + self.temp = temp + self.valveStatus = valveStatus + self.heaterStatus = heaterStatus + self.isAutoOn = isAutoOn + + self.dataPublisher = KafkaProducer( + bootstrap_servers=['localhost:9092'], + client_id=f'manager{id}', + value_serializer=lambda v: dumps(v).encode('utf-8') + ) + + self.detectorConsumer = KafkaConsumer( + 'dataDetectors', + bootstrap_servers=['localhost:9092'], + auto_offset_reset='earliest', + enable_auto_commit=True, + group_id=f'manager{id}', + value_deserializer=lambda x: loads(x.decode('utf-8')) + ) + + def update(self): + for message in self.detectorConsumer: + if message.value['id'] == self.id: + print(f"Manager {self.id} received message: ") + print(message.value) + print("Updating info...\n") + self.moisture = message.value['moisture'] + self.temp = message.value['temperature'] + + dataMessage = { + 'id' : self.id, + 'moisture' : self.moisture, + 'temp' : self.temp, + 'valveStatus': str(self.valveStatus), + 'heaterStatus': str(self.heaterStatus), + 'isAutoOn' : self.isAutoOn + } + + self.dataPublisher.send('data', dataMessage) + +manager1 = Manager(id = 1) + +managers = [manager1] while True: - for message in dataConsumer: - print(message) \ No newline at end of file + time.sleep(1) + for manager in managers: + manager.update() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 0683811..995a9e2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,6 +39,7 @@ services: echo -e 'Creating kafka topics' kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic dataDetectors --replication-factor 1 --partitions 1 kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 echo -e 'Successfully created the following topics:' From 7f88f87722aa05768a11fd3df72fc13e29fa0860 Mon Sep 17 00:00:00 2001 From: the Date: Tue, 29 Oct 2024 17:51:20 +0400 Subject: [PATCH 6/8] fix, no more infinite loops for consumer --- GreenhouseManager/manager.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index 4d1d836..5c2b975 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -21,7 +21,7 @@ class Manager: self.dataPublisher = KafkaProducer( bootstrap_servers=['localhost:9092'], - client_id=f'manager{id}', + client_id=f'manager{id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) @@ -30,6 +30,7 @@ class Manager: bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, + consumer_timeout_ms = 1000, group_id=f'manager{id}', value_deserializer=lambda x: loads(x.decode('utf-8')) ) @@ -39,20 +40,25 @@ class Manager: if message.value['id'] == self.id: print(f"Manager {self.id} received message: ") print(message.value) - print("Updating info...\n") self.moisture = message.value['moisture'] self.temp = message.value['temperature'] + print("Updating info...\n") - dataMessage = { - 'id' : self.id, - 'moisture' : self.moisture, - 'temp' : self.temp, + self.sendData() + + def sendData(self): + print("sending data...") + message = { + 'id': self.id, + 'moisture': self.moisture, + 'temp': self.temp, 'valveStatus': str(self.valveStatus), 'heaterStatus': str(self.heaterStatus), - 'isAutoOn' : self.isAutoOn + 'isAutoOn': self.isAutoOn } - self.dataPublisher.send('data', dataMessage) + print(message) + self.dataPublisher.send('data', message) manager1 = Manager(id = 1) From 091dcbd3a3bbda0af7b0b00ca439483c69e10476 Mon Sep 17 00:00:00 2001 From: dimazhelovanov Date: Tue, 29 Oct 2024 19:49:33 +0400 Subject: [PATCH 7/8] =?UTF-8?q?=D0=A3=D0=BF=D1=80=D0=B0=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=20=D0=B2=D0=B5=D0=BD=D1=82=D0=B8=D0=BB=D1=8F?= =?UTF-8?q?=D0=BC=D0=B8/=D0=BD=D0=B0=D0=B3=D1=80=D0=B5=D0=B2=D0=B0=D1=82?= =?UTF-8?q?=D0=B5=D0=BB=D1=8F=D0=BC=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- GreenhouseController/ManageController.py | 47 ++++++++++++++++++++++++ GreenhouseManager/manager.py | 3 +- 2 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 GreenhouseController/ManageController.py diff --git a/GreenhouseController/ManageController.py b/GreenhouseController/ManageController.py new file mode 100644 index 0000000..37f60b5 --- /dev/null +++ b/GreenhouseController/ManageController.py @@ -0,0 +1,47 @@ +from json import dumps + + + +from json import dumps + +class ManageController: + def __init__(self, producer, topic='commands'): + self.valve_state = "closed" + self.heater_state = "off" + self.producer = producer + self.topic = topic + + def toggle_device(self, device, request_id, greenhouse_id): + + + if device == 'valve': + + if self.valve_state == 'closed': + self.valve_state = 'open' + print("Valve opened") + else: + self.valve_state = 'closed' + print("Valve closed") + + elif device == 'heater': + + if self.heater_state == 'off': + self.heater_state = 'on' + print("Heater turned on") + else: + self.heater_state = 'off' + print("Heater turned off") + + + self.send_status(request_id, greenhouse_id) + + def send_status(self, request_id, greenhouse_id): + + status = { + 'request_id': request_id, + 'greenhouse_id': greenhouse_id, + 'valve_state': self.valve_state, + 'heater_state': self.heater_state + } + self.producer.send(self.topic, value=status) + print(f"Sent device status: {status}") \ No newline at end of file diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index 74fed03..9e96558 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -3,7 +3,8 @@ import kafka import socket from json import dumps, loads import time - +from ManageController import ManagerController +greenhouse_controller = GreenhouseController(producer) consumer = KafkaConsumer( 'commands', bootstrap_servers=['localhost:9092'], From 3f5bb31646a19fa7126f0a1a581dd5f1e64cac76 Mon Sep 17 00:00:00 2001 From: dimazhelovanov Date: Wed, 30 Oct 2024 11:10:39 +0400 Subject: [PATCH 8/8] =?UTF-8?q?=D0=9C=D0=B5=D0=BD=D0=B5=D0=B4=D0=B6=D0=B5?= =?UTF-8?q?=D1=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- GreenhouseController/ManageController.py | 5 +- GreenhouseManager/manager.py | 98 +++++++++++++++++++++--- 2 files changed, 89 insertions(+), 14 deletions(-) diff --git a/GreenhouseController/ManageController.py b/GreenhouseController/ManageController.py index 37f60b5..2d9ad10 100644 --- a/GreenhouseController/ManageController.py +++ b/GreenhouseController/ManageController.py @@ -43,5 +43,6 @@ class ManageController: 'valve_state': self.valve_state, 'heater_state': self.heater_state } - self.producer.send(self.topic, value=status) - print(f"Sent device status: {status}") \ No newline at end of file + + print(f"Sent device status: {status}") + return status \ No newline at end of file diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index 5c2b975..c082d04 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -4,20 +4,23 @@ import socket from json import dumps, loads import time from enum import Enum +from GreenhouseDetector.detector import Detector class Status(Enum): UNKNOWN = -1 OFF = 0 ON = 1 + class Manager: - def __init__(self, id : int, moisture : float = 0, temp : float = 20, valveStatus : Status = Status.UNKNOWN, heaterStatus : Status = Status.UNKNOWN, isAutoOn : bool = False): + def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", + heater_state: str = "off"): self.id = id self.moisture = moisture self.temp = temp - self.valveStatus = valveStatus - self.heaterStatus = heaterStatus self.isAutoOn = isAutoOn + self.valve_state = valve_state + self.heater_state = heater_state self.dataPublisher = KafkaProducer( bootstrap_servers=['localhost:9092'], @@ -25,19 +28,33 @@ class Manager: value_serializer=lambda v: dumps(v).encode('utf-8') ) - self.detectorConsumer = KafkaConsumer( - 'dataDetectors', + # self.detectorConsumer = KafkaConsumer( + # 'dataDetectors', + # bootstrap_servers=['localhost:9092'], + # auto_offset_reset='earliest', + # enable_auto_commit=True, + # consumer_timeout_ms=1000, + #group_id=f'manager{id}', + # value_deserializer=lambda x: loads(x.decode('utf-8')) + #) + self.controllerConsumer = KafkaConsumer( + 'commands', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, - consumer_timeout_ms = 1000, + consumer_timeout_ms=2000, group_id=f'manager{id}', value_deserializer=lambda x: loads(x.decode('utf-8')) ) + self.controllerConsumerResponse = KafkaProducer( + bootstrap_servers=['localhost:9092'], + client_id=f'manager{id}_producer', + value_serializer=lambda v: dumps(v).encode('utf-8') + ) def update(self): for message in self.detectorConsumer: - if message.value['id'] == self.id: + print(f"Manager {self.id} received message: ") print(message.value) self.moisture = message.value['moisture'] @@ -52,19 +69,76 @@ class Manager: 'id': self.id, 'moisture': self.moisture, 'temp': self.temp, - 'valveStatus': str(self.valveStatus), - 'heaterStatus': str(self.heaterStatus), + 'valveStatus': str(self.valve_state), + 'heaterStatus': str(self.heater_state), 'isAutoOn': self.isAutoOn } print(message) self.dataPublisher.send('data', message) -manager1 = Manager(id = 1) + def toggle_device(self, device, request_id, greenhouse_id): + + if device == 'valve': + + if self.valve_state == 'closed': + self.valve_state = 'open' + print("Valve opened") + else: + self.valve_state = 'closed' + print("Valve closed") + + elif device == 'heater': + + if self.heater_state == 'off': + self.heater_state = 'on' + print("Heater turned on") + else: + self.heater_state = 'off' + print("Heater turned off") + + self.send_status(request_id, greenhouse_id) + + def send_status(self, request_id, greenhouse_id): + + status = { + 'request_id': request_id, + 'greenhouse_id': greenhouse_id, + 'valve_state': self.valve_state, + 'heater_state': self.heater_state + } + self.sendDataCommand(status) + print("Updating info...\n") + + def sendDataCommand(self, message): + print("sending data...") + + self.dataPublisher.send('response', message) + + def getCommand(self): + messages = self.controllerConsumer.poll(timeout_ms=1000) + + # Проверяем, есть ли сообщения + + for tp, msgs in messages.items(): + for message in msgs: + print(f"Manager {self.id} received message: ") + print(message.value) + self.request_id = message.value['request_id'] + self.greenhouse_id = message.value['greenhouse_id'] + self.command = message.value['command'] + self.toggle_device(self.command, self.request_id, self.greenhouse_id) + + + +manager1 = Manager(id=1) managers = [manager1] + while True: - time.sleep(1) + time.sleep(5) + manager1.sendData() for manager in managers: - manager.update() \ No newline at end of file + + manager.getCommand()