DAS_2023_1/basharin_sevastyan_lab_4
2023-12-01 22:33:40 +04:00
..
consumer1.py basharin_sevastyan_lab_4 is ready 2023-12-01 22:33:40 +04:00
consumer2.py basharin_sevastyan_lab_4 is ready 2023-12-01 22:33:40 +04:00
PC1C1.png basharin_sevastyan_lab_4 is ready 2023-12-01 22:33:40 +04:00
PC1C2.png basharin_sevastyan_lab_4 is ready 2023-12-01 22:33:40 +04:00
publisher.py basharin_sevastyan_lab_4 is ready 2023-12-01 22:33:40 +04:00
queue1_c1c1.png basharin_sevastyan_lab_4 is ready 2023-12-01 22:33:40 +04:00
queue1.png basharin_sevastyan_lab_4 is ready 2023-12-01 22:33:40 +04:00
queue2.png basharin_sevastyan_lab_4 is ready 2023-12-01 22:33:40 +04:00
queues.png basharin_sevastyan_lab_4 is ready 2023-12-01 22:33:40 +04:00
README.md basharin_sevastyan_lab_4 is ready 2023-12-01 22:33:40 +04:00

Лабораторная работа 3. Вариант 5.

Как запустить

В директории с файлами выполнить следующие команды:

  • Запустить Publisher python publisher.py.
  • Запустить Consumer 1 python consumer1.py.
  • Запустить Consumer 2 python consumer2.py.

Описание работы

Есть отправитель publisher и два слушателя consumer1 и consumer2

publisher, согласно заданию, отправляет сообщения раз в 3 секунды.

import time
import random

def publish_event(channel, exchange_name):
    events = ["пришёл заказ", "сообщение от пользователя", "необходимо создать отчёт"]
    while True:
        event = random.choice(events)
        channel.basic_publish(exchange=exchange_name, routing_key='', body=event)
        print(f"Отправлено событие: {event}")
        time.sleep(3) # Задержка отправки

consumer1 прослушивает очередь queue1 раз в 2 секунды.

import time

def process_message(ch, method, properties, body): # Функция получения сообщений
    print(f"Получено сообщение (Consumer 1): {body.decode('utf-8')}")
    time.sleep(2) # Задержка прослушиваения
    ch.basic_ack(delivery_tag=method.delivery_tag)

Созданная очередь queue1

consumer2 прослушивает очередь без задержек.

def process_message(ch, method, properties, body): # Функция получения сообщений
    print(f"Получено сообщение (Consumer 2): {body.decode('utf-8')}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consume_messages(channel, queue_name, exchange_name): # Функция прослушки очереди
    channel.queue_declare(queue=queue_name)
    channel.queue_bind(exchange=exchange_name, queue=queue_name)
    channel.basic_consume(queue=queue_name, on_message_callback=process_message)

    print("Consumer 2 начал прослушивание сообщений...")
    channel.start_consuming()

Созданная очередь queue2

Скриншот из RabbitMQ Management UI.

Анализ результатов

Сначала разберем первую ситуацию, когда запущены consumer1 и consumer2. Тогда график очереди queue1 будет таким: Если мы параллельно запустим ещё один процесс consumer1, то с заданными нами параметрами ситуация изменится слабо, тк в момент обновления RabbitMQ Management UI в очереди будет как минимум одно сообщение, но сообщений будет получено больше. Если уменьшить время отправки сообщений, то можно будет отследить изменения в очереди queue1.

Терминалы:

RabbitMQ Management UI: В данном эксперименте трудно установить, какой из вариантов лучше, но можно установить, что consumer2 будет показывать ту же эффективность, что и связка consumer1 + consumer1 и то, что два consumer1 будут эффективнее разгружать очередь, чем один consumer1.

Видео

https://youtu.be/GoXtPGZe9jY