diff --git a/GreenhouseDetector/detector.py b/GreenhouseDetector/detector.py index 1ecb671..728ec1d 100644 --- a/GreenhouseDetector/detector.py +++ b/GreenhouseDetector/detector.py @@ -28,13 +28,21 @@ class Detector: message = {'id' : self.id, 'moisture': self.moisture, 'temperature' : self.temp } - self.producer.send('data', message) + self.producer.send('dataDetectors', message) def cycle(self): self.moisture += rnd.random() / 100 self.temp += (rnd.random() - 0.5) / 100 +detector1 = Detector(1, 0.6, 0.2, 40, 20) +detector2 = Detector(2, 0.7, 0.3, 40, 20) +detector3 = Detector(3, 0.9, 0.6, 40, 20) + +detectors = [detector1, detector2, detector3] while True: + for detector in detectors: + detector.cycle() + detector.sendData() time.sleep(1) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index dbda11b..4d1d836 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -3,15 +3,62 @@ import kafka import socket from json import dumps, loads import time +from enum import Enum -dataConsumer = KafkaConsumer( - 'data', - bootstrap_servers=['localhost:9092'], - auto_offset_reset='earliest', - enable_auto_commit=True, - group_id='my-group', - value_deserializer=lambda x: loads(x.decode('utf-8'))) +class Status(Enum): + UNKNOWN = -1 + OFF = 0 + ON = 1 + +class Manager: + def __init__(self, id : int, moisture : float = 0, temp : float = 20, valveStatus : Status = Status.UNKNOWN, heaterStatus : Status = Status.UNKNOWN, isAutoOn : bool = False): + self.id = id + self.moisture = moisture + self.temp = temp + self.valveStatus = valveStatus + self.heaterStatus = heaterStatus + self.isAutoOn = isAutoOn + + self.dataPublisher = KafkaProducer( + bootstrap_servers=['localhost:9092'], + client_id=f'manager{id}', + value_serializer=lambda v: dumps(v).encode('utf-8') + ) + + self.detectorConsumer = KafkaConsumer( + 'dataDetectors', + bootstrap_servers=['localhost:9092'], + auto_offset_reset='earliest', + enable_auto_commit=True, + group_id=f'manager{id}', + value_deserializer=lambda x: loads(x.decode('utf-8')) + ) + + def update(self): + for message in self.detectorConsumer: + if message.value['id'] == self.id: + print(f"Manager {self.id} received message: ") + print(message.value) + print("Updating info...\n") + self.moisture = message.value['moisture'] + self.temp = message.value['temperature'] + + dataMessage = { + 'id' : self.id, + 'moisture' : self.moisture, + 'temp' : self.temp, + 'valveStatus': str(self.valveStatus), + 'heaterStatus': str(self.heaterStatus), + 'isAutoOn' : self.isAutoOn + } + + self.dataPublisher.send('data', dataMessage) + +manager1 = Manager(id = 1) + +managers = [manager1] while True: - for message in dataConsumer: - print(message) \ No newline at end of file + time.sleep(1) + for manager in managers: + manager.update() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 0683811..995a9e2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,6 +39,7 @@ services: echo -e 'Creating kafka topics' kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic dataDetectors --replication-factor 1 --partitions 1 kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 echo -e 'Successfully created the following topics:'