diff --git a/Cloud/Program.cs b/Cloud/Program.cs index 5daec1f..16d0c36 100644 --- a/Cloud/Program.cs +++ b/Cloud/Program.cs @@ -20,7 +20,7 @@ using Cloud.Middlewares; var builder = WebApplication.CreateBuilder(args); // Add services to the container. -builder.Services.AddTransient(); +builder.Services.AddSingleton(); builder.Services.AddTransient(); //Redis configuration diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs index c6b0b30..f29a4b8 100644 --- a/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs @@ -1,6 +1,7 @@ using Cloud.Services.Broker.Support; using Confluent.Kafka; +using System.Diagnostics; using System.Text.Json; namespace Cloud.Services.Broker.Implement.Kafka @@ -20,29 +21,61 @@ namespace Cloud.Services.Broker.Implement.Kafka public IEnumerable? WaitMessages(string topic) where T : IBrokerResponse { - try - { - _consumer.Subscribe(topic); + List res = new(); + List partitions; - var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000)); - Console.WriteLine($"================ Received message: {consumeResult?.Message.Value}"); - if (consumeResult == null) - { - // No message received from Kafka within the specified timeout. - return default; - } + using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = _config["KAFKA_URL"] }).Build(); + var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20)); + var currentTopic = meta.Topics.SingleOrDefault(t => t.Topic == topic) + ?? throw new Exception($"Topic {topic} not found"); + partitions = currentTopic.Partitions; + + _consumer.Subscribe(topic); + foreach (var partition in partitions) + { + var topicPartition = new TopicPartition(topic, partition.PartitionId); + _consumer.Assign(topicPartition); + + T? message = _consume(); + if (message == null) return null; + res.Add(message); - return JsonSerializer.Deserialize>(consumeResult.Message.Value); + } + _consumer.Unassign(); + _consumer.Unsubscribe(); + return res; + } - } - catch (Exception ex) - { - throw; - } - finally - { - _consumer.Close(); - } + private T? _consume() where T : IBrokerResponse + { + var sw = new Stopwatch(); + sw.Start(); + try + { + while (true) + { + var consumeResult = _consumer.Consume(TimeSpan.FromMinutes(1)); + + if (consumeResult?.Message?.Value == null) + { + // Предел по времени + if (sw.Elapsed > TimeSpan.FromMinutes(1)) + { + return default; + } + continue; + } + + string jsonObj = consumeResult.Message.Value; + return JsonSerializer.Deserialize(jsonObj); + + } + } + catch (Exception ex) + { + _consumer.Close(); + throw; + } } public void ChangeBrokerIp(string ip) @@ -53,7 +86,7 @@ namespace Cloud.Services.Broker.Implement.Kafka GroupId = _config["Kafka:GroupId"], AutoOffsetReset = AutoOffsetReset.Earliest, }; - + _consumer?.Close(); _consumer = new ConsumerBuilder(consumerConfig).Build(); } } diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs index 2a0f8c8..6a916bb 100644 --- a/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs @@ -12,7 +12,7 @@ namespace Cloud.Services.Broker.Implement.Kafka public KafkaProducer(IConfiguration configuration) { _config = configuration; - Console.WriteLine($"KafkaConsumer created. IP:" + _config["KAFKA_URL"]); + Console.WriteLine($"KafkaProducer created. IP:" + _config["KAFKA_URL"]); ChangeBrokerIp(_config["KAFKA_URL"]); } public async Task ProduceAsync(string topic, Command command)