diff --git a/sergeev_evgenii_lab_4/README.md b/sergeev_evgenii_lab_4/README.md index e69de29..d809a7c 100644 --- a/sergeev_evgenii_lab_4/README.md +++ b/sergeev_evgenii_lab_4/README.md @@ -0,0 +1,53 @@ +# Лабораторная работа №4 - Работа с брокером сообщений + +Цель: изучение проектирования приложений при помощи брокера сообщений + +Задачи: + +Необходимо выбрать предметную область и разработать следующие приложения: + +Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт". +Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#). +Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1. +Далее необходимо собрать и запустить приложения одновременно по одному экземпляру. + +Сделать в отчёте вывод о скорости обработки consumer-ами событий от publisher-а. Для этого можно посмотреть заполненность созданных очередей. +А для этого можно использовать скриншот из RabbitMQ Management UI. + +Запустить несколько копий Consumer 1. Проверить заново заполненность очередей через UI. + +Скриншоты сообщений: + +Publisher: +![Alt text](images/img_publisher.png) + +Consumer 1: +![Alt text](images/img_slow.png) + +Consumer 2: +![Alt text](images/img_fast.png) + + +# RabbitMQ Management UI +Без запущенных consumerов: +![Alt text](images/img_without_consumers.png) + +Запустил Consumer 1: +![Alt text](images/img_with_consumer_slow.png) + +Запустил Consumer 1 и 2: +![Alt text](images/img_with_consumers.png) + +Запустил Consumer 1 и 2: +![Alt text](images/img_with_two_slow_consumers.png) + +Вывод: по данным об очередях с RabbitMQ Management UI видно, что второй Consumer +работает быстрее и ограничен лишь скоростью отправки сообщений, но если скорость отправки сообщений не +будет ограничена, то возникает риск пропуска сообщений, а также такой метод сильнее нагружает систему и +усложняет отслеживания работоспособности системы, что может привести к сбоям. Также у брокера сообщений можно поставить +опцию автоматического ответа на сообщения, что ускоряет скорость работы, но увеличивает шанс потери сообщений. +Также два работающих Consumer 1 обрабатывают сообщения медленнее, чем один Consumer 2 + +# Видео + +Видео с разбором лабораторной работы - https://youtu.be/8GOG8MyPkO4 \ No newline at end of file diff --git a/sergeev_evgenii_lab_4/consumer_fast.py b/sergeev_evgenii_lab_4/consumer_fast.py new file mode 100644 index 0000000..6200cca --- /dev/null +++ b/sergeev_evgenii_lab_4/consumer_fast.py @@ -0,0 +1,44 @@ +import threading + +import pika +import json + + +class LectureConsumer2: + def __init__(self): + self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + self.channel = self.connection.channel() + + self.channel.exchange_declare(exchange='lecture_exchange', exchange_type='fanout') + + result = self.channel.queue_declare(queue='', exclusive=True) + self.queue_name = result.method.queue + print('fast + ' + self.queue_name) + + self.channel.queue_bind(exchange='lecture_exchange', queue=self.queue_name) + + def callback(self, ch, method, properties, body): + lecture_event = json.loads(body) + print(f" [x] Consumer_fast received and processed lecture event instantly: {lecture_event}") + + def consume_lectures(self): + self.channel.basic_consume(queue=self.queue_name, on_message_callback=self.callback) + print(' [*] Consumer_fast waiting for lecture events. To exit press CTRL+C') + self.channel.start_consuming() + + +if __name__ == '__main__': + lecture_consumer2 = LectureConsumer2() + try: + lecture_consumer2.consume_lectures() + except KeyboardInterrupt: + lecture_consumer2.connection.close() + +# if __name__ == '__main__': +# lecture_consumer2 = LectureConsumer2() +# try: +# t1 = threading.Thread(target=lecture_consumer2.consume_lectures) +# t1.start() +# t1.join() +# except KeyboardInterrupt: +# lecture_consumer2.connection.close() diff --git a/sergeev_evgenii_lab_4/consumer_slow.py b/sergeev_evgenii_lab_4/consumer_slow.py new file mode 100644 index 0000000..fb341d5 --- /dev/null +++ b/sergeev_evgenii_lab_4/consumer_slow.py @@ -0,0 +1,38 @@ +import pika +import json +import time +import threading + + +class LectureConsumer1: + def __init__(self): + self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange='lecture_exchange', exchange_type='fanout') + + result = self.channel.queue_declare(queue='', exclusive=True) + self.queue_name = result.method.queue + print('slow + ' + self.queue_name) + + self.channel.queue_bind(exchange='lecture_exchange', queue=self.queue_name) + + def callback(self, ch, method, properties, body): + lecture_event = json.loads(body) + print(f" [x] Consumer_slow received lecture event: {lecture_event}") + time.sleep(3) + print(f" [x] Consumer_slow processed lecture event: {lecture_event}") + + def consume_lectures(self): + self.channel.basic_consume(queue=self.queue_name, on_message_callback=self.callback) + print(' [*] Consumer_slow waiting for lecture events. To exit press CTRL+C') + self.channel.start_consuming() + + +if __name__ == '__main__': + lecture_consumer1 = LectureConsumer1() + try: + t1 = threading.Thread(target=lecture_consumer1.consume_lectures) + t1.start() + t1.join() + except KeyboardInterrupt: + lecture_consumer1.connection.close() diff --git a/sergeev_evgenii_lab_4/images/img_fast.png b/sergeev_evgenii_lab_4/images/img_fast.png new file mode 100644 index 0000000..a5f1f78 Binary files /dev/null and b/sergeev_evgenii_lab_4/images/img_fast.png differ diff --git a/sergeev_evgenii_lab_4/images/img_publisher.png b/sergeev_evgenii_lab_4/images/img_publisher.png new file mode 100644 index 0000000..571e954 Binary files /dev/null and b/sergeev_evgenii_lab_4/images/img_publisher.png differ diff --git a/sergeev_evgenii_lab_4/images/img_slow.png b/sergeev_evgenii_lab_4/images/img_slow.png new file mode 100644 index 0000000..5d8c62e Binary files /dev/null and b/sergeev_evgenii_lab_4/images/img_slow.png differ diff --git a/sergeev_evgenii_lab_4/images/img_with_consumer_slow.png b/sergeev_evgenii_lab_4/images/img_with_consumer_slow.png new file mode 100644 index 0000000..6f21f0c Binary files /dev/null and b/sergeev_evgenii_lab_4/images/img_with_consumer_slow.png differ diff --git a/sergeev_evgenii_lab_4/images/img_with_consumers.png b/sergeev_evgenii_lab_4/images/img_with_consumers.png new file mode 100644 index 0000000..3faea87 Binary files /dev/null and b/sergeev_evgenii_lab_4/images/img_with_consumers.png differ diff --git a/sergeev_evgenii_lab_4/images/img_with_two_slow_consumers.png b/sergeev_evgenii_lab_4/images/img_with_two_slow_consumers.png new file mode 100644 index 0000000..3d972a9 Binary files /dev/null and b/sergeev_evgenii_lab_4/images/img_with_two_slow_consumers.png differ diff --git a/sergeev_evgenii_lab_4/images/img_without_consumers.png b/sergeev_evgenii_lab_4/images/img_without_consumers.png new file mode 100644 index 0000000..e6d2ef1 Binary files /dev/null and b/sergeev_evgenii_lab_4/images/img_without_consumers.png differ diff --git a/sergeev_evgenii_lab_4/publisher.py b/sergeev_evgenii_lab_4/publisher.py new file mode 100644 index 0000000..dc862f0 --- /dev/null +++ b/sergeev_evgenii_lab_4/publisher.py @@ -0,0 +1,31 @@ +import pika +import time +import json +import random + + +class LecturePublisher: + def __init__(self): + self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + self.channel = self.connection.channel() + self.channel.exchange_declare(exchange='lecture_exchange', exchange_type='fanout') + + def publish_lecture_event(self): + lecture_event = { + 'event_type': 'new_lecture', + 'lecture_id': random.randint(1, 100), + 'topic': f'Lecture on Topic {random.randint(1, 10)}', + 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') + } + self.channel.basic_publish(exchange='lecture_exchange', routing_key='', body=json.dumps(lecture_event)) + print(f" [x] Sent lecture event: {lecture_event}") + + +if __name__ == '__main__': + lecture_publisher = LecturePublisher() + try: + while True: + lecture_publisher.publish_lecture_event() + time.sleep(1) + except KeyboardInterrupt: + lecture_publisher.connection.close()