using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; namespace ConsumerSimple { public class Receiver : IDisposable { private readonly ConnectionFactory _connectionFactory; private readonly IConnection _connection; private readonly IModel _channel; public Dictionary<string, HashSet<string>> Queues { get; private set; } = new(); public Receiver(string brockerHost, string brockerUsername, string brockerPassword) { _connectionFactory = new ConnectionFactory() { HostName = brockerHost, UserName = brockerUsername, Password = brockerPassword }; _connection = _connectionFactory.CreateConnection(); _channel = _connection.CreateModel(); } public bool SubscribeTo(string exchange, Action<string> handler, string? queueName = null) { try { if (!Queues.ContainsKey(exchange)) { _channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout); Queues.Add(exchange, new HashSet<string>()); } if (queueName != null) _channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); queueName = queueName ?? _channel.QueueDeclare().QueueName; _channel.QueueBind(queue: queueName, exchange: exchange, routingKey: string.Empty); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { try { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); handler(message); _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } catch (Exception ex) { Console.WriteLine(ex.Message); } }; _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Queues[exchange].Add(queueName); return true; } catch (Exception ex) { Console.WriteLine(ex.Message); } return false; } ~Receiver() => Dispose(); public void Dispose() { _connection.Dispose(); _channel.Dispose(); } } }