Merge pull request 'savenkov_alexander_lab_4_ready' (#140) from savenkov_alexander_lab_4 into main

Reviewed-on: #140
This commit is contained in:
Alexey 2024-12-10 22:02:50 +04:00
commit b851fa7e05
19 changed files with 201 additions and 0 deletions

View File

@ -0,0 +1,3 @@
# Default ignored files
/shelf/
/workspace.xml

View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="DiscordProjectSettings">
<option name="show" value="ASK" />
<option name="description" value="" />
</component>
</project>

View File

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11 (savenkov_alexander_lab_4) (2)" project-jdk-type="Python SDK" />
</project>

View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/savenkov_alexander_lab_4.iml" filepath="$PROJECT_DIR$/.idea/savenkov_alexander_lab_4.iml" />
</modules>
</component>
</project>

View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -0,0 +1,69 @@
# Лабораторная работа №4 - Работа с брокером сообщений
Цель: изучение проектирования приложений при помощи брокера сообщений.
Задачи:
Необходимо выбрать предметную область и разработать следующие приложения:
Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт".
Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#).
Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1.
Далее необходимо собрать и запустить приложения одновременно по одному экземпляру.
Сделать в отчёте вывод о скорости обработки consumer-ами событий от publisher-а. Для этого можно посмотреть заполненность созданных очередей. А для этого можно использовать скриншот из RabbitMQ Management UI.
Запустить несколько копий Consumer 1. Проверить заново заполненность очередей через UI.
# Publisher
<p>
<div>Код Publisher</div>
<img src="screens/img1.png" width="650" title="Код Publisher">
</p>
<p>
<div>Работа Publisher</div>
<img src="screens/img2.png" width="650" title="Работа Publisher">
</p>
# Consumer 1
<p>
<div>Код Consumer 1</div>
<img src="screens/img3.png" width="650" title="Код Consumer 1">
</p>
<p>
<div>Работа Consumer 1</div>
<img src="screens/img4.png" width="650" title="Работа Consumer 1">
</p>
# Consumer 2
<p>
<divКод Consumer 2</div>
<img src="screens/img5.png" width="650" title="Код Consumer 2">
</p>
<p>
<div>Работа Consumer 2</div>
<img src="screens/img6.png" width="650" title="Работа Consumer 2">
</p>
# RabbitMQ Management UI
<p>
<div>До запусков Consumer</div>
<img src="screens/img7.png" width="650" title="До запусков Consumer">
</p>
<p>
<div>После запуска Consumer 1</div>
<img src="screens/img8.png" width="650" title="После запуска Consumer 1">
</p>
<p>
<div>После запуска Consumer 2</div>
<img src="screens/img9.png" width="650" title="После запуска Consumer 2">
</p>
Вывод: по данным об очередях с RabbitMQ Management UI видно, что второй Consumer работает быстрее и ограничен лишь скоростью отправки сообщений, но если скорость отправки сообщений не будет ограничена, то возникает риск пропуска сообщений, а также такой метод сильнее нагружает систему и усложняет отслеживания работоспособности системы, что может привести к сбоям.
# Видео
Видео с разбором лабораторной работы - https://youtu.be/8GOG8MyPkO4

View File

@ -0,0 +1,34 @@
import pika
import time
def process_inventory_event(ch, method, properties, body):
decoded_message = body.decode('utf-8')
print(f" [x] Processing Inventory Event: {decoded_message}")
time.sleep(2)
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
# Устанавливаем соединение с сервером RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем exchange с типом 'fanout'
channel.exchange_declare(exchange='events', exchange_type='fanout')
# Создаем очередь с уникальным именем
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Привязываем очередь к exchange
channel.queue_bind(exchange='events', queue=queue_name)
# Указываем, как обрабатывать сообщения при получении
channel.basic_consume(queue=queue_name, on_message_callback=process_inventory_event)
print(' [*] Waiting for Inventory Events. To exit press CTRL+C')
# Запускаем бесконечный цикл получения и обработки сообщений
channel.start_consuming()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,34 @@
import pika
import time
def process_order_event(ch, method, properties, body):
decoded_message = body.decode('utf-8')
print(f" [x] Processing Order Event: {decoded_message}")
# No delay for Consumer 2
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
def main():
# Устанавливаем соединение с сервером RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем exchange с типом 'fanout'
channel.exchange_declare(exchange='events', exchange_type='fanout')
# Создаем очередь с уникальным именем
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Привязываем очередь к exchange
channel.queue_bind(exchange='events', queue=queue_name)
# Указываем, как обрабатывать сообщения при получении
channel.basic_consume(queue=queue_name, on_message_callback=process_order_event)
print(' [*] Waiting for Order Events. To exit press CTRL+C')
# Запускаем бесконечный цикл получения и обработки сообщений
channel.start_consuming()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,26 @@
import pika
import time
import random
def generate_order_event(channel):
order_id = random.randint(1, 1000)
event = f"Поступил новый заказ #{order_id}"
message = f"Событие: {event}"
channel.basic_publish(exchange='events', routing_key='', body=message)
print(f" [x] Sent: {message}")
def main():
# Устанавливаем соединение с сервером RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем exchange с типом 'fanout'
channel.exchange_declare(exchange='events', exchange_type='fanout')
# В бесконечном цикле генерируем и отправляем события в RabbitMQ
while True:
generate_order_event(channel)
time.sleep(1)
if __name__ == '__main__':
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 103 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 132 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 132 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 64 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 90 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 69 KiB