Compare commits

..

14 Commits

3 changed files with 130 additions and 121 deletions

View File

@ -1,13 +1,12 @@
from random import random
from turtledemo.penrose import start
from kafka import KafkaProducer, KafkaConsumer
import kafka
import socket
from json import dumps, loads
import time import time
import random as rnd import random as rnd
from flask import Flask
import requests
import threading
app = Flask(__name__)
class Detector: class Detector:
def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower): def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower):
self.id = id self.id = id
@ -18,31 +17,23 @@ class Detector:
self.moisture = 0 self.moisture = 0
self.temp = 0 self.temp = 0
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
client_id=f'detector{self.id}',
value_serializer=lambda v: dumps(v).encode('utf-8')
)
def sendData(self):
message = {'id' : self.id,
'moisture': self.moisture,
'temperature' : self.temp }
self.producer.send('dataDetectors', message)
def cycle(self): def cycle(self):
self.moisture += rnd.random() / 100 self.moisture += rnd.random() / 100
self.temp += (rnd.random() - 0.5) / 100 self.temp += (rnd.random() - 0.5) / 100
def sendData(self):
data = {"moisture": self.moisture,
"temp": self.temp}
requests.post(f"http://127.0.0.1:20002/webhook?id={self.id}", json=data)
detector1 = Detector(1, 0.6, 0.2, 40, 20) detector1 = Detector(1, 0.6, 0.2, 40, 20)
detector2 = Detector(2, 0.7, 0.3, 40, 20)
detector3 = Detector(3, 0.9, 0.6, 40, 20)
detectors = [detector1, detector2, detector3] detectors = [detector1]
while True: if __name__ =="__main__":
for detector in detectors: while True:
detector.cycle() for detector in detectors:
detector.sendData() detector.cycle()
time.sleep(1) detector.sendData()
time.sleep(1)

View File

@ -2,20 +2,20 @@ from kafka import KafkaProducer, KafkaConsumer
import kafka import kafka
import socket import socket
from json import dumps, loads from json import dumps, loads
from flask import Flask, request
import time import time
from enum import Enum from enum import Enum
from GreenhouseDetector.detector import Detector import threading
class Status(Enum): app = Flask(__name__)
UNKNOWN = -1
OFF = 0
ON = 1
def start_manager():
return
class Manager: class Manager:
def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed",
heater_state: str = "off"): heater_state: str = "off"):
self.id = id self._id = _id
self.moisture = moisture self.moisture = moisture
self.temp = temp self.temp = temp
self.isAutoOn = isAutoOn self.isAutoOn = isAutoOn
@ -24,49 +24,29 @@ class Manager:
self.dataPublisher = KafkaProducer( self.dataPublisher = KafkaProducer(
bootstrap_servers=['localhost:9092'], bootstrap_servers=['localhost:9092'],
client_id=f'manager{id}_producer', client_id=f'manager{self._id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8') value_serializer=lambda v: dumps(v).encode('utf-8')
) )
# self.detectorConsumer = KafkaConsumer(
# 'dataDetectors',
# bootstrap_servers=['localhost:9092'],
# auto_offset_reset='earliest',
# enable_auto_commit=True,
# consumer_timeout_ms=1000,
#group_id=f'manager{id}',
# value_deserializer=lambda x: loads(x.decode('utf-8'))
#)
self.controllerConsumer = KafkaConsumer( self.controllerConsumer = KafkaConsumer(
'commands', 'commands',
bootstrap_servers=['localhost:9092'], bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', auto_offset_reset='earliest',
enable_auto_commit=True, enable_auto_commit=True,
consumer_timeout_ms=2000, consumer_timeout_ms=2000,
group_id=f'manager{id}', group_id=f'manager{self._id}',
value_deserializer=lambda x: loads(x.decode('utf-8')) value_deserializer=lambda x: loads(x.decode('utf-8'))
) )
self.controllerConsumerResponse = KafkaProducer( self.controllerConsumerResponse = KafkaProducer(
bootstrap_servers=['localhost:9092'], bootstrap_servers=['localhost:9092'],
client_id=f'manager{id}_producer', client_id=f'manager{self._id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8') value_serializer=lambda v: dumps(v).encode('utf-8')
) )
def update(self):
for message in self.detectorConsumer:
print(f"Manager {self.id} received message: ")
print(message.value)
self.moisture = message.value['moisture']
self.temp = message.value['temperature']
print("Updating info...\n")
self.sendData()
def sendData(self): def sendData(self):
print("sending data...") print("sending data...")
message = { message = {
'id': self.id, 'id': self._id,
'moisture': self.moisture, 'moisture': self.moisture,
'temp': self.temp, 'temp': self.temp,
'valveStatus': str(self.valve_state), 'valveStatus': str(self.valve_state),
@ -76,6 +56,8 @@ class Manager:
print(message) print(message)
self.dataPublisher.send('data', message) self.dataPublisher.send('data', message)
self.dataPublisher.flush()
def toggle_device(self, device, request_id, greenhouse_id): def toggle_device(self, device, request_id, greenhouse_id):
@ -122,23 +104,33 @@ class Manager:
for tp, msgs in messages.items(): for tp, msgs in messages.items():
for message in msgs: for message in msgs:
print(f"Manager {self.id} received message: ") print(f"Manager {self._id} received message: ")
print(message.value) print(message.value)
self.request_id = message.value['request_id'] self.request_id = message.value['request_id']
self.greenhouse_id = message.value['greenhouse_id'] self.greenhouse_id = message.value['greenhouse_id']
self.command = message.value['command'] self.command = message.value['command']
self.toggle_device(self.command, self.request_id, self.greenhouse_id) self.toggle_device(self.command, self.request_id, self.greenhouse_id)
@app.route(f'/webhook', methods=['POST'])
def webhook():
print("received webhook", request.args.get('id'))
for manager in managers:
print()
if int(request.args.get('id')) == manager._id and request.method == 'POST':
print("Data received from Webhook is", request.json)
body = request.json
for key, value in body.items():
setattr(manager, key, value)
manager1 = Manager(id=1) manager.sendData()
return f"Webhook received for manager {manager._id}"
return "Webhook ignored"
t1 = threading.Thread(target=start_manager)
manager1 = Manager(_id=1)
managers = [manager1] managers = [manager1]
if __name__ == "__main__":
while True: threading.Thread(target=lambda: app.run(host="0.0.0.0", port=20002, debug=True, use_reloader=False)).start()
time.sleep(5)
manager1.sendData()
for manager in managers:
manager.getCommand()

View File

@ -1,3 +1,12 @@
networks:
vpn:
name: kafkaVPN
driver: bridge
ipam:
config:
- subnet: "192.168.2.0/24"
gateway: "192.168.2.1"
services: services:
cloud: cloud:
build: ./Cloud/ build: ./Cloud/
@ -20,64 +29,81 @@ services:
redis: redis:
image: 'redis:latest' image: 'redis:latest'
ports: ports:
- '6379:6379' - '6379:6379'
volumes: volumes:
- 'cloud-redis:/data' - 'cloud-redis:/data'
healthcheck: healthcheck:
test: test:
- CMD - CMD
- redis-cli - redis-cli
- ping - ping
retries: 3 retries: 3
timeout: 5s timeout: 5s
kafka: zookeeper:
image: confluentinc/cp-kafka:7.4.0 networks:
ports: - vpn
- 9092:9092 image: confluentinc/cp-zookeeper:7.4.0
- 9997:9997 environment:
expose: ZOOKEEPER_CLIENT_PORT: 2181
- 29092:29092 ZOOKEEPER_TICK_TIME: 2000
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- zookeeper
init-kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic dataDetectors --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports: ports:
- 8080:8080 - 2181:2181
kafka:
networks:
vpn:
ipv4_address: 192.168.2.10
image: confluentinc/cp-kafka:7.4.0
ports:
- 9092:9092
- 9997:9997
expose:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: HOST://192.168.1.5:9092,DOCKER://kafka:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- zookeeper
init-kafka:
networks:
- vpn
image: confluentinc/cp-kafka:7.4.0
depends_on: depends_on:
- kafka - kafka
environment: entrypoint: [ '/bin/sh', '-c' ]
KAFKA_CLUSTERS_0_NAME: local command: |
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 "
KAFKA_CLUSTERS_0_METRICS_PORT: 9997 # blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
kafka-ui:
networks:
- vpn
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
volumes: volumes:
postgres_data: postgres_data:
driver: local driver: local
@ -89,4 +115,4 @@ volumes:
ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000 ZOOKEEPER_TICK_TIME: 2000
ports: ports:
- 2181:2181 - 2181:2181