Lab finished

This commit is contained in:
2025-10-17 16:48:10 +04:00
parent 7731f1a120
commit 62117ab2dd
13 changed files with 389 additions and 0 deletions

View File

@@ -0,0 +1,90 @@
# Лабораторная работа №4 — Работа с брокером сообщений
## 1. Цель работы
Изучение проектирования приложений при помощи брокера сообщений RabbitMQ.
## 2. Как запустить лабораторную работу
### Предварительные требования:
* Установленный RabbitMQ Server
* Python 3.9+
* Библиотека pika
### Запуск Tutorial 1
```bash
cd tut1
python receive.py
python send.py
```
![alt text](./screenshots/Tutorial1.png)
### Запуск Tutorial 2
```bash
cd tut2
python worker.py
python new_task.py
```
![alt text](./screenshots/Tutorial2.png)
### Запуск Tutorial 3
```bash
cd tut3
python receive_logs.py
python emit_log.py
```
![alt text](./screenshots/Tutorial3.png)
### Запуск основного приложения
```bash
cd app
python consumer1_slow.py
python consumer2_fast.py
python publisher.py
python consumer1_slow.py
python consumer1_slow.py
```
## 3. Структура проекта
```
tut1/ # Изучение туториала 1
tut2/ # Изучение туториала 2
tut3/ # Изучение туториала 3
app/ # Создание приложений, обрабатывающих очередь сообщений с разной скоростью
README.md # Документация проекта
```
### Описание сервисов
**tut1** — Базовая модель "производитель-потребитель". Демонстрирует отправку и получение простых сообщений через очередь.
**tut2** — Модель распределения задач между несколькими worker'ами. Демонстрирует round-robin балансировку нагрузки и механизм подтверждения обработки сообщений (acknowledgment).
**tut3** — Паттерн fanout exchange. Демонстрирует широковещательную рассылку сообщений всем подписчикам одновременно.
**app** - Система обработки заказов интернет-магазина. Publisher генерирует события заказов каждую секунду. Consumer 1 обрабатывает заказы медленно (2-3 секунды), Consumer 2 — мгновенно.
## 5. Результаты тестирования
### Сценарий 1: Один экземпляр каждого consumer
#### Параметры:
* Publisher: 1 сообщение/секунду
* Consumer 1 (slow): 2.5 секунды/сообщение
* Consumer 2 (fast): мгновенная обработка
#### Результат:
* Очередь order_processing_slow: накапливаются сообщения
* Очередь order_processing_fast: 0 сообщений (успевает обрабатывать)
#### Вывод:
Consumer 1 не справляется с нагрузкой, так как скорость обработки (0.4 сообщения/сек) меньше скорости поступления (1 сообщение/сек).
### Сценарий 2: Три экземпляра Consumer 1
#### Параметры:
* Publisher: 1 сообщение/секунду
* 3× Consumer 1 (slow): 2.5 секунды/сообщение каждый
* Consumer 2 (fast): мгновенная обработка
#### Результат:
* Очередь order_processing_slow: Спустя некоторое время пустеет и не копит более 1-2х сообщений
* Сообщения равномерно распределяются между тремя consumers
RabbitMQ автоматически балансирует нагрузку (round-robin)
#### Вывод:
Три consumer справляются с нагрузкой. Суммарная скорость обработки (1.2 сообщения/сек) превышает скорость поступления (1 сообщение/сек). Демонстрируется горизонтальное масштабирование.
## 6. Используемые технологии
1. **Python 3.9** — язык программирования
2. **RabbitMQ** — брокер сообщений
3. **Pika** — Python клиент для RabbitMQ
4. **RabbitMQ Management UI** — веб-интерфейс для мониторинга
## 7. Видео
Ссылка на видео работы RabbitMQ: [Rutube](https://rutube.ru/video/private/d4dc5613005afbbe601862d54b248d36/?p=De8RTMImQZQtab3avfH2Zg)

View File

@@ -0,0 +1,58 @@
import pika
import json
import time
import random
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='fanout')
queue_name = 'order_processing_slow'
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange='orders', queue=queue_name)
print(f' [*] Consumer 1 (SLOW) started')
print(f' [*] Queue: {queue_name}')
print(f' [*] Processing time: 2-3 seconds per order')
print(' [*] Waiting for orders. To exit press CTRL+C')
def callback(ch, method, properties, body):
try:
order = json.loads(body.decode())
print(f" [x] Received order #{order['order_id']}: "
f"{order['product']} - ${order['amount']}")
print(f" Processing order #{order['order_id']}...")
processing_time = random.uniform(2.0, 3.0)
time.sleep(processing_time)
print(f" [✓] Order #{order['order_id']} processed successfully "
f"(took {processing_time:.1f}s)")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f" [!] Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=queue_name,
on_message_callback=callback
)
try:
channel.start_consuming()
except KeyboardInterrupt:
print('\n [*] Consumer 1 stopped')
finally:
connection.close()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,51 @@
import pika
import json
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='fanout')
queue_name = 'order_processing_fast'
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange='orders', queue=queue_name)
print(f' [*] Consumer 2 (FAST) started')
print(f' [*] Queue: {queue_name}')
print(f' [*] Processing time: instant')
print(' [*] Waiting for orders. To exit press CTRL+C')
def callback(ch, method, properties, body):
try:
order = json.loads(body.decode())
print(f" [x] Received order #{order['order_id']}: "
f"{order['product']} - ${order['amount']}")
print(f" [✓] Order #{order['order_id']} processed instantly")
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f" [!] Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=queue_name,
on_message_callback=callback
)
try:
channel.start_consuming()
except KeyboardInterrupt:
print('\n [*] Consumer 2 stopped')
finally:
connection.close()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,53 @@
import pika
import json
import time
from datetime import datetime
import random
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='fanout')
print(' [*] Publisher started. Publishing orders every second...')
print(' [*] Press CTRL+C to stop')
order_id = 1
try:
while True:
order = {
'order_id': order_id,
'customer': f'Customer_{random.randint(1, 100)}',
'product': random.choice([
'Laptop', 'Phone', 'Tablet',
'Headphones', 'Monitor', 'Keyboard'
]),
'amount': round(random.uniform(10.0, 1000.0), 2),
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
message = json.dumps(order, ensure_ascii=False)
channel.basic_publish(
exchange='orders',
routing_key='',
body=message
)
print(f" [x] Sent order #{order_id}: {order['product']} "
f"for ${order['amount']} from {order['customer']}")
order_id += 1
time.sleep(1)
except KeyboardInterrupt:
print('\n [*] Publisher stopped')
finally:
connection.close()
if __name__ == '__main__':
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 45 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

View File

@@ -0,0 +1,21 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
channel.basic_consume(
queue='hello',
on_message_callback=callback,
auto_ack=True
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

View File

@@ -0,0 +1,18 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!'
)
print(" [x] Sent 'Hello World!'")
connection.close()

View File

@@ -0,0 +1,23 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World...!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
)
)
print(f" [x] Sent '{message}'")
connection.close()

View File

@@ -0,0 +1,29 @@
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='task_queue',
on_message_callback=callback
)
channel.start_consuming()

View File

@@ -0,0 +1,20 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(
exchange='logs',
routing_key='',
body=message
)
print(f" [x] Sent '{message}'")
connection.close()

View File

@@ -0,0 +1,26 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body.decode()}")
channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=True
)
channel.start_consuming()