diff --git a/shadaev_anton_lab_4/README.MD b/shadaev_anton_lab_4/README.MD new file mode 100644 index 0000000..6fee2b4 --- /dev/null +++ b/shadaev_anton_lab_4/README.MD @@ -0,0 +1,37 @@ +# Лабораторная работа №4 - Работа с брокером сообщений + +# Задачи: + +Необходимо выбрать предметную область и разработать следующие приложения: + +1) Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт". +2) Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#). +3) Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1. + + +# Как запустить +Запустить `python 'название_файла'` + +# Работа программы + +**publisher:** + +![img.png](screenshots/img.png) + +**consumer_1:** + +![img_1.png](screenshots/img_1.png) + +**consumer_2:** + +![img_2.png](screenshots/img_2.png) + +**1-я очередь** + +![img_3.png](screenshots/img_3.png) + +**2-я очередь** + +![img_4.png](screenshots/img_4.png) + +Видео: https://youtu.be/3pxgHmm3i0Q \ No newline at end of file diff --git a/shadaev_anton_lab_4/consumer_1.py b/shadaev_anton_lab_4/consumer_1.py new file mode 100644 index 0000000..358a570 --- /dev/null +++ b/shadaev_anton_lab_4/consumer_1.py @@ -0,0 +1,37 @@ +import pika +import time +import threading + + +def process_message(ch, method, properties, body): + print(f"Получена вакансия (Consumer 1): {body.decode('utf-8')}") + time.sleep(2) + ch.basic_ack(delivery_tag=method.delivery_tag) + + +def consume_messages(channel, queue_name, exchange_name): + channel.queue_declare(queue=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + channel.basic_consume(queue=queue_name, on_message_callback=process_message) + + print("Consumer 1 начал прослушивание сообщений...") + channel.start_consuming() + + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + queue_name = 'queue1' + + consumer_thread = threading.Thread(target=consume_messages, args=(channel, queue_name, exchange_name)) + consumer_thread.start() + + consumer_thread.join() + + connection.close() + + +if __name__ == '__main__': + main() diff --git a/shadaev_anton_lab_4/consumer_2.py b/shadaev_anton_lab_4/consumer_2.py new file mode 100644 index 0000000..da1aacd --- /dev/null +++ b/shadaev_anton_lab_4/consumer_2.py @@ -0,0 +1,31 @@ +import pika + + +def process_message(ch, method, properties, body): + print(f"Получена вакансия (Consumer 2): {body.decode('utf-8')}") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +def consume_messages(channel, queue_name, exchange_name): + channel.queue_declare(queue=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + channel.basic_consume(queue=queue_name, on_message_callback=process_message) + + print("Consumer 2 начал прослушивание сообщений...") + channel.start_consuming() + + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + queue_name = 'queue2' + + consume_messages(channel, queue_name, exchange_name) + + connection.close() + + +if __name__ == '__main__': + main() diff --git a/shadaev_anton_lab_4/publisher.py b/shadaev_anton_lab_4/publisher.py new file mode 100644 index 0000000..380126f --- /dev/null +++ b/shadaev_anton_lab_4/publisher.py @@ -0,0 +1,28 @@ +import pika +import time +import random + + +def publish_event(channel, exchange_name): + events = ["Пришла страница", "Пришла вакансия", "Необходимо обработать запрос"] + while True: + event = random.choice(events) + channel.basic_publish(exchange=exchange_name, routing_key='', body=event) + print(f"Отправлено событие: {event}") + time.sleep(3) + + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + channel.exchange_declare(exchange=exchange_name, exchange_type='fanout') + + publish_event(channel, exchange_name) + + connection.close() + + +if __name__ == '__main__': + main() diff --git a/shadaev_anton_lab_4/screenshots/img.png b/shadaev_anton_lab_4/screenshots/img.png new file mode 100644 index 0000000..a95678d Binary files /dev/null and b/shadaev_anton_lab_4/screenshots/img.png differ diff --git a/shadaev_anton_lab_4/screenshots/img_1.png b/shadaev_anton_lab_4/screenshots/img_1.png new file mode 100644 index 0000000..793f9a7 Binary files /dev/null and b/shadaev_anton_lab_4/screenshots/img_1.png differ diff --git a/shadaev_anton_lab_4/screenshots/img_2.png b/shadaev_anton_lab_4/screenshots/img_2.png new file mode 100644 index 0000000..71215e1 Binary files /dev/null and b/shadaev_anton_lab_4/screenshots/img_2.png differ diff --git a/shadaev_anton_lab_4/screenshots/img_3.png b/shadaev_anton_lab_4/screenshots/img_3.png new file mode 100644 index 0000000..5c78365 Binary files /dev/null and b/shadaev_anton_lab_4/screenshots/img_3.png differ diff --git a/shadaev_anton_lab_4/screenshots/img_4.png b/shadaev_anton_lab_4/screenshots/img_4.png new file mode 100644 index 0000000..8150974 Binary files /dev/null and b/shadaev_anton_lab_4/screenshots/img_4.png differ diff --git a/shadaev_anton_lab_5/img.png b/shadaev_anton_lab_5/img.png new file mode 100644 index 0000000..66d8fcf Binary files /dev/null and b/shadaev_anton_lab_5/img.png differ