From 60ff69f12db2267bcd34fbbbf8c9153a29e259f1 Mon Sep 17 00:00:00 2001 From: DmitriyAntonov Date: Mon, 4 Dec 2023 20:01:33 +0400 Subject: [PATCH] good start --- antonov_dmitry_lab4/README.md | 58 +++++++++++++++++++++++++ antonov_dmitry_lab4/docker-compose.yml | 6 +++ antonov_dmitry_lab4/rabbit/app.py | 21 +++++++++ antonov_dmitry_lab4/rabbit/consumer.py | 33 ++++++++++++++ antonov_dmitry_lab4/rabbit/publisher.py | 23 ++++++++++ 5 files changed, 141 insertions(+) create mode 100644 antonov_dmitry_lab4/README.md create mode 100644 antonov_dmitry_lab4/docker-compose.yml create mode 100644 antonov_dmitry_lab4/rabbit/app.py create mode 100644 antonov_dmitry_lab4/rabbit/consumer.py create mode 100644 antonov_dmitry_lab4/rabbit/publisher.py diff --git a/antonov_dmitry_lab4/README.md b/antonov_dmitry_lab4/README.md new file mode 100644 index 0000000..146a6b0 --- /dev/null +++ b/antonov_dmitry_lab4/README.md @@ -0,0 +1,58 @@ +# Лабораторная работа №4 - Работа с брокером сообщений + +Изучение проектирования приложений при помощи брокера сообщений. + +# Задачи + +Необходимо выбрать предметную область и разработать следующие приложения: +* Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт". +* Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#). +* Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1. +Далее необходимо собрать и запустить приложения одновременно по одному экземпляру. + +Сделать в отчёте вывод о скорости обработки consumer-ами событий от publisher-а. Для этого можно посмотреть заполненность созданных очередей. А для этого можно использовать скриншот из RabbitMQ Management UI. + +Запустить несколько копий Consumer 1. Проверить заново заполненность очередей через UI. + +# Запуск + +Командой в консоли проекта "docker-compose up -d" + +# Описание работы: +Развернули два приложения +Сервисы используем из предыдущей работы +Предметная область - врачи и пациенты + +1. Сервис с врачами: +- доступ на http://localhost:5000/ + +2. Сервис с пациентами: +- доступ на http://localhost:5001/ + +Сервисы связываются друг с другом через ссылку и библиотеку requests + +Flask-приложение с RabbitMQ, использующего библиотеку pika для publisher и Celery для consumers. +Приложение Flask (app.py), издателя (publisher.py) и двух потребителей (consumer.py). +Потребители реализованы как задачи Celery. Можно запускать приложение Flask +и обоих потребителей отдельно в разных терминалах. +Consumer ы будут прослушивать сообщения, опубликованные publisher ом. +

+

App
+ +

+

+

Consumer
+ +

+

+

Publisher
+ +

+

+

RabbitMQ
+ +

+ +# Ссылка на видео +https://disk.yandex.ru/i/3o4aLuqp1EpbJg + diff --git a/antonov_dmitry_lab4/docker-compose.yml b/antonov_dmitry_lab4/docker-compose.yml new file mode 100644 index 0000000..0dd0527 --- /dev/null +++ b/antonov_dmitry_lab4/docker-compose.yml @@ -0,0 +1,6 @@ +version: "2.1" +services: + rabbitmq: + image: rabbitmq:3.10.7-management + ports: + - 15672:15672 \ No newline at end of file diff --git a/antonov_dmitry_lab4/rabbit/app.py b/antonov_dmitry_lab4/rabbit/app.py new file mode 100644 index 0000000..48d5e59 --- /dev/null +++ b/antonov_dmitry_lab4/rabbit/app.py @@ -0,0 +1,21 @@ +from celery import Celery +from flask import Flask, render_template + +app = Flask(__name__) + +# Celery конфигурация +app.config['CELERY_BROKER_URL'] = 'pyamqp://guest:guest@localhost//' +app.config['CELERY_RESULT_BACKEND'] = 'rpc://' + +# Создаем инстанс Celery +celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) +celery.conf.update(app.config) + + +@app.route('/') +def index(): + return render_template('index.html') + + +if __name__ == '__main__': + app.run(debug=True) diff --git a/antonov_dmitry_lab4/rabbit/consumer.py b/antonov_dmitry_lab4/rabbit/consumer.py new file mode 100644 index 0000000..315e23e --- /dev/null +++ b/antonov_dmitry_lab4/rabbit/consumer.py @@ -0,0 +1,33 @@ +from celery import Celery +import time +import pika + +app = Celery('consumer', broker='pyamqp://guest:guest@localhost//') + + +@app.task +def process_messages(queue_name, delay): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + # Объявляем очередь с именем + channel.queue_declare(queue=queue_name) + + # Привязываем очередь к 'logs' exchange + channel.queue_bind(exchange='logs', queue=queue_name) + + def callback(ch, method, properties, body): + print(f" [x] Получено сообщение: {body}") + time.sleep(delay) + print(f" [x] Обработано сообщение: {body}") + + # Устанавливаем consumer а чтобы получать сообщения из очереди + channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) + + print(f' [*] Ожидаем сообщения от {queue_name}') + channel.start_consuming() + + +if __name__ == '__main__': + process_messages.delay('consumer1_queue', 2) # Queue для Consumer 1 с задержкой 2 с + process_messages.delay('consumer2_queue', 0) # Queue для Consumer 2 без задержки diff --git a/antonov_dmitry_lab4/rabbit/publisher.py b/antonov_dmitry_lab4/rabbit/publisher.py new file mode 100644 index 0000000..1b419cd --- /dev/null +++ b/antonov_dmitry_lab4/rabbit/publisher.py @@ -0,0 +1,23 @@ +import pika +import time + +connection_params = pika.ConnectionParameters( + host='localhost', # RabbitMQ server hostname + port=15672, # RabbitMQ server port + credentials=pika.PlainCredentials('guest', 'guest') # credentials +) + +connection = pika.BlockingConnection(connection_params) +channel = connection.channel() + +# Объявляем exchange с именем 'logs' типа 'fanout' +channel.exchange_declare(exchange='logs', exchange_type='fanout') + +# Отдаем сообщение в 'logs' exchange каждую секунду +while True: + message = "Пациент прибыл" # Сообщение + channel.basic_publish(exchange='logs', routing_key='', body=message) + print(f" [x] Отправлено сообщение: {message}") + time.sleep(1) + +connection.close()