diff --git a/arutunyan_dmitry_lab_4/README.md b/arutunyan_dmitry_lab_4/README.md new file mode 100644 index 0000000..ca400ca --- /dev/null +++ b/arutunyan_dmitry_lab_4/README.md @@ -0,0 +1,159 @@ + +## Лабораторная работа 4. Вариант 4. +### Задание +Изучить проектирования приложений при помощи брокера сообщений. + +- Установить брокер сообщений `RabbitMQ`, +- Пройти первые 3 урока из `RabbitMQ Tutorials`, +- Продемонстрировать работу брокера сообщений. + +### Как запустить +Для запуска программы необходимо с помощью командной строки в корневой директории файлов прокета прописать: +``` +python publisher.py +python consumer1.py +python consumer2.py +``` +Результат работы программы будет выведен в консоль. + +### Используемые технологии + +- Брокер сообщений `RabbitMQ` - программная система, реализующая протокол AMQP (Advanced Message Queuing Protocol), который представляет собой стандартный протокол обмена сообщениями между приложениями. `RabbitMQ` работает на основе модели "производитель-потребитель" (producer-consumer), где приложения, называемые "производителями", создают и отправляют сообщения в очередь, а другие приложения, называемые "потребителями", получают и обрабатывают эти сообщения из очереди. RabbitMQ обеспечивает надежную доставку сообщений, сохраняя их в очереди до тех пор, пока они не будут получены и обработаны потребителями. +- Библиотека `pika`, обеспечивающая полную поддержку протокола AMQP (Advanced Message Queuing Protocol), который является стандартом для обмена сообщениями в системах очередей сообщений. Благодаря этой библиотеке возможно создание и настройка связи между компонентами системы, обмен сообщениями и управление очередями, используя простой и понятный API. + +### Описание работы +#### Выполнение RabbitMQ Tutorials +##### Урок "Hello World!" +В данном уроке рассматриваются две небольшие программы на Python; производитель (отправитель), который отправляет одно сообщение, и потребитель (получатель), который получает сообщения "Привет, мир" и распечатывает их. + +![](diag1.png "") + +Результат выполнения программ: +![](prog1.png "") + +##### Урок "Work queues" +В данном уроке рассматривается создание рабочей очереди, которая будет использоваться для распределения трудоемких задач между несколькими рабочими. + +![](diag2.png "") + +Результат выполнения программ: +![](prog2.png "") + +##### Урок "Publish/Subscribe" +В данном уроке рассматривается создание простой системы ведения журнала, состоящей из двух программ - первая будет отправлять сообщения журнала, а вторая будет получать и распечатывать их. Доставка сообщений будет производиться для нескольких потребителей. + +Таким образом, в данном уроке рассматривается реализация шаблока "публикация / подписка". + +![](diag3.png "") + +Результат выполнения программ: +![](prog3.png "") + +#### Выполнение демонстрационного приложения + +В качестве предметной области была выбрана тематика курсовой работы "Суши-бар". + +В таком случае, `publisher` раз в секунду отправляет сообщение по состоянию элементов заказа, на которое `consumer1` и `consumer2` необходимо отреагировать. + +```python +logs = ["get address", "get order", "get pavement", "order done"] + +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() +exchange_name = 'logs' +channel.exchange_declare(exchange=exchange_name, exchange_type='fanout') +print(' [*] Started. To exit press CTRL+C') + +while 1: + log = random.choice(logs) + channel.basic_publish(exchange=exchange_name, routing_key='', body=log) + print(f" [x] Published: {log}") + time.sleep(1) +``` + +Данные сообщение транслируются на обе очереди подписчиков: `slow-queue` и `fast-queue`. `consumer1` принимает сообшения в очередь `slow-queue` и реагирует на них (обрабатывает) в течении 2-3 секунд. + +```python +def message_manager(channel, queue_name, exchange_name): + channel.queue_declare(queue=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + + def callback(ch, method, properties, body): + task = body.decode() + print(f" [x] Received : {task}") + time.sleep(random.randint(2, 3)) + if task == "get address": + print(" [x] Address set") + elif task == "get order": + print(" [x] Order sent to preparation") + elif task == "get pavement": + print(" [x] Bank account checked") + else: + print(" [x] Order sent to a delivery") + ch.basic_ack(delivery_tag=method.delivery_tag) + + channel.basic_consume(queue=queue_name, on_message_callback=callback) + print("[*] Waiting for messages. To exit press CTRL+C") + channel.start_consuming() + + +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() +exchange_name = 'logs' +queue_name = 'slow-queue' + +consumer_thread = threading.Thread(target=message_manager, args=(channel, queue_name, exchange_name)) +consumer_thread.start() +consumer_thread.join() +``` + +`consumer2` принимает сообшения в очередь `fast-queue` и реагирует на них (обрабатывает) без задержек. + +```python +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() +exchange_name = 'logs' +queue_name = 'fast-queue' + +channel.queue_declare(queue=queue_name) +channel.queue_bind(exchange=exchange_name, queue=queue_name) + + +def callback(ch, method, properties, body): + task = body.decode() + print(f" [x] Received : {task}") + if task == "get address": + print(" [x] Address set") + elif task == "get order": + print(" [x] Order sent to preparation") + elif task == "get pavement": + print(" [x] Bank account checked") + else: + print(" [x] Order sent to a delivery") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +channel.basic_consume(queue=queue_name, on_message_callback=callback) +print("[*] Waiting for messages. To exit press CTRL+C") +channel.start_consuming() +``` + +#### Работа программы. Обзор скорости обработки сообщений + +В первом случае, запустим по одному экземпляру каждого приложения. Проверим состояния очередей каждого из consumer: + +![](rep1.png "") + +Согласно данныи `RabbitMQ`, задачи в очереди `slow-queue` исполнителя `consumer1` накапливаются линейно, поскольку данный исполнитель обрабатывает задачи с задержкой. Задачи в очереди `fast-queue` исполнителя `consumer2` не накапливаются, т.к. данный исполнитель выполняет задачи мгновенно. + +Во втором случае, запустим исполнитель `consumer1` в 3х экземплярах и проверим состояние очереди `slow-queue`: + +![](rep2.png "") + +Как можно заметить, в данной конфигурации удалось добиться постоянного количества задач в очереди `slow-queue` - 2-3 задачи. Это обусловлено тем, что принимаемые `consumer1` задачи равномерно распределяются по 3м исполнителям и успевают обрабатываться ими до накопления задач от `publisher`. + +### Вывод +Таким образом, `RabbitMQ` может иметь применение в распределённых системах в качестве узла обмена сообщениями между микросервисами для распределения и адресации задач между ними, а также обмеспечивает управление данными процессами. + +### Видео +https://youtu.be/T0lIQIHTenY \ No newline at end of file diff --git a/arutunyan_dmitry_lab_4/diag1.png b/arutunyan_dmitry_lab_4/diag1.png new file mode 100644 index 0000000..88eb316 Binary files /dev/null and b/arutunyan_dmitry_lab_4/diag1.png differ diff --git a/arutunyan_dmitry_lab_4/diag2.png b/arutunyan_dmitry_lab_4/diag2.png new file mode 100644 index 0000000..4369981 Binary files /dev/null and b/arutunyan_dmitry_lab_4/diag2.png differ diff --git a/arutunyan_dmitry_lab_4/diag3.png b/arutunyan_dmitry_lab_4/diag3.png new file mode 100644 index 0000000..108734f Binary files /dev/null and b/arutunyan_dmitry_lab_4/diag3.png differ diff --git a/arutunyan_dmitry_lab_4/hello-world/receive.py b/arutunyan_dmitry_lab_4/hello-world/receive.py new file mode 100644 index 0000000..7fca03d --- /dev/null +++ b/arutunyan_dmitry_lab_4/hello-world/receive.py @@ -0,0 +1,27 @@ +import pika, sys, os + + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + channel.queue_declare(queue='hello') + + def callback(ch, method, properties, body): + print(f" [x] Sent {body}") + + channel.basic_consume(queue='hello', + auto_ack=True, + on_message_callback=callback) + print(' [*] Waiting for messages. To exit press CTRL+C') + channel.start_consuming() + + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print('Interrupted') + try: + sys.exit(0) + except SystemExit: + os._exit(0) diff --git a/arutunyan_dmitry_lab_4/hello-world/send.py b/arutunyan_dmitry_lab_4/hello-world/send.py new file mode 100644 index 0000000..431fadf --- /dev/null +++ b/arutunyan_dmitry_lab_4/hello-world/send.py @@ -0,0 +1,10 @@ +import pika + +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() +channel.queue_declare(queue='hello') +channel.basic_publish(exchange='', + routing_key='hello', + body='Hello World!') +print(" [x] Sent 'Hello World!'") +connection.close() diff --git a/arutunyan_dmitry_lab_4/prog1.png b/arutunyan_dmitry_lab_4/prog1.png new file mode 100644 index 0000000..522b294 Binary files /dev/null and b/arutunyan_dmitry_lab_4/prog1.png differ diff --git a/arutunyan_dmitry_lab_4/prog2.png b/arutunyan_dmitry_lab_4/prog2.png new file mode 100644 index 0000000..2e3b0c4 Binary files /dev/null and b/arutunyan_dmitry_lab_4/prog2.png differ diff --git a/arutunyan_dmitry_lab_4/prog3.png b/arutunyan_dmitry_lab_4/prog3.png new file mode 100644 index 0000000..4e10c31 Binary files /dev/null and b/arutunyan_dmitry_lab_4/prog3.png differ diff --git a/arutunyan_dmitry_lab_4/publish-subscribe/emit_log.py b/arutunyan_dmitry_lab_4/publish-subscribe/emit_log.py new file mode 100644 index 0000000..1f56fd5 --- /dev/null +++ b/arutunyan_dmitry_lab_4/publish-subscribe/emit_log.py @@ -0,0 +1,13 @@ +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.exchange_declare(exchange='logs', exchange_type='fanout') + +message = ' '.join(sys.argv[1:]) or "info: Hello World!" +channel.basic_publish(exchange='logs', routing_key='', body=message) +print(f" [x] Sent {message}") +connection.close() diff --git a/arutunyan_dmitry_lab_4/publish-subscribe/receive_logs.py b/arutunyan_dmitry_lab_4/publish-subscribe/receive_logs.py new file mode 100644 index 0000000..b6c29d1 --- /dev/null +++ b/arutunyan_dmitry_lab_4/publish-subscribe/receive_logs.py @@ -0,0 +1,24 @@ +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.exchange_declare(exchange='logs', exchange_type='fanout') + +result = channel.queue_declare(queue='', exclusive=True) +queue_name = result.method.queue + +channel.queue_bind(exchange='logs', queue=queue_name) + +print(' [*] Waiting for logs. To exit press CTRL+C') + + +def callback(ch, method, properties, body): + print(f" [x] {body}") + + +channel.basic_consume( + queue=queue_name, on_message_callback=callback, auto_ack=True) + +channel.start_consuming() diff --git a/arutunyan_dmitry_lab_4/rep1.png b/arutunyan_dmitry_lab_4/rep1.png new file mode 100644 index 0000000..9214f20 Binary files /dev/null and b/arutunyan_dmitry_lab_4/rep1.png differ diff --git a/arutunyan_dmitry_lab_4/rep2.png b/arutunyan_dmitry_lab_4/rep2.png new file mode 100644 index 0000000..a73a886 Binary files /dev/null and b/arutunyan_dmitry_lab_4/rep2.png differ diff --git a/arutunyan_dmitry_lab_4/sushi-bar/consumer1.py b/arutunyan_dmitry_lab_4/sushi-bar/consumer1.py new file mode 100644 index 0000000..754e36a --- /dev/null +++ b/arutunyan_dmitry_lab_4/sushi-bar/consumer1.py @@ -0,0 +1,34 @@ +import pika, time, threading, random + + +def message_manager(channel, queue_name, exchange_name): + channel.queue_declare(queue=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + + def callback(ch, method, properties, body): + task = body.decode() + print(f" [x] Received : {task}") + time.sleep(random.randint(2, 3)) + if task == "get address": + print(" [x] Address set") + elif task == "get order": + print(" [x] Order sent to preparation") + elif task == "get pavement": + print(" [x] Bank account checked") + else: + print(" [x] Order sent to a delivery") + ch.basic_ack(delivery_tag=method.delivery_tag) + + channel.basic_consume(queue=queue_name, on_message_callback=callback) + print("[*] Waiting for messages. To exit press CTRL+C") + channel.start_consuming() + + +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() +exchange_name = 'logs' +queue_name = 'slow-queue' + +consumer_thread = threading.Thread(target=message_manager, args=(channel, queue_name, exchange_name)) +consumer_thread.start() +consumer_thread.join() diff --git a/arutunyan_dmitry_lab_4/sushi-bar/consumer2.py b/arutunyan_dmitry_lab_4/sushi-bar/consumer2.py new file mode 100644 index 0000000..ec7ee5a --- /dev/null +++ b/arutunyan_dmitry_lab_4/sushi-bar/consumer2.py @@ -0,0 +1,28 @@ +import pika + +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() +exchange_name = 'logs' +queue_name = 'fast-queue' + +channel.queue_declare(queue=queue_name) +channel.queue_bind(exchange=exchange_name, queue=queue_name) + + +def callback(ch, method, properties, body): + task = body.decode() + print(f" [x] Received : {task}") + if task == "get address": + print(" [x] Address set") + elif task == "get order": + print(" [x] Order sent to preparation") + elif task == "get pavement": + print(" [x] Bank account checked") + else: + print(" [x] Order sent to a delivery") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +channel.basic_consume(queue=queue_name, on_message_callback=callback) +print("[*] Waiting for messages. To exit press CTRL+C") +channel.start_consuming() diff --git a/arutunyan_dmitry_lab_4/sushi-bar/publisher.py b/arutunyan_dmitry_lab_4/sushi-bar/publisher.py new file mode 100644 index 0000000..f14222a --- /dev/null +++ b/arutunyan_dmitry_lab_4/sushi-bar/publisher.py @@ -0,0 +1,18 @@ +import pika, time, random +logs = ["get address", "get order", "get pavement", "order done"] + +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() +exchange_name = 'logs' +channel.exchange_declare(exchange=exchange_name, exchange_type='fanout') +print(' [*] Started. To exit press CTRL+C') + +while 1: + log = random.choice(logs) + channel.basic_publish(exchange=exchange_name, routing_key='', body=log) + print(f" [x] Published: {log}") + time.sleep(1) + + + + diff --git a/arutunyan_dmitry_lab_4/work-queues/new-task.py b/arutunyan_dmitry_lab_4/work-queues/new-task.py new file mode 100644 index 0000000..3503d55 --- /dev/null +++ b/arutunyan_dmitry_lab_4/work-queues/new-task.py @@ -0,0 +1,20 @@ +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) + +message = ' '.join(sys.argv[1:]) or "Hello World!" +channel.basic_publish( + exchange='', + routing_key='task_queue', + body=message, + properties=pika.BasicProperties( + delivery_mode=pika.DeliveryMode.Persistent + )) +print(f" [x] Sent {message}") +connection.close() + diff --git a/arutunyan_dmitry_lab_4/work-queues/worker.py b/arutunyan_dmitry_lab_4/work-queues/worker.py new file mode 100644 index 0000000..deeec14 --- /dev/null +++ b/arutunyan_dmitry_lab_4/work-queues/worker.py @@ -0,0 +1,22 @@ +import pika +import time + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) +print(' [*] Waiting for messages. To exit press CTRL+C') + + +def callback(ch, method, properties, body): + print(f" [x] Received {body.decode()}") + time.sleep(body.count(b'.')) + print(" [x] Done") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +channel.basic_qos(prefetch_count=1) +channel.basic_consume(queue='task_queue', on_message_callback=callback) + +channel.start_consuming()