dev #9
@ -0,0 +1,40 @@
|
|||||||
|
from random import random
|
||||||
|
from turtledemo.penrose import start
|
||||||
|
|
||||||
|
from kafka import KafkaProducer, KafkaConsumer
|
||||||
|
import kafka
|
||||||
|
import socket
|
||||||
|
from json import dumps, loads
|
||||||
|
import time
|
||||||
|
import random as rnd
|
||||||
|
|
||||||
|
class Detector:
|
||||||
|
def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower):
|
||||||
|
self.id = id
|
||||||
|
self.moistureThresholdUpper = moistureThresholdUpper
|
||||||
|
self.moistureThresholdLower = moistureThresholdLower
|
||||||
|
self.tempThresholdUpper = tempThresholdUpper
|
||||||
|
self.tempThresholdLower = tempThresholdLower
|
||||||
|
self.moisture = 0
|
||||||
|
self.temp = 0
|
||||||
|
|
||||||
|
self.producer = KafkaProducer(
|
||||||
|
bootstrap_servers=['localhost:9092'],
|
||||||
|
client_id=f'detector{self.id}',
|
||||||
|
value_serializer=lambda v: dumps(v).encode('utf-8')
|
||||||
|
)
|
||||||
|
|
||||||
|
def sendData(self):
|
||||||
|
message = {'id' : self.id,
|
||||||
|
'moisture': self.moisture,
|
||||||
|
'temperature' : self.temp }
|
||||||
|
self.producer.send('data', message)
|
||||||
|
|
||||||
|
def cycle(self):
|
||||||
|
self.moisture += rnd.random() / 100
|
||||||
|
self.temp += (rnd.random() - 0.5) / 100
|
||||||
|
|
||||||
|
|
||||||
|
while True:
|
||||||
|
time.sleep(1)
|
||||||
|
|
@ -4,24 +4,14 @@ import socket
|
|||||||
from json import dumps, loads
|
from json import dumps, loads
|
||||||
import time
|
import time
|
||||||
|
|
||||||
consumer = KafkaConsumer(
|
dataConsumer = KafkaConsumer(
|
||||||
'commands',
|
'data',
|
||||||
bootstrap_servers=['localhost:9092'],
|
bootstrap_servers=['localhost:9092'],
|
||||||
auto_offset_reset='earliest',
|
auto_offset_reset='earliest',
|
||||||
enable_auto_commit=True,
|
enable_auto_commit=True,
|
||||||
group_id='my-group',
|
group_id='my-group',
|
||||||
value_deserializer=lambda x: loads(x.decode('utf-8')))
|
value_deserializer=lambda x: loads(x.decode('utf-8')))
|
||||||
|
|
||||||
print(consumer.topics())
|
|
||||||
consumer.subscribe(['commands'])
|
|
||||||
producer = KafkaProducer(bootstrap_servers = ['localhost:9092'],
|
|
||||||
value_serializer=lambda x:
|
|
||||||
dumps(x).encode('utf-8'))
|
|
||||||
|
|
||||||
data = {'message' : 'hello'}
|
|
||||||
producer.send('commands', value=data)
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
for message in consumer:
|
for message in dataConsumer:
|
||||||
print(message)
|
print(message)
|
||||||
time.sleep(1)
|
|
Loading…
x
Reference in New Issue
Block a user