From 9c1720e1316d6bcfb14a36f6ccfd2b3dc433f3a5 Mon Sep 17 00:00:00 2001 From: mfnefd Date: Wed, 4 Dec 2024 01:57:10 +0400 Subject: [PATCH] fix: kafka service --- Cloud/Controllers/ValveController.cs | 9 ++--- Cloud/Services/Broker/IBrokerConsumer.cs | 2 +- Cloud/Services/Broker/IBrokerService.cs | 5 ++- .../Broker/Implement/Kafka/KafkaConsumer.cs | 34 ++++++++++++------- .../Broker/Implement/Kafka/KafkaProducer.cs | 22 ++++++++---- .../Broker/Implement/Kafka/KafkaService.cs | 14 ++++++-- 6 files changed, 55 insertions(+), 31 deletions(-) diff --git a/Cloud/Controllers/ValveController.cs b/Cloud/Controllers/ValveController.cs index dbc081a..9fc973a 100644 --- a/Cloud/Controllers/ValveController.cs +++ b/Cloud/Controllers/ValveController.cs @@ -1,5 +1,6 @@ using Cloud.Requests; using Cloud.Services; +using Cloud.Services.Broker; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using System.ComponentModel.DataAnnotations; @@ -14,11 +15,11 @@ namespace Cloud.Controllers { //Контроллер вентиля - private readonly ProducerService _producerService; + private readonly IBrokerService _kafkaService; - public ValveController(ProducerService producerService) + public ValveController(IBrokerService kafkaService) { - _producerService = producerService; + _kafkaService = kafkaService; } [HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")] @@ -34,7 +35,7 @@ namespace Cloud.Controllers var message = JsonSerializer.Serialize(kafkaRequest); return Ok(kafkaRequest); - /*await _producerService.ProduceMessageAsync("ValvesHeatersRequest", message); + /*await _kafkaService.ProduceAsync("ValvesHeatersRequest", message); return Ok($"Valve status is {request.Action}");*/ } diff --git a/Cloud/Services/Broker/IBrokerConsumer.cs b/Cloud/Services/Broker/IBrokerConsumer.cs index 7997ad0..c152981 100644 --- a/Cloud/Services/Broker/IBrokerConsumer.cs +++ b/Cloud/Services/Broker/IBrokerConsumer.cs @@ -4,6 +4,6 @@ namespace Cloud.Services.Broker { public interface IBrokerConsumer { - T? WaitMessage(string topic) where T : IBrokerResponse; + IEnumerable? WaitMessages(string topic) where T : IBrokerResponse; } } \ No newline at end of file diff --git a/Cloud/Services/Broker/IBrokerService.cs b/Cloud/Services/Broker/IBrokerService.cs index f19aeb4..356157b 100644 --- a/Cloud/Services/Broker/IBrokerService.cs +++ b/Cloud/Services/Broker/IBrokerService.cs @@ -2,9 +2,8 @@ using Cloud.Services.Broker.Support; namespace Cloud.Services.Broker { - public interface IBrokerService + public interface IBrokerService : IBrokerProducer, IBrokerConsumer { - Task Produce(Command command); - T? Consume(string topic) where T : IBrokerResponse; + void ChangeBrokerIp(string ip); } } \ No newline at end of file diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs index 2b31499..d2fe34c 100644 --- a/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs @@ -7,35 +7,31 @@ namespace Cloud.Services.Broker.Implement.Kafka { public class KafkaConsumer : IBrokerConsumer { - private readonly IConsumer _consumer; + private IConsumer _consumer; + private readonly IConfiguration _config; public KafkaConsumer(IConfiguration config) { - var consumerConfig = new ConsumerConfig() - { - BootstrapServers = config["Kafka:BootstrapServers"], - GroupId = config["Kafka:GroupId"], - AutoOffsetReset = AutoOffsetReset.Earliest, - }; - - _consumer = new ConsumerBuilder(consumerConfig).Build(); + _config = config; + ChangeBrokerIp(_config["Kafka:BootstrapServers"]); } - public T? WaitMessage(string topic) + public IEnumerable? WaitMessages(string topic) where T : IBrokerResponse - { + { try { _consumer.Subscribe(topic); var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000)); - + Console.WriteLine($"================ Received message: {consumeResult?.Message.Value}"); if (consumeResult == null) { // No message received from Kafka within the specified timeout. return default; } - return JsonSerializer.Deserialize(consumeResult.Message.Value); + + return JsonSerializer.Deserialize>(consumeResult.Message.Value); } catch (Exception ex) @@ -47,5 +43,17 @@ namespace Cloud.Services.Broker.Implement.Kafka _consumer.Close(); } } + + public void ChangeBrokerIp(string ip) + { + var consumerConfig = new ConsumerConfig() + { + BootstrapServers = ip, + GroupId = _config["Kafka:GroupId"], + AutoOffsetReset = AutoOffsetReset.Earliest, + }; + + _consumer = new ConsumerBuilder(consumerConfig).Build(); + } } } \ No newline at end of file diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs index b301084..0391cc4 100644 --- a/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs @@ -5,17 +5,14 @@ namespace Cloud.Services.Broker.Implement.Kafka { public class KafkaProducer : IBrokerProducer { - private readonly IProducer _producer; + private IProducer _producer; + private readonly IConfiguration _config; public KafkaProducer(IConfiguration configuration) { - var producerConfig = new ProducerConfig - { - BootstrapServers = configuration["Kafka:BootstrapServers"] - }; + _config = configuration; - //Build the Producer - _producer = new ProducerBuilder(producerConfig).Build(); + ChangeBrokerIp(_config["Kafka:BootstrapServers"]); } public async Task ProduceAsync(string topic, Command command) { @@ -24,5 +21,16 @@ namespace Cloud.Services.Broker.Implement.Kafka //Produce the Message await _producer.ProduceAsync(topic, message); } + + public void ChangeBrokerIp(string ip) + { + var producerConfig = new ProducerConfig + { + BootstrapServers = ip + }; + + //Build the Producer + _producer = new ProducerBuilder(producerConfig).Build(); + } } } \ No newline at end of file diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs index c0b90ba..8fc9174 100644 --- a/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs @@ -13,10 +13,18 @@ namespace Cloud.Services.Broker.Implement.Kafka _consumer = new KafkaConsumer(configuration); } - public T? Consume(string topic) - where T : IBrokerResponse => _consumer.WaitMessage(topic); + public IEnumerable? WaitMessages(string topic) + where T : IBrokerResponse + => _consumer.WaitMessages(topic); - public async Task Produce(Command command) + + public async Task ProduceAsync(string topic, Command command) => await _producer.ProduceAsync("commands", command); + + public void ChangeBrokerIp(string ip) + { + _consumer.ChangeBrokerIp(ip); + _producer.ChangeBrokerIp(ip); + } } } \ No newline at end of file