arutunyan_dmitry_lab_4 is ready

This commit is contained in:
Arutunyan-Dmitry 2024-01-17 22:26:54 +04:00
parent 7de577eadc
commit 0c8752a817
18 changed files with 355 additions and 0 deletions

View File

@ -0,0 +1,159 @@
## Лабораторная работа 4. Вариант 4.
### Задание
Изучить проектирования приложений при помощи брокера сообщений.
- Установить брокер сообщений `RabbitMQ`,
- Пройти первые 3 урока из `RabbitMQ Tutorials`,
- Продемонстрировать работу брокера сообщений.
### Как запустить
Для запуска программы необходимо с помощью командной строки в корневой директории файлов прокета прописать:
```
python publisher.py
python consumer1.py
python consumer2.py
```
Результат работы программы будет выведен в консоль.
### Используемые технологии
- Брокер сообщений `RabbitMQ` - программная система, реализующая протокол AMQP (Advanced Message Queuing Protocol), который представляет собой стандартный протокол обмена сообщениями между приложениями. `RabbitMQ` работает на основе модели "производитель-потребитель" (producer-consumer), где приложения, называемые "производителями", создают и отправляют сообщения в очередь, а другие приложения, называемые "потребителями", получают и обрабатывают эти сообщения из очереди. RabbitMQ обеспечивает надежную доставку сообщений, сохраняя их в очереди до тех пор, пока они не будут получены и обработаны потребителями.
- Библиотека `pika`, обеспечивающая полную поддержку протокола AMQP (Advanced Message Queuing Protocol), который является стандартом для обмена сообщениями в системах очередей сообщений. Благодаря этой библиотеке возможно создание и настройка связи между компонентами системы, обмен сообщениями и управление очередями, используя простой и понятный API.
### Описание работы
#### Выполнение RabbitMQ Tutorials
##### Урок "Hello World!"
В данном уроке рассматриваются две небольшие программы на Python; производитель (отправитель), который отправляет одно сообщение, и потребитель (получатель), который получает сообщения "Привет, мир" и распечатывает их.
![](diag1.png "")
Результат выполнения программ:
![](prog1.png "")
##### Урок "Work queues"
В данном уроке рассматривается создание рабочей очереди, которая будет использоваться для распределения трудоемких задач между несколькими рабочими.
![](diag2.png "")
Результат выполнения программ:
![](prog2.png "")
##### Урок "Publish/Subscribe"
В данном уроке рассматривается создание простой системы ведения журнала, состоящей из двух программ - первая будет отправлять сообщения журнала, а вторая будет получать и распечатывать их. Доставка сообщений будет производиться для нескольких потребителей.
Таким образом, в данном уроке рассматривается реализация шаблока "публикация / подписка".
![](diag3.png "")
Результат выполнения программ:
![](prog3.png "")
#### Выполнение демонстрационного приложения
В качестве предметной области была выбрана тематика курсовой работы "Суши-бар".
В таком случае, `publisher` раз в секунду отправляет сообщение по состоянию элементов заказа, на которое `consumer1` и `consumer2` необходимо отреагировать.
```python
logs = ["get address", "get order", "get pavement", "order done"]
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
print(' [*] Started. To exit press CTRL+C')
while 1:
log = random.choice(logs)
channel.basic_publish(exchange=exchange_name, routing_key='', body=log)
print(f" [x] Published: {log}")
time.sleep(1)
```
Данные сообщение транслируются на обе очереди подписчиков: `slow-queue` и `fast-queue`. `consumer1` принимает сообшения в очередь `slow-queue` и реагирует на них (обрабатывает) в течении 2-3 секунд.
```python
def message_manager(channel, queue_name, exchange_name):
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
def callback(ch, method, properties, body):
task = body.decode()
print(f" [x] Received : {task}")
time.sleep(random.randint(2, 3))
if task == "get address":
print(" [x] Address set")
elif task == "get order":
print(" [x] Order sent to preparation")
elif task == "get pavement":
print(" [x] Bank account checked")
else:
print(" [x] Order sent to a delivery")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print("[*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'slow-queue'
consumer_thread = threading.Thread(target=message_manager, args=(channel, queue_name, exchange_name))
consumer_thread.start()
consumer_thread.join()
```
`consumer2` принимает сообшения в очередь `fast-queue` и реагирует на них (обрабатывает) без задержек.
```python
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'fast-queue'
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
def callback(ch, method, properties, body):
task = body.decode()
print(f" [x] Received : {task}")
if task == "get address":
print(" [x] Address set")
elif task == "get order":
print(" [x] Order sent to preparation")
elif task == "get pavement":
print(" [x] Bank account checked")
else:
print(" [x] Order sent to a delivery")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print("[*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
```
#### Работа программы. Обзор скорости обработки сообщений
В первом случае, запустим по одному экземпляру каждого приложения. Проверим состояния очередей каждого из consumer:
![](rep1.png "")
Согласно данныи `RabbitMQ`, задачи в очереди `slow-queue` исполнителя `consumer1` накапливаются линейно, поскольку данный исполнитель обрабатывает задачи с задержкой. Задачи в очереди `fast-queue` исполнителя `consumer2` не накапливаются, т.к. данный исполнитель выполняет задачи мгновенно.
Во втором случае, запустим исполнитель `consumer1` в 3х экземплярах и проверим состояние очереди `slow-queue`:
![](rep2.png "")
Как можно заметить, в данной конфигурации удалось добиться постоянного количества задач в очереди `slow-queue` - 2-3 задачи. Это обусловлено тем, что принимаемые `consumer1` задачи равномерно распределяются по 3м исполнителям и успевают обрабатываться ими до накопления задач от `publisher`.
### Вывод
Таким образом, `RabbitMQ` может иметь применение в распределённых системах в качестве узла обмена сообщениями между микросервисами для распределения и адресации задач между ними, а также обмеспечивает управление данными процессами.
### Видео
https://youtu.be/T0lIQIHTenY

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.4 KiB

View File

@ -0,0 +1,27 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Sent {body}")
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@ -0,0 +1,10 @@
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

View File

@ -0,0 +1,13 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,24 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

Binary file not shown.

After

Width:  |  Height:  |  Size: 184 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 87 KiB

View File

@ -0,0 +1,34 @@
import pika, time, threading, random
def message_manager(channel, queue_name, exchange_name):
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
def callback(ch, method, properties, body):
task = body.decode()
print(f" [x] Received : {task}")
time.sleep(random.randint(2, 3))
if task == "get address":
print(" [x] Address set")
elif task == "get order":
print(" [x] Order sent to preparation")
elif task == "get pavement":
print(" [x] Bank account checked")
else:
print(" [x] Order sent to a delivery")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print("[*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'slow-queue'
consumer_thread = threading.Thread(target=message_manager, args=(channel, queue_name, exchange_name))
consumer_thread.start()
consumer_thread.join()

View File

@ -0,0 +1,28 @@
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'fast-queue'
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
def callback(ch, method, properties, body):
task = body.decode()
print(f" [x] Received : {task}")
if task == "get address":
print(" [x] Address set")
elif task == "get order":
print(" [x] Order sent to preparation")
elif task == "get pavement":
print(" [x] Bank account checked")
else:
print(" [x] Order sent to a delivery")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print("[*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()

View File

@ -0,0 +1,18 @@
import pika, time, random
logs = ["get address", "get order", "get pavement", "order done"]
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
print(' [*] Started. To exit press CTRL+C')
while 1:
log = random.choice(logs)
channel.basic_publish(exchange=exchange_name, routing_key='', body=log)
print(f" [x] Published: {log}")
time.sleep(1)

View File

@ -0,0 +1,20 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,22 @@
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()