antonov_dmitry_lab4 #34

Merged
Alexey merged 3 commits from antonov_dmitry_lab4 into main 2023-12-05 22:29:55 +04:00
5 changed files with 141 additions and 0 deletions
Showing only changes of commit 60ff69f12d - Show all commits

View File

@ -0,0 +1,58 @@
# Лабораторная работа №4 - Работа с брокером сообщений
Изучение проектирования приложений при помощи брокера сообщений.
# Задачи
Необходимо выбрать предметную область и разработать следующие приложения:
* Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт".
* Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#).
* Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1.
Далее необходимо собрать и запустить приложения одновременно по одному экземпляру.
Сделать в отчёте вывод о скорости обработки consumer-ами событий от publisher-а. Для этого можно посмотреть заполненность созданных очередей. А для этого можно использовать скриншот из RabbitMQ Management UI.
Запустить несколько копий Consumer 1. Проверить заново заполненность очередей через UI.
# Запуск
Командой в консоли проекта "docker-compose up -d"
# Описание работы:
Развернули два приложения
Сервисы используем из предыдущей работы
Предметная область - врачи и пациенты
1. Сервис с врачами:
- доступ на http://localhost:5000/
2. Сервис с пациентами:
- доступ на http://localhost:5001/
Сервисы связываются друг с другом через ссылку и библиотеку requests
Flask-приложение с RabbitMQ, использующего библиотеку pika для publisher и Celery для consumers.
Приложение Flask (app.py), издателя (publisher.py) и двух потребителей (consumer.py).
Потребители реализованы как задачи Celery. Можно запускать приложение Flask
и обоих потребителей отдельно в разных терминалах.
Consumer ы будут прослушивать сообщения, опубликованные publisher ом.
<p>
<div>App</div>
<img src="screens/img1.png" width="650" title="App">
</p>
<p>
<div>Consumer</div>
<img src="screens/img2.png" width="650" title="Consumer">
</p>
<p>
<div>Publisher</div>
<img src="screens/img3.png" width="650" title="Publisher">
</p>
<p>
<div>RabbitMQ</div>
<img src="screens/img4.png" width="650" title="RabbitMQ">
</p>
# Ссылка на видео
https://disk.yandex.ru/i/3o4aLuqp1EpbJg

View File

@ -0,0 +1,6 @@
version: "2.1"
services:
rabbitmq:
image: rabbitmq:3.10.7-management
ports:
- 15672:15672

View File

@ -0,0 +1,21 @@
from celery import Celery
from flask import Flask, render_template
app = Flask(__name__)
# Celery конфигурация
app.config['CELERY_BROKER_URL'] = 'pyamqp://guest:guest@localhost//'
app.config['CELERY_RESULT_BACKEND'] = 'rpc://'
# Создаем инстанс Celery
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@app.route('/')
def index():
return render_template('index.html')
if __name__ == '__main__':
app.run(debug=True)

View File

@ -0,0 +1,33 @@
from celery import Celery
import time
import pika
app = Celery('consumer', broker='pyamqp://guest:guest@localhost//')
@app.task
def process_messages(queue_name, delay):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Объявляем очередь с именем
channel.queue_declare(queue=queue_name)
# Привязываем очередь к 'logs' exchange
channel.queue_bind(exchange='logs', queue=queue_name)
def callback(ch, method, properties, body):
print(f" [x] Получено сообщение: {body}")
time.sleep(delay)
print(f" [x] Обработано сообщение: {body}")
# Устанавливаем consumer а чтобы получать сообщения из очереди
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(f' [*] Ожидаем сообщения от {queue_name}')
channel.start_consuming()
if __name__ == '__main__':
process_messages.delay('consumer1_queue', 2) # Queue для Consumer 1 с задержкой 2 с
process_messages.delay('consumer2_queue', 0) # Queue для Consumer 2 без задержки

View File

@ -0,0 +1,23 @@
import pika
import time
connection_params = pika.ConnectionParameters(
host='localhost', # RabbitMQ server hostname
port=15672, # RabbitMQ server port
credentials=pika.PlainCredentials('guest', 'guest') # credentials
)
connection = pika.BlockingConnection(connection_params)
channel = connection.channel()
# Объявляем exchange с именем 'logs' типа 'fanout'
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# Отдаем сообщение в 'logs' exchange каждую секунду
while True:
message = "Пациент прибыл" # Сообщение
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Отправлено сообщение: {message}")
time.sleep(1)
connection.close()