88 lines
2.9 KiB
C#
88 lines
2.9 KiB
C#
using RabbitMQ.Client;
|
|
using RabbitMQ.Client.Events;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Text;
|
|
using System.Threading.Channels;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace PublishSubscribe
|
|
{
|
|
public class ExchangeReceiver: IDisposable
|
|
{
|
|
private readonly ConnectionFactory _connectionFactory;
|
|
private readonly IConnection _connection;
|
|
private readonly IModel _channel;
|
|
|
|
public Dictionary<string, HashSet<string>> Queues { get; private set; } = new();
|
|
|
|
public ExchangeReceiver(string brockerHost, string brockerUsername, string brockerPassword)
|
|
{
|
|
_connectionFactory = new ConnectionFactory() { HostName = brockerHost, UserName = brockerUsername, Password = brockerPassword };
|
|
_connection = _connectionFactory.CreateConnection();
|
|
_channel = _connection.CreateModel();
|
|
}
|
|
|
|
public bool SubscribeTo(string exchange, Action<string> handler, string? queueName = null)
|
|
{
|
|
try
|
|
{
|
|
if (!Queues.ContainsKey(exchange))
|
|
{
|
|
_channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
|
|
Queues.Add(exchange, new HashSet<string>());
|
|
}
|
|
if (queueName != null)
|
|
_channel.QueueDeclare(queue: queueName,
|
|
durable: true,
|
|
exclusive: false,
|
|
autoDelete: false,
|
|
arguments: null);
|
|
|
|
queueName = queueName ?? _channel.QueueDeclare().QueueName;
|
|
|
|
|
|
_channel.QueueBind(queue: queueName,
|
|
exchange: exchange,
|
|
routingKey: string.Empty);
|
|
|
|
var consumer = new EventingBasicConsumer(_channel);
|
|
consumer.Received += (model, ea) =>
|
|
{
|
|
try
|
|
{
|
|
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
|
|
handler(message);
|
|
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Console.WriteLine(ex.Message);
|
|
}
|
|
|
|
};
|
|
_channel.BasicConsume(queue: queueName,
|
|
autoAck: false,
|
|
consumer: consumer);
|
|
|
|
Queues[exchange].Add(queueName);
|
|
return true;
|
|
}
|
|
catch(Exception ex)
|
|
{
|
|
Console.WriteLine(ex.Message);
|
|
}
|
|
return false;
|
|
}
|
|
|
|
~ExchangeReceiver() => Dispose();
|
|
|
|
public void Dispose()
|
|
{
|
|
_connection.Dispose();
|
|
_channel.Dispose();
|
|
}
|
|
}
|
|
}
|