diff --git a/GreenhouseDetector/detector.py b/GreenhouseDetector/detector.py index 728ec1d..d092373 100644 --- a/GreenhouseDetector/detector.py +++ b/GreenhouseDetector/detector.py @@ -1,13 +1,12 @@ -from random import random -from turtledemo.penrose import start - -from kafka import KafkaProducer, KafkaConsumer -import kafka -import socket -from json import dumps, loads import time import random as rnd +from flask import Flask +import requests +import threading + +app = Flask(__name__) + class Detector: def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower): self.id = id @@ -18,31 +17,23 @@ class Detector: self.moisture = 0 self.temp = 0 - self.producer = KafkaProducer( - bootstrap_servers=['localhost:9092'], - client_id=f'detector{self.id}', - value_serializer=lambda v: dumps(v).encode('utf-8') - ) - - def sendData(self): - message = {'id' : self.id, - 'moisture': self.moisture, - 'temperature' : self.temp } - self.producer.send('dataDetectors', message) - def cycle(self): self.moisture += rnd.random() / 100 self.temp += (rnd.random() - 0.5) / 100 + def sendData(self): + data = {"moisture": self.moisture, + "temp": self.temp} + requests.post(f"http://127.0.0.1:20002/webhook?id={self.id}", json=data) + detector1 = Detector(1, 0.6, 0.2, 40, 20) -detector2 = Detector(2, 0.7, 0.3, 40, 20) -detector3 = Detector(3, 0.9, 0.6, 40, 20) -detectors = [detector1, detector2, detector3] +detectors = [detector1] -while True: - for detector in detectors: - detector.cycle() - detector.sendData() - time.sleep(1) +if __name__ =="__main__": + while True: + for detector in detectors: + detector.cycle() + detector.sendData() + time.sleep(1) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index c082d04..d9772d1 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -2,20 +2,20 @@ from kafka import KafkaProducer, KafkaConsumer import kafka import socket from json import dumps, loads +from flask import Flask, request import time from enum import Enum -from GreenhouseDetector.detector import Detector +import threading -class Status(Enum): - UNKNOWN = -1 - OFF = 0 - ON = 1 +app = Flask(__name__) +def start_manager(): + return class Manager: - def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", + def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", heater_state: str = "off"): - self.id = id + self._id = _id self.moisture = moisture self.temp = temp self.isAutoOn = isAutoOn @@ -24,49 +24,29 @@ class Manager: self.dataPublisher = KafkaProducer( bootstrap_servers=['localhost:9092'], - client_id=f'manager{id}_producer', + client_id=f'manager{self._id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) - # self.detectorConsumer = KafkaConsumer( - # 'dataDetectors', - # bootstrap_servers=['localhost:9092'], - # auto_offset_reset='earliest', - # enable_auto_commit=True, - # consumer_timeout_ms=1000, - #group_id=f'manager{id}', - # value_deserializer=lambda x: loads(x.decode('utf-8')) - #) self.controllerConsumer = KafkaConsumer( 'commands', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, consumer_timeout_ms=2000, - group_id=f'manager{id}', + group_id=f'manager{self._id}', value_deserializer=lambda x: loads(x.decode('utf-8')) ) self.controllerConsumerResponse = KafkaProducer( bootstrap_servers=['localhost:9092'], - client_id=f'manager{id}_producer', + client_id=f'manager{self._id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) - def update(self): - for message in self.detectorConsumer: - - print(f"Manager {self.id} received message: ") - print(message.value) - self.moisture = message.value['moisture'] - self.temp = message.value['temperature'] - print("Updating info...\n") - - self.sendData() - def sendData(self): print("sending data...") message = { - 'id': self.id, + 'id': self._id, 'moisture': self.moisture, 'temp': self.temp, 'valveStatus': str(self.valve_state), @@ -76,6 +56,8 @@ class Manager: print(message) self.dataPublisher.send('data', message) + self.dataPublisher.flush() + def toggle_device(self, device, request_id, greenhouse_id): @@ -122,23 +104,33 @@ class Manager: for tp, msgs in messages.items(): for message in msgs: - print(f"Manager {self.id} received message: ") + print(f"Manager {self._id} received message: ") print(message.value) self.request_id = message.value['request_id'] self.greenhouse_id = message.value['greenhouse_id'] self.command = message.value['command'] self.toggle_device(self.command, self.request_id, self.greenhouse_id) +@app.route(f'/webhook', methods=['POST']) +def webhook(): + print("received webhook", request.args.get('id')) + for manager in managers: + print() + if int(request.args.get('id')) == manager._id and request.method == 'POST': + print("Data received from Webhook is", request.json) + body = request.json + for key, value in body.items(): + setattr(manager, key, value) -manager1 = Manager(id=1) + manager.sendData() + return f"Webhook received for manager {manager._id}" + return "Webhook ignored" + +t1 = threading.Thread(target=start_manager) +manager1 = Manager(_id=1) managers = [manager1] - -while True: - time.sleep(5) - manager1.sendData() - for manager in managers: - - manager.getCommand() +if __name__ == "__main__": + threading.Thread(target=lambda: app.run(host="0.0.0.0", port=20002, debug=True, use_reloader=False)).start() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index fa85db4..af1ab66 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,3 +1,12 @@ +networks: + vpn: + name: kafkaVPN + driver: bridge + ipam: + config: + - subnet: "192.168.2.0/24" + gateway: "192.168.2.1" + services: cloud: build: ./Cloud/ @@ -20,64 +29,81 @@ services: redis: image: 'redis:latest' ports: - - '6379:6379' + - '6379:6379' volumes: - - 'cloud-redis:/data' + - 'cloud-redis:/data' healthcheck: - test: - - CMD - - redis-cli - - ping - retries: 3 - timeout: 5s - kafka: - image: confluentinc/cp-kafka:7.4.0 - ports: - - 9092:9092 - - 9997:9997 - expose: - - 29092:29092 - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - depends_on: - - zookeeper - init-kafka: - image: confluentinc/cp-kafka:7.4.0 - depends_on: - - kafka - entrypoint: [ '/bin/sh', '-c' ] - command: | - " - # blocks until kafka is reachable - kafka-topics --bootstrap-server kafka:29092 --list - - echo -e 'Creating kafka topics' - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic dataDetectors --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 - - echo -e 'Successfully created the following topics:' - kafka-topics --bootstrap-server kafka:29092 --list - " - kafka-ui: - container_name: kafka-ui - image: provectuslabs/kafka-ui:latest + test: + - CMD + - redis-cli + - ping + retries: 3 + timeout: 5s + zookeeper: + networks: + - vpn + image: confluentinc/cp-zookeeper:7.4.0 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 ports: - - 8080:8080 + - 2181:2181 + kafka: + networks: + vpn: + ipv4_address: 192.168.2.10 + image: confluentinc/cp-kafka:7.4.0 + ports: + - 9092:9092 + - 9997:9997 + expose: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092 + KAFKA_ADVERTISED_LISTENERS: HOST://192.168.1.5:9092,DOCKER://kafka:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + depends_on: + - zookeeper + init-kafka: + networks: + - vpn + image: confluentinc/cp-kafka:7.4.0 depends_on: - kafka - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 - KAFKA_CLUSTERS_0_METRICS_PORT: 9997 + entrypoint: [ '/bin/sh', '-c' ] + command: | + " + # blocks until kafka is reachable + kafka-topics --bootstrap-server kafka:29092 --list + + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server kafka:29092 --list + " + kafka-ui: + networks: + - vpn + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + depends_on: + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_METRICS_PORT: 9997 volumes: postgres_data: driver: local @@ -89,4 +115,4 @@ volumes: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - - 2181:2181 + - 2181:2181 \ No newline at end of file