del: сервисы брокера
они уже есь
This commit is contained in:
parent
9b770d131a
commit
fed96d5b86
@ -1,39 +0,0 @@
|
||||
namespace Cloud.Services
|
||||
{
|
||||
public class BackgroundWorkerService : BackgroundService
|
||||
{
|
||||
public readonly ILogger<BackgroundWorkerService> _logger;
|
||||
private readonly ConsumerService _consumerService;
|
||||
|
||||
public BackgroundWorkerService(ILogger<BackgroundWorkerService> logger, ConsumerService consumer)
|
||||
{
|
||||
_logger = logger;
|
||||
_consumerService = consumer;
|
||||
}
|
||||
|
||||
//Backghround Service, This will run continuously
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
//_logger.LogInformation("Background Service is Runing at : {time}", DateTimeOffset.Now);
|
||||
|
||||
string request = await _consumerService.WaitMessage("ValvesHeatersRequest"); //Consume the Kafka Message
|
||||
|
||||
//After Consume the Order Request Can process the order
|
||||
if (!string.IsNullOrEmpty(request))
|
||||
_logger.LogInformation("Valves-Heaters Request : {value}", request);
|
||||
|
||||
|
||||
await Task.Delay(1000, stoppingToken);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError($"BackgroundWorkerService - Exception {ex}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
using Confluent.Kafka;
|
||||
|
||||
namespace Cloud.Services
|
||||
{
|
||||
public class ConsumerService
|
||||
{
|
||||
private IConsumer<string, string> _consumer;
|
||||
private ConsumerConfig consumerConfig;
|
||||
public ConsumerService(IConfiguration configuration)
|
||||
{
|
||||
consumerConfig = new ConsumerConfig
|
||||
{
|
||||
BootstrapServers = configuration["Kafka:BootstrapServers"],
|
||||
GroupId = configuration["Kafka:GroupId"],
|
||||
AutoOffsetReset = AutoOffsetReset.Earliest,
|
||||
};
|
||||
|
||||
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
|
||||
}
|
||||
|
||||
//Consume Method
|
||||
public async Task<string>WaitMessage(string topic)
|
||||
{
|
||||
try
|
||||
{
|
||||
_consumer.Subscribe(topic);
|
||||
|
||||
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000));
|
||||
|
||||
if (consumeResult != null)
|
||||
{
|
||||
return consumeResult.Message.Value;
|
||||
}
|
||||
else
|
||||
{
|
||||
//No message received from Kafka within the specified timeout.
|
||||
}
|
||||
return "";
|
||||
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return "";
|
||||
}
|
||||
finally
|
||||
{
|
||||
_consumer.Close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
using Confluent.Kafka;
|
||||
|
||||
namespace Cloud.Services
|
||||
{
|
||||
public class ProducerService
|
||||
{
|
||||
private readonly IProducer<string, string> _producer;
|
||||
|
||||
public ProducerService(IConfiguration configuration)
|
||||
{
|
||||
var producerConfig = new ProducerConfig
|
||||
{
|
||||
BootstrapServers = configuration["Kafka:BootstrapServers"]
|
||||
};
|
||||
|
||||
//Build the Producer
|
||||
_producer = new ProducerBuilder<string, string>(producerConfig).Build();
|
||||
}
|
||||
|
||||
//Method for Produce the Message to Kafka Topic
|
||||
public async Task ProduceMessageAsync(string topic, string value)
|
||||
{
|
||||
var kafkaMessage = new Message<string, string>
|
||||
{
|
||||
Key = Guid.NewGuid().ToString(),
|
||||
Value = value
|
||||
};
|
||||
|
||||
//Produce the Message
|
||||
await _producer.ProduceAsync(topic, kafkaMessage);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user