Add broker service

This commit is contained in:
Артем Харламов 2024-11-13 15:24:47 +04:00
parent 80b002f12a
commit 7c310d21f7
10 changed files with 212 additions and 0 deletions

View File

@ -7,6 +7,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.6.0" />
<PackageReference Include="FluentValidation.AspNetCore" Version="11.3.0" /> <PackageReference Include="FluentValidation.AspNetCore" Version="11.3.0" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="6.0.4" /> <PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="6.0.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.14" /> <PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.14" />

View File

@ -0,0 +1,41 @@
using Cloud.Requests;
using Cloud.Services;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System.ComponentModel.DataAnnotations;
using System.Text.Json;
namespace Cloud.Controllers
{
[Authorize]
[ApiController]
[Route("api")]
public class ValveController : ControllerBase
{
//Контроллер вентиля
private readonly ProducerService _producerService;
public ValveController(ProducerService producerService)
{
_producerService = producerService;
}
[HttpPost("farm/{farmId}/greenhouse/{greenhouseId}/watering")]
public async Task<IActionResult> interactValve([FromBody] ValveRequest request, int farmId, int ghId)
{
var kafkaRequest = new
{
FarmId = farmId,
GreenHouseId = ghId,
SomeAction = request.Action,
};
var message = JsonSerializer.Serialize(kafkaRequest);
await _producerService.ProduceMessageAsync("InventoryUpdates", message);
return Ok($"Valve status is {request.Action}");
}
}
}

9
Cloud/Enums/ValveEnum.cs Normal file
View File

@ -0,0 +1,9 @@
namespace Cloud.Enums
{
public enum ValveEnum
{
Open,
Close,
Auto
}
}

View File

@ -7,6 +7,7 @@ using FluentValidation;
using FluentValidation.AspNetCore; using FluentValidation.AspNetCore;
using Cloud.Validation; using Cloud.Validation;
using StackExchange.Redis; using StackExchange.Redis;
using Cloud.Services;
var builder = WebApplication.CreateBuilder(args); var builder = WebApplication.CreateBuilder(args);
@ -19,6 +20,15 @@ builder.Services.AddSingleton<IConnectionMultiplexer>(sp =>
return ConnectionMultiplexer.Connect(configuration); return ConnectionMultiplexer.Connect(configuration);
}); });
//Kafka producer service
builder.Services.AddSingleton<ProducerService, ProducerService>();
//Kafka consumer service
builder.Services.AddSingleton<ConsumerService, ConsumerService>();
//Add the BackgroundWorkerService
builder.Services.AddHostedService<BackgroundWorkerService>();
//Jwt configuration //Jwt configuration
var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get<string>(); var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get<string>();
var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get<string>(); var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get<string>();
@ -58,6 +68,7 @@ builder.Services.AddFluentValidationClientsideAdapters();
builder.Services.AddValidatorsFromAssemblyContaining<LoginValidator>(); builder.Services.AddValidatorsFromAssemblyContaining<LoginValidator>();
builder.Services.AddValidatorsFromAssemblyContaining<RegisterValidator>(); builder.Services.AddValidatorsFromAssemblyContaining<RegisterValidator>();
builder.Services.AddValidatorsFromAssemblyContaining<FarmValidator>(); builder.Services.AddValidatorsFromAssemblyContaining<FarmValidator>();
builder.Services.AddValidatorsFromAssemblyContaining<ValveValidator>();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer(); builder.Services.AddEndpointsApiExplorer();

View File

@ -0,0 +1,7 @@
namespace Cloud.Requests
{
public class ValveRequest
{
public string Action { get; set; }
}
}

View File

@ -0,0 +1,39 @@
namespace Cloud.Services
{
public class BackgroundWorkerService : BackgroundService
{
public readonly ILogger<BackgroundWorkerService> _logger;
private readonly ConsumerService _consumerService;
public BackgroundWorkerService(ILogger<BackgroundWorkerService> logger, ConsumerService consumer)
{
_logger = logger;
_consumerService = consumer;
}
//Backghround Service, This will run continuously
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
while (!stoppingToken.IsCancellationRequested)
{
//_logger.LogInformation("Background Service is Runing at : {time}", DateTimeOffset.Now);
string request = await _consumerService.WaitMessage("ValvesHeatersRequest"); //Consume the Kafka Message
//After Consume the Order Request Can process the order
if (!string.IsNullOrEmpty(request))
_logger.LogInformation("Valves-Heaters Request : {value}", request);
await Task.Delay(1000, stoppingToken);
}
}
catch (Exception ex)
{
_logger.LogError($"BackgroundWorkerService - Exception {ex}");
}
}
}
}

View File

@ -0,0 +1,51 @@
using Confluent.Kafka;
namespace Cloud.Services
{
public class ConsumerService
{
private IConsumer<string, string> _consumer;
private ConsumerConfig consumerConfig;
public ConsumerService(IConfiguration configuration)
{
consumerConfig = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
GroupId = configuration["Kafka:GroupId"],
AutoOffsetReset = AutoOffsetReset.Earliest,
};
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
}
//Consume Method
public async Task<string>WaitMessage(string topic)
{
try
{
_consumer.Subscribe(topic);
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000));
if (consumeResult != null)
{
return consumeResult.Message.Value;
}
else
{
//No message received from Kafka within the specified timeout.
}
return "";
}
catch (Exception ex)
{
return "";
}
finally
{
_consumer.Close();
}
}
}
}

View File

@ -0,0 +1,33 @@
using Confluent.Kafka;
namespace Cloud.Services
{
public class ProducerService
{
private readonly IProducer<string, string> _producer;
public ProducerService(IConfiguration configuration)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"]
};
//Build the Producer
_producer = new ProducerBuilder<string, string>(producerConfig).Build();
}
//Method for Produce the Message to Kafka Topic
public async Task ProduceMessageAsync(string topic, string value)
{
var kafkaMessage = new Message<string, string>
{
Key = Guid.NewGuid().ToString(),
Value = value
};
//Produce the Message
await _producer.ProduceAsync(topic, kafkaMessage);
}
}
}

View File

@ -0,0 +1,16 @@
using Cloud.Enums;
using Cloud.Requests;
using FluentValidation;
namespace Cloud.Validation
{
public class ValveValidator : AbstractValidator<ValveRequest>
{
public ValveValidator() {
RuleFor(request => request.Action)
.NotEmpty().WithMessage("Action can't be empty").
IsEnumName(typeof (ValveEnum)).WithMessage("Action is not correct");
}
}
}

View File

@ -5,6 +5,10 @@
"Microsoft.AspNetCore": "Warning" "Microsoft.AspNetCore": "Warning"
} }
}, },
"Kafka": {
"BootstrapServers": "localhost:9092",
"GroupId": "ValvesHeaters"
},
"AllowedHosts": "*", "AllowedHosts": "*",
"Jwt": { "Jwt": {
"Key": "m7TyhE20s0dVtUDAr9EnFdPZnAG8maxgBTaiW5j6kO6RQhWDAGxYmXyu0suDnE0o", "Key": "m7TyhE20s0dVtUDAr9EnFdPZnAG8maxgBTaiW5j6kO6RQhWDAGxYmXyu0suDnE0o",