52 lines
1021 B
C#
52 lines
1021 B
C#
using Confluent.Kafka;
|
|
|
|
namespace Cloud.Services
|
|
{
|
|
public class ConsumerService
|
|
{
|
|
private IConsumer<string, string> _consumer;
|
|
private ConsumerConfig consumerConfig;
|
|
public ConsumerService(IConfiguration configuration)
|
|
{
|
|
consumerConfig = new ConsumerConfig
|
|
{
|
|
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
|
GroupId = configuration["Kafka:GroupId"],
|
|
AutoOffsetReset = AutoOffsetReset.Earliest,
|
|
};
|
|
|
|
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
|
|
}
|
|
|
|
//Consume Method
|
|
public async Task<string>WaitMessage(string topic)
|
|
{
|
|
try
|
|
{
|
|
_consumer.Subscribe(topic);
|
|
|
|
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000));
|
|
|
|
if (consumeResult != null)
|
|
{
|
|
return consumeResult.Message.Value;
|
|
}
|
|
else
|
|
{
|
|
//No message received from Kafka within the specified timeout.
|
|
}
|
|
return "";
|
|
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
return "";
|
|
}
|
|
finally
|
|
{
|
|
_consumer.Close();
|
|
}
|
|
}
|
|
}
|
|
}
|