Cucumber/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs

51 lines
1.2 KiB
C#

using Cloud.Services.Broker.Support;
using Confluent.Kafka;
using System.Text.Json;
namespace Cloud.Services.Broker.Implement.Kafka
{
public class KafkaConsumer : IBrokerConsumer
{
private readonly IConsumer<string, string> _consumer;
public KafkaConsumer(IConfiguration config)
{
var consumerConfig = new ConsumerConfig()
{
BootstrapServers = config["Kafka:BootstrapServers"],
GroupId = config["Kafka:GroupId"],
AutoOffsetReset = AutoOffsetReset.Earliest,
};
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
}
public T? WaitMessage<T>(string topic)
where T : IBrokerResponse
{
try
{
_consumer.Subscribe(topic);
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000));
if (consumeResult == null)
{
// No message received from Kafka within the specified timeout.
return default;
}
return JsonSerializer.Deserialize<T>(consumeResult.Message.Value);
}
catch (Exception ex)
{
throw;
}
finally
{
_consumer.Close();
}
}
}
}