using Cloud.Services.Broker.Support; using Confluent.Kafka; using System.Diagnostics; using System.Text.Json; namespace Cloud.Services.Broker.Implement.Kafka { public class KafkaConsumer : IBrokerConsumer { private IConsumer _consumer; private readonly IConfiguration _config; public KafkaConsumer(IConfiguration config) { _config = config; Console.WriteLine($"KafkaConsumer created. IP:" + _config["KAFKA_URL"]); ChangeBrokerIp(_config["KAFKA_URL"]); } public IEnumerable? WaitMessages(string topic) where T : IBrokerResponse { List res = new(); List partitions; using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = _config["KAFKA_URL"] }).Build(); var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20)); var currentTopic = meta.Topics.SingleOrDefault(t => t.Topic == topic) ?? throw new Exception($"Topic {topic} not found"); partitions = currentTopic.Partitions; _consumer.Subscribe(topic); foreach (var partition in partitions) { var topicPartition = new TopicPartition(topic, partition.PartitionId); _consumer.Assign(topicPartition); T? message = _consume(); if (message == null) return null; res.Add(message); } _consumer.Unassign(); _consumer.Unsubscribe(); return res; } private T? _consume() where T : IBrokerResponse { var sw = new Stopwatch(); sw.Start(); try { while (true) { var consumeResult = _consumer.Consume(TimeSpan.FromMinutes(1)); if (consumeResult?.Message?.Value == null) { // Предел по времени if (sw.Elapsed > TimeSpan.FromMinutes(1)) { return default; } continue; } string jsonObj = consumeResult.Message.Value; return JsonSerializer.Deserialize(jsonObj); } } catch (Exception ex) { _consumer.Close(); throw; } } public void ChangeBrokerIp(string ip) { var consumerConfig = new ConsumerConfig() { BootstrapServers = ip, GroupId = _config["Kafka:GroupId"], AutoOffsetReset = AutoOffsetReset.Earliest, }; _consumer?.Close(); _consumer = new ConsumerBuilder(consumerConfig).Build(); } } }