fix: kafka service

This commit is contained in:
mfnefd 2024-12-04 01:57:10 +04:00
parent fed96d5b86
commit 9c1720e131
6 changed files with 55 additions and 31 deletions

View File

@ -1,5 +1,6 @@
using Cloud.Requests; using Cloud.Requests;
using Cloud.Services; using Cloud.Services;
using Cloud.Services.Broker;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations;
@ -14,11 +15,11 @@ namespace Cloud.Controllers
{ {
//Контроллер вентиля //Контроллер вентиля
private readonly ProducerService _producerService; private readonly IBrokerService _kafkaService;
public ValveController(ProducerService producerService) public ValveController(IBrokerService kafkaService)
{ {
_producerService = producerService; _kafkaService = kafkaService;
} }
[HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")] [HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")]
@ -34,7 +35,7 @@ namespace Cloud.Controllers
var message = JsonSerializer.Serialize(kafkaRequest); var message = JsonSerializer.Serialize(kafkaRequest);
return Ok(kafkaRequest); return Ok(kafkaRequest);
/*await _producerService.ProduceMessageAsync("ValvesHeatersRequest", message); /*await _kafkaService.ProduceAsync("ValvesHeatersRequest", message);
return Ok($"Valve status is {request.Action}");*/ return Ok($"Valve status is {request.Action}");*/
} }

View File

@ -4,6 +4,6 @@ namespace Cloud.Services.Broker
{ {
public interface IBrokerConsumer public interface IBrokerConsumer
{ {
T? WaitMessage<T>(string topic) where T : IBrokerResponse; IEnumerable<T>? WaitMessages<T>(string topic) where T : IBrokerResponse;
} }
} }

View File

@ -2,9 +2,8 @@ using Cloud.Services.Broker.Support;
namespace Cloud.Services.Broker namespace Cloud.Services.Broker
{ {
public interface IBrokerService public interface IBrokerService : IBrokerProducer, IBrokerConsumer
{ {
Task Produce(Command command); void ChangeBrokerIp(string ip);
T? Consume<T>(string topic) where T : IBrokerResponse;
} }
} }

View File

@ -7,35 +7,31 @@ namespace Cloud.Services.Broker.Implement.Kafka
{ {
public class KafkaConsumer : IBrokerConsumer public class KafkaConsumer : IBrokerConsumer
{ {
private readonly IConsumer<string, string> _consumer; private IConsumer<string, string> _consumer;
private readonly IConfiguration _config;
public KafkaConsumer(IConfiguration config) public KafkaConsumer(IConfiguration config)
{ {
var consumerConfig = new ConsumerConfig() _config = config;
{ ChangeBrokerIp(_config["Kafka:BootstrapServers"]);
BootstrapServers = config["Kafka:BootstrapServers"],
GroupId = config["Kafka:GroupId"],
AutoOffsetReset = AutoOffsetReset.Earliest,
};
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
} }
public T? WaitMessage<T>(string topic) public IEnumerable<T>? WaitMessages<T>(string topic)
where T : IBrokerResponse where T : IBrokerResponse
{ {
try try
{ {
_consumer.Subscribe(topic); _consumer.Subscribe(topic);
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000)); var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000));
Console.WriteLine($"================ Received message: {consumeResult?.Message.Value}");
if (consumeResult == null) if (consumeResult == null)
{ {
// No message received from Kafka within the specified timeout. // No message received from Kafka within the specified timeout.
return default; return default;
} }
return JsonSerializer.Deserialize<T>(consumeResult.Message.Value);
return JsonSerializer.Deserialize<IEnumerable<T>>(consumeResult.Message.Value);
} }
catch (Exception ex) catch (Exception ex)
@ -47,5 +43,17 @@ namespace Cloud.Services.Broker.Implement.Kafka
_consumer.Close(); _consumer.Close();
} }
} }
public void ChangeBrokerIp(string ip)
{
var consumerConfig = new ConsumerConfig()
{
BootstrapServers = ip,
GroupId = _config["Kafka:GroupId"],
AutoOffsetReset = AutoOffsetReset.Earliest,
};
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
}
} }
} }

View File

@ -5,17 +5,14 @@ namespace Cloud.Services.Broker.Implement.Kafka
{ {
public class KafkaProducer : IBrokerProducer public class KafkaProducer : IBrokerProducer
{ {
private readonly IProducer<Guid, Command> _producer; private IProducer<Guid, Command> _producer;
private readonly IConfiguration _config;
public KafkaProducer(IConfiguration configuration) public KafkaProducer(IConfiguration configuration)
{ {
var producerConfig = new ProducerConfig _config = configuration;
{
BootstrapServers = configuration["Kafka:BootstrapServers"]
};
//Build the Producer ChangeBrokerIp(_config["Kafka:BootstrapServers"]);
_producer = new ProducerBuilder<Guid, Command>(producerConfig).Build();
} }
public async Task ProduceAsync(string topic, Command command) public async Task ProduceAsync(string topic, Command command)
{ {
@ -24,5 +21,16 @@ namespace Cloud.Services.Broker.Implement.Kafka
//Produce the Message //Produce the Message
await _producer.ProduceAsync(topic, message); await _producer.ProduceAsync(topic, message);
} }
public void ChangeBrokerIp(string ip)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = ip
};
//Build the Producer
_producer = new ProducerBuilder<Guid, Command>(producerConfig).Build();
}
} }
} }

View File

@ -13,10 +13,18 @@ namespace Cloud.Services.Broker.Implement.Kafka
_consumer = new KafkaConsumer(configuration); _consumer = new KafkaConsumer(configuration);
} }
public T? Consume<T>(string topic) public IEnumerable<T>? WaitMessages<T>(string topic)
where T : IBrokerResponse => _consumer.WaitMessage<T>(topic); where T : IBrokerResponse
=> _consumer.WaitMessages<T>(topic);
public async Task Produce(Command command)
public async Task ProduceAsync(string topic, Command command)
=> await _producer.ProduceAsync("commands", command); => await _producer.ProduceAsync("commands", command);
public void ChangeBrokerIp(string ip)
{
_consumer.ChangeBrokerIp(ip);
_producer.ChangeBrokerIp(ip);
}
} }
} }