From 08ee12aa8bef22ec1e987965a8a606e45b8d597a Mon Sep 17 00:00:00 2001 From: the Date: Tue, 29 Oct 2024 16:07:02 +0400 Subject: [PATCH] simple detector + manager --- GreenhouseDetector/detector.py | 40 ++++++++++++++++++++++++++++++++++ GreenhouseManager/manager.py | 18 ++++----------- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/GreenhouseDetector/detector.py b/GreenhouseDetector/detector.py index e69de29..1ecb671 100644 --- a/GreenhouseDetector/detector.py +++ b/GreenhouseDetector/detector.py @@ -0,0 +1,40 @@ +from random import random +from turtledemo.penrose import start + +from kafka import KafkaProducer, KafkaConsumer +import kafka +import socket +from json import dumps, loads +import time +import random as rnd + +class Detector: + def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower): + self.id = id + self.moistureThresholdUpper = moistureThresholdUpper + self.moistureThresholdLower = moistureThresholdLower + self.tempThresholdUpper = tempThresholdUpper + self.tempThresholdLower = tempThresholdLower + self.moisture = 0 + self.temp = 0 + + self.producer = KafkaProducer( + bootstrap_servers=['localhost:9092'], + client_id=f'detector{self.id}', + value_serializer=lambda v: dumps(v).encode('utf-8') + ) + + def sendData(self): + message = {'id' : self.id, + 'moisture': self.moisture, + 'temperature' : self.temp } + self.producer.send('data', message) + + def cycle(self): + self.moisture += rnd.random() / 100 + self.temp += (rnd.random() - 0.5) / 100 + + +while True: + time.sleep(1) + diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index 74fed03..dbda11b 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -4,24 +4,14 @@ import socket from json import dumps, loads import time -consumer = KafkaConsumer( - 'commands', +dataConsumer = KafkaConsumer( + 'data', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', value_deserializer=lambda x: loads(x.decode('utf-8'))) -print(consumer.topics()) -consumer.subscribe(['commands']) -producer = KafkaProducer(bootstrap_servers = ['localhost:9092'], - value_serializer=lambda x: - dumps(x).encode('utf-8')) - -data = {'message' : 'hello'} -producer.send('commands', value=data) - while True: - for message in consumer: - print(message) - time.sleep(1) \ No newline at end of file + for message in dataConsumer: + print(message) \ No newline at end of file