This commit is contained in:
dimazhelovanov 2024-10-29 19:50:35 +04:00
commit 1c38c61fbc
3 changed files with 110 additions and 19 deletions

View File

@ -0,0 +1,48 @@
from random import random
from turtledemo.penrose import start
from kafka import KafkaProducer, KafkaConsumer
import kafka
import socket
from json import dumps, loads
import time
import random as rnd
class Detector:
def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower):
self.id = id
self.moistureThresholdUpper = moistureThresholdUpper
self.moistureThresholdLower = moistureThresholdLower
self.tempThresholdUpper = tempThresholdUpper
self.tempThresholdLower = tempThresholdLower
self.moisture = 0
self.temp = 0
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
client_id=f'detector{self.id}',
value_serializer=lambda v: dumps(v).encode('utf-8')
)
def sendData(self):
message = {'id' : self.id,
'moisture': self.moisture,
'temperature' : self.temp }
self.producer.send('dataDetectors', message)
def cycle(self):
self.moisture += rnd.random() / 100
self.temp += (rnd.random() - 0.5) / 100
detector1 = Detector(1, 0.6, 0.2, 40, 20)
detector2 = Detector(2, 0.7, 0.3, 40, 20)
detector3 = Detector(3, 0.9, 0.6, 40, 20)
detectors = [detector1, detector2, detector3]
while True:
for detector in detectors:
detector.cycle()
detector.sendData()
time.sleep(1)

View File

@ -3,26 +3,68 @@ import kafka
import socket import socket
from json import dumps, loads from json import dumps, loads
import time import time
from ManageController import ManagerController from enum import Enum
greenhouse_controller = GreenhouseController(producer)
consumer = KafkaConsumer(
'commands',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
print(consumer.topics()) class Status(Enum):
consumer.subscribe(['commands']) UNKNOWN = -1
producer = KafkaProducer(bootstrap_servers = ['localhost:9092'], OFF = 0
value_serializer=lambda x: ON = 1
dumps(x).encode('utf-8'))
data = {'message' : 'hello'} class Manager:
producer.send('commands', value=data) def __init__(self, id : int, moisture : float = 0, temp : float = 20, valveStatus : Status = Status.UNKNOWN, heaterStatus : Status = Status.UNKNOWN, isAutoOn : bool = False):
self.id = id
self.moisture = moisture
self.temp = temp
self.valveStatus = valveStatus
self.heaterStatus = heaterStatus
self.isAutoOn = isAutoOn
self.dataPublisher = KafkaProducer(
bootstrap_servers=['localhost:9092'],
client_id=f'manager{id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8')
)
self.detectorConsumer = KafkaConsumer(
'dataDetectors',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
consumer_timeout_ms = 1000,
group_id=f'manager{id}',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
def update(self):
for message in self.detectorConsumer:
if message.value['id'] == self.id:
print(f"Manager {self.id} received message: ")
print(message.value)
self.moisture = message.value['moisture']
self.temp = message.value['temperature']
print("Updating info...\n")
self.sendData()
def sendData(self):
print("sending data...")
message = {
'id': self.id,
'moisture': self.moisture,
'temp': self.temp,
'valveStatus': str(self.valveStatus),
'heaterStatus': str(self.heaterStatus),
'isAutoOn': self.isAutoOn
}
print(message)
self.dataPublisher.send('data', message)
manager1 = Manager(id = 1)
managers = [manager1]
while True: while True:
for message in consumer: time.sleep(1)
print(message) for manager in managers:
time.sleep(1) manager.update()

View File

@ -39,6 +39,7 @@ services:
echo -e 'Creating kafka topics' echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic dataDetectors --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:' echo -e 'Successfully created the following topics:'