using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Channels; using System.Threading.Tasks; namespace PublishSubscribe { public class ExchangeReceiver: IDisposable { private readonly ConnectionFactory _connectionFactory; private readonly IConnection _connection; private readonly IModel _channel; public Dictionary> Queues { get; private set; } = new(); public ExchangeReceiver(string brockerHost, string brockerUsername, string brockerPassword) { _connectionFactory = new ConnectionFactory() { HostName = brockerHost, UserName = brockerUsername, Password = brockerPassword }; _connection = _connectionFactory.CreateConnection(); _channel = _connection.CreateModel(); } public bool SubscribeTo(string exchange, Action handler, string? queueName = null) { try { if (!Queues.ContainsKey(exchange)) { _channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout); Queues.Add(exchange, new HashSet()); } if (queueName != null) _channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); queueName = queueName ?? _channel.QueueDeclare().QueueName; _channel.QueueBind(queue: queueName, exchange: exchange, routingKey: string.Empty); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { try { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); handler(message); _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } catch (Exception ex) { Console.WriteLine(ex.Message); } }; _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Queues[exchange].Add(queueName); return true; } catch(Exception ex) { Console.WriteLine(ex.Message); } return false; } ~ExchangeReceiver() => Dispose(); public void Dispose() { _connection.Dispose(); _channel.Dispose(); } } }