Merge pull request 'presnyakova_victoria_lab4' (#135) from presnyakova_victoria_lab4 into main

Reviewed-on: #135
This commit is contained in:
Alexey 2024-11-25 21:13:18 +04:00
commit 65de84d901
18 changed files with 258 additions and 0 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

View File

@ -0,0 +1,25 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@ -0,0 +1,11 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 88 KiB

View File

@ -0,0 +1,19 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,22 @@
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

View File

@ -0,0 +1,13 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

View File

@ -0,0 +1,22 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

View File

@ -0,0 +1,19 @@
from flask import Flask, render_template, request
from publisher import publish_messages
app = Flask(__name__)
@app.route('/')
def index():
return "это publisher"
@app.route('/publish', methods=['GET'])
def publish():
publish_messages()
return 'Начало детекции дефектов'
if __name__ == '__main__':
app.run(debug=True)

View File

@ -0,0 +1,27 @@
import pika
def callback(ch, method, properties, body):
print("Скрипт 1 обрабатывает дефект '{}'".format(body.decode()))
def consume_messages():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
channel.queue_declare(queue='example_queue')
channel.queue_bind(exchange=exchange_name, queue='example_queue')
channel.basic_consume(queue='example_queue', on_message_callback=callback, auto_ack=True)
print('Скрипт 1 ожидает дефектов')
try:
channel.start_consuming()
except KeyboardInterrupt:
print('Прервано. Останавливаем...')
channel.stop_consuming()
connection.close()
if __name__ == '__main__':
consume_messages()

View File

@ -0,0 +1,27 @@
import pika
import time
def callback(ch, method, properties, body):
print("Скрипт 2 обрабатывает дефект '{}'".format(body.decode()))
time.sleep(2)
def consume_messages():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
channel.queue_declare(queue='example_queue_2')
channel.queue_bind(exchange=exchange_name, queue='example_queue_2')
channel.basic_consume(queue='example_queue_2', on_message_callback=callback, auto_ack=True)
print('Скрипт 2 ожидает дефектов')
try:
channel.start_consuming()
except KeyboardInterrupt:
print('Прервано. Останавливаем...')
channel.stop_consuming()
connection.close()
if __name__ == '__main__':
consume_messages()

View File

@ -0,0 +1,19 @@
import pika
import time
def publish_messages():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
channel.queue_declare(queue='example_queue')
# Отдаем сообщение в 'logs' exchange каждую секунду
while True:
message = "Дефект обнаружен" # Сообщение
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f"Отправлено сообщение: {message}")
time.sleep(1)
print("Отправлено '{}'".format(message))
connection.close()

View File

@ -0,0 +1,54 @@
# Лабораторная работа №4 - Работа с брокером сообщений
Lesson 1:
![img.png](lessons/lesson_1/img.png) ![img.png](lessons/lesson_1/img_1.png)
Lesson 2:
![img.png](lessons/lesson_2/img.png) ![img.png](lessons/lesson_2/img_1.png)
Lesson 3:
![img.png](lessons/lesson_3/img.png) ![img.png](lessons/lesson_3/img_1.png)
# Задачи
Необходимо выбрать предметную область и разработать следующие приложения:
1. **Publisher**.
Программа, которая создаёт один **exchange** с типом _fanout_.
Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области.
Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт".
2. **Consumer 1**.
Программа, которая создаёт под себя отдельную не анонимную (!) очередь (**queue**) (то есть имя queue НЕ пустая строка), создаёт **binding** на **exchange** и начинает принимать сообщения (_consume_).
Программа должна обрабатывать сообщения 2-3 секунды.
Можно реализовать через обычный _Thread.Sleep_ (для C#).
3. **Consumer 2**.
Аналогично _Consumer 1_, только сообщения необходимо обрабатывать моментально.
Только имя очереди должно отличаться от _Consumer 1_.
Далее необходимо собрать и запустить приложения одновременно по одному экземпляру.
# Запуск
Проект запускается в ide просто по нажатию у питон файла на функцию мейн.
Нужно последовательно запустить функцию мейн у файлов app.py, consumer1.py, consumer2.py.
Очередь сообщений запускается такой командой
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# Описание работы:
Развернули два приложения
Сервисы используем из предыдущей работы
Предметная область - Скрипты и дефекты
1. Consumer 1 - Скрипт 1:
2. Consumer 2 - Скрипт 2:
Оба скрипта обрабатывают дефекты.
Flask-приложение с RabbitMQ, использующего библиотеку pika для publisher и для consumers.
Надо обязательно вызвать метод publish, иначе обработка дефектов не начнется.
Приложение Flask (app.py), для источника дефектов (publisher) и двух скриптов-потребителей (consumer1.py и consumer2.py).
Запускаем приложение Flask и обоих потребителей запускаем отдельно в разных терминалах.
Consumer ы будут прослушивать сообщения, опубликованные publisher ом.
![img.png](screens/img.png)
# Ссылка на видео
https://drive.google.com/file/d/1P8kKSZAre24I04DXL1WqkPtdL6RnDOPA/view?usp=sharing

Binary file not shown.

After

Width:  |  Height:  |  Size: 84 KiB