From 7c310d21f7aa316abc15a01bdcc37b3ebb5b5cc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D1=80=D1=82=D0=B5=D0=BC=20=D0=A5=D0=B0=D1=80=D0=BB?= =?UTF-8?q?=D0=B0=D0=BC=D0=BE=D0=B2?= Date: Wed, 13 Nov 2024 15:24:47 +0400 Subject: [PATCH 1/2] Add broker service --- Cloud/Cloud.csproj | 1 + Cloud/Controllers/ValveController.cs | 41 ++++++++++++++++++ Cloud/Enums/ValveEnum.cs | 9 ++++ Cloud/Program.cs | 11 +++++ Cloud/Requests/ValveRequest.cs | 7 ++++ Cloud/Services/BackgroundWorkerService.cs | 39 +++++++++++++++++ Cloud/Services/ConsumerService.cs | 51 +++++++++++++++++++++++ Cloud/Services/ProducerService.cs | 33 +++++++++++++++ Cloud/Validation/ValveValidator.cs | 16 +++++++ Cloud/appsettings.json | 4 ++ 10 files changed, 212 insertions(+) create mode 100644 Cloud/Controllers/ValveController.cs create mode 100644 Cloud/Enums/ValveEnum.cs create mode 100644 Cloud/Requests/ValveRequest.cs create mode 100644 Cloud/Services/BackgroundWorkerService.cs create mode 100644 Cloud/Services/ConsumerService.cs create mode 100644 Cloud/Services/ProducerService.cs create mode 100644 Cloud/Validation/ValveValidator.cs diff --git a/Cloud/Cloud.csproj b/Cloud/Cloud.csproj index 6a5fc81..e74339d 100644 --- a/Cloud/Cloud.csproj +++ b/Cloud/Cloud.csproj @@ -7,6 +7,7 @@ + diff --git a/Cloud/Controllers/ValveController.cs b/Cloud/Controllers/ValveController.cs new file mode 100644 index 0000000..5708820 --- /dev/null +++ b/Cloud/Controllers/ValveController.cs @@ -0,0 +1,41 @@ +using Cloud.Requests; +using Cloud.Services; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; +using System.ComponentModel.DataAnnotations; +using System.Text.Json; + +namespace Cloud.Controllers +{ + [Authorize] + [ApiController] + [Route("api")] + public class ValveController : ControllerBase + { + //Контроллер вентиля + + private readonly ProducerService _producerService; + + public ValveController(ProducerService producerService) + { + _producerService = producerService; + } + + [HttpPost("farm/{farmId}/greenhouse/{greenhouseId}/watering")] + public async Task interactValve([FromBody] ValveRequest request, int farmId, int ghId) + { + var kafkaRequest = new + { + FarmId = farmId, + GreenHouseId = ghId, + SomeAction = request.Action, + }; + + var message = JsonSerializer.Serialize(kafkaRequest); + + await _producerService.ProduceMessageAsync("InventoryUpdates", message); + + return Ok($"Valve status is {request.Action}"); + } + } +} diff --git a/Cloud/Enums/ValveEnum.cs b/Cloud/Enums/ValveEnum.cs new file mode 100644 index 0000000..f04d64a --- /dev/null +++ b/Cloud/Enums/ValveEnum.cs @@ -0,0 +1,9 @@ +namespace Cloud.Enums +{ + public enum ValveEnum + { + Open, + Close, + Auto + } +} diff --git a/Cloud/Program.cs b/Cloud/Program.cs index c02b6fb..1c3e3a8 100644 --- a/Cloud/Program.cs +++ b/Cloud/Program.cs @@ -7,6 +7,7 @@ using FluentValidation; using FluentValidation.AspNetCore; using Cloud.Validation; using StackExchange.Redis; +using Cloud.Services; var builder = WebApplication.CreateBuilder(args); @@ -19,6 +20,15 @@ builder.Services.AddSingleton(sp => return ConnectionMultiplexer.Connect(configuration); }); +//Kafka producer service +builder.Services.AddSingleton(); + +//Kafka consumer service +builder.Services.AddSingleton(); + +//Add the BackgroundWorkerService +builder.Services.AddHostedService(); + //Jwt configuration var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get(); var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get(); @@ -58,6 +68,7 @@ builder.Services.AddFluentValidationClientsideAdapters(); builder.Services.AddValidatorsFromAssemblyContaining(); builder.Services.AddValidatorsFromAssemblyContaining(); builder.Services.AddValidatorsFromAssemblyContaining(); +builder.Services.AddValidatorsFromAssemblyContaining(); // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); diff --git a/Cloud/Requests/ValveRequest.cs b/Cloud/Requests/ValveRequest.cs new file mode 100644 index 0000000..dee6a5a --- /dev/null +++ b/Cloud/Requests/ValveRequest.cs @@ -0,0 +1,7 @@ +namespace Cloud.Requests +{ + public class ValveRequest + { + public string Action { get; set; } + } +} diff --git a/Cloud/Services/BackgroundWorkerService.cs b/Cloud/Services/BackgroundWorkerService.cs new file mode 100644 index 0000000..f7fbf34 --- /dev/null +++ b/Cloud/Services/BackgroundWorkerService.cs @@ -0,0 +1,39 @@ +namespace Cloud.Services +{ + public class BackgroundWorkerService : BackgroundService + { + public readonly ILogger _logger; + private readonly ConsumerService _consumerService; + + public BackgroundWorkerService(ILogger logger, ConsumerService consumer) + { + _logger = logger; + _consumerService = consumer; + } + + //Backghround Service, This will run continuously + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + try + { + while (!stoppingToken.IsCancellationRequested) + { + //_logger.LogInformation("Background Service is Runing at : {time}", DateTimeOffset.Now); + + string request = await _consumerService.WaitMessage("ValvesHeatersRequest"); //Consume the Kafka Message + + //After Consume the Order Request Can process the order + if (!string.IsNullOrEmpty(request)) + _logger.LogInformation("Valves-Heaters Request : {value}", request); + + + await Task.Delay(1000, stoppingToken); + } + } + catch (Exception ex) + { + _logger.LogError($"BackgroundWorkerService - Exception {ex}"); + } + } + } +} diff --git a/Cloud/Services/ConsumerService.cs b/Cloud/Services/ConsumerService.cs new file mode 100644 index 0000000..82d5bb2 --- /dev/null +++ b/Cloud/Services/ConsumerService.cs @@ -0,0 +1,51 @@ +using Confluent.Kafka; + +namespace Cloud.Services +{ + public class ConsumerService + { + private IConsumer _consumer; + private ConsumerConfig consumerConfig; + public ConsumerService(IConfiguration configuration) + { + consumerConfig = new ConsumerConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"], + GroupId = configuration["Kafka:GroupId"], + AutoOffsetReset = AutoOffsetReset.Earliest, + }; + + _consumer = new ConsumerBuilder(consumerConfig).Build(); + } + + //Consume Method + public async TaskWaitMessage(string topic) + { + try + { + _consumer.Subscribe(topic); + + var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000)); + + if (consumeResult != null) + { + return consumeResult.Message.Value; + } + else + { + //No message received from Kafka within the specified timeout. + } + return ""; + + } + catch (Exception ex) + { + return ""; + } + finally + { + _consumer.Close(); + } + } + } +} diff --git a/Cloud/Services/ProducerService.cs b/Cloud/Services/ProducerService.cs new file mode 100644 index 0000000..073bdc6 --- /dev/null +++ b/Cloud/Services/ProducerService.cs @@ -0,0 +1,33 @@ +using Confluent.Kafka; + +namespace Cloud.Services +{ + public class ProducerService + { + private readonly IProducer _producer; + + public ProducerService(IConfiguration configuration) + { + var producerConfig = new ProducerConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"] + }; + + //Build the Producer + _producer = new ProducerBuilder(producerConfig).Build(); + } + + //Method for Produce the Message to Kafka Topic + public async Task ProduceMessageAsync(string topic, string value) + { + var kafkaMessage = new Message + { + Key = Guid.NewGuid().ToString(), + Value = value + }; + + //Produce the Message + await _producer.ProduceAsync(topic, kafkaMessage); + } + } +} diff --git a/Cloud/Validation/ValveValidator.cs b/Cloud/Validation/ValveValidator.cs new file mode 100644 index 0000000..4a19f58 --- /dev/null +++ b/Cloud/Validation/ValveValidator.cs @@ -0,0 +1,16 @@ +using Cloud.Enums; +using Cloud.Requests; +using FluentValidation; + +namespace Cloud.Validation +{ + public class ValveValidator : AbstractValidator + { + public ValveValidator() { + + RuleFor(request => request.Action) + .NotEmpty().WithMessage("Action can't be empty"). + IsEnumName(typeof (ValveEnum)).WithMessage("Action is not correct"); + } + } +} diff --git a/Cloud/appsettings.json b/Cloud/appsettings.json index b272a9c..e80f5b5 100644 --- a/Cloud/appsettings.json +++ b/Cloud/appsettings.json @@ -5,6 +5,10 @@ "Microsoft.AspNetCore": "Warning" } }, + "Kafka": { + "BootstrapServers": "localhost:9092", + "GroupId": "ValvesHeaters" + }, "AllowedHosts": "*", "Jwt": { "Key": "m7TyhE20s0dVtUDAr9EnFdPZnAG8maxgBTaiW5j6kO6RQhWDAGxYmXyu0suDnE0o", From 5e73961ad593fba43cf1f82bd390f74d32842978 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D1=80=D1=82=D0=B5=D0=BC=20=D0=A5=D0=B0=D1=80=D0=BB?= =?UTF-8?q?=D0=B0=D0=BC=D0=BE=D0=B2?= Date: Tue, 19 Nov 2024 19:36:05 +0400 Subject: [PATCH 2/2] Add Valve Controller --- Cloud/Controllers/ValveController.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Cloud/Controllers/ValveController.cs b/Cloud/Controllers/ValveController.cs index 5708820..dbc081a 100644 --- a/Cloud/Controllers/ValveController.cs +++ b/Cloud/Controllers/ValveController.cs @@ -21,7 +21,7 @@ namespace Cloud.Controllers _producerService = producerService; } - [HttpPost("farm/{farmId}/greenhouse/{greenhouseId}/watering")] + [HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")] public async Task interactValve([FromBody] ValveRequest request, int farmId, int ghId) { var kafkaRequest = new @@ -32,10 +32,11 @@ namespace Cloud.Controllers }; var message = JsonSerializer.Serialize(kafkaRequest); + return Ok(kafkaRequest); - await _producerService.ProduceMessageAsync("InventoryUpdates", message); + /*await _producerService.ProduceMessageAsync("ValvesHeatersRequest", message); - return Ok($"Valve status is {request.Action}"); + return Ok($"Valve status is {request.Action}");*/ } } }