38 lines
1.1 KiB
C#
38 lines
1.1 KiB
C#
using System.Text.Json;
|
|
using Cloud.Services.Broker.Support;
|
|
using Confluent.Kafka;
|
|
|
|
namespace Cloud.Services.Broker.Implement.Kafka
|
|
{
|
|
public class KafkaProducer : IBrokerProducer
|
|
{
|
|
private IProducer<string, string> _producer;
|
|
private readonly IConfiguration _config;
|
|
|
|
public KafkaProducer(IConfiguration configuration)
|
|
{
|
|
_config = configuration;
|
|
Console.WriteLine($"KafkaProducer created. IP:" + _config["KAFKA_URL"]);
|
|
ChangeBrokerIp(_config["KAFKA_URL"]);
|
|
}
|
|
public async Task ProduceAsync(string topic, Command command)
|
|
{
|
|
var commandSerialized = JsonSerializer.Serialize(command);
|
|
var message = new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = commandSerialized };
|
|
|
|
//Produce the Message
|
|
await _producer.ProduceAsync(topic, message);
|
|
}
|
|
|
|
public void ChangeBrokerIp(string ip)
|
|
{
|
|
var producerConfig = new ProducerConfig
|
|
{
|
|
BootstrapServers = ip
|
|
};
|
|
|
|
//Build the Producer
|
|
_producer = new ProducerBuilder<string, string>(producerConfig).Build();
|
|
}
|
|
}
|
|
} |