Compare commits
No commits in common. "17594169bf8930e6f60edf3584795f9c755d513b" and "e065588c61ed9a1f566acc017d925eb5af2771e9" have entirely different histories.
17594169bf
...
e065588c61
@ -1,51 +0,0 @@
|
|||||||
# Лабораторная работа №4
|
|
||||||
|
|
||||||
## Результаты уроков
|
|
||||||
|
|
||||||
### Урок №1
|
|
||||||
|
|
||||||

|
|
||||||
|
|
||||||
### Урок №2
|
|
||||||
|
|
||||||

|
|
||||||
|
|
||||||
### Урок №3
|
|
||||||
|
|
||||||

|
|
||||||
|
|
||||||
## Описание самостоятельного проекта
|
|
||||||
|
|
||||||
Проект разворачивает 3 программы в отдельных контейнерах с использованием Docker Compose:
|
|
||||||
1. **publisher** - издатель, отправляющий случайные сообщения из списка раз в секунду;
|
|
||||||
2. **consumer_1** - потребитель, обрабатывающий сообщения с задержкой в 5 секунд;
|
|
||||||
3. **consumer_2** - потребитель, обрабатывающий сообщения без задержки;
|
|
||||||
|
|
||||||
### Отправляемые сообщения
|
|
||||||
- Новая публикация
|
|
||||||
- Добавлен автор
|
|
||||||
- Ошибка!!!
|
|
||||||
- чел..
|
|
||||||
|
|
||||||
###### *Комментарии указаны в файлах
|
|
||||||
|
|
||||||
## Анализ очередей
|
|
||||||
|
|
||||||
### publish_queue_fast
|
|
||||||
|
|
||||||

|
|
||||||
|
|
||||||
- Очередь без задержек не содержит в себе сообщений, тем самым график держится на нуле.
|
|
||||||
- Скорость прихода сообщений - 1 секунда, а после моментальная обработка.
|
|
||||||
|
|
||||||
### publish_queue_slow
|
|
||||||
|
|
||||||

|
|
||||||
|
|
||||||
- Очередь с задержкой копит сообщения - график растет.
|
|
||||||
- Скорость прихода сообщений - 1 секунда, скорость обработки 0.2 сообщения в секунду, что и приводит к накоплению сообщений.
|
|
||||||
|
|
||||||
## Видеодемонстрация работоспособности
|
|
||||||
|
|
||||||
[Демонстрация работы сервиса](https://files.ulstu.ru/s/mbXq3CMgafMMDH2)
|
|
||||||
|
|
@ -1,16 +0,0 @@
|
|||||||
# Используем официальный образ Python 3.10 на основе slim, который является легковесной версией
|
|
||||||
FROM python:3.10-slim
|
|
||||||
|
|
||||||
# Устанавливаем рабочую директорию внутри контейнера
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
# Устанавливаем библиотеку pika для взаимодействия с RabbitMQ
|
|
||||||
RUN pip install pika
|
|
||||||
|
|
||||||
# Копируем содержимое текущей директории на хосте в директорию /app внутри контейнера
|
|
||||||
COPY . /app/
|
|
||||||
|
|
||||||
# Указываем команду, которая будет выполняться при запуске контейнера
|
|
||||||
CMD ["python", "publisher.py"]
|
|
||||||
|
|
||||||
|
|
@ -1,27 +0,0 @@
|
|||||||
import pika
|
|
||||||
import time
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [Consumer 1] {body.decode('utf-8')}")
|
|
||||||
time.sleep(5)
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
||||||
|
|
||||||
#подключение к брокеру
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
#определение обмена
|
|
||||||
channel.exchange_declare(exchange='publish_logs', exchange_type='fanout')
|
|
||||||
|
|
||||||
#определение новой очереди и привязка к обмену
|
|
||||||
queue_name = "publish_logs_slow"
|
|
||||||
channel.queue_declare(queue=queue_name)
|
|
||||||
channel.queue_bind(exchange='publish_logs', queue=queue_name)
|
|
||||||
|
|
||||||
print(' [*] Consumer 1 working. To exit press CTRL+C')
|
|
||||||
|
|
||||||
#обозначение функции обработки сообщений
|
|
||||||
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
|
|
||||||
|
|
||||||
#начало обработки
|
|
||||||
channel.start_consuming()
|
|
@ -1,25 +0,0 @@
|
|||||||
import pika
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [Consumer 2] {body.decode('utf-8')}")
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
||||||
|
|
||||||
#подключение к брокеру
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
#определение обмена
|
|
||||||
channel.exchange_declare(exchange='publish_logs', exchange_type='fanout')
|
|
||||||
|
|
||||||
#определение новой очереди и привязка к обмену
|
|
||||||
queue_name = "publish_queue_fast"
|
|
||||||
channel.queue_declare(queue=queue_name)
|
|
||||||
channel.queue_bind(exchange='publish_logs', queue=queue_name)
|
|
||||||
|
|
||||||
print(' [*] Consumer 2 working. To exit press CTRL+C')
|
|
||||||
|
|
||||||
#обозначение функции обработки сообщений
|
|
||||||
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
|
|
||||||
|
|
||||||
#начало обработки
|
|
||||||
channel.start_consuming()
|
|
@ -1,44 +0,0 @@
|
|||||||
services:
|
|
||||||
rabbitmq: # Сервис для RabbitMQ (брокера сообщений)
|
|
||||||
image: rabbitmq:3-management # Используемый образ RabbitMQ с веб-интерфейсом управления
|
|
||||||
ports:
|
|
||||||
- "5672:5672" # Порт для подключения клиентов к RabbitMQ
|
|
||||||
- "15672:15672" # Порт для доступа к веб-интерфейсу управления
|
|
||||||
environment: # Переменные окружения для настройки RabbitMQ
|
|
||||||
- RABBITMQ_DEFAULT_USER=guest # Имя пользователя по умолчанию
|
|
||||||
- RABBITMQ_DEFAULT_PASS=guest # Пароль пользователя по умолчанию
|
|
||||||
healthcheck:
|
|
||||||
test: ["CMD", "rabbitmqctl", "status"] # Проверка работоспособности RabbitMQ
|
|
||||||
interval: 10s # Интервал между проверками (10 секунд)
|
|
||||||
timeout: 5s # Тайм-аут для проверки (5 секунд)
|
|
||||||
retries: 5 # Количество попыток при неудачных проверках
|
|
||||||
|
|
||||||
publisher: # Сервис для запуска издателя (publisher)
|
|
||||||
build:
|
|
||||||
context: . # Использует текущую директорию для сборки Docker-образа
|
|
||||||
command: python publisher.py # Команда для запуска издателя
|
|
||||||
environment:
|
|
||||||
- PYTHONUNBUFFERED=1 # Отключение буферизации вывода Python
|
|
||||||
depends_on:
|
|
||||||
rabbitmq:
|
|
||||||
condition: service_healthy # Ожидает готовности RabbitMQ перед запуском
|
|
||||||
|
|
||||||
consumer_1: # Сервис для запуска первого потребителя (consumer_1)
|
|
||||||
build:
|
|
||||||
context: . # Использует текущую директорию для сборки Docker-образа
|
|
||||||
command: python consumer_1.py # Команда для запуска первого потребителя
|
|
||||||
environment:
|
|
||||||
- PYTHONUNBUFFERED=1 # Отключение буферизации вывода Python
|
|
||||||
depends_on:
|
|
||||||
rabbitmq:
|
|
||||||
condition: service_healthy # Ожидает готовности RabbitMQ перед запуском
|
|
||||||
|
|
||||||
consumer_2: # Сервис для запуска второго потребителя (consumer_2)
|
|
||||||
build:
|
|
||||||
context: . # Использует текущую директорию для сборки Docker-образа
|
|
||||||
command: python consumer_2.py # Команда для запуска второго потребителя
|
|
||||||
environment:
|
|
||||||
- PYTHONUNBUFFERED=1 # Отключение буферизации вывода Python
|
|
||||||
depends_on:
|
|
||||||
rabbitmq:
|
|
||||||
condition: service_healthy # Ожидает готовности RabbitMQ перед запуском
|
|
@ -1,25 +0,0 @@
|
|||||||
import pika
|
|
||||||
import random
|
|
||||||
import time
|
|
||||||
|
|
||||||
#подключение к брокеру
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
#определение обмена
|
|
||||||
channel.exchange_declare(exchange='publish_logs', exchange_type='fanout')
|
|
||||||
|
|
||||||
#список сообщений
|
|
||||||
messages = [
|
|
||||||
"Новая публикация",
|
|
||||||
"Добавлен автор",
|
|
||||||
"Ошибка!!!",
|
|
||||||
"чел.."
|
|
||||||
]
|
|
||||||
|
|
||||||
#цикл для бесконечной отправки сообщений консюмерам
|
|
||||||
while True:
|
|
||||||
message = random.choice(messages)
|
|
||||||
channel.basic_publish(exchange='publish_logs', routing_key='', body=message)
|
|
||||||
print(f" [x] Sent {message}")
|
|
||||||
time.sleep(1)
|
|
Binary file not shown.
Before Width: | Height: | Size: 39 KiB |
Binary file not shown.
Before Width: | Height: | Size: 42 KiB |
@ -1,25 +0,0 @@
|
|||||||
import pika, sys, os
|
|
||||||
|
|
||||||
def main():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='hello')
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [x] Received {body}")
|
|
||||||
|
|
||||||
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
|
||||||
|
|
||||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
|
||||||
channel.start_consuming()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
try:
|
|
||||||
main()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print('Interrupted')
|
|
||||||
try:
|
|
||||||
sys.exit(0)
|
|
||||||
except SystemExit:
|
|
||||||
os._exit(0)
|
|
@ -1,11 +0,0 @@
|
|||||||
import pika
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='hello')
|
|
||||||
|
|
||||||
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
|
|
||||||
print(" [x] Sent 'Hello World!'")
|
|
||||||
connection.close()
|
|
Binary file not shown.
Before Width: | Height: | Size: 14 KiB |
@ -1,19 +0,0 @@
|
|||||||
import pika
|
|
||||||
import sys
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='task_queue', durable=True)
|
|
||||||
|
|
||||||
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
|
||||||
channel.basic_publish(
|
|
||||||
exchange='',
|
|
||||||
routing_key='task_queue',
|
|
||||||
body=message,
|
|
||||||
properties=pika.BasicProperties(
|
|
||||||
delivery_mode=pika.DeliveryMode.Persistent
|
|
||||||
))
|
|
||||||
print(f" [x] Sent {message}")
|
|
||||||
connection.close()
|
|
Binary file not shown.
Before Width: | Height: | Size: 68 KiB |
@ -1,22 +0,0 @@
|
|||||||
import pika
|
|
||||||
import time
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='task_queue', durable=True)
|
|
||||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
|
||||||
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [x] Received {body.decode()}")
|
|
||||||
time.sleep(body.count(b'.'))
|
|
||||||
print(" [x] Done")
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
||||||
|
|
||||||
|
|
||||||
channel.basic_qos(prefetch_count=1)
|
|
||||||
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
|
||||||
|
|
||||||
channel.start_consuming()
|
|
@ -1,13 +0,0 @@
|
|||||||
import pika
|
|
||||||
import sys
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
|
||||||
|
|
||||||
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
|
||||||
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
|
||||||
print(f" [x] Sent {message}")
|
|
||||||
connection.close()
|
|
@ -1,22 +0,0 @@
|
|||||||
import pika
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
|
||||||
|
|
||||||
result = channel.queue_declare(queue='', exclusive=True)
|
|
||||||
queue_name = result.method.queue
|
|
||||||
|
|
||||||
channel.queue_bind(exchange='logs', queue=queue_name)
|
|
||||||
|
|
||||||
print(' [*] Waiting for logs. To exit press CTRL+C')
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [x] {body}")
|
|
||||||
|
|
||||||
channel.basic_consume(
|
|
||||||
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
|
||||||
|
|
||||||
channel.start_consuming()
|
|
Binary file not shown.
Before Width: | Height: | Size: 92 KiB |
Loading…
x
Reference in New Issue
Block a user