diff --git a/Cloud/Controllers/ValveController.cs b/Cloud/Controllers/ValveController.cs new file mode 100644 index 0000000..dbc081a --- /dev/null +++ b/Cloud/Controllers/ValveController.cs @@ -0,0 +1,42 @@ +using Cloud.Requests; +using Cloud.Services; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using System.ComponentModel.DataAnnotations; +using System.Text.Json; + +namespace Cloud.Controllers +{ + [Authorize] + [ApiController] + [Route("api")] + public class ValveController : ControllerBase + { + //Контроллер вентиля + + private readonly ProducerService _producerService; + + public ValveController(ProducerService producerService) + { + _producerService = producerService; + } + + [HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")] + public async Task interactValve([FromBody] ValveRequest request, int farmId, int ghId) + { + var kafkaRequest = new + { + FarmId = farmId, + GreenHouseId = ghId, + SomeAction = request.Action, + }; + + var message = JsonSerializer.Serialize(kafkaRequest); + return Ok(kafkaRequest); + + /*await _producerService.ProduceMessageAsync("ValvesHeatersRequest", message); + + return Ok($"Valve status is {request.Action}");*/ + } + } +} diff --git a/Cloud/Enums/ValveEnum.cs b/Cloud/Enums/ValveEnum.cs new file mode 100644 index 0000000..f04d64a --- /dev/null +++ b/Cloud/Enums/ValveEnum.cs @@ -0,0 +1,9 @@ +namespace Cloud.Enums +{ + public enum ValveEnum + { + Open, + Close, + Auto + } +} diff --git a/Cloud/Program.cs b/Cloud/Program.cs index 7ab00f6..6e22ecd 100644 --- a/Cloud/Program.cs +++ b/Cloud/Program.cs @@ -9,6 +9,7 @@ using Cloud.Validation; using StackExchange.Redis; using Cloud.Services.Broker.Implement.Kafka; using Cloud.Services.Broker; +using Cloud.Services; var builder = WebApplication.CreateBuilder(args); @@ -21,6 +22,15 @@ builder.Services.AddSingleton(sp => return ConnectionMultiplexer.Connect(configuration); }); +//Kafka producer service +builder.Services.AddSingleton(); + +//Kafka consumer service +builder.Services.AddSingleton(); + +//Add the BackgroundWorkerService +builder.Services.AddHostedService(); + //Jwt configuration var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get(); var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get(); @@ -60,6 +70,7 @@ builder.Services.AddFluentValidationClientsideAdapters(); builder.Services.AddValidatorsFromAssemblyContaining(); builder.Services.AddValidatorsFromAssemblyContaining(); builder.Services.AddValidatorsFromAssemblyContaining(); +builder.Services.AddValidatorsFromAssemblyContaining(); // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); diff --git a/Cloud/Requests/ValveRequest.cs b/Cloud/Requests/ValveRequest.cs new file mode 100644 index 0000000..dee6a5a --- /dev/null +++ b/Cloud/Requests/ValveRequest.cs @@ -0,0 +1,7 @@ +namespace Cloud.Requests +{ + public class ValveRequest + { + public string Action { get; set; } + } +} diff --git a/Cloud/Services/BackgroundWorkerService.cs b/Cloud/Services/BackgroundWorkerService.cs new file mode 100644 index 0000000..f7fbf34 --- /dev/null +++ b/Cloud/Services/BackgroundWorkerService.cs @@ -0,0 +1,39 @@ +namespace Cloud.Services +{ + public class BackgroundWorkerService : BackgroundService + { + public readonly ILogger _logger; + private readonly ConsumerService _consumerService; + + public BackgroundWorkerService(ILogger logger, ConsumerService consumer) + { + _logger = logger; + _consumerService = consumer; + } + + //Backghround Service, This will run continuously + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + try + { + while (!stoppingToken.IsCancellationRequested) + { + //_logger.LogInformation("Background Service is Runing at : {time}", DateTimeOffset.Now); + + string request = await _consumerService.WaitMessage("ValvesHeatersRequest"); //Consume the Kafka Message + + //After Consume the Order Request Can process the order + if (!string.IsNullOrEmpty(request)) + _logger.LogInformation("Valves-Heaters Request : {value}", request); + + + await Task.Delay(1000, stoppingToken); + } + } + catch (Exception ex) + { + _logger.LogError($"BackgroundWorkerService - Exception {ex}"); + } + } + } +} diff --git a/Cloud/Services/ConsumerService.cs b/Cloud/Services/ConsumerService.cs new file mode 100644 index 0000000..82d5bb2 --- /dev/null +++ b/Cloud/Services/ConsumerService.cs @@ -0,0 +1,51 @@ +using Confluent.Kafka; + +namespace Cloud.Services +{ + public class ConsumerService + { + private IConsumer _consumer; + private ConsumerConfig consumerConfig; + public ConsumerService(IConfiguration configuration) + { + consumerConfig = new ConsumerConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"], + GroupId = configuration["Kafka:GroupId"], + AutoOffsetReset = AutoOffsetReset.Earliest, + }; + + _consumer = new ConsumerBuilder(consumerConfig).Build(); + } + + //Consume Method + public async TaskWaitMessage(string topic) + { + try + { + _consumer.Subscribe(topic); + + var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000)); + + if (consumeResult != null) + { + return consumeResult.Message.Value; + } + else + { + //No message received from Kafka within the specified timeout. + } + return ""; + + } + catch (Exception ex) + { + return ""; + } + finally + { + _consumer.Close(); + } + } + } +} diff --git a/Cloud/Services/ProducerService.cs b/Cloud/Services/ProducerService.cs new file mode 100644 index 0000000..073bdc6 --- /dev/null +++ b/Cloud/Services/ProducerService.cs @@ -0,0 +1,33 @@ +using Confluent.Kafka; + +namespace Cloud.Services +{ + public class ProducerService + { + private readonly IProducer _producer; + + public ProducerService(IConfiguration configuration) + { + var producerConfig = new ProducerConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"] + }; + + //Build the Producer + _producer = new ProducerBuilder(producerConfig).Build(); + } + + //Method for Produce the Message to Kafka Topic + public async Task ProduceMessageAsync(string topic, string value) + { + var kafkaMessage = new Message + { + Key = Guid.NewGuid().ToString(), + Value = value + }; + + //Produce the Message + await _producer.ProduceAsync(topic, kafkaMessage); + } + } +} diff --git a/Cloud/Validation/ValveValidator.cs b/Cloud/Validation/ValveValidator.cs new file mode 100644 index 0000000..4a19f58 --- /dev/null +++ b/Cloud/Validation/ValveValidator.cs @@ -0,0 +1,16 @@ +using Cloud.Enums; +using Cloud.Requests; +using FluentValidation; + +namespace Cloud.Validation +{ + public class ValveValidator : AbstractValidator + { + public ValveValidator() { + + RuleFor(request => request.Action) + .NotEmpty().WithMessage("Action can't be empty"). + IsEnumName(typeof (ValveEnum)).WithMessage("Action is not correct"); + } + } +} diff --git a/Cloud/appsettings.json b/Cloud/appsettings.json index b272a9c..e80f5b5 100644 --- a/Cloud/appsettings.json +++ b/Cloud/appsettings.json @@ -5,6 +5,10 @@ "Microsoft.AspNetCore": "Warning" } }, + "Kafka": { + "BootstrapServers": "localhost:9092", + "GroupId": "ValvesHeaters" + }, "AllowedHosts": "*", "Jwt": { "Key": "m7TyhE20s0dVtUDAr9EnFdPZnAG8maxgBTaiW5j6kO6RQhWDAGxYmXyu0suDnE0o", diff --git a/GreenhouseDetector/detector.py b/GreenhouseDetector/detector.py index 728ec1d..d092373 100644 --- a/GreenhouseDetector/detector.py +++ b/GreenhouseDetector/detector.py @@ -1,13 +1,12 @@ -from random import random -from turtledemo.penrose import start - -from kafka import KafkaProducer, KafkaConsumer -import kafka -import socket -from json import dumps, loads import time import random as rnd +from flask import Flask +import requests +import threading + +app = Flask(__name__) + class Detector: def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower): self.id = id @@ -18,31 +17,23 @@ class Detector: self.moisture = 0 self.temp = 0 - self.producer = KafkaProducer( - bootstrap_servers=['localhost:9092'], - client_id=f'detector{self.id}', - value_serializer=lambda v: dumps(v).encode('utf-8') - ) - - def sendData(self): - message = {'id' : self.id, - 'moisture': self.moisture, - 'temperature' : self.temp } - self.producer.send('dataDetectors', message) - def cycle(self): self.moisture += rnd.random() / 100 self.temp += (rnd.random() - 0.5) / 100 + def sendData(self): + data = {"moisture": self.moisture, + "temp": self.temp} + requests.post(f"http://127.0.0.1:20002/webhook?id={self.id}", json=data) + detector1 = Detector(1, 0.6, 0.2, 40, 20) -detector2 = Detector(2, 0.7, 0.3, 40, 20) -detector3 = Detector(3, 0.9, 0.6, 40, 20) -detectors = [detector1, detector2, detector3] +detectors = [detector1] -while True: - for detector in detectors: - detector.cycle() - detector.sendData() - time.sleep(1) +if __name__ =="__main__": + while True: + for detector in detectors: + detector.cycle() + detector.sendData() + time.sleep(1) diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index c082d04..d9772d1 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -2,20 +2,20 @@ from kafka import KafkaProducer, KafkaConsumer import kafka import socket from json import dumps, loads +from flask import Flask, request import time from enum import Enum -from GreenhouseDetector.detector import Detector +import threading -class Status(Enum): - UNKNOWN = -1 - OFF = 0 - ON = 1 +app = Flask(__name__) +def start_manager(): + return class Manager: - def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", + def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", heater_state: str = "off"): - self.id = id + self._id = _id self.moisture = moisture self.temp = temp self.isAutoOn = isAutoOn @@ -24,49 +24,29 @@ class Manager: self.dataPublisher = KafkaProducer( bootstrap_servers=['localhost:9092'], - client_id=f'manager{id}_producer', + client_id=f'manager{self._id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) - # self.detectorConsumer = KafkaConsumer( - # 'dataDetectors', - # bootstrap_servers=['localhost:9092'], - # auto_offset_reset='earliest', - # enable_auto_commit=True, - # consumer_timeout_ms=1000, - #group_id=f'manager{id}', - # value_deserializer=lambda x: loads(x.decode('utf-8')) - #) self.controllerConsumer = KafkaConsumer( 'commands', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest', enable_auto_commit=True, consumer_timeout_ms=2000, - group_id=f'manager{id}', + group_id=f'manager{self._id}', value_deserializer=lambda x: loads(x.decode('utf-8')) ) self.controllerConsumerResponse = KafkaProducer( bootstrap_servers=['localhost:9092'], - client_id=f'manager{id}_producer', + client_id=f'manager{self._id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) - def update(self): - for message in self.detectorConsumer: - - print(f"Manager {self.id} received message: ") - print(message.value) - self.moisture = message.value['moisture'] - self.temp = message.value['temperature'] - print("Updating info...\n") - - self.sendData() - def sendData(self): print("sending data...") message = { - 'id': self.id, + 'id': self._id, 'moisture': self.moisture, 'temp': self.temp, 'valveStatus': str(self.valve_state), @@ -76,6 +56,8 @@ class Manager: print(message) self.dataPublisher.send('data', message) + self.dataPublisher.flush() + def toggle_device(self, device, request_id, greenhouse_id): @@ -122,23 +104,33 @@ class Manager: for tp, msgs in messages.items(): for message in msgs: - print(f"Manager {self.id} received message: ") + print(f"Manager {self._id} received message: ") print(message.value) self.request_id = message.value['request_id'] self.greenhouse_id = message.value['greenhouse_id'] self.command = message.value['command'] self.toggle_device(self.command, self.request_id, self.greenhouse_id) +@app.route(f'/webhook', methods=['POST']) +def webhook(): + print("received webhook", request.args.get('id')) + for manager in managers: + print() + if int(request.args.get('id')) == manager._id and request.method == 'POST': + print("Data received from Webhook is", request.json) + body = request.json + for key, value in body.items(): + setattr(manager, key, value) -manager1 = Manager(id=1) + manager.sendData() + return f"Webhook received for manager {manager._id}" + return "Webhook ignored" + +t1 = threading.Thread(target=start_manager) +manager1 = Manager(_id=1) managers = [manager1] - -while True: - time.sleep(5) - manager1.sendData() - for manager in managers: - - manager.getCommand() +if __name__ == "__main__": + threading.Thread(target=lambda: app.run(host="0.0.0.0", port=20002, debug=True, use_reloader=False)).start() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index fa85db4..af1ab66 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,3 +1,12 @@ +networks: + vpn: + name: kafkaVPN + driver: bridge + ipam: + config: + - subnet: "192.168.2.0/24" + gateway: "192.168.2.1" + services: cloud: build: ./Cloud/ @@ -20,64 +29,81 @@ services: redis: image: 'redis:latest' ports: - - '6379:6379' + - '6379:6379' volumes: - - 'cloud-redis:/data' + - 'cloud-redis:/data' healthcheck: - test: - - CMD - - redis-cli - - ping - retries: 3 - timeout: 5s - kafka: - image: confluentinc/cp-kafka:7.4.0 - ports: - - 9092:9092 - - 9997:9997 - expose: - - 29092:29092 - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - depends_on: - - zookeeper - init-kafka: - image: confluentinc/cp-kafka:7.4.0 - depends_on: - - kafka - entrypoint: [ '/bin/sh', '-c' ] - command: | - " - # blocks until kafka is reachable - kafka-topics --bootstrap-server kafka:29092 --list - - echo -e 'Creating kafka topics' - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic dataDetectors --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 - - echo -e 'Successfully created the following topics:' - kafka-topics --bootstrap-server kafka:29092 --list - " - kafka-ui: - container_name: kafka-ui - image: provectuslabs/kafka-ui:latest + test: + - CMD + - redis-cli + - ping + retries: 3 + timeout: 5s + zookeeper: + networks: + - vpn + image: confluentinc/cp-zookeeper:7.4.0 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 ports: - - 8080:8080 + - 2181:2181 + kafka: + networks: + vpn: + ipv4_address: 192.168.2.10 + image: confluentinc/cp-kafka:7.4.0 + ports: + - 9092:9092 + - 9997:9997 + expose: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092 + KAFKA_ADVERTISED_LISTENERS: HOST://192.168.1.5:9092,DOCKER://kafka:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + depends_on: + - zookeeper + init-kafka: + networks: + - vpn + image: confluentinc/cp-kafka:7.4.0 depends_on: - kafka - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 - KAFKA_CLUSTERS_0_METRICS_PORT: 9997 + entrypoint: [ '/bin/sh', '-c' ] + command: | + " + # blocks until kafka is reachable + kafka-topics --bootstrap-server kafka:29092 --list + + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server kafka:29092 --list + " + kafka-ui: + networks: + - vpn + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + depends_on: + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_METRICS_PORT: 9997 volumes: postgres_data: driver: local @@ -89,4 +115,4 @@ volumes: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - - 2181:2181 + - 2181:2181 \ No newline at end of file