using Cloud.Services.Broker.Support; using Confluent.Kafka; using System.Text.Json; namespace Cloud.Services.Broker.Implement.Kafka { public class KafkaConsumer : IBrokerConsumer { private IConsumer _consumer; private readonly IConfiguration _config; public KafkaConsumer(IConfiguration config) { _config = config; ChangeBrokerIp(_config["Kafka:BootstrapServers"]); } public IEnumerable? WaitMessages(string topic) where T : IBrokerResponse { try { _consumer.Subscribe(topic); var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000)); Console.WriteLine($"================ Received message: {consumeResult?.Message.Value}"); if (consumeResult == null) { // No message received from Kafka within the specified timeout. return default; } return JsonSerializer.Deserialize>(consumeResult.Message.Value); } catch (Exception ex) { throw; } finally { _consumer.Close(); } } public void ChangeBrokerIp(string ip) { var consumerConfig = new ConsumerConfig() { BootstrapServers = ip, GroupId = _config["Kafka:GroupId"], AutoOffsetReset = AutoOffsetReset.Earliest, }; _consumer = new ConsumerBuilder(consumerConfig).Build(); } } }