93 lines
3.0 KiB
C#
93 lines
3.0 KiB
C#
|
|
using Cloud.Services.Broker.Support;
|
|
using Confluent.Kafka;
|
|
using System.Diagnostics;
|
|
using System.Text.Json;
|
|
|
|
namespace Cloud.Services.Broker.Implement.Kafka
|
|
{
|
|
public class KafkaConsumer : IBrokerConsumer
|
|
{
|
|
private IConsumer<string, string> _consumer;
|
|
private readonly IConfiguration _config;
|
|
|
|
public KafkaConsumer(IConfiguration config)
|
|
{
|
|
_config = config;
|
|
Console.WriteLine($"KafkaConsumer created. IP:" + _config["KAFKA_URL"]);
|
|
ChangeBrokerIp(_config["KAFKA_URL"]);
|
|
}
|
|
|
|
public IEnumerable<T>? WaitMessages<T>(string topic)
|
|
where T : IBrokerResponse
|
|
{
|
|
List<T> res = new();
|
|
List<PartitionMetadata> partitions;
|
|
|
|
using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = _config["KAFKA_URL"] }).Build();
|
|
var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
|
|
var currentTopic = meta.Topics.SingleOrDefault(t => t.Topic == topic)
|
|
?? throw new Exception($"Topic {topic} not found");
|
|
partitions = currentTopic.Partitions;
|
|
|
|
_consumer.Subscribe(topic);
|
|
foreach (var partition in partitions)
|
|
{
|
|
var topicPartition = new TopicPartition(topic, partition.PartitionId);
|
|
_consumer.Assign(topicPartition);
|
|
|
|
T? message = _consume<T>();
|
|
if (message == null) return null;
|
|
res.Add(message);
|
|
|
|
}
|
|
_consumer.Unassign();
|
|
_consumer.Unsubscribe();
|
|
return res;
|
|
}
|
|
|
|
private T? _consume<T>() where T : IBrokerResponse
|
|
{
|
|
var sw = new Stopwatch();
|
|
sw.Start();
|
|
try
|
|
{
|
|
while (true)
|
|
{
|
|
var consumeResult = _consumer.Consume(TimeSpan.FromMinutes(1));
|
|
|
|
if (consumeResult?.Message?.Value == null)
|
|
{
|
|
// Предел по времени
|
|
if (sw.Elapsed > TimeSpan.FromMinutes(1))
|
|
{
|
|
return default;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
string jsonObj = consumeResult.Message.Value;
|
|
return JsonSerializer.Deserialize<T>(jsonObj);
|
|
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_consumer.Close();
|
|
throw;
|
|
}
|
|
}
|
|
|
|
public void ChangeBrokerIp(string ip)
|
|
{
|
|
var consumerConfig = new ConsumerConfig()
|
|
{
|
|
BootstrapServers = ip,
|
|
GroupId = _config["Kafka:GroupId"],
|
|
AutoOffsetReset = AutoOffsetReset.Earliest,
|
|
};
|
|
_consumer?.Close();
|
|
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
|
|
}
|
|
}
|
|
} |