diff --git a/GreenhouseController/ManageController.py b/GreenhouseController/ManageController.py new file mode 100644 index 0000000..2d9ad10 --- /dev/null +++ b/GreenhouseController/ManageController.py @@ -0,0 +1,48 @@ +from json import dumps + + + +from json import dumps + +class ManageController: + def __init__(self, producer, topic='commands'): + self.valve_state = "closed" + self.heater_state = "off" + self.producer = producer + self.topic = topic + + def toggle_device(self, device, request_id, greenhouse_id): + + + if device == 'valve': + + if self.valve_state == 'closed': + self.valve_state = 'open' + print("Valve opened") + else: + self.valve_state = 'closed' + print("Valve closed") + + elif device == 'heater': + + if self.heater_state == 'off': + self.heater_state = 'on' + print("Heater turned on") + else: + self.heater_state = 'off' + print("Heater turned off") + + + self.send_status(request_id, greenhouse_id) + + def send_status(self, request_id, greenhouse_id): + + status = { + 'request_id': request_id, + 'greenhouse_id': greenhouse_id, + 'valve_state': self.valve_state, + 'heater_state': self.heater_state + } + + print(f"Sent device status: {status}") + return status \ No newline at end of file diff --git a/GreenhouseDetector/detector.py b/GreenhouseDetector/detector.py new file mode 100644 index 0000000..728ec1d --- /dev/null +++ b/GreenhouseDetector/detector.py @@ -0,0 +1,48 @@ +from random import random +from turtledemo.penrose import start + +from kafka import KafkaProducer, KafkaConsumer +import kafka +import socket +from json import dumps, loads +import time +import random as rnd + +class Detector: + def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower): + self.id = id + self.moistureThresholdUpper = moistureThresholdUpper + self.moistureThresholdLower = moistureThresholdLower + self.tempThresholdUpper = tempThresholdUpper + self.tempThresholdLower = tempThresholdLower + self.moisture = 0 + self.temp = 0 + + self.producer = KafkaProducer( + bootstrap_servers=['localhost:9092'], + client_id=f'detector{self.id}', + value_serializer=lambda v: dumps(v).encode('utf-8') + ) + + def sendData(self): + message = {'id' : self.id, + 'moisture': self.moisture, + 'temperature' : self.temp } + self.producer.send('dataDetectors', message) + + def cycle(self): + self.moisture += rnd.random() / 100 + self.temp += (rnd.random() - 0.5) / 100 + +detector1 = Detector(1, 0.6, 0.2, 40, 20) +detector2 = Detector(2, 0.7, 0.3, 40, 20) +detector3 = Detector(3, 0.9, 0.6, 40, 20) + +detectors = [detector1, detector2, detector3] + +while True: + for detector in detectors: + detector.cycle() + detector.sendData() + time.sleep(1) + diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py new file mode 100644 index 0000000..c082d04 --- /dev/null +++ b/GreenhouseManager/manager.py @@ -0,0 +1,144 @@ +from kafka import KafkaProducer, KafkaConsumer +import kafka +import socket +from json import dumps, loads +import time +from enum import Enum +from GreenhouseDetector.detector import Detector + +class Status(Enum): + UNKNOWN = -1 + OFF = 0 + ON = 1 + + +class Manager: + def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", + heater_state: str = "off"): + self.id = id + self.moisture = moisture + self.temp = temp + self.isAutoOn = isAutoOn + self.valve_state = valve_state + self.heater_state = heater_state + + self.dataPublisher = KafkaProducer( + bootstrap_servers=['localhost:9092'], + client_id=f'manager{id}_producer', + value_serializer=lambda v: dumps(v).encode('utf-8') + ) + + # self.detectorConsumer = KafkaConsumer( + # 'dataDetectors', + # bootstrap_servers=['localhost:9092'], + # auto_offset_reset='earliest', + # enable_auto_commit=True, + # consumer_timeout_ms=1000, + #group_id=f'manager{id}', + # value_deserializer=lambda x: loads(x.decode('utf-8')) + #) + self.controllerConsumer = KafkaConsumer( + 'commands', + bootstrap_servers=['localhost:9092'], + auto_offset_reset='earliest', + enable_auto_commit=True, + consumer_timeout_ms=2000, + group_id=f'manager{id}', + value_deserializer=lambda x: loads(x.decode('utf-8')) + ) + self.controllerConsumerResponse = KafkaProducer( + bootstrap_servers=['localhost:9092'], + client_id=f'manager{id}_producer', + value_serializer=lambda v: dumps(v).encode('utf-8') + ) + + def update(self): + for message in self.detectorConsumer: + + print(f"Manager {self.id} received message: ") + print(message.value) + self.moisture = message.value['moisture'] + self.temp = message.value['temperature'] + print("Updating info...\n") + + self.sendData() + + def sendData(self): + print("sending data...") + message = { + 'id': self.id, + 'moisture': self.moisture, + 'temp': self.temp, + 'valveStatus': str(self.valve_state), + 'heaterStatus': str(self.heater_state), + 'isAutoOn': self.isAutoOn + } + + print(message) + self.dataPublisher.send('data', message) + + def toggle_device(self, device, request_id, greenhouse_id): + + if device == 'valve': + + if self.valve_state == 'closed': + self.valve_state = 'open' + print("Valve opened") + else: + self.valve_state = 'closed' + print("Valve closed") + + elif device == 'heater': + + if self.heater_state == 'off': + self.heater_state = 'on' + print("Heater turned on") + else: + self.heater_state = 'off' + print("Heater turned off") + + self.send_status(request_id, greenhouse_id) + + def send_status(self, request_id, greenhouse_id): + + status = { + 'request_id': request_id, + 'greenhouse_id': greenhouse_id, + 'valve_state': self.valve_state, + 'heater_state': self.heater_state + } + self.sendDataCommand(status) + print("Updating info...\n") + + def sendDataCommand(self, message): + print("sending data...") + + self.dataPublisher.send('response', message) + + def getCommand(self): + messages = self.controllerConsumer.poll(timeout_ms=1000) + + # Проверяем, есть ли сообщения + + for tp, msgs in messages.items(): + for message in msgs: + print(f"Manager {self.id} received message: ") + print(message.value) + self.request_id = message.value['request_id'] + self.greenhouse_id = message.value['greenhouse_id'] + self.command = message.value['command'] + self.toggle_device(self.command, self.request_id, self.greenhouse_id) + + + +manager1 = Manager(id=1) + +managers = [manager1] + + +while True: + time.sleep(5) + manager1.sendData() + for manager in managers: + + manager.getCommand() diff --git a/docker-compose.yml b/docker-compose.yml index 71e50fa..fa85db4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,8 +30,63 @@ services: - ping retries: 3 timeout: 5s + kafka: + image: confluentinc/cp-kafka:7.4.0 + ports: + - 9092:9092 + - 9997:9997 + expose: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + depends_on: + - zookeeper + init-kafka: + image: confluentinc/cp-kafka:7.4.0 + depends_on: + - kafka + entrypoint: [ '/bin/sh', '-c' ] + command: | + " + # blocks until kafka is reachable + kafka-topics --bootstrap-server kafka:29092 --list + + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic dataDetectors --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server kafka:29092 --list + " + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + depends_on: + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_METRICS_PORT: 9997 volumes: postgres_data: driver: local cloud-redis: driver: local + zookeeper: + image: confluentinc/cp-zookeeper:7.4.0 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 2181:2181