forked from Alexey/DAS_2024_1
Merge pull request 'lazarev_andrey_lab_4' (#141) from lazarev_andrey_lab_4 into main
Reviewed-on: Alexey/DAS_2024_1#141
This commit is contained in:
commit
17594169bf
51
lazarev_andrey_lab_4/README.md
Normal file
51
lazarev_andrey_lab_4/README.md
Normal file
@ -0,0 +1,51 @@
|
||||
# Лабораторная работа №4
|
||||
|
||||
## Результаты уроков
|
||||
|
||||
### Урок №1
|
||||
|
||||
![](tutorial_1/tutorial_1.png "")
|
||||
|
||||
### Урок №2
|
||||
|
||||
![](tutorial_2/tutorial_2.png "")
|
||||
|
||||
### Урок №3
|
||||
|
||||
![](tutorial_3/tutorial_3.png "")
|
||||
|
||||
## Описание самостоятельного проекта
|
||||
|
||||
Проект разворачивает 3 программы в отдельных контейнерах с использованием Docker Compose:
|
||||
1. **publisher** - издатель, отправляющий случайные сообщения из списка раз в секунду;
|
||||
2. **consumer_1** - потребитель, обрабатывающий сообщения с задержкой в 5 секунд;
|
||||
3. **consumer_2** - потребитель, обрабатывающий сообщения без задержки;
|
||||
|
||||
### Отправляемые сообщения
|
||||
- Новая публикация
|
||||
- Добавлен автор
|
||||
- Ошибка!!!
|
||||
- чел..
|
||||
|
||||
###### *Комментарии указаны в файлах
|
||||
|
||||
## Анализ очередей
|
||||
|
||||
### publish_queue_fast
|
||||
|
||||
![](RabbitMQ_app/report_fast.png "")
|
||||
|
||||
- Очередь без задержек не содержит в себе сообщений, тем самым график держится на нуле.
|
||||
- Скорость прихода сообщений - 1 секунда, а после моментальная обработка.
|
||||
|
||||
### publish_queue_slow
|
||||
|
||||
![](RabbitMQ_app/report_slow.png "")
|
||||
|
||||
- Очередь с задержкой копит сообщения - график растет.
|
||||
- Скорость прихода сообщений - 1 секунда, скорость обработки 0.2 сообщения в секунду, что и приводит к накоплению сообщений.
|
||||
|
||||
## Видеодемонстрация работоспособности
|
||||
|
||||
[Демонстрация работы сервиса](https://files.ulstu.ru/s/mbXq3CMgafMMDH2)
|
||||
|
16
lazarev_andrey_lab_4/RabbitMQ_app/Dockerfile
Normal file
16
lazarev_andrey_lab_4/RabbitMQ_app/Dockerfile
Normal file
@ -0,0 +1,16 @@
|
||||
# Используем официальный образ Python 3.10 на основе slim, который является легковесной версией
|
||||
FROM python:3.10-slim
|
||||
|
||||
# Устанавливаем рабочую директорию внутри контейнера
|
||||
WORKDIR /app
|
||||
|
||||
# Устанавливаем библиотеку pika для взаимодействия с RabbitMQ
|
||||
RUN pip install pika
|
||||
|
||||
# Копируем содержимое текущей директории на хосте в директорию /app внутри контейнера
|
||||
COPY . /app/
|
||||
|
||||
# Указываем команду, которая будет выполняться при запуске контейнера
|
||||
CMD ["python", "publisher.py"]
|
||||
|
||||
|
27
lazarev_andrey_lab_4/RabbitMQ_app/consumer_1.py
Normal file
27
lazarev_andrey_lab_4/RabbitMQ_app/consumer_1.py
Normal file
@ -0,0 +1,27 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [Consumer 1] {body.decode('utf-8')}")
|
||||
time.sleep(5)
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
#подключение к брокеру
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
|
||||
channel = connection.channel()
|
||||
|
||||
#определение обмена
|
||||
channel.exchange_declare(exchange='publish_logs', exchange_type='fanout')
|
||||
|
||||
#определение новой очереди и привязка к обмену
|
||||
queue_name = "publish_logs_slow"
|
||||
channel.queue_declare(queue=queue_name)
|
||||
channel.queue_bind(exchange='publish_logs', queue=queue_name)
|
||||
|
||||
print(' [*] Consumer 1 working. To exit press CTRL+C')
|
||||
|
||||
#обозначение функции обработки сообщений
|
||||
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
|
||||
|
||||
#начало обработки
|
||||
channel.start_consuming()
|
25
lazarev_andrey_lab_4/RabbitMQ_app/consumer_2.py
Normal file
25
lazarev_andrey_lab_4/RabbitMQ_app/consumer_2.py
Normal file
@ -0,0 +1,25 @@
|
||||
import pika
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [Consumer 2] {body.decode('utf-8')}")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
#подключение к брокеру
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
|
||||
channel = connection.channel()
|
||||
|
||||
#определение обмена
|
||||
channel.exchange_declare(exchange='publish_logs', exchange_type='fanout')
|
||||
|
||||
#определение новой очереди и привязка к обмену
|
||||
queue_name = "publish_queue_fast"
|
||||
channel.queue_declare(queue=queue_name)
|
||||
channel.queue_bind(exchange='publish_logs', queue=queue_name)
|
||||
|
||||
print(' [*] Consumer 2 working. To exit press CTRL+C')
|
||||
|
||||
#обозначение функции обработки сообщений
|
||||
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
|
||||
|
||||
#начало обработки
|
||||
channel.start_consuming()
|
44
lazarev_andrey_lab_4/RabbitMQ_app/docker-compose.yml
Normal file
44
lazarev_andrey_lab_4/RabbitMQ_app/docker-compose.yml
Normal file
@ -0,0 +1,44 @@
|
||||
services:
|
||||
rabbitmq: # Сервис для RabbitMQ (брокера сообщений)
|
||||
image: rabbitmq:3-management # Используемый образ RabbitMQ с веб-интерфейсом управления
|
||||
ports:
|
||||
- "5672:5672" # Порт для подключения клиентов к RabbitMQ
|
||||
- "15672:15672" # Порт для доступа к веб-интерфейсу управления
|
||||
environment: # Переменные окружения для настройки RabbitMQ
|
||||
- RABBITMQ_DEFAULT_USER=guest # Имя пользователя по умолчанию
|
||||
- RABBITMQ_DEFAULT_PASS=guest # Пароль пользователя по умолчанию
|
||||
healthcheck:
|
||||
test: ["CMD", "rabbitmqctl", "status"] # Проверка работоспособности RabbitMQ
|
||||
interval: 10s # Интервал между проверками (10 секунд)
|
||||
timeout: 5s # Тайм-аут для проверки (5 секунд)
|
||||
retries: 5 # Количество попыток при неудачных проверках
|
||||
|
||||
publisher: # Сервис для запуска издателя (publisher)
|
||||
build:
|
||||
context: . # Использует текущую директорию для сборки Docker-образа
|
||||
command: python publisher.py # Команда для запуска издателя
|
||||
environment:
|
||||
- PYTHONUNBUFFERED=1 # Отключение буферизации вывода Python
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy # Ожидает готовности RabbitMQ перед запуском
|
||||
|
||||
consumer_1: # Сервис для запуска первого потребителя (consumer_1)
|
||||
build:
|
||||
context: . # Использует текущую директорию для сборки Docker-образа
|
||||
command: python consumer_1.py # Команда для запуска первого потребителя
|
||||
environment:
|
||||
- PYTHONUNBUFFERED=1 # Отключение буферизации вывода Python
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy # Ожидает готовности RabbitMQ перед запуском
|
||||
|
||||
consumer_2: # Сервис для запуска второго потребителя (consumer_2)
|
||||
build:
|
||||
context: . # Использует текущую директорию для сборки Docker-образа
|
||||
command: python consumer_2.py # Команда для запуска второго потребителя
|
||||
environment:
|
||||
- PYTHONUNBUFFERED=1 # Отключение буферизации вывода Python
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy # Ожидает готовности RabbitMQ перед запуском
|
25
lazarev_andrey_lab_4/RabbitMQ_app/publisher.py
Normal file
25
lazarev_andrey_lab_4/RabbitMQ_app/publisher.py
Normal file
@ -0,0 +1,25 @@
|
||||
import pika
|
||||
import random
|
||||
import time
|
||||
|
||||
#подключение к брокеру
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
|
||||
channel = connection.channel()
|
||||
|
||||
#определение обмена
|
||||
channel.exchange_declare(exchange='publish_logs', exchange_type='fanout')
|
||||
|
||||
#список сообщений
|
||||
messages = [
|
||||
"Новая публикация",
|
||||
"Добавлен автор",
|
||||
"Ошибка!!!",
|
||||
"чел.."
|
||||
]
|
||||
|
||||
#цикл для бесконечной отправки сообщений консюмерам
|
||||
while True:
|
||||
message = random.choice(messages)
|
||||
channel.basic_publish(exchange='publish_logs', routing_key='', body=message)
|
||||
print(f" [x] Sent {message}")
|
||||
time.sleep(1)
|
BIN
lazarev_andrey_lab_4/RabbitMQ_app/report_fast.png
Normal file
BIN
lazarev_andrey_lab_4/RabbitMQ_app/report_fast.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 39 KiB |
BIN
lazarev_andrey_lab_4/RabbitMQ_app/report_slow.png
Normal file
BIN
lazarev_andrey_lab_4/RabbitMQ_app/report_slow.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 42 KiB |
25
lazarev_andrey_lab_4/tutorial_1/receive.py
Normal file
25
lazarev_andrey_lab_4/tutorial_1/receive.py
Normal file
@ -0,0 +1,25 @@
|
||||
import pika, sys, os
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body}")
|
||||
|
||||
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
||||
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
channel.start_consuming()
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print('Interrupted')
|
||||
try:
|
||||
sys.exit(0)
|
||||
except SystemExit:
|
||||
os._exit(0)
|
11
lazarev_andrey_lab_4/tutorial_1/send.py
Normal file
11
lazarev_andrey_lab_4/tutorial_1/send.py
Normal file
@ -0,0 +1,11 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
|
||||
print(" [x] Sent 'Hello World!'")
|
||||
connection.close()
|
BIN
lazarev_andrey_lab_4/tutorial_1/tutorial_1.png
Normal file
BIN
lazarev_andrey_lab_4/tutorial_1/tutorial_1.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 14 KiB |
19
lazarev_andrey_lab_4/tutorial_2/new_task.py
Normal file
19
lazarev_andrey_lab_4/tutorial_2/new_task.py
Normal file
@ -0,0 +1,19 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
||||
channel.basic_publish(
|
||||
exchange='',
|
||||
routing_key='task_queue',
|
||||
body=message,
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=pika.DeliveryMode.Persistent
|
||||
))
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
BIN
lazarev_andrey_lab_4/tutorial_2/tutorial_2.png
Normal file
BIN
lazarev_andrey_lab_4/tutorial_2/tutorial_2.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 68 KiB |
22
lazarev_andrey_lab_4/tutorial_2/worker.py
Normal file
22
lazarev_andrey_lab_4/tutorial_2/worker.py
Normal file
@ -0,0 +1,22 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body.decode()}")
|
||||
time.sleep(body.count(b'.'))
|
||||
print(" [x] Done")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
||||
|
||||
channel.start_consuming()
|
13
lazarev_andrey_lab_4/tutorial_3/emit_log.py
Normal file
13
lazarev_andrey_lab_4/tutorial_3/emit_log.py
Normal file
@ -0,0 +1,13 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
||||
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
0
lazarev_andrey_lab_4/tutorial_3/receive_logs.log
Normal file
0
lazarev_andrey_lab_4/tutorial_3/receive_logs.log
Normal file
22
lazarev_andrey_lab_4/tutorial_3/receive_logs.py
Normal file
22
lazarev_andrey_lab_4/tutorial_3/receive_logs.py
Normal file
@ -0,0 +1,22 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
result = channel.queue_declare(queue='', exclusive=True)
|
||||
queue_name = result.method.queue
|
||||
|
||||
channel.queue_bind(exchange='logs', queue=queue_name)
|
||||
|
||||
print(' [*] Waiting for logs. To exit press CTRL+C')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] {body}")
|
||||
|
||||
channel.basic_consume(
|
||||
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
||||
|
||||
channel.start_consuming()
|
BIN
lazarev_andrey_lab_4/tutorial_3/tutorial_3.png
Normal file
BIN
lazarev_andrey_lab_4/tutorial_3/tutorial_3.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 92 KiB |
Loading…
Reference in New Issue
Block a user