dev #9

Merged
mfnefd merged 77 commits from dev into main 2024-12-25 23:49:45 +04:00
3 changed files with 56 additions and 23 deletions
Showing only changes of commit 7b751b7072 - Show all commits

View File

@ -20,7 +20,7 @@ using Cloud.Middlewares;
var builder = WebApplication.CreateBuilder(args); var builder = WebApplication.CreateBuilder(args);
// Add services to the container. // Add services to the container.
builder.Services.AddTransient<IBrokerService, KafkaService>(); builder.Services.AddSingleton<IBrokerService, KafkaService>();
builder.Services.AddTransient<IGreenhouseService, GreenhouseService>(); builder.Services.AddTransient<IGreenhouseService, GreenhouseService>();
//Redis configuration //Redis configuration

View File

@ -1,6 +1,7 @@
using Cloud.Services.Broker.Support; using Cloud.Services.Broker.Support;
using Confluent.Kafka; using Confluent.Kafka;
using System.Diagnostics;
using System.Text.Json; using System.Text.Json;
namespace Cloud.Services.Broker.Implement.Kafka namespace Cloud.Services.Broker.Implement.Kafka
@ -20,28 +21,60 @@ namespace Cloud.Services.Broker.Implement.Kafka
public IEnumerable<T>? WaitMessages<T>(string topic) public IEnumerable<T>? WaitMessages<T>(string topic)
where T : IBrokerResponse where T : IBrokerResponse
{ {
try List<T> res = new();
{ List<PartitionMetadata> partitions;
_consumer.Subscribe(topic);
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000)); using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = _config["KAFKA_URL"] }).Build();
Console.WriteLine($"================ Received message: {consumeResult?.Message.Value}"); var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
if (consumeResult == null) var currentTopic = meta.Topics.SingleOrDefault(t => t.Topic == topic)
?? throw new Exception($"Topic {topic} not found");
partitions = currentTopic.Partitions;
_consumer.Subscribe(topic);
foreach (var partition in partitions)
{ {
// No message received from Kafka within the specified timeout. var topicPartition = new TopicPartition(topic, partition.PartitionId);
return default; _consumer.Assign(topicPartition);
T? message = _consume<T>();
if (message == null) return null;
res.Add(message);
}
_consumer.Unassign();
_consumer.Unsubscribe();
return res;
} }
return JsonSerializer.Deserialize<IEnumerable<T>>(consumeResult.Message.Value); private T? _consume<T>() where T : IBrokerResponse
{
var sw = new Stopwatch();
sw.Start();
try
{
while (true)
{
var consumeResult = _consumer.Consume(TimeSpan.FromMinutes(1));
if (consumeResult?.Message?.Value == null)
{
// Предел по времени
if (sw.Elapsed > TimeSpan.FromMinutes(1))
{
return default;
}
continue;
}
string jsonObj = consumeResult.Message.Value;
return JsonSerializer.Deserialize<T>(jsonObj);
}
} }
catch (Exception ex) catch (Exception ex)
{
throw;
}
finally
{ {
_consumer.Close(); _consumer.Close();
throw;
} }
} }
@ -53,7 +86,7 @@ namespace Cloud.Services.Broker.Implement.Kafka
GroupId = _config["Kafka:GroupId"], GroupId = _config["Kafka:GroupId"],
AutoOffsetReset = AutoOffsetReset.Earliest, AutoOffsetReset = AutoOffsetReset.Earliest,
}; };
_consumer?.Close();
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build(); _consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
} }
} }

View File

@ -12,7 +12,7 @@ namespace Cloud.Services.Broker.Implement.Kafka
public KafkaProducer(IConfiguration configuration) public KafkaProducer(IConfiguration configuration)
{ {
_config = configuration; _config = configuration;
Console.WriteLine($"KafkaConsumer created. IP:" + _config["KAFKA_URL"]); Console.WriteLine($"KafkaProducer created. IP:" + _config["KAFKA_URL"]);
ChangeBrokerIp(_config["KAFKA_URL"]); ChangeBrokerIp(_config["KAFKA_URL"]);
} }
public async Task ProduceAsync(string topic, Command command) public async Task ProduceAsync(string topic, Command command)