diff --git a/basharin_sevastyan_lab_4/PC1C1.png b/basharin_sevastyan_lab_4/PC1C1.png new file mode 100644 index 0000000..7875dda Binary files /dev/null and b/basharin_sevastyan_lab_4/PC1C1.png differ diff --git a/basharin_sevastyan_lab_4/PC1C2.png b/basharin_sevastyan_lab_4/PC1C2.png new file mode 100644 index 0000000..f8f3e14 Binary files /dev/null and b/basharin_sevastyan_lab_4/PC1C2.png differ diff --git a/basharin_sevastyan_lab_4/README.md b/basharin_sevastyan_lab_4/README.md new file mode 100644 index 0000000..1897f12 --- /dev/null +++ b/basharin_sevastyan_lab_4/README.md @@ -0,0 +1,76 @@ +# Лабораторная работа 3. Вариант 5. + +### Как запустить +В директории с файлами выполнить следующие команды: ++ Запустить Publisher `python publisher.py`. ++ Запустить Consumer 1 `python consumer1.py`. ++ Запустить Consumer 2 `python consumer2.py`. + +### Описание работы +Есть отправитель `publisher` и два слушателя `consumer1` и `consumer2` + +`publisher`, согласно заданию, отправляет сообщения раз в 3 секунды. +```python +import time +import random + +def publish_event(channel, exchange_name): + events = ["пришёл заказ", "сообщение от пользователя", "необходимо создать отчёт"] + while True: + event = random.choice(events) + channel.basic_publish(exchange=exchange_name, routing_key='', body=event) + print(f"Отправлено событие: {event}") + time.sleep(3) # Задержка отправки +``` + +`consumer1` прослушивает очередь `queue1` раз в 2 секунды. +```python +import time + +def process_message(ch, method, properties, body): # Функция получения сообщений + print(f"Получено сообщение (Consumer 1): {body.decode('utf-8')}") + time.sleep(2) # Задержка прослушиваения + ch.basic_ack(delivery_tag=method.delivery_tag) +``` +Созданная очередь `queue1` +![](queue1.png "") + +`consumer2` прослушивает очередь без задержек. +```python +def process_message(ch, method, properties, body): # Функция получения сообщений + print(f"Получено сообщение (Consumer 2): {body.decode('utf-8')}") + ch.basic_ack(delivery_tag=method.delivery_tag) + +def consume_messages(channel, queue_name, exchange_name): # Функция прослушки очереди + channel.queue_declare(queue=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + channel.basic_consume(queue=queue_name, on_message_callback=process_message) + + print("Consumer 2 начал прослушивание сообщений...") + channel.start_consuming() +``` +Созданная очередь `queue2` +![](queue2.png "") + +Скриншот из `RabbitMQ Management UI`. +![](queues.png "") + +### Анализ результатов +Сначала разберем первую ситуацию, когда запущены `consumer1` и `consumer2`. +![](PC1C2.png "") +Тогда график очереди `queue1` будет таким: +![](queue1.png "") +Если мы параллельно запустим ещё один процесс `consumer1`, то с заданными нами параметрами ситуация изменится слабо, +тк в момент обновления `RabbitMQ Management UI` в очереди будет как минимум одно сообщение, но сообщений будет получено +больше. Если уменьшить время отправки сообщений, то можно будет отследить изменения в очереди `queue1`. + +Терминалы: +![](PC1C1.png "") + +`RabbitMQ Management UI`: +![](queue1_c1c1.png "") +В данном эксперименте трудно установить, какой из вариантов лучше, но можно установить, что `consumer2` будет показывать +ту же эффективность, что и связка `consumer1` + `consumer1` и то, что два `consumer1` будут эффективнее разгружать очередь, чем один `consumer1`. + +### Видео +https://youtu.be/GoXtPGZe9jY \ No newline at end of file diff --git a/basharin_sevastyan_lab_4/consumer1.py b/basharin_sevastyan_lab_4/consumer1.py new file mode 100644 index 0000000..60d6ff4 --- /dev/null +++ b/basharin_sevastyan_lab_4/consumer1.py @@ -0,0 +1,33 @@ +import pika +import time +import threading + +def process_message(ch, method, properties, body): + print(f"Получено сообщение (Consumer 1): {body.decode('utf-8')}") + time.sleep(2) + ch.basic_ack(delivery_tag=method.delivery_tag) + +def consume_messages(channel, queue_name, exchange_name): + channel.queue_declare(queue=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + channel.basic_consume(queue=queue_name, on_message_callback=process_message) + + print("Consumer 1 начал прослушивание сообщений...") + channel.start_consuming() + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + queue_name = 'queue1' + + consumer_thread = threading.Thread(target=consume_messages, args=(channel, queue_name, exchange_name)) + consumer_thread.start() + + consumer_thread.join() + + connection.close() + +if __name__ == '__main__': + main() diff --git a/basharin_sevastyan_lab_4/consumer2.py b/basharin_sevastyan_lab_4/consumer2.py new file mode 100644 index 0000000..319f768 --- /dev/null +++ b/basharin_sevastyan_lab_4/consumer2.py @@ -0,0 +1,27 @@ +import pika + +def process_message(ch, method, properties, body): + print(f"Получено сообщение (Consumer 2): {body.decode('utf-8')}") + ch.basic_ack(delivery_tag=method.delivery_tag) + +def consume_messages(channel, queue_name, exchange_name): + channel.queue_declare(queue=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + channel.basic_consume(queue=queue_name, on_message_callback=process_message) + + print("Consumer 2 начал прослушивание сообщений...") + channel.start_consuming() + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + queue_name = 'queue2' + + consume_messages(channel, queue_name, exchange_name) + + connection.close() + +if __name__ == '__main__': + main() diff --git a/basharin_sevastyan_lab_4/publisher.py b/basharin_sevastyan_lab_4/publisher.py new file mode 100644 index 0000000..5135e16 --- /dev/null +++ b/basharin_sevastyan_lab_4/publisher.py @@ -0,0 +1,25 @@ +import pika +import time +import random + +def publish_event(channel, exchange_name): + events = ["пришёл заказ", "сообщение от пользователя", "необходимо создать отчёт"] + while True: + event = random.choice(events) + channel.basic_publish(exchange=exchange_name, routing_key='', body=event) + print(f"Отправлено событие: {event}") + time.sleep(3) + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + channel.exchange_declare(exchange=exchange_name, exchange_type='fanout') + + publish_event(channel, exchange_name) + + connection.close() + +if __name__ == '__main__': + main() diff --git a/basharin_sevastyan_lab_4/queue1.png b/basharin_sevastyan_lab_4/queue1.png new file mode 100644 index 0000000..a316362 Binary files /dev/null and b/basharin_sevastyan_lab_4/queue1.png differ diff --git a/basharin_sevastyan_lab_4/queue1_c1c1.png b/basharin_sevastyan_lab_4/queue1_c1c1.png new file mode 100644 index 0000000..5768a71 Binary files /dev/null and b/basharin_sevastyan_lab_4/queue1_c1c1.png differ diff --git a/basharin_sevastyan_lab_4/queue2.png b/basharin_sevastyan_lab_4/queue2.png new file mode 100644 index 0000000..40732f8 Binary files /dev/null and b/basharin_sevastyan_lab_4/queue2.png differ diff --git a/basharin_sevastyan_lab_4/queues.png b/basharin_sevastyan_lab_4/queues.png new file mode 100644 index 0000000..364735f Binary files /dev/null and b/basharin_sevastyan_lab_4/queues.png differ