Compare commits

..

2 Commits

Author SHA1 Message Date
17594169bf Merge pull request 'lazarev_andrey_lab_4' (#141) from lazarev_andrey_lab_4 into main
Reviewed-on: #141
2024-11-25 21:19:34 +04:00
08b8a6f487 lazarev_andrey_lab_4 2024-11-04 02:01:55 +04:00
18 changed files with 300 additions and 0 deletions

View File

@ -0,0 +1,51 @@
# Лабораторная работа №4
## Результаты уроков
### Урок №1
![](tutorial_1/tutorial_1.png "")
### Урок №2
![](tutorial_2/tutorial_2.png "")
### Урок №3
![](tutorial_3/tutorial_3.png "")
## Описание самостоятельного проекта
Проект разворачивает 3 программы в отдельных контейнерах с использованием Docker Compose:
1. **publisher** - издатель, отправляющий случайные сообщения из списка раз в секунду;
2. **consumer_1** - потребитель, обрабатывающий сообщения с задержкой в 5 секунд;
3. **consumer_2** - потребитель, обрабатывающий сообщения без задержки;
### Отправляемые сообщения
- Новая публикация
- Добавлен автор
- Ошибка!!!
- чел..
###### *Комментарии указаны в файлах
## Анализ очередей
### publish_queue_fast
![](RabbitMQ_app/report_fast.png "")
- Очередь без задержек не содержит в себе сообщений, тем самым график держится на нуле.
- Скорость прихода сообщений - 1 секунда, а после моментальная обработка.
### publish_queue_slow
![](RabbitMQ_app/report_slow.png "")
- Очередь с задержкой копит сообщения - график растет.
- Скорость прихода сообщений - 1 секунда, скорость обработки 0.2 сообщения в секунду, что и приводит к накоплению сообщений.
## Видеодемонстрация работоспособности
[Демонстрация работы сервиса](https://files.ulstu.ru/s/mbXq3CMgafMMDH2)

View File

@ -0,0 +1,16 @@
# Используем официальный образ Python 3.10 на основе slim, который является легковесной версией
FROM python:3.10-slim
# Устанавливаем рабочую директорию внутри контейнера
WORKDIR /app
# Устанавливаем библиотеку pika для взаимодействия с RabbitMQ
RUN pip install pika
# Копируем содержимое текущей директории на хосте в директорию /app внутри контейнера
COPY . /app/
# Указываем команду, которая будет выполняться при запуске контейнера
CMD ["python", "publisher.py"]

View File

@ -0,0 +1,27 @@
import pika
import time
def callback(ch, method, properties, body):
print(f" [Consumer 1] {body.decode('utf-8')}")
time.sleep(5)
ch.basic_ack(delivery_tag=method.delivery_tag)
#подключение к брокеру
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
#определение обмена
channel.exchange_declare(exchange='publish_logs', exchange_type='fanout')
#определение новой очереди и привязка к обмену
queue_name = "publish_logs_slow"
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='publish_logs', queue=queue_name)
print(' [*] Consumer 1 working. To exit press CTRL+C')
#обозначение функции обработки сообщений
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
#начало обработки
channel.start_consuming()

View File

@ -0,0 +1,25 @@
import pika
def callback(ch, method, properties, body):
print(f" [Consumer 2] {body.decode('utf-8')}")
ch.basic_ack(delivery_tag=method.delivery_tag)
#подключение к брокеру
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
#определение обмена
channel.exchange_declare(exchange='publish_logs', exchange_type='fanout')
#определение новой очереди и привязка к обмену
queue_name = "publish_queue_fast"
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='publish_logs', queue=queue_name)
print(' [*] Consumer 2 working. To exit press CTRL+C')
#обозначение функции обработки сообщений
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
#начало обработки
channel.start_consuming()

View File

@ -0,0 +1,44 @@
services:
rabbitmq: # Сервис для RabbitMQ (брокера сообщений)
image: rabbitmq:3-management # Используемый образ RabbitMQ с веб-интерфейсом управления
ports:
- "5672:5672" # Порт для подключения клиентов к RabbitMQ
- "15672:15672" # Порт для доступа к веб-интерфейсу управления
environment: # Переменные окружения для настройки RabbitMQ
- RABBITMQ_DEFAULT_USER=guest # Имя пользователя по умолчанию
- RABBITMQ_DEFAULT_PASS=guest # Пароль пользователя по умолчанию
healthcheck:
test: ["CMD", "rabbitmqctl", "status"] # Проверка работоспособности RabbitMQ
interval: 10s # Интервал между проверками (10 секунд)
timeout: 5s # Тайм-аут для проверки (5 секунд)
retries: 5 # Количество попыток при неудачных проверках
publisher: # Сервис для запуска издателя (publisher)
build:
context: . # Использует текущую директорию для сборки Docker-образа
command: python publisher.py # Команда для запуска издателя
environment:
- PYTHONUNBUFFERED=1 # Отключение буферизации вывода Python
depends_on:
rabbitmq:
condition: service_healthy # Ожидает готовности RabbitMQ перед запуском
consumer_1: # Сервис для запуска первого потребителя (consumer_1)
build:
context: . # Использует текущую директорию для сборки Docker-образа
command: python consumer_1.py # Команда для запуска первого потребителя
environment:
- PYTHONUNBUFFERED=1 # Отключение буферизации вывода Python
depends_on:
rabbitmq:
condition: service_healthy # Ожидает готовности RabbitMQ перед запуском
consumer_2: # Сервис для запуска второго потребителя (consumer_2)
build:
context: . # Использует текущую директорию для сборки Docker-образа
command: python consumer_2.py # Команда для запуска второго потребителя
environment:
- PYTHONUNBUFFERED=1 # Отключение буферизации вывода Python
depends_on:
rabbitmq:
condition: service_healthy # Ожидает готовности RabbitMQ перед запуском

View File

@ -0,0 +1,25 @@
import pika
import random
import time
#подключение к брокеру
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
#определение обмена
channel.exchange_declare(exchange='publish_logs', exchange_type='fanout')
#список сообщений
messages = [
"Новая публикация",
"Добавлен автор",
"Ошибка!!!",
"чел.."
]
#цикл для бесконечной отправки сообщений консюмерам
while True:
message = random.choice(messages)
channel.basic_publish(exchange='publish_logs', routing_key='', body=message)
print(f" [x] Sent {message}")
time.sleep(1)

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

View File

@ -0,0 +1,25 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@ -0,0 +1,11 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

View File

@ -0,0 +1,19 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 68 KiB

View File

@ -0,0 +1,22 @@
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

View File

@ -0,0 +1,13 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,22 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

Binary file not shown.

After

Width:  |  Height:  |  Size: 92 KiB