add: сервис брокера, сущности брокера

This commit is contained in:
mfnefd 2024-11-19 23:23:21 +04:00
parent 53d93635fc
commit c230d86404
11 changed files with 109 additions and 15 deletions

View File

@ -7,6 +7,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.6.1" />
<PackageReference Include="FluentValidation.AspNetCore" Version="11.3.0" /> <PackageReference Include="FluentValidation.AspNetCore" Version="11.3.0" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="6.0.4" /> <PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="6.0.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.14" /> <PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.14" />

View File

@ -1,7 +1,9 @@
using Cloud.Services.Broker.Support;
namespace Cloud.Services.Broker namespace Cloud.Services.Broker
{ {
public interface IBrokerConsumer public interface IBrokerConsumer
{ {
// TODO: добавить методы для получения данных T? WaitMessage<T>(string topic) where T : IBrokerResponse;
} }
} }

View File

@ -2,7 +2,7 @@ using Cloud.Services.Broker.Support;
namespace Cloud.Services.Broker namespace Cloud.Services.Broker
{ {
public interface IBrokerProdurcer public interface IBrokerProducer
{ {
Task ProduceAsync(string topic, Command command); Task ProduceAsync(string topic, Command command);
} }

View File

@ -4,7 +4,7 @@ namespace Cloud.Services.Broker
{ {
public interface IBrokerService public interface IBrokerService
{ {
Task<CommandResult> Produce(Command command); Task Produce(Command command);
Task<T> Consume<T>(string topic); T? Consume<T>(string topic) where T : IBrokerResponse;
} }
} }

View File

@ -0,0 +1,51 @@
using Cloud.Services.Broker.Support;
using Confluent.Kafka;
using System.Text.Json;
namespace Cloud.Services.Broker.Implement.Kafka
{
public class KafkaConsumer : IBrokerConsumer
{
private readonly IConsumer<string, string> _consumer;
public KafkaConsumer(IConfiguration config)
{
var consumerConfig = new ConsumerConfig()
{
BootstrapServers = config["Kafka:BootstrapServers"],
GroupId = config["Kafka:GroupId"],
AutoOffsetReset = AutoOffsetReset.Earliest,
};
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
}
public T? WaitMessage<T>(string topic)
where T : IBrokerResponse
{
try
{
_consumer.Subscribe(topic);
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000));
if (consumeResult == null)
{
// No message received from Kafka within the specified timeout.
return default;
}
return JsonSerializer.Deserialize<T>(consumeResult.Message.Value);
}
catch (Exception ex)
{
throw;
}
finally
{
_consumer.Close();
}
}
}
}

View File

@ -0,0 +1,28 @@
using Cloud.Services.Broker.Support;
using Confluent.Kafka;
namespace Cloud.Services.Broker.Implement.Kafka
{
public class KafkaProducer : IBrokerProducer
{
private readonly IProducer<Guid, Command> _producer;
public KafkaProducer(IConfiguration configuration)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"]
};
//Build the Producer
_producer = new ProducerBuilder<Guid, Command>(producerConfig).Build();
}
public async Task ProduceAsync(string topic, Command command)
{
var message = new Message<Guid, Command> { Key = Guid.NewGuid(), Value = command };
//Produce the Message
await _producer.ProduceAsync(topic, message);
}
}
}

View File

@ -4,14 +4,19 @@ namespace Cloud.Services.Broker.Implement.Kafka
{ {
public class KafkaService : IBrokerService public class KafkaService : IBrokerService
{ {
public Task<T> Consume<T>(string topic) private readonly KafkaProducer _producer;
private readonly KafkaConsumer _consumer;
public KafkaService(IConfiguration configuration)
{ {
throw new NotImplementedException(); _producer = new KafkaProducer(configuration);
_consumer = new KafkaConsumer(configuration);
} }
public Task<CommandResult> Produce(Command command) public T? Consume<T>(string topic)
{ where T : IBrokerResponse => _consumer.WaitMessage<T>(topic);
throw new NotImplementedException();
} public async Task Produce(Command command)
=> await _producer.ProduceAsync("commands", command);
} }
} }

View File

@ -1,9 +1,10 @@
using System.Text.Json;
namespace Cloud.Services.Broker.Support namespace Cloud.Services.Broker.Support
{ {
public class Command public class Command
{ {
public int Id { get; set; } public Guid GreenhouseId { get; set; }
public int GreenhouseId { get; set; }
public string CommandName { get; set; } = null!; public string CommandName { get; set; } = null!;
} }
} }

View File

@ -1,6 +1,6 @@
namespace Cloud.Services.Broker.Support namespace Cloud.Services.Broker.Support
{ {
public class CommandResult public class CommandResult : IBrokerResponse
{ {
public int CommandId { get; set; } public int CommandId { get; set; }
public int GreenhouseId { get; set; } public int GreenhouseId { get; set; }

View File

@ -1,6 +1,6 @@
namespace Cloud.Services.Broker.Support namespace Cloud.Services.Broker.Support
{ {
public class GreenhouseInfo public class GreenhouseInfo : IBrokerResponse
{ {
public int Id { get; set; } public int Id { get; set; }
public int PercentWater { get; set; } public int PercentWater { get; set; }

View File

@ -0,0 +1,6 @@
namespace Cloud.Services.Broker.Support
{
public interface IBrokerResponse
{
}
}