kurbanova_alina_lab_4 is ready

This commit is contained in:
Алина Курбанова
2025-11-16 14:06:59 +04:00
parent 3ab10ab9d2
commit 31e6bcd880
20 changed files with 425 additions and 0 deletions

View File

@@ -0,0 +1,50 @@
# Лабораторная работа №4 - Работа с брокером сообщений
**Цель**: изучение проектирования приложений при помощи брокера сообщений.
**Задачи**:
1. Установить брокер сообщений RabbitMQ.
2. Пройти уроки 1, 2 и 3 из [RabbitMQ Tutorials](https://www.rabbitmq.com/getstarted.html) на любом языке программирования.
3. Продемонстрировать работу брокера сообщений.
**Предметная область**: заказ пиццы.
**Как запустить лабораторную работу**:
1) Установить и запустить RabbitMQ.
2) Установить зависимости Python с помощью команды pip install pika.
3) Запустить в 3-х разных терминалах:
Терминал 1 - python publisher.py (Издатель)
Терминал 2 - python consumer_1.py (Медленный обработчик)
Терминал 3 - python consumer_2.py (Быстрый обработчик)
4) Остановка работы - Ctrl+C.
**Что она делает**
Данное приложение реализует систему обработки заказов пиццы с использованием асинхронной архитектуры на основе RabbitMQ. Система состоит из трех основных компонентов: издателя (Publisher) и двух типов потребителей (Consumer). Издатель каждую секунду генерирует новые заказы пиццы с различными параметрами, такими как тип пиццы, имя клиента и статус заказа, и публикует эти события в exchange типа fanout. Exchange типа fanout обеспечивает доставку каждого сообщения во все привязанные к нему очереди одновременно.
Первый потребитель (Consumer 1) создает собственную именованную очередь и обрабатывает сообщения с задержкой 2-3 секунды, имитируя медленный процесс обработки заказа. Второй потребитель (Consumer 2) также создает свою очередь с другим именем, но обрабатывает сообщения мгновенно, демонстрируя быструю обработку. Оба потребителя используют механизм подтверждения обработки сообщений, что гарантирует надежную доставку.
**Какие технологии использовали**:
Python + pika библиотека
RabbitMQ - брокер сообщений
AMQP протокол
Fanout Exchange - рассылка всем очередям
Message Queues - асинхронная обработка
Publisher-Subscriber паттерн
Работает на localhost, порт по умолчанию 5672.
**Пример работы**:
![alt text](pizza_order/image_2.png)
![alt text](pizza_order/image.png)
**Вывод о скорости обработки _consumer_-ами событий от _publisher_-а**:
RabbitMQ автоматически распределяет сообщения между всеми Consumer'ами одной очереди. Добавление большего количества Consumer'ов позволяет обрабатывать сообщения быстрее. При достаточном количестве Consumer'ов очередь перестает расти.
**Ссылка на видео**:
https://vkvideo.ru/video-233171473_456239021

Binary file not shown.

After

Width:  |  Height:  |  Size: 457 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 494 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 643 KiB

View File

@@ -0,0 +1,94 @@
import pika
import json
import time
import random
from datetime import datetime
class PizzaConsumer1:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
self.channel = self.connection.channel()
self.processed_count = 0
self.start_time = datetime.now()
# Создаем exchange (на случай если publisher еще не запущен)
self.channel.exchange_declare(
exchange='pizza_orders',
exchange_type='fanout'
)
# Создаем не анонимную очередь
self.queue_name = "slow_processing_queue"
self.channel.queue_declare(
queue=self.queue_name,
durable=False,
exclusive=False,
auto_delete=False
)
# Создаем binding на exchange
self.channel.queue_bind(
exchange='pizza_orders',
queue=self.queue_name
)
def process_order_slowly(self, order_event):
processing_time = random.uniform(2.0, 3.0) # 2-3 секунды
print(f"[{datetime.now().strftime('%H:%M:%S')}]Начата обработка заказа"
f"#{order_event['order_id']} {order_event['pizza_type']} "
f"для {
order_event['customer_name']} (время:{processing_time:.1f})")
time.sleep(processing_time)
print(f"[{datetime.now().strftime(
'%H:%M:%S')}] ✓ Заказ #{order_event['order_id']} обработан")
def print_statistics(self):
elapsed = datetime.now() - self.start_time
rate = self.processed_count / (elapsed.total_seconds() / 60)
print("=== Статистика Consumer 1 ===")
print(f"Обработано заказов: {self.processed_count}")
print(f"Время работы: {elapsed}")
print(f"Скорость: {rate:.2f} заказов/мин")
print("==============================")
def callback(self, ch, method, properties, body):
try:
order_event = json.loads(body.decode())
self.process_order_slowly(order_event)
self.processed_count += 1
# Выводим статистику каждые 5 сообщений
if self.processed_count % 5 == 0:
self.print_statistics()
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Ошибка обработки: {e}")
def run(self):
print(f"Consumer 1 запущен. Очередь: {self.queue_name}")
print("Обработка сообщений занимает 2-3 секунды...")
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=self.callback,
auto_ack=False
)
try:
self.channel.start_consuming()
except KeyboardInterrupt:
print("\nОстановка Consumer 1...")
finally:
self.connection.close()
if __name__ == "__main__":
consumer = PizzaConsumer1()
consumer.run()

View File

@@ -0,0 +1,86 @@
import pika
import json
from datetime import datetime
class PizzaConsumer2:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
self.channel = self.connection.channel()
self.processed_count = 0
self.start_time = datetime.now()
# Создаем exchange (на случай если publisher еще не запущен)
self.channel.exchange_declare(
exchange='pizza_orders',
exchange_type='fanout'
)
# Создаем не анонимную очередь с другим именем
self.queue_name = "fast_processing_queue"
self.channel.queue_declare(
queue=self.queue_name,
durable=False,
exclusive=False,
auto_delete=False
)
# Создаем binding на exchange
self.channel.queue_bind(
exchange='pizza_orders',
queue=self.queue_name
)
def process_order_instantly(self, order_event):
print(f"[{datetime.now().strftime(
'%H:%M:%S')}]✓ Заказ #{order_event['order_id']} "
f"{order_event['pizza_type']} для {
order_event['customer_name']} "
f"обработан мгновенно")
def print_statistics(self):
elapsed = datetime.now() - self.start_time
rate = self.processed_count / (elapsed.total_seconds() / 60)
print("=== Статистика Consumer 2 ===")
print(f"Обработано заказов: {self.processed_count}")
print(f"Время работы: {elapsed}")
print(f"Скорость: {rate:.2f} заказов/мин")
print("==============================")
def callback(self, ch, method, properties, body):
try:
order_event = json.loads(body.decode())
self.process_order_instantly(order_event)
self.processed_count += 1
# Выводим статистику каждые 10 сообщений
if self.processed_count % 10 == 0:
self.print_statistics()
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Ошибка обработки: {e}")
def run(self):
print(f"Consumer 2 запущен. Очередь: {self.queue_name}")
print("Обработка сообщений происходит мгновенно...")
self.channel.basic_consume(
queue=self.queue_name,
on_message_callback=self.callback,
auto_ack=False
)
try:
self.channel.start_consuming()
except KeyboardInterrupt:
print("\nОстановка Consumer 2...")
finally:
self.connection.close()
if __name__ == "__main__":
consumer = PizzaConsumer2()
consumer.run()

Binary file not shown.

After

Width:  |  Height:  |  Size: 291 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 615 KiB

View File

@@ -0,0 +1,68 @@
import pika
import json
import time
import random
from datetime import datetime
class PizzaPublisher:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
self.channel = self.connection.channel()
self.order_id = 1
# Создаем exchange типа fanout
self.channel.exchange_declare(
exchange='pizza_orders',
exchange_type='fanout'
)
self.pizza_types = ["Маргарита", "Пепперони", "Гавайская",
"Четыре сыра", "Мясная"]
self.customer_names = ["Иван", "Мария", "Алексей",
"Екатерина", "Дмитрий"]
self.statuses = ["новый", "принят", "в процессе", "готовится"]
def generate_order_event(self):
order_event = {
"event_type": "новый заказ",
"order_id": self.order_id,
"customer_name": random.choice(self.customer_names),
"pizza_type": random.choice(self.pizza_types),
"timestamp": datetime.now().isoformat(),
"status": random.choice(self.statuses)
}
self.order_id += 1
return order_event
def publish_message(self):
order_event = self.generate_order_event()
message = json.dumps(order_event)
self.channel.basic_publish(
exchange='pizza_orders',
routing_key='',
body=message
)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Отправлено событие: "
f"Заказ #{order_event['order_id']} {order_event['pizza_type']} "
f"для {order_event['customer_name']}")
def run(self):
print("Publisher запущен. Нажмите Ctrl+C для остановки.")
try:
while True:
self.publish_message()
time.sleep(1)
except KeyboardInterrupt:
print("\nОстановка Publisher...")
finally:
self.connection.close()
if __name__ == "__main__":
publisher = PizzaPublisher()
publisher.run()

Binary file not shown.

After

Width:  |  Height:  |  Size: 281 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 460 KiB

View File

@@ -0,0 +1,32 @@
#!/usr/bin/env python
import pika
import os
import sys
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback,
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@@ -0,0 +1,12 @@
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 190 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 498 KiB

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello world!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@@ -0,0 +1,23 @@
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

Binary file not shown.

After

Width:  |  Height:  |  Size: 435 KiB

View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()