diff --git a/kozyrev_sergey_lab_4/README.md b/kozyrev_sergey_lab_4/README.md new file mode 100644 index 0000000..0602731 --- /dev/null +++ b/kozyrev_sergey_lab_4/README.md @@ -0,0 +1,90 @@ +# Лабораторная работа №4 — Работа с брокером сообщений +## 1. Цель работы +Изучение проектирования приложений при помощи брокера сообщений RabbitMQ. +## 2. Как запустить лабораторную работу +### Предварительные требования: +* Установленный RabbitMQ Server +* Python 3.9+ +* Библиотека pika +### Запуск Tutorial 1 +```bash +cd tut1 +python receive.py +python send.py +``` +![alt text](./screenshots/Tutorial1.png) + +### Запуск Tutorial 2 +```bash +cd tut2 +python worker.py +python new_task.py +``` +![alt text](./screenshots/Tutorial2.png) + +### Запуск Tutorial 3 +```bash +cd tut3 +python receive_logs.py +python emit_log.py +``` +![alt text](./screenshots/Tutorial3.png) + +### Запуск основного приложения +```bash +cd app +python consumer1_slow.py +python consumer2_fast.py +python publisher.py +python consumer1_slow.py +python consumer1_slow.py +``` + +## 3. Структура проекта +``` +tut1/ # Изучение туториала 1 +tut2/ # Изучение туториала 2 +tut3/ # Изучение туториала 3 +app/ # Создание приложений, обрабатывающих очередь сообщений с разной скоростью +README.md # Документация проекта +``` +### Описание сервисов +**tut1** — Базовая модель "производитель-потребитель". Демонстрирует отправку и получение простых сообщений через очередь. + +**tut2** — Модель распределения задач между несколькими worker'ами. Демонстрирует round-robin балансировку нагрузки и механизм подтверждения обработки сообщений (acknowledgment). + +**tut3** — Паттерн fanout exchange. Демонстрирует широковещательную рассылку сообщений всем подписчикам одновременно. + +**app** - Система обработки заказов интернет-магазина. Publisher генерирует события заказов каждую секунду. Consumer 1 обрабатывает заказы медленно (2-3 секунды), Consumer 2 — мгновенно. + +## 5. Результаты тестирования +### Сценарий 1: Один экземпляр каждого consumer +#### Параметры: +* Publisher: 1 сообщение/секунду +* Consumer 1 (slow): 2.5 секунды/сообщение +* Consumer 2 (fast): мгновенная обработка +#### Результат: +* Очередь order_processing_slow: накапливаются сообщения +* Очередь order_processing_fast: 0 сообщений (успевает обрабатывать) +#### Вывод: +Consumer 1 не справляется с нагрузкой, так как скорость обработки (0.4 сообщения/сек) меньше скорости поступления (1 сообщение/сек). +### Сценарий 2: Три экземпляра Consumer 1 +#### Параметры: +* Publisher: 1 сообщение/секунду +* 3× Consumer 1 (slow): 2.5 секунды/сообщение каждый +* Consumer 2 (fast): мгновенная обработка +#### Результат: +* Очередь order_processing_slow: Спустя некоторое время пустеет и не копит более 1-2х сообщений +* Сообщения равномерно распределяются между тремя consumers +RabbitMQ автоматически балансирует нагрузку (round-robin) +#### Вывод: +Три consumer справляются с нагрузкой. Суммарная скорость обработки (1.2 сообщения/сек) превышает скорость поступления (1 сообщение/сек). Демонстрируется горизонтальное масштабирование. + +## 6. Используемые технологии +1. **Python 3.9** — язык программирования +2. **RabbitMQ** — брокер сообщений +3. **Pika** — Python клиент для RabbitMQ +4. **RabbitMQ Management UI** — веб-интерфейс для мониторинга + +## 7. Видео +Ссылка на видео работы RabbitMQ: [Rutube](https://rutube.ru/video/private/d4dc5613005afbbe601862d54b248d36/?p=De8RTMImQZQtab3avfH2Zg) \ No newline at end of file diff --git a/kozyrev_sergey_lab_4/app/consumer1_slow.py b/kozyrev_sergey_lab_4/app/consumer1_slow.py new file mode 100644 index 0000000..2a76f96 --- /dev/null +++ b/kozyrev_sergey_lab_4/app/consumer1_slow.py @@ -0,0 +1,58 @@ +import pika +import json +import time +import random + +def main(): + connection = pika.BlockingConnection( + pika.ConnectionParameters('localhost') + ) + channel = connection.channel() + + channel.exchange_declare(exchange='orders', exchange_type='fanout') + + queue_name = 'order_processing_slow' + channel.queue_declare(queue=queue_name, durable=True) + + channel.queue_bind(exchange='orders', queue=queue_name) + + print(f' [*] Consumer 1 (SLOW) started') + print(f' [*] Queue: {queue_name}') + print(f' [*] Processing time: 2-3 seconds per order') + print(' [*] Waiting for orders. To exit press CTRL+C') + + def callback(ch, method, properties, body): + try: + order = json.loads(body.decode()) + print(f" [x] Received order #{order['order_id']}: " + f"{order['product']} - ${order['amount']}") + print(f" Processing order #{order['order_id']}...") + + processing_time = random.uniform(2.0, 3.0) + time.sleep(processing_time) + + print(f" [✓] Order #{order['order_id']} processed successfully " + f"(took {processing_time:.1f}s)") + + ch.basic_ack(delivery_tag=method.delivery_tag) + + except Exception as e: + print(f" [!] Error processing message: {e}") + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + + channel.basic_qos(prefetch_count=1) + + channel.basic_consume( + queue=queue_name, + on_message_callback=callback + ) + + try: + channel.start_consuming() + except KeyboardInterrupt: + print('\n [*] Consumer 1 stopped') + finally: + connection.close() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/kozyrev_sergey_lab_4/app/consumer2_fast.py b/kozyrev_sergey_lab_4/app/consumer2_fast.py new file mode 100644 index 0000000..8bd5ec5 --- /dev/null +++ b/kozyrev_sergey_lab_4/app/consumer2_fast.py @@ -0,0 +1,51 @@ +import pika +import json + +def main(): + connection = pika.BlockingConnection( + pika.ConnectionParameters('localhost') + ) + channel = connection.channel() + + channel.exchange_declare(exchange='orders', exchange_type='fanout') + + queue_name = 'order_processing_fast' + channel.queue_declare(queue=queue_name, durable=True) + + channel.queue_bind(exchange='orders', queue=queue_name) + + print(f' [*] Consumer 2 (FAST) started') + print(f' [*] Queue: {queue_name}') + print(f' [*] Processing time: instant') + print(' [*] Waiting for orders. To exit press CTRL+C') + + def callback(ch, method, properties, body): + try: + order = json.loads(body.decode()) + print(f" [x] Received order #{order['order_id']}: " + f"{order['product']} - ${order['amount']}") + + print(f" [✓] Order #{order['order_id']} processed instantly") + + ch.basic_ack(delivery_tag=method.delivery_tag) + + except Exception as e: + print(f" [!] Error processing message: {e}") + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) + + channel.basic_qos(prefetch_count=1) + + channel.basic_consume( + queue=queue_name, + on_message_callback=callback + ) + + try: + channel.start_consuming() + except KeyboardInterrupt: + print('\n [*] Consumer 2 stopped') + finally: + connection.close() + +if __name__ == '__main__': + main() diff --git a/kozyrev_sergey_lab_4/app/publisher.py b/kozyrev_sergey_lab_4/app/publisher.py new file mode 100644 index 0000000..5ca327c --- /dev/null +++ b/kozyrev_sergey_lab_4/app/publisher.py @@ -0,0 +1,53 @@ +import pika +import json +import time +from datetime import datetime +import random + +def main(): + connection = pika.BlockingConnection( + pika.ConnectionParameters('localhost') + ) + channel = connection.channel() + + channel.exchange_declare(exchange='orders', exchange_type='fanout') + + print(' [*] Publisher started. Publishing orders every second...') + print(' [*] Press CTRL+C to stop') + + order_id = 1 + + try: + while True: + order = { + 'order_id': order_id, + 'customer': f'Customer_{random.randint(1, 100)}', + 'product': random.choice([ + 'Laptop', 'Phone', 'Tablet', + 'Headphones', 'Monitor', 'Keyboard' + ]), + 'amount': round(random.uniform(10.0, 1000.0), 2), + 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') + } + + message = json.dumps(order, ensure_ascii=False) + + channel.basic_publish( + exchange='orders', + routing_key='', + body=message + ) + + print(f" [x] Sent order #{order_id}: {order['product']} " + f"for ${order['amount']} from {order['customer']}") + + order_id += 1 + time.sleep(1) + + except KeyboardInterrupt: + print('\n [*] Publisher stopped') + finally: + connection.close() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/kozyrev_sergey_lab_4/screenshots/Tutorial1.png b/kozyrev_sergey_lab_4/screenshots/Tutorial1.png new file mode 100644 index 0000000..0e8021a Binary files /dev/null and b/kozyrev_sergey_lab_4/screenshots/Tutorial1.png differ diff --git a/kozyrev_sergey_lab_4/screenshots/Tutorial2.png b/kozyrev_sergey_lab_4/screenshots/Tutorial2.png new file mode 100644 index 0000000..6ac89cc Binary files /dev/null and b/kozyrev_sergey_lab_4/screenshots/Tutorial2.png differ diff --git a/kozyrev_sergey_lab_4/screenshots/Tutorial3.png b/kozyrev_sergey_lab_4/screenshots/Tutorial3.png new file mode 100644 index 0000000..086e9a5 Binary files /dev/null and b/kozyrev_sergey_lab_4/screenshots/Tutorial3.png differ diff --git a/kozyrev_sergey_lab_4/tut1/receive.py b/kozyrev_sergey_lab_4/tut1/receive.py new file mode 100644 index 0000000..731c7b4 --- /dev/null +++ b/kozyrev_sergey_lab_4/tut1/receive.py @@ -0,0 +1,21 @@ +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters('localhost') +) +channel = connection.channel() + +channel.queue_declare(queue='hello') + +def callback(ch, method, properties, body): + print(f" [x] Received {body.decode()}") + +channel.basic_consume( + queue='hello', + on_message_callback=callback, + auto_ack=True +) + +print(' [*] Waiting for messages. To exit press CTRL+C') + +channel.start_consuming() \ No newline at end of file diff --git a/kozyrev_sergey_lab_4/tut1/send.py b/kozyrev_sergey_lab_4/tut1/send.py new file mode 100644 index 0000000..87256b1 --- /dev/null +++ b/kozyrev_sergey_lab_4/tut1/send.py @@ -0,0 +1,18 @@ +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters('localhost') +) +channel = connection.channel() + +channel.queue_declare(queue='hello') + +channel.basic_publish( + exchange='', + routing_key='hello', + body='Hello World!' +) + +print(" [x] Sent 'Hello World!'") + +connection.close() \ No newline at end of file diff --git a/kozyrev_sergey_lab_4/tut2/new_task.py b/kozyrev_sergey_lab_4/tut2/new_task.py new file mode 100644 index 0000000..a5d3058 --- /dev/null +++ b/kozyrev_sergey_lab_4/tut2/new_task.py @@ -0,0 +1,23 @@ +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters('localhost') +) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) + +message = ' '.join(sys.argv[1:]) or "Hello World...!" + +channel.basic_publish( + exchange='', + routing_key='task_queue', + body=message, + properties=pika.BasicProperties( + delivery_mode=pika.DeliveryMode.Persistent + ) +) + +print(f" [x] Sent '{message}'") +connection.close() \ No newline at end of file diff --git a/kozyrev_sergey_lab_4/tut2/worker.py b/kozyrev_sergey_lab_4/tut2/worker.py new file mode 100644 index 0000000..85c17f9 --- /dev/null +++ b/kozyrev_sergey_lab_4/tut2/worker.py @@ -0,0 +1,29 @@ +import pika +import time + +connection = pika.BlockingConnection( + pika.ConnectionParameters('localhost') +) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) + +print(' [*] Waiting for messages. To exit press CTRL+C') + +def callback(ch, method, properties, body): + print(f" [x] Received {body.decode()}") + + time.sleep(body.count(b'.')) + + print(" [x] Done") + + ch.basic_ack(delivery_tag=method.delivery_tag) + +channel.basic_qos(prefetch_count=1) + +channel.basic_consume( + queue='task_queue', + on_message_callback=callback +) + +channel.start_consuming() \ No newline at end of file diff --git a/kozyrev_sergey_lab_4/tut3/emit_log.py b/kozyrev_sergey_lab_4/tut3/emit_log.py new file mode 100644 index 0000000..1aeb19b --- /dev/null +++ b/kozyrev_sergey_lab_4/tut3/emit_log.py @@ -0,0 +1,20 @@ +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters('localhost') +) +channel = connection.channel() + +channel.exchange_declare(exchange='logs', exchange_type='fanout') + +message = ' '.join(sys.argv[1:]) or "info: Hello World!" + +channel.basic_publish( + exchange='logs', + routing_key='', + body=message +) + +print(f" [x] Sent '{message}'") +connection.close() \ No newline at end of file diff --git a/kozyrev_sergey_lab_4/tut3/receive_logs.py b/kozyrev_sergey_lab_4/tut3/receive_logs.py new file mode 100644 index 0000000..3213809 --- /dev/null +++ b/kozyrev_sergey_lab_4/tut3/receive_logs.py @@ -0,0 +1,26 @@ +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters('localhost') +) +channel = connection.channel() + +channel.exchange_declare(exchange='logs', exchange_type='fanout') + +result = channel.queue_declare(queue='', exclusive=True) +queue_name = result.method.queue + +channel.queue_bind(exchange='logs', queue=queue_name) + +print(' [*] Waiting for logs. To exit press CTRL+C') + +def callback(ch, method, properties, body): + print(f" [x] {body.decode()}") + +channel.basic_consume( + queue=queue_name, + on_message_callback=callback, + auto_ack=True +) + +channel.start_consuming() \ No newline at end of file