This commit is contained in:
Евгений Сергеев 2024-01-22 00:58:41 +04:00
parent c87987d4f3
commit 0743ecc259
11 changed files with 166 additions and 0 deletions

View File

@ -0,0 +1,53 @@
# Лабораторная работа №4 - Работа с брокером сообщений
Цель: изучение проектирования приложений при помощи брокера сообщений
Задачи:
Необходимо выбрать предметную область и разработать следующие приложения:
Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт".
Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#).
Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1.
Далее необходимо собрать и запустить приложения одновременно по одному экземпляру.
Сделать в отчёте вывод о скорости обработки consumer-ами событий от publisher-а. Для этого можно посмотреть заполненность созданных очередей.
А для этого можно использовать скриншот из RabbitMQ Management UI.
Запустить несколько копий Consumer 1. Проверить заново заполненность очередей через UI.
Скриншоты сообщений:
Publisher:
![Alt text](images/img_publisher.png)
Consumer 1:
![Alt text](images/img_slow.png)
Consumer 2:
![Alt text](images/img_fast.png)
# RabbitMQ Management UI
Без запущенных consumerов:
![Alt text](images/img_without_consumers.png)
Запустил Consumer 1:
![Alt text](images/img_with_consumer_slow.png)
Запустил Consumer 1 и 2:
![Alt text](images/img_with_consumers.png)
Запустил Consumer 1 и 2:
![Alt text](images/img_with_two_slow_consumers.png)
Вывод: по данным об очередях с RabbitMQ Management UI видно, что второй Consumer
работает быстрее и ограничен лишь скоростью отправки сообщений, но если скорость отправки сообщений не
будет ограничена, то возникает риск пропуска сообщений, а также такой метод сильнее нагружает систему и
усложняет отслеживания работоспособности системы, что может привести к сбоям. Также у брокера сообщений можно поставить
опцию автоматического ответа на сообщения, что ускоряет скорость работы, но увеличивает шанс потери сообщений.
Также два работающих Consumer 1 обрабатывают сообщения медленнее, чем один Consumer 2
# Видео
Видео с разбором лабораторной работы - https://youtu.be/8GOG8MyPkO4

View File

@ -0,0 +1,44 @@
import threading
import pika
import json
class LectureConsumer2:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='lecture_exchange', exchange_type='fanout')
result = self.channel.queue_declare(queue='', exclusive=True)
self.queue_name = result.method.queue
print('fast + ' + self.queue_name)
self.channel.queue_bind(exchange='lecture_exchange', queue=self.queue_name)
def callback(self, ch, method, properties, body):
lecture_event = json.loads(body)
print(f" [x] Consumer_fast received and processed lecture event instantly: {lecture_event}")
def consume_lectures(self):
self.channel.basic_consume(queue=self.queue_name, on_message_callback=self.callback)
print(' [*] Consumer_fast waiting for lecture events. To exit press CTRL+C')
self.channel.start_consuming()
if __name__ == '__main__':
lecture_consumer2 = LectureConsumer2()
try:
lecture_consumer2.consume_lectures()
except KeyboardInterrupt:
lecture_consumer2.connection.close()
# if __name__ == '__main__':
# lecture_consumer2 = LectureConsumer2()
# try:
# t1 = threading.Thread(target=lecture_consumer2.consume_lectures)
# t1.start()
# t1.join()
# except KeyboardInterrupt:
# lecture_consumer2.connection.close()

View File

@ -0,0 +1,38 @@
import pika
import json
import time
import threading
class LectureConsumer1:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='lecture_exchange', exchange_type='fanout')
result = self.channel.queue_declare(queue='', exclusive=True)
self.queue_name = result.method.queue
print('slow + ' + self.queue_name)
self.channel.queue_bind(exchange='lecture_exchange', queue=self.queue_name)
def callback(self, ch, method, properties, body):
lecture_event = json.loads(body)
print(f" [x] Consumer_slow received lecture event: {lecture_event}")
time.sleep(3)
print(f" [x] Consumer_slow processed lecture event: {lecture_event}")
def consume_lectures(self):
self.channel.basic_consume(queue=self.queue_name, on_message_callback=self.callback)
print(' [*] Consumer_slow waiting for lecture events. To exit press CTRL+C')
self.channel.start_consuming()
if __name__ == '__main__':
lecture_consumer1 = LectureConsumer1()
try:
t1 = threading.Thread(target=lecture_consumer1.consume_lectures)
t1.start()
t1.join()
except KeyboardInterrupt:
lecture_consumer1.connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 83 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 63 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 60 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

View File

@ -0,0 +1,31 @@
import pika
import time
import json
import random
class LecturePublisher:
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='lecture_exchange', exchange_type='fanout')
def publish_lecture_event(self):
lecture_event = {
'event_type': 'new_lecture',
'lecture_id': random.randint(1, 100),
'topic': f'Lecture on Topic {random.randint(1, 10)}',
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
}
self.channel.basic_publish(exchange='lecture_exchange', routing_key='', body=json.dumps(lecture_event))
print(f" [x] Sent lecture event: {lecture_event}")
if __name__ == '__main__':
lecture_publisher = LecturePublisher()
try:
while True:
lecture_publisher.publish_lecture_event()
time.sleep(1)
except KeyboardInterrupt:
lecture_publisher.connection.close()