From 9bbe7b8b53cce5becd44b9ca248811f780949867 Mon Sep 17 00:00:00 2001 From: m1aksim1 Date: Wed, 4 Dec 2024 00:28:01 +0400 Subject: [PATCH] kosheev_maksim_lab_4 is ready3 --- kosheev_maksim_lab_4/README.md | 89 +++++++++++++++++++ .../lesson1-hello-world/receive.py | 28 ++++++ .../lesson1-hello-world/send.py | 17 ++++ .../lesson2-work-queues/new_task.py | 24 +++++ .../lesson2-work-queues/worker.py | 27 ++++++ .../lesson3-publish-subscribe/emit_log.py | 15 ++++ .../lesson3-publish-subscribe/logs.txt | 0 .../lesson3-publish-subscribe/receive_logs.py | 26 ++++++ 8 files changed, 226 insertions(+) create mode 100644 kosheev_maksim_lab_4/lesson3-publish-subscribe/logs.txt diff --git a/kosheev_maksim_lab_4/README.md b/kosheev_maksim_lab_4/README.md index e69de29..c650e27 100644 --- a/kosheev_maksim_lab_4/README.md +++ b/kosheev_maksim_lab_4/README.md @@ -0,0 +1,89 @@ +# Лабораторная работа №4 - Работа с брокером сообщений + +## Задание + +* Установить брокер сообщений RabbitMQ. +* Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials, используя Python. +* Продемонстрировать работу брокера сообщений. + +## Работа программы + +Программа демонстрирует работу с брокером сообщений RabbitMQ, включая следующие этапы: + +1. **Урок 1: Hello World** + - Программа демонстрирует базовый обмен сообщениями между продюсером и потребителем через очередь. + +2. **Урок 2: Work Queues** + - Реализована система, где сообщения из одной очереди обрабатываются несколькими потребителями. + - Обеспечивается балансировка нагрузки между потребителями. + +3. **Урок 3: Publish/Subscribe** + - Используется exchange типа `fanout` для широковещательной рассылки сообщений. + - Сообщения из одного источника доставляются сразу нескольким потребителям. + +### Описание классов + +- **`emit_log.py`**: + - Отправляет сообщения в exchange `logs`. + - Использует тип exchange `fanout` для широковещательной передачи сообщений. + +- **`receive_logs.py`**: + - Получает и обрабатывает сообщения, отправленные через exchange `logs`. + - Каждое подключение создаёт временную очередь для получения сообщений. + +### Работа приложения + +1. **Логгирование**: + - Программа записывает логи в файл, используя один из потребителей. + +2. **Отображение сообщений на экране**: + - Сообщения выводятся в терминале через другого потребителя. + +## Примеры использования + +### Запуск продюсера + +```bash +python emit_log.py "Ваше сообщение" +``` + +### Запуск потребителя + +Для отображения сообщений в терминале: +```bash +python receive_logs.py +``` + +Для записи сообщений в файл: +```bash +python receive_logs.py > logs_from_rabbit.log +``` + +### Уроки + +#### Lesson 1: "Hello World" + +![](result.jpg ) + +#### Lesson 2: "Work Queues" + +![](result2.jpg ) + +#### Lesson 3: "Publish/Subscribe" + +![](result3.jpg) + + + + +## Как установить RabbitMQ + + ```bash + docker run -d --hostname rabbitmq --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management + ``` + + +## Видео демонстрации + +Демонстрацию работы программы можно посмотреть по ссылке: +[Ссылка на видео](https://disk.yandex.ru/i/bNA65J1D-7Kk6w) diff --git a/kosheev_maksim_lab_4/lesson1-hello-world/receive.py b/kosheev_maksim_lab_4/lesson1-hello-world/receive.py index e69de29..4387aaa 100644 --- a/kosheev_maksim_lab_4/lesson1-hello-world/receive.py +++ b/kosheev_maksim_lab_4/lesson1-hello-world/receive.py @@ -0,0 +1,28 @@ +import pika +import sys + +def main(): + # Устанавливаем соединение с RabbitMQ + connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost') + ) + channel = connection.channel() + + # Объявляем очередь + channel.queue_declare(queue='hello') + + # Обрабатываем сообщения из очереди + def callback(ch, method, properties, body): + print(f" [x] Received {body}") + + channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) + + print(' [*] Waiting for messages. To exit press CTRL+C') + channel.start_consuming() + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print('Interrupted') + sys.exit(0) diff --git a/kosheev_maksim_lab_4/lesson1-hello-world/send.py b/kosheev_maksim_lab_4/lesson1-hello-world/send.py index e69de29..d0670b4 100644 --- a/kosheev_maksim_lab_4/lesson1-hello-world/send.py +++ b/kosheev_maksim_lab_4/lesson1-hello-world/send.py @@ -0,0 +1,17 @@ +import pika + +# Устанавливаем соединение с RabbitMQ +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost') +) +channel = connection.channel() + +# Объявляем очередь +channel.queue_declare(queue='hello') + +# Отправляем сообщение +channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') +print(" [x] Sent 'Hello World!'") + +# Закрываем соединение +connection.close() diff --git a/kosheev_maksim_lab_4/lesson2-work-queues/new_task.py b/kosheev_maksim_lab_4/lesson2-work-queues/new_task.py index e69de29..a77f720 100644 --- a/kosheev_maksim_lab_4/lesson2-work-queues/new_task.py +++ b/kosheev_maksim_lab_4/lesson2-work-queues/new_task.py @@ -0,0 +1,24 @@ +import pika +import sys + +# Устанавливаем соединение с RabbitMQ +connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +# Объявляем очередь с параметром durability=True +channel.queue_declare(queue='task_queue', durable=True) + +# Формируем сообщение из аргументов командной строки +message = ' '.join(sys.argv[1:]) or "Hello World!" + +# Отправляем сообщение с установкой свойства persistence +channel.basic_publish( + exchange='', + routing_key='task_queue', + body=message, + properties=pika.BasicProperties( + delivery_mode=pika.DeliveryMode.Persistent + ) +) +print(f" [x] Sent {message}") +connection.close() \ No newline at end of file diff --git a/kosheev_maksim_lab_4/lesson2-work-queues/worker.py b/kosheev_maksim_lab_4/lesson2-work-queues/worker.py index e69de29..028d580 100644 --- a/kosheev_maksim_lab_4/lesson2-work-queues/worker.py +++ b/kosheev_maksim_lab_4/lesson2-work-queues/worker.py @@ -0,0 +1,27 @@ +import pika +import time + +# Устанавливаем соединение с RabbitMQ +connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +# Объявляем очередь с параметром durability=True +channel.queue_declare(queue='task_queue', durable=True) +print(' [*] Waiting for messages. To exit press CTRL+C') + +# Обработка сообщения +def callback(ch, method, properties, body): + print(f" [x] Received {body.decode()}") + time.sleep(body.count(b'.')) # Имитируем длительную задачу + print(" [x] Done") + # Подтверждаем обработку сообщения + ch.basic_ack(delivery_tag=method.delivery_tag) + +# Настраиваем prefetch_count=1 для справедливой диспетчеризации +channel.basic_qos(prefetch_count=1) + +# Указываем обработчик для очереди +channel.basic_consume(queue='task_queue', on_message_callback=callback) + +# Запускаем процесс ожидания сообщений +channel.start_consuming() \ No newline at end of file diff --git a/kosheev_maksim_lab_4/lesson3-publish-subscribe/emit_log.py b/kosheev_maksim_lab_4/lesson3-publish-subscribe/emit_log.py index e69de29..f4c4012 100644 --- a/kosheev_maksim_lab_4/lesson3-publish-subscribe/emit_log.py +++ b/kosheev_maksim_lab_4/lesson3-publish-subscribe/emit_log.py @@ -0,0 +1,15 @@ +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +# Создание обменника +channel.exchange_declare(exchange='logs', exchange_type='fanout') + +# Формирование сообщения +message = ' '.join(sys.argv[1:]) or "info: Hello World!" +channel.basic_publish(exchange='logs', routing_key='', body=message) +print(f" [x] Sent {message}") +connection.close() \ No newline at end of file diff --git a/kosheev_maksim_lab_4/lesson3-publish-subscribe/logs.txt b/kosheev_maksim_lab_4/lesson3-publish-subscribe/logs.txt new file mode 100644 index 0000000..e69de29 diff --git a/kosheev_maksim_lab_4/lesson3-publish-subscribe/receive_logs.py b/kosheev_maksim_lab_4/lesson3-publish-subscribe/receive_logs.py index e69de29..0b0d0d3 100644 --- a/kosheev_maksim_lab_4/lesson3-publish-subscribe/receive_logs.py +++ b/kosheev_maksim_lab_4/lesson3-publish-subscribe/receive_logs.py @@ -0,0 +1,26 @@ +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +# Создание обменника +channel.exchange_declare(exchange='logs', exchange_type='fanout') + +# Создание временной очереди +result = channel.queue_declare(queue='', exclusive=True) +queue_name = result.method.queue + +# Привязка очереди к обменнику +channel.queue_bind(exchange='logs', queue=queue_name) + +print(' [*] Waiting for logs. To exit press CTRL+C') + +# Обработка входящих сообщений +def callback(ch, method, properties, body): + print(f" [x] {body}") + +channel.basic_consume( + queue=queue_name, on_message_callback=callback, auto_ack=True) + +channel.start_consuming() \ No newline at end of file