brokerService #6
@ -7,6 +7,7 @@
|
|||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
<PackageReference Include="Confluent.Kafka" Version="2.6.0" />
|
||||||
<PackageReference Include="FluentValidation.AspNetCore" Version="11.3.0" />
|
<PackageReference Include="FluentValidation.AspNetCore" Version="11.3.0" />
|
||||||
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="6.0.4" />
|
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="6.0.4" />
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.14" />
|
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.14" />
|
||||||
|
42
Cloud/Controllers/ValveController.cs
Normal file
42
Cloud/Controllers/ValveController.cs
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
using Cloud.Requests;
|
||||||
|
using Cloud.Services;
|
||||||
|
using Microsoft.AspNetCore.Authorization;
|
||||||
|
using Microsoft.AspNetCore.Mvc;
|
||||||
|
using System.ComponentModel.DataAnnotations;
|
||||||
|
using System.Text.Json;
|
||||||
|
|
||||||
|
namespace Cloud.Controllers
|
||||||
|
{
|
||||||
|
[Authorize]
|
||||||
|
[ApiController]
|
||||||
|
[Route("api")]
|
||||||
|
public class ValveController : ControllerBase
|
||||||
|
{
|
||||||
|
//Контроллер вентиля
|
||||||
|
|
||||||
|
private readonly ProducerService _producerService;
|
||||||
|
|
||||||
|
public ValveController(ProducerService producerService)
|
||||||
|
{
|
||||||
|
_producerService = producerService;
|
||||||
|
}
|
||||||
|
|
||||||
|
[HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")]
|
||||||
|
public async Task<IActionResult> interactValve([FromBody] ValveRequest request, int farmId, int ghId)
|
||||||
|
{
|
||||||
|
var kafkaRequest = new
|
||||||
|
{
|
||||||
|
FarmId = farmId,
|
||||||
|
GreenHouseId = ghId,
|
||||||
|
SomeAction = request.Action,
|
||||||
|
};
|
||||||
|
|
||||||
|
var message = JsonSerializer.Serialize(kafkaRequest);
|
||||||
|
return Ok(kafkaRequest);
|
||||||
|
|
||||||
|
/*await _producerService.ProduceMessageAsync("ValvesHeatersRequest", message);
|
||||||
|
|
||||||
|
return Ok($"Valve status is {request.Action}");*/
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
9
Cloud/Enums/ValveEnum.cs
Normal file
9
Cloud/Enums/ValveEnum.cs
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
namespace Cloud.Enums
|
||||||
|
{
|
||||||
|
public enum ValveEnum
|
||||||
|
{
|
||||||
|
Open,
|
||||||
|
Close,
|
||||||
|
Auto
|
||||||
|
}
|
||||||
|
}
|
@ -7,6 +7,7 @@ using FluentValidation;
|
|||||||
using FluentValidation.AspNetCore;
|
using FluentValidation.AspNetCore;
|
||||||
using Cloud.Validation;
|
using Cloud.Validation;
|
||||||
using StackExchange.Redis;
|
using StackExchange.Redis;
|
||||||
|
using Cloud.Services;
|
||||||
|
|
||||||
var builder = WebApplication.CreateBuilder(args);
|
var builder = WebApplication.CreateBuilder(args);
|
||||||
|
|
||||||
@ -19,6 +20,15 @@ builder.Services.AddSingleton<IConnectionMultiplexer>(sp =>
|
|||||||
return ConnectionMultiplexer.Connect(configuration);
|
return ConnectionMultiplexer.Connect(configuration);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
//Kafka producer service
|
||||||
|
builder.Services.AddSingleton<ProducerService, ProducerService>();
|
||||||
|
|
||||||
|
//Kafka consumer service
|
||||||
|
builder.Services.AddSingleton<ConsumerService, ConsumerService>();
|
||||||
|
|
||||||
|
//Add the BackgroundWorkerService
|
||||||
|
builder.Services.AddHostedService<BackgroundWorkerService>();
|
||||||
|
|
||||||
//Jwt configuration
|
//Jwt configuration
|
||||||
var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get<string>();
|
var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get<string>();
|
||||||
var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get<string>();
|
var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get<string>();
|
||||||
@ -58,6 +68,7 @@ builder.Services.AddFluentValidationClientsideAdapters();
|
|||||||
builder.Services.AddValidatorsFromAssemblyContaining<LoginValidator>();
|
builder.Services.AddValidatorsFromAssemblyContaining<LoginValidator>();
|
||||||
builder.Services.AddValidatorsFromAssemblyContaining<RegisterValidator>();
|
builder.Services.AddValidatorsFromAssemblyContaining<RegisterValidator>();
|
||||||
builder.Services.AddValidatorsFromAssemblyContaining<FarmValidator>();
|
builder.Services.AddValidatorsFromAssemblyContaining<FarmValidator>();
|
||||||
|
builder.Services.AddValidatorsFromAssemblyContaining<ValveValidator>();
|
||||||
|
|
||||||
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
|
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
|
||||||
builder.Services.AddEndpointsApiExplorer();
|
builder.Services.AddEndpointsApiExplorer();
|
||||||
|
7
Cloud/Requests/ValveRequest.cs
Normal file
7
Cloud/Requests/ValveRequest.cs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
namespace Cloud.Requests
|
||||||
|
{
|
||||||
|
public class ValveRequest
|
||||||
|
{
|
||||||
|
public string Action { get; set; }
|
||||||
|
}
|
||||||
|
}
|
39
Cloud/Services/BackgroundWorkerService.cs
Normal file
39
Cloud/Services/BackgroundWorkerService.cs
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
namespace Cloud.Services
|
||||||
|
{
|
||||||
|
public class BackgroundWorkerService : BackgroundService
|
||||||
|
{
|
||||||
|
public readonly ILogger<BackgroundWorkerService> _logger;
|
||||||
|
private readonly ConsumerService _consumerService;
|
||||||
|
|
||||||
|
public BackgroundWorkerService(ILogger<BackgroundWorkerService> logger, ConsumerService consumer)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
_consumerService = consumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Backghround Service, This will run continuously
|
||||||
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
//_logger.LogInformation("Background Service is Runing at : {time}", DateTimeOffset.Now);
|
||||||
|
|
||||||
|
string request = await _consumerService.WaitMessage("ValvesHeatersRequest"); //Consume the Kafka Message
|
||||||
|
|
||||||
|
//After Consume the Order Request Can process the order
|
||||||
|
if (!string.IsNullOrEmpty(request))
|
||||||
|
_logger.LogInformation("Valves-Heaters Request : {value}", request);
|
||||||
|
|
||||||
|
|
||||||
|
await Task.Delay(1000, stoppingToken);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError($"BackgroundWorkerService - Exception {ex}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
51
Cloud/Services/ConsumerService.cs
Normal file
51
Cloud/Services/ConsumerService.cs
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
using Confluent.Kafka;
|
||||||
|
|
||||||
|
namespace Cloud.Services
|
||||||
|
{
|
||||||
|
public class ConsumerService
|
||||||
|
{
|
||||||
|
private IConsumer<string, string> _consumer;
|
||||||
|
private ConsumerConfig consumerConfig;
|
||||||
|
public ConsumerService(IConfiguration configuration)
|
||||||
|
{
|
||||||
|
consumerConfig = new ConsumerConfig
|
||||||
|
{
|
||||||
|
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||||
|
GroupId = configuration["Kafka:GroupId"],
|
||||||
|
AutoOffsetReset = AutoOffsetReset.Earliest,
|
||||||
|
};
|
||||||
|
|
||||||
|
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
|
||||||
|
}
|
||||||
|
|
||||||
|
//Consume Method
|
||||||
|
public async Task<string>WaitMessage(string topic)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
_consumer.Subscribe(topic);
|
||||||
|
|
||||||
|
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000));
|
||||||
|
|
||||||
|
if (consumeResult != null)
|
||||||
|
{
|
||||||
|
return consumeResult.Message.Value;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
//No message received from Kafka within the specified timeout.
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_consumer.Close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
33
Cloud/Services/ProducerService.cs
Normal file
33
Cloud/Services/ProducerService.cs
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
using Confluent.Kafka;
|
||||||
|
|
||||||
|
namespace Cloud.Services
|
||||||
|
{
|
||||||
|
public class ProducerService
|
||||||
|
{
|
||||||
|
private readonly IProducer<string, string> _producer;
|
||||||
|
|
||||||
|
public ProducerService(IConfiguration configuration)
|
||||||
|
{
|
||||||
|
var producerConfig = new ProducerConfig
|
||||||
|
{
|
||||||
|
BootstrapServers = configuration["Kafka:BootstrapServers"]
|
||||||
|
};
|
||||||
|
|
||||||
|
//Build the Producer
|
||||||
|
_producer = new ProducerBuilder<string, string>(producerConfig).Build();
|
||||||
|
}
|
||||||
|
|
||||||
|
//Method for Produce the Message to Kafka Topic
|
||||||
|
public async Task ProduceMessageAsync(string topic, string value)
|
||||||
|
{
|
||||||
|
var kafkaMessage = new Message<string, string>
|
||||||
|
{
|
||||||
|
Key = Guid.NewGuid().ToString(),
|
||||||
|
Value = value
|
||||||
|
};
|
||||||
|
|
||||||
|
//Produce the Message
|
||||||
|
await _producer.ProduceAsync(topic, kafkaMessage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
16
Cloud/Validation/ValveValidator.cs
Normal file
16
Cloud/Validation/ValveValidator.cs
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
using Cloud.Enums;
|
||||||
|
using Cloud.Requests;
|
||||||
|
using FluentValidation;
|
||||||
|
|
||||||
|
namespace Cloud.Validation
|
||||||
|
{
|
||||||
|
public class ValveValidator : AbstractValidator<ValveRequest>
|
||||||
|
{
|
||||||
|
public ValveValidator() {
|
||||||
|
|
||||||
|
RuleFor(request => request.Action)
|
||||||
|
.NotEmpty().WithMessage("Action can't be empty").
|
||||||
|
IsEnumName(typeof (ValveEnum)).WithMessage("Action is not correct");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -5,6 +5,10 @@
|
|||||||
"Microsoft.AspNetCore": "Warning"
|
"Microsoft.AspNetCore": "Warning"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"Kafka": {
|
||||||
|
"BootstrapServers": "localhost:9092",
|
||||||
|
"GroupId": "ValvesHeaters"
|
||||||
|
},
|
||||||
"AllowedHosts": "*",
|
"AllowedHosts": "*",
|
||||||
"Jwt": {
|
"Jwt": {
|
||||||
"Key": "m7TyhE20s0dVtUDAr9EnFdPZnAG8maxgBTaiW5j6kO6RQhWDAGxYmXyu0suDnE0o",
|
"Key": "m7TyhE20s0dVtUDAr9EnFdPZnAG8maxgBTaiW5j6kO6RQhWDAGxYmXyu0suDnE0o",
|
||||||
|
Loading…
Reference in New Issue
Block a user