From 7f5262575e81e24965fc48f3b9b89cec0d4640d1 Mon Sep 17 00:00:00 2001 From: the Date: Tue, 12 Nov 2024 16:18:14 +0400 Subject: [PATCH 01/14] webhook test --- GreenhouseDetector/detector.py | 46 ++++++++++++++-------------------- GreenhouseManager/manager.py | 34 ++++++++++--------------- 2 files changed, 32 insertions(+), 48 deletions(-) diff --git a/GreenhouseDetector/detector.py b/GreenhouseDetector/detector.py index 728ec1d..57545e9 100644 --- a/GreenhouseDetector/detector.py +++ b/GreenhouseDetector/detector.py @@ -1,13 +1,19 @@ -from random import random -from turtledemo.penrose import start - -from kafka import KafkaProducer, KafkaConsumer -import kafka -import socket -from json import dumps, loads import time import random as rnd +from flask import Flask, jsonify +import requests +import threading + +app = Flask(__name__) + +def start_detector(): + while True: + for detector in detectors: + detector.cycle() + detector.sendData() + time.sleep(1) + class Detector: def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower): self.id = id @@ -18,31 +24,17 @@ class Detector: self.moisture = 0 self.temp = 0 - self.producer = KafkaProducer( - bootstrap_servers=['localhost:9092'], - client_id=f'detector{self.id}', - value_serializer=lambda v: dumps(v).encode('utf-8') - ) - - def sendData(self): - message = {'id' : self.id, - 'moisture': self.moisture, - 'temperature' : self.temp } - self.producer.send('dataDetectors', message) - def cycle(self): self.moisture += rnd.random() / 100 self.temp += (rnd.random() - 0.5) / 100 detector1 = Detector(1, 0.6, 0.2, 40, 20) -detector2 = Detector(2, 0.7, 0.3, 40, 20) -detector3 = Detector(3, 0.9, 0.6, 40, 20) -detectors = [detector1, detector2, detector3] +detectors = [detector1] -while True: - for detector in detectors: - detector.cycle() - detector.sendData() - time.sleep(1) +t1 = threading.Thread(target=start_detector) + +if __name__ =="__main__": + t1.start() + app.run(host='0.0.0.0', port=20001, debug=True) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index c082d04..b3d21bf 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -2,15 +2,15 @@ from kafka import KafkaProducer, KafkaConsumer import kafka import socket from json import dumps, loads +from flask import Flask, request import time from enum import Enum -from GreenhouseDetector.detector import Detector +import threading -class Status(Enum): - UNKNOWN = -1 - OFF = 0 - ON = 1 +app = Flask(__name__) +def start_manager(): + return class Manager: def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", @@ -28,15 +28,6 @@ class Manager: value_serializer=lambda v: dumps(v).encode('utf-8') ) - # self.detectorConsumer = KafkaConsumer( - # 'dataDetectors', - # bootstrap_servers=['localhost:9092'], - # auto_offset_reset='earliest', - # enable_auto_commit=True, - # consumer_timeout_ms=1000, - #group_id=f'manager{id}', - # value_deserializer=lambda x: loads(x.decode('utf-8')) - #) self.controllerConsumer = KafkaConsumer( 'commands', bootstrap_servers=['localhost:9092'], @@ -129,16 +120,17 @@ class Manager: self.command = message.value['command'] self.toggle_device(self.command, self.request_id, self.greenhouse_id) - +@app.route('/webhook', methods=['POST']) +def webhook(): + if request.method == 'POST': + print("Data received from Webhook is") + return "Webhook received" manager1 = Manager(id=1) managers = [manager1] +t1 = threading.Thread(target=start_manager) -while True: - time.sleep(5) - manager1.sendData() - for manager in managers: - - manager.getCommand() +if __name__ == "__main__": + app.run(host="0.0.0.0", port=20002) \ No newline at end of file From b7f4aa3f9f145a1e5c14fa63a97cf9af1a50b975 Mon Sep 17 00:00:00 2001 From: the Date: Tue, 12 Nov 2024 16:19:37 +0400 Subject: [PATCH 02/14] webhook fix --- GreenhouseManager/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index b3d21bf..1152700 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -123,7 +123,7 @@ class Manager: @app.route('/webhook', methods=['POST']) def webhook(): if request.method == 'POST': - print("Data received from Webhook is") + print("Data received from Webhook is", request.json) return "Webhook received" manager1 = Manager(id=1) From f82c8daa92541b2fdc60878a64ba978c8535e92a Mon Sep 17 00:00:00 2001 From: the Date: Tue, 12 Nov 2024 17:04:24 +0400 Subject: [PATCH 03/14] detector webhooks --- GreenhouseDetector/detector.py | 23 +++++++++---------- GreenhouseManager/manager.py | 41 +++++++++++++++------------------- 2 files changed, 29 insertions(+), 35 deletions(-) diff --git a/GreenhouseDetector/detector.py b/GreenhouseDetector/detector.py index 57545e9..d092373 100644 --- a/GreenhouseDetector/detector.py +++ b/GreenhouseDetector/detector.py @@ -1,19 +1,12 @@ import time import random as rnd -from flask import Flask, jsonify +from flask import Flask import requests import threading app = Flask(__name__) -def start_detector(): - while True: - for detector in detectors: - detector.cycle() - detector.sendData() - time.sleep(1) - class Detector: def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower): self.id = id @@ -28,13 +21,19 @@ class Detector: self.moisture += rnd.random() / 100 self.temp += (rnd.random() - 0.5) / 100 + def sendData(self): + data = {"moisture": self.moisture, + "temp": self.temp} + requests.post(f"http://127.0.0.1:20002/webhook?id={self.id}", json=data) + detector1 = Detector(1, 0.6, 0.2, 40, 20) detectors = [detector1] -t1 = threading.Thread(target=start_detector) - if __name__ =="__main__": - t1.start() - app.run(host='0.0.0.0', port=20001, debug=True) + while True: + for detector in detectors: + detector.cycle() + detector.sendData() + time.sleep(1) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index 1152700..457c483 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -13,9 +13,9 @@ def start_manager(): return class Manager: - def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", + def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", heater_state: str = "off"): - self.id = id + self._id = _id self.moisture = moisture self.temp = temp self.isAutoOn = isAutoOn @@ -24,7 +24,7 @@ class Manager: self.dataPublisher = KafkaProducer( bootstrap_servers=['localhost:9092'], - client_id=f'manager{id}_producer', + client_id=f'manager{self._id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) @@ -34,30 +34,19 @@ class Manager: auto_offset_reset='earliest', enable_auto_commit=True, consumer_timeout_ms=2000, - group_id=f'manager{id}', + group_id=f'manager{self._id}', value_deserializer=lambda x: loads(x.decode('utf-8')) ) self.controllerConsumerResponse = KafkaProducer( bootstrap_servers=['localhost:9092'], - client_id=f'manager{id}_producer', + client_id=f'manager{self._id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) - def update(self): - for message in self.detectorConsumer: - - print(f"Manager {self.id} received message: ") - print(message.value) - self.moisture = message.value['moisture'] - self.temp = message.value['temperature'] - print("Updating info...\n") - - self.sendData() - def sendData(self): print("sending data...") message = { - 'id': self.id, + 'id': self._id, 'moisture': self.moisture, 'temp': self.temp, 'valveStatus': str(self.valve_state), @@ -113,21 +102,27 @@ class Manager: for tp, msgs in messages.items(): for message in msgs: - print(f"Manager {self.id} received message: ") + print(f"Manager {self._id} received message: ") print(message.value) self.request_id = message.value['request_id'] self.greenhouse_id = message.value['greenhouse_id'] self.command = message.value['command'] self.toggle_device(self.command, self.request_id, self.greenhouse_id) -@app.route('/webhook', methods=['POST']) +@app.route(f'/webhook', methods=['POST']) def webhook(): - if request.method == 'POST': - print("Data received from Webhook is", request.json) - return "Webhook received" + for manager in managers: + if request.args.get('id') == manager._id and request.method == 'POST': + print("Data received from Webhook is", request.json) -manager1 = Manager(id=1) + body = request.json + for key, value in body.items(): + setattr(manager, key, value) + return f"Webhook received for manager {manager._id}" + return "Webhook ignored" + +manager1 = Manager(_id=1) managers = [manager1] t1 = threading.Thread(target=start_manager) From 4747f975c53d19befc73d28da032cac98f0ae69f Mon Sep 17 00:00:00 2001 From: the Date: Tue, 12 Nov 2024 18:03:45 +0400 Subject: [PATCH 04/14] kafka network --- docker-compose.yml | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 995a9e2..e05fdef 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,7 @@ services: zookeeper: + networks: + - vpn image: confluentinc/cp-zookeeper:7.4.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 @@ -8,17 +10,18 @@ services: - 2181:2181 kafka: + networks: + - vpn image: confluentinc/cp-kafka:7.4.0 ports: - - 9092:9092 - - 9997:9997 - expose: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://157.245.80.232:29093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000 @@ -27,6 +30,8 @@ services: - zookeeper init-kafka: + networks: + - vpn image: confluentinc/cp-kafka:7.4.0 depends_on: - kafka @@ -34,19 +39,20 @@ services: command: | " # blocks until kafka is reachable - kafka-topics --bootstrap-server kafka:29092 --list + kafka-topics --bootstrap-server kafka:9092 --list echo -e 'Creating kafka topics' - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic dataDetectors --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 echo -e 'Successfully created the following topics:' - kafka-topics --bootstrap-server kafka:29092 --list + kafka-topics --bootstrap-server kafka:9092 --list " kafka-ui: + networks: + - vpn container_name: kafka-ui image: provectuslabs/kafka-ui:latest ports: @@ -56,4 +62,8 @@ services: environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 - KAFKA_CLUSTERS_0_METRICS_PORT: 9997 \ No newline at end of file + KAFKA_CLUSTERS_0_METRICS_PORT: 9997 + +networks: + vpn: + name: kafkaVPN \ No newline at end of file From ce3f3a4dc64ed5a987f4194f6707f0e6bbea85a5 Mon Sep 17 00:00:00 2001 From: the Date: Tue, 12 Nov 2024 22:31:57 +0400 Subject: [PATCH 05/14] kafka network fix --- docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index e05fdef..f17fc16 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,8 +18,8 @@ services: environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092 - KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://157.245.80.232:29093 + KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29092,INTERNAL://:9092 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://157.245.80.232:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 From 57e05aba909768bf6518fd37849150b6c52b6f86 Mon Sep 17 00:00:00 2001 From: the Date: Tue, 12 Nov 2024 22:34:06 +0400 Subject: [PATCH 06/14] kafka network unfix --- docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index f17fc16..e05fdef 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,8 +18,8 @@ services: environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29092,INTERNAL://:9092 - KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://157.245.80.232:29092 + KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://157.245.80.232:29093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 From ffef39d409773dde7850fdd759f5ac5aaf2ba007 Mon Sep 17 00:00:00 2001 From: the Date: Wed, 13 Nov 2024 00:30:33 +0400 Subject: [PATCH 07/14] =?UTF-8?q?=D0=A1=D0=A2=D0=9E=D0=9B=D0=AC=D0=9A?= =?UTF-8?q?=D0=9E=20=D0=9C=D0=A3=D0=A7=D0=95=D0=9D=D0=98=D0=99=20=D0=A0?= =?UTF-8?q?=D0=90=D0=94=D0=98=20=D0=9E=D0=94=D0=9D=D0=9E=D0=99=20=D0=A1?= =?UTF-8?q?=D0=A2=D0=A0=D0=9E=D0=A7=D0=9A=D0=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- GreenhouseManager/manager.py | 3 +++ docker-compose.yml | 47 ++++++++++++++++++++---------------- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index 457c483..794fbff 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -56,6 +56,7 @@ class Manager: print(message) self.dataPublisher.send('data', message) + self.dataPublisher.flush() def toggle_device(self, device, request_id, greenhouse_id): @@ -119,6 +120,8 @@ def webhook(): for key, value in body.items(): setattr(manager, key, value) + manager.sendData() + return f"Webhook received for manager {manager._id}" return "Webhook ignored" diff --git a/docker-compose.yml b/docker-compose.yml index e05fdef..a0a4521 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ services: zookeeper: - networks: - - vpn +# networks: +# - vpn image: confluentinc/cp-zookeeper:7.4.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 @@ -10,18 +10,23 @@ services: - 2181:2181 kafka: - networks: - - vpn +# networks: +# - vpn image: confluentinc/cp-kafka:7.4.0 ports: - - 29092:29092 + - 9092:9092 + - 9997:9997 + + expose: + - 29092:29092 + environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: EXTERNAL_SAME_HOST://:29092,EXTERNAL_DIFFERENT_HOST://:29093,INTERNAL://:9092 - KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092,EXTERNAL_DIFFERENT_HOST://157.245.80.232:29093 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT,EXTERNAL_DIFFERENT_HOST:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092 + KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000 @@ -30,8 +35,8 @@ services: - zookeeper init-kafka: - networks: - - vpn +# networks: +# - vpn image: confluentinc/cp-kafka:7.4.0 depends_on: - kafka @@ -39,20 +44,20 @@ services: command: | " # blocks until kafka is reachable - kafka-topics --bootstrap-server kafka:9092 --list + kafka-topics --bootstrap-server kafka:29092 --list echo -e 'Creating kafka topics' - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 echo -e 'Successfully created the following topics:' - kafka-topics --bootstrap-server kafka:9092 --list + kafka-topics --bootstrap-server kafka:29092 --list " kafka-ui: - networks: - - vpn +# networks: +# - vpn container_name: kafka-ui image: provectuslabs/kafka-ui:latest ports: @@ -64,6 +69,6 @@ services: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 -networks: - vpn: - name: kafkaVPN \ No newline at end of file +#networks: +# vpn: +# name: kafkaVPN \ No newline at end of file From 73961934f0081de042370a133fd2fee49f94b2aa Mon Sep 17 00:00:00 2001 From: the Date: Wed, 13 Nov 2024 00:31:21 +0400 Subject: [PATCH 08/14] =?UTF-8?q?=D0=A1=D0=A2=D0=9E=D0=9B=D0=AC=D0=9A?= =?UTF-8?q?=D0=9E=20=D0=9C=D0=A3=D0=A7=D0=95=D0=9D=D0=98=D0=99=20=D0=A0?= =?UTF-8?q?=D0=90=D0=94=D0=98=20=D0=9E=D0=94=D0=9D=D0=9E=D0=99=20=D0=A1?= =?UTF-8?q?=D0=A2=D0=A0=D0=9E=D0=A7=D0=9A=D0=98,=20=D0=95=D0=A9=D0=81=20?= =?UTF-8?q?=D0=98=20=D0=9D=D0=95=D0=A2=D0=92=D0=9E=D0=A0=D0=9A=20=D0=9D?= =?UTF-8?q?=D0=90=20=D0=A1=D0=90=D0=9C=D0=9E=D0=9C=20=D0=94=D0=95=D0=9B?= =?UTF-8?q?=D0=95=20=D0=9D=D0=95=20=D0=A0=D0=90=D0=91=D0=9E=D0=A2=D0=90?= =?UTF-8?q?=D0=95=D0=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- GreenhouseManager/manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index 794fbff..efee642 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -58,6 +58,7 @@ class Manager: self.dataPublisher.send('data', message) self.dataPublisher.flush() + def toggle_device(self, device, request_id, greenhouse_id): if device == 'valve': From b009ebdd0ced34f1628dc368c28b8aaa88318797 Mon Sep 17 00:00:00 2001 From: the Date: Wed, 13 Nov 2024 00:36:31 +0400 Subject: [PATCH 09/14] =?UTF-8?q?=D0=90=D0=90=D0=90=D0=90=D0=90=D0=90?= =?UTF-8?q?=D0=90=D0=90=D0=90=D0=90=D0=90=D0=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index a0a4521..56659ba 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,7 +24,7 @@ services: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092 - KAFKA_ADVERTISED_LISTENERS: HOST://localhost:9092,DOCKER://kafka:29092 + KAFKA_ADVERTISED_LISTENERS: HOST://192.168.1.5:9092,DOCKER://kafka:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 From 83aec339c9875de463d6d3fadd72519d0ae528d9 Mon Sep 17 00:00:00 2001 From: the Date: Wed, 13 Nov 2024 00:53:31 +0400 Subject: [PATCH 10/14] =?UTF-8?q?=D1=84=D0=B8=D0=BA=D1=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- GreenhouseManager/manager.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index efee642..d9772d1 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -113,8 +113,10 @@ class Manager: @app.route(f'/webhook', methods=['POST']) def webhook(): + print("received webhook", request.args.get('id')) for manager in managers: - if request.args.get('id') == manager._id and request.method == 'POST': + print() + if int(request.args.get('id')) == manager._id and request.method == 'POST': print("Data received from Webhook is", request.json) body = request.json @@ -126,10 +128,9 @@ def webhook(): return f"Webhook received for manager {manager._id}" return "Webhook ignored" +t1 = threading.Thread(target=start_manager) manager1 = Manager(_id=1) managers = [manager1] -t1 = threading.Thread(target=start_manager) - if __name__ == "__main__": - app.run(host="0.0.0.0", port=20002) \ No newline at end of file + threading.Thread(target=lambda: app.run(host="0.0.0.0", port=20002, debug=True, use_reloader=False)).start() \ No newline at end of file From c0322536999c239e455efbadcb1724d174c8c6c7 Mon Sep 17 00:00:00 2001 From: the Date: Wed, 13 Nov 2024 01:00:16 +0400 Subject: [PATCH 11/14] =?UTF-8?q?=D1=84=D0=B8=D0=BA=D1=812?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker-compose.yml | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 56659ba..32dc5a9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ services: zookeeper: -# networks: -# - vpn + networks: + - vpn image: confluentinc/cp-zookeeper:7.4.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 @@ -10,8 +10,8 @@ services: - 2181:2181 kafka: -# networks: -# - vpn + networks: + - vpn image: confluentinc/cp-kafka:7.4.0 ports: - 9092:9092 @@ -35,8 +35,8 @@ services: - zookeeper init-kafka: -# networks: -# - vpn + networks: + - vpn image: confluentinc/cp-kafka:7.4.0 depends_on: - kafka @@ -56,8 +56,8 @@ services: " kafka-ui: -# networks: -# - vpn + networks: + - vpn container_name: kafka-ui image: provectuslabs/kafka-ui:latest ports: @@ -69,6 +69,6 @@ services: KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 -#networks: -# vpn: -# name: kafkaVPN \ No newline at end of file +networks: + vpn: + name: kafkaVPN \ No newline at end of file From 5fa9c76b996d3b79ed60ee3d4c6623271400b6fa Mon Sep 17 00:00:00 2001 From: the Date: Wed, 13 Nov 2024 01:14:32 +0400 Subject: [PATCH 12/14] test changes --- docker-compose.yml | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 32dc5a9..a1e5c42 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,3 +1,12 @@ +networks: + vpn: + name: kafkaVPN + driver: bridge + ipam: + config: + - subnet: "192.168.2.0/24" + gateway: "192.168.2.1" + services: zookeeper: networks: @@ -11,7 +20,8 @@ services: kafka: networks: - - vpn + vpn: + ipv4_address: 192.168.2.10 image: confluentinc/cp-kafka:7.4.0 ports: - 9092:9092 @@ -68,7 +78,3 @@ services: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 - -networks: - vpn: - name: kafkaVPN \ No newline at end of file From 7c310d21f7aa316abc15a01bdcc37b3ebb5b5cc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D1=80=D1=82=D0=B5=D0=BC=20=D0=A5=D0=B0=D1=80=D0=BB?= =?UTF-8?q?=D0=B0=D0=BC=D0=BE=D0=B2?= Date: Wed, 13 Nov 2024 15:24:47 +0400 Subject: [PATCH 13/14] Add broker service --- Cloud/Cloud.csproj | 1 + Cloud/Controllers/ValveController.cs | 41 ++++++++++++++++++ Cloud/Enums/ValveEnum.cs | 9 ++++ Cloud/Program.cs | 11 +++++ Cloud/Requests/ValveRequest.cs | 7 ++++ Cloud/Services/BackgroundWorkerService.cs | 39 +++++++++++++++++ Cloud/Services/ConsumerService.cs | 51 +++++++++++++++++++++++ Cloud/Services/ProducerService.cs | 33 +++++++++++++++ Cloud/Validation/ValveValidator.cs | 16 +++++++ Cloud/appsettings.json | 4 ++ 10 files changed, 212 insertions(+) create mode 100644 Cloud/Controllers/ValveController.cs create mode 100644 Cloud/Enums/ValveEnum.cs create mode 100644 Cloud/Requests/ValveRequest.cs create mode 100644 Cloud/Services/BackgroundWorkerService.cs create mode 100644 Cloud/Services/ConsumerService.cs create mode 100644 Cloud/Services/ProducerService.cs create mode 100644 Cloud/Validation/ValveValidator.cs diff --git a/Cloud/Cloud.csproj b/Cloud/Cloud.csproj index 6a5fc81..e74339d 100644 --- a/Cloud/Cloud.csproj +++ b/Cloud/Cloud.csproj @@ -7,6 +7,7 @@ + diff --git a/Cloud/Controllers/ValveController.cs b/Cloud/Controllers/ValveController.cs new file mode 100644 index 0000000..5708820 --- /dev/null +++ b/Cloud/Controllers/ValveController.cs @@ -0,0 +1,41 @@ +using Cloud.Requests; +using Cloud.Services; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using System.ComponentModel.DataAnnotations; +using System.Text.Json; + +namespace Cloud.Controllers +{ + [Authorize] + [ApiController] + [Route("api")] + public class ValveController : ControllerBase + { + //Контроллер вентиля + + private readonly ProducerService _producerService; + + public ValveController(ProducerService producerService) + { + _producerService = producerService; + } + + [HttpPost("farm/{farmId}/greenhouse/{greenhouseId}/watering")] + public async Task interactValve([FromBody] ValveRequest request, int farmId, int ghId) + { + var kafkaRequest = new + { + FarmId = farmId, + GreenHouseId = ghId, + SomeAction = request.Action, + }; + + var message = JsonSerializer.Serialize(kafkaRequest); + + await _producerService.ProduceMessageAsync("InventoryUpdates", message); + + return Ok($"Valve status is {request.Action}"); + } + } +} diff --git a/Cloud/Enums/ValveEnum.cs b/Cloud/Enums/ValveEnum.cs new file mode 100644 index 0000000..f04d64a --- /dev/null +++ b/Cloud/Enums/ValveEnum.cs @@ -0,0 +1,9 @@ +namespace Cloud.Enums +{ + public enum ValveEnum + { + Open, + Close, + Auto + } +} diff --git a/Cloud/Program.cs b/Cloud/Program.cs index c02b6fb..1c3e3a8 100644 --- a/Cloud/Program.cs +++ b/Cloud/Program.cs @@ -7,6 +7,7 @@ using FluentValidation; using FluentValidation.AspNetCore; using Cloud.Validation; using StackExchange.Redis; +using Cloud.Services; var builder = WebApplication.CreateBuilder(args); @@ -19,6 +20,15 @@ builder.Services.AddSingleton(sp => return ConnectionMultiplexer.Connect(configuration); }); +//Kafka producer service +builder.Services.AddSingleton(); + +//Kafka consumer service +builder.Services.AddSingleton(); + +//Add the BackgroundWorkerService +builder.Services.AddHostedService(); + //Jwt configuration var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get(); var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get(); @@ -58,6 +68,7 @@ builder.Services.AddFluentValidationClientsideAdapters(); builder.Services.AddValidatorsFromAssemblyContaining(); builder.Services.AddValidatorsFromAssemblyContaining(); builder.Services.AddValidatorsFromAssemblyContaining(); +builder.Services.AddValidatorsFromAssemblyContaining(); // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); diff --git a/Cloud/Requests/ValveRequest.cs b/Cloud/Requests/ValveRequest.cs new file mode 100644 index 0000000..dee6a5a --- /dev/null +++ b/Cloud/Requests/ValveRequest.cs @@ -0,0 +1,7 @@ +namespace Cloud.Requests +{ + public class ValveRequest + { + public string Action { get; set; } + } +} diff --git a/Cloud/Services/BackgroundWorkerService.cs b/Cloud/Services/BackgroundWorkerService.cs new file mode 100644 index 0000000..f7fbf34 --- /dev/null +++ b/Cloud/Services/BackgroundWorkerService.cs @@ -0,0 +1,39 @@ +namespace Cloud.Services +{ + public class BackgroundWorkerService : BackgroundService + { + public readonly ILogger _logger; + private readonly ConsumerService _consumerService; + + public BackgroundWorkerService(ILogger logger, ConsumerService consumer) + { + _logger = logger; + _consumerService = consumer; + } + + //Backghround Service, This will run continuously + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + try + { + while (!stoppingToken.IsCancellationRequested) + { + //_logger.LogInformation("Background Service is Runing at : {time}", DateTimeOffset.Now); + + string request = await _consumerService.WaitMessage("ValvesHeatersRequest"); //Consume the Kafka Message + + //After Consume the Order Request Can process the order + if (!string.IsNullOrEmpty(request)) + _logger.LogInformation("Valves-Heaters Request : {value}", request); + + + await Task.Delay(1000, stoppingToken); + } + } + catch (Exception ex) + { + _logger.LogError($"BackgroundWorkerService - Exception {ex}"); + } + } + } +} diff --git a/Cloud/Services/ConsumerService.cs b/Cloud/Services/ConsumerService.cs new file mode 100644 index 0000000..82d5bb2 --- /dev/null +++ b/Cloud/Services/ConsumerService.cs @@ -0,0 +1,51 @@ +using Confluent.Kafka; + +namespace Cloud.Services +{ + public class ConsumerService + { + private IConsumer _consumer; + private ConsumerConfig consumerConfig; + public ConsumerService(IConfiguration configuration) + { + consumerConfig = new ConsumerConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"], + GroupId = configuration["Kafka:GroupId"], + AutoOffsetReset = AutoOffsetReset.Earliest, + }; + + _consumer = new ConsumerBuilder(consumerConfig).Build(); + } + + //Consume Method + public async TaskWaitMessage(string topic) + { + try + { + _consumer.Subscribe(topic); + + var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000)); + + if (consumeResult != null) + { + return consumeResult.Message.Value; + } + else + { + //No message received from Kafka within the specified timeout. + } + return ""; + + } + catch (Exception ex) + { + return ""; + } + finally + { + _consumer.Close(); + } + } + } +} diff --git a/Cloud/Services/ProducerService.cs b/Cloud/Services/ProducerService.cs new file mode 100644 index 0000000..073bdc6 --- /dev/null +++ b/Cloud/Services/ProducerService.cs @@ -0,0 +1,33 @@ +using Confluent.Kafka; + +namespace Cloud.Services +{ + public class ProducerService + { + private readonly IProducer _producer; + + public ProducerService(IConfiguration configuration) + { + var producerConfig = new ProducerConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"] + }; + + //Build the Producer + _producer = new ProducerBuilder(producerConfig).Build(); + } + + //Method for Produce the Message to Kafka Topic + public async Task ProduceMessageAsync(string topic, string value) + { + var kafkaMessage = new Message + { + Key = Guid.NewGuid().ToString(), + Value = value + }; + + //Produce the Message + await _producer.ProduceAsync(topic, kafkaMessage); + } + } +} diff --git a/Cloud/Validation/ValveValidator.cs b/Cloud/Validation/ValveValidator.cs new file mode 100644 index 0000000..4a19f58 --- /dev/null +++ b/Cloud/Validation/ValveValidator.cs @@ -0,0 +1,16 @@ +using Cloud.Enums; +using Cloud.Requests; +using FluentValidation; + +namespace Cloud.Validation +{ + public class ValveValidator : AbstractValidator + { + public ValveValidator() { + + RuleFor(request => request.Action) + .NotEmpty().WithMessage("Action can't be empty"). + IsEnumName(typeof (ValveEnum)).WithMessage("Action is not correct"); + } + } +} diff --git a/Cloud/appsettings.json b/Cloud/appsettings.json index b272a9c..e80f5b5 100644 --- a/Cloud/appsettings.json +++ b/Cloud/appsettings.json @@ -5,6 +5,10 @@ "Microsoft.AspNetCore": "Warning" } }, + "Kafka": { + "BootstrapServers": "localhost:9092", + "GroupId": "ValvesHeaters" + }, "AllowedHosts": "*", "Jwt": { "Key": "m7TyhE20s0dVtUDAr9EnFdPZnAG8maxgBTaiW5j6kO6RQhWDAGxYmXyu0suDnE0o", From 5e73961ad593fba43cf1f82bd390f74d32842978 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D1=80=D1=82=D0=B5=D0=BC=20=D0=A5=D0=B0=D1=80=D0=BB?= =?UTF-8?q?=D0=B0=D0=BC=D0=BE=D0=B2?= Date: Tue, 19 Nov 2024 19:36:05 +0400 Subject: [PATCH 14/14] Add Valve Controller --- Cloud/Controllers/ValveController.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Cloud/Controllers/ValveController.cs b/Cloud/Controllers/ValveController.cs index 5708820..dbc081a 100644 --- a/Cloud/Controllers/ValveController.cs +++ b/Cloud/Controllers/ValveController.cs @@ -21,7 +21,7 @@ namespace Cloud.Controllers _producerService = producerService; } - [HttpPost("farm/{farmId}/greenhouse/{greenhouseId}/watering")] + [HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")] public async Task interactValve([FromBody] ValveRequest request, int farmId, int ghId) { var kafkaRequest = new @@ -32,10 +32,11 @@ namespace Cloud.Controllers }; var message = JsonSerializer.Serialize(kafkaRequest); + return Ok(kafkaRequest); - await _producerService.ProduceMessageAsync("InventoryUpdates", message); + /*await _producerService.ProduceMessageAsync("ValvesHeatersRequest", message); - return Ok($"Valve status is {request.Action}"); + return Ok($"Valve status is {request.Action}");*/ } } }