using System.Text.Json; using Cloud.Services.Broker.Support; using Confluent.Kafka; namespace Cloud.Services.Broker.Implement.Kafka { public class KafkaProducer : IBrokerProducer { private IProducer _producer; private readonly IConfiguration _config; public KafkaProducer(IConfiguration configuration) { _config = configuration; Console.WriteLine($"KafkaConsumer created. IP:" + _config["KAFKA_URL"]); ChangeBrokerIp(_config["KAFKA_URL"]); } public async Task ProduceAsync(string topic, Command command) { var commandSerialized = JsonSerializer.Serialize(command); var message = new Message { Key = Guid.NewGuid().ToString(), Value = commandSerialized }; //Produce the Message await _producer.ProduceAsync(topic, message); } public void ChangeBrokerIp(string ip) { var producerConfig = new ProducerConfig { BootstrapServers = ip }; //Build the Producer _producer = new ProducerBuilder(producerConfig).Build(); } } }