diff --git a/gusev_vladislav_lab_4/README.md b/gusev_vladislav_lab_4/README.md new file mode 100644 index 0000000..8e95bf7 --- /dev/null +++ b/gusev_vladislav_lab_4/README.md @@ -0,0 +1,37 @@ +# Лабораторная работа №4 - Работа с брокером сообщений + +# Задачи: + +Необходимо выбрать предметную область и разработать следующие приложения: + +1) Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт". +2) Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#). +3) Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1. + + +# Как запустить +Командой "python 'название_файла'" в текущей папке + +# Работа программы + +publisher: +![img.png](img.png) + +consumer_2: +![img_1.png](img_1.png) + +consumer_1 в одном экземпляре: +![img_2.png](img_2.png) + +второй consumer_1: +![img_3.png](img_3.png) + +наша первая очередь за 10 минут, на пике зафиксирован запуск второго экземпляра +![img_4.png](img_4.png) + +вторая очередь, пик из-за того, что consumer_2 запустился, когда уже были какие-то сообщения в очереди +![img_5.png](img_5.png) + +Можно сказать, что запуск второго экземпляра consumer_1 скорости уменьшил нагрузку на очередь. + +Видео -> https://drive.google.com/file/d/1JDzdRgUFYzMBYBt0QA5Pho9yvOsu9Ecu/view?usp=sharing \ No newline at end of file diff --git a/gusev_vladislav_lab_4/consumer_1.py b/gusev_vladislav_lab_4/consumer_1.py new file mode 100644 index 0000000..a56c2dc --- /dev/null +++ b/gusev_vladislav_lab_4/consumer_1.py @@ -0,0 +1,33 @@ +import pika +import time +import threading + +def process_message(ch, method, properties, body): + print(f"Получено сообщение (Consumer 1): {body.decode('utf-8')}") + time.sleep(2) + ch.basic_ack(delivery_tag=method.delivery_tag) + +def consume_messages(channel, queue_name, exchange_name): + channel.queue_declare(queue=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + channel.basic_consume(queue=queue_name, on_message_callback=process_message) + + print("Consumer 1 начал прослушивание сообщений...") + channel.start_consuming() + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + queue_name = 'queue1' + + consumer_thread = threading.Thread(target=consume_messages, args=(channel, queue_name, exchange_name)) + consumer_thread.start() + + consumer_thread.join() + + connection.close() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/gusev_vladislav_lab_4/consumer_2.py b/gusev_vladislav_lab_4/consumer_2.py new file mode 100644 index 0000000..66e9119 --- /dev/null +++ b/gusev_vladislav_lab_4/consumer_2.py @@ -0,0 +1,27 @@ +import pika + +def process_message(ch, method, properties, body): + print(f"Получено сообщение (Consumer 2): {body.decode('utf-8')}") + ch.basic_ack(delivery_tag=method.delivery_tag) + +def consume_messages(channel, queue_name, exchange_name): + channel.queue_declare(queue=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + channel.basic_consume(queue=queue_name, on_message_callback=process_message) + + print("Consumer 2 начал прослушивание сообщений...") + channel.start_consuming() + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + queue_name = 'queue2' + + consume_messages(channel, queue_name, exchange_name) + + connection.close() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/gusev_vladislav_lab_4/img.png b/gusev_vladislav_lab_4/img.png new file mode 100644 index 0000000..ff50dc3 Binary files /dev/null and b/gusev_vladislav_lab_4/img.png differ diff --git a/gusev_vladislav_lab_4/img_1.png b/gusev_vladislav_lab_4/img_1.png new file mode 100644 index 0000000..1aa5ca2 Binary files /dev/null and b/gusev_vladislav_lab_4/img_1.png differ diff --git a/gusev_vladislav_lab_4/img_2.png b/gusev_vladislav_lab_4/img_2.png new file mode 100644 index 0000000..006ee61 Binary files /dev/null and b/gusev_vladislav_lab_4/img_2.png differ diff --git a/gusev_vladislav_lab_4/img_3.png b/gusev_vladislav_lab_4/img_3.png new file mode 100644 index 0000000..06cfbc1 Binary files /dev/null and b/gusev_vladislav_lab_4/img_3.png differ diff --git a/gusev_vladislav_lab_4/img_4.png b/gusev_vladislav_lab_4/img_4.png new file mode 100644 index 0000000..2ef9afa Binary files /dev/null and b/gusev_vladislav_lab_4/img_4.png differ diff --git a/gusev_vladislav_lab_4/img_5.png b/gusev_vladislav_lab_4/img_5.png new file mode 100644 index 0000000..2317214 Binary files /dev/null and b/gusev_vladislav_lab_4/img_5.png differ diff --git a/gusev_vladislav_lab_4/publisher.py b/gusev_vladislav_lab_4/publisher.py new file mode 100644 index 0000000..ac5b20a --- /dev/null +++ b/gusev_vladislav_lab_4/publisher.py @@ -0,0 +1,25 @@ +import pika +import time +import random + +def publish_event(channel, exchange_name): + events = ["Пришла фотография", "Пришло сообщение", "Необходимо обработать запрос"] + while True: + event = random.choice(events) + channel.basic_publish(exchange=exchange_name, routing_key='', body=event) + print(f"Отправлено событие: {event}") + time.sleep(3) + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + channel.exchange_declare(exchange=exchange_name, exchange_type='fanout') + + publish_event(channel, exchange_name) + + connection.close() + +if __name__ == '__main__': + main() \ No newline at end of file