kosheev_maksim_lab_4 is ready3
This commit is contained in:
parent
0add49368f
commit
9bbe7b8b53
@ -0,0 +1,89 @@
|
|||||||
|
# Лабораторная работа №4 - Работа с брокером сообщений
|
||||||
|
|
||||||
|
## Задание
|
||||||
|
|
||||||
|
* Установить брокер сообщений RabbitMQ.
|
||||||
|
* Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials, используя Python.
|
||||||
|
* Продемонстрировать работу брокера сообщений.
|
||||||
|
|
||||||
|
## Работа программы
|
||||||
|
|
||||||
|
Программа демонстрирует работу с брокером сообщений RabbitMQ, включая следующие этапы:
|
||||||
|
|
||||||
|
1. **Урок 1: Hello World**
|
||||||
|
- Программа демонстрирует базовый обмен сообщениями между продюсером и потребителем через очередь.
|
||||||
|
|
||||||
|
2. **Урок 2: Work Queues**
|
||||||
|
- Реализована система, где сообщения из одной очереди обрабатываются несколькими потребителями.
|
||||||
|
- Обеспечивается балансировка нагрузки между потребителями.
|
||||||
|
|
||||||
|
3. **Урок 3: Publish/Subscribe**
|
||||||
|
- Используется exchange типа `fanout` для широковещательной рассылки сообщений.
|
||||||
|
- Сообщения из одного источника доставляются сразу нескольким потребителям.
|
||||||
|
|
||||||
|
### Описание классов
|
||||||
|
|
||||||
|
- **`emit_log.py`**:
|
||||||
|
- Отправляет сообщения в exchange `logs`.
|
||||||
|
- Использует тип exchange `fanout` для широковещательной передачи сообщений.
|
||||||
|
|
||||||
|
- **`receive_logs.py`**:
|
||||||
|
- Получает и обрабатывает сообщения, отправленные через exchange `logs`.
|
||||||
|
- Каждое подключение создаёт временную очередь для получения сообщений.
|
||||||
|
|
||||||
|
### Работа приложения
|
||||||
|
|
||||||
|
1. **Логгирование**:
|
||||||
|
- Программа записывает логи в файл, используя один из потребителей.
|
||||||
|
|
||||||
|
2. **Отображение сообщений на экране**:
|
||||||
|
- Сообщения выводятся в терминале через другого потребителя.
|
||||||
|
|
||||||
|
## Примеры использования
|
||||||
|
|
||||||
|
### Запуск продюсера
|
||||||
|
|
||||||
|
```bash
|
||||||
|
python emit_log.py "Ваше сообщение"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Запуск потребителя
|
||||||
|
|
||||||
|
Для отображения сообщений в терминале:
|
||||||
|
```bash
|
||||||
|
python receive_logs.py
|
||||||
|
```
|
||||||
|
|
||||||
|
Для записи сообщений в файл:
|
||||||
|
```bash
|
||||||
|
python receive_logs.py > logs_from_rabbit.log
|
||||||
|
```
|
||||||
|
|
||||||
|
### Уроки
|
||||||
|
|
||||||
|
#### Lesson 1: "Hello World"
|
||||||
|
|
||||||
|
![](result.jpg )
|
||||||
|
|
||||||
|
#### Lesson 2: "Work Queues"
|
||||||
|
|
||||||
|
![](result2.jpg )
|
||||||
|
|
||||||
|
#### Lesson 3: "Publish/Subscribe"
|
||||||
|
|
||||||
|
![](result3.jpg)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## Как установить RabbitMQ
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker run -d --hostname rabbitmq --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
## Видео демонстрации
|
||||||
|
|
||||||
|
Демонстрацию работы программы можно посмотреть по ссылке:
|
||||||
|
[Ссылка на видео](https://disk.yandex.ru/i/bNA65J1D-7Kk6w)
|
@ -0,0 +1,28 @@
|
|||||||
|
import pika
|
||||||
|
import sys
|
||||||
|
|
||||||
|
def main():
|
||||||
|
# Устанавливаем соединение с RabbitMQ
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host='localhost')
|
||||||
|
)
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
# Объявляем очередь
|
||||||
|
channel.queue_declare(queue='hello')
|
||||||
|
|
||||||
|
# Обрабатываем сообщения из очереди
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
print(f" [x] Received {body}")
|
||||||
|
|
||||||
|
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
||||||
|
|
||||||
|
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||||
|
channel.start_consuming()
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
try:
|
||||||
|
main()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print('Interrupted')
|
||||||
|
sys.exit(0)
|
@ -0,0 +1,17 @@
|
|||||||
|
import pika
|
||||||
|
|
||||||
|
# Устанавливаем соединение с RabbitMQ
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host='localhost')
|
||||||
|
)
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
# Объявляем очередь
|
||||||
|
channel.queue_declare(queue='hello')
|
||||||
|
|
||||||
|
# Отправляем сообщение
|
||||||
|
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
|
||||||
|
print(" [x] Sent 'Hello World!'")
|
||||||
|
|
||||||
|
# Закрываем соединение
|
||||||
|
connection.close()
|
@ -0,0 +1,24 @@
|
|||||||
|
import pika
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Устанавливаем соединение с RabbitMQ
|
||||||
|
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
# Объявляем очередь с параметром durability=True
|
||||||
|
channel.queue_declare(queue='task_queue', durable=True)
|
||||||
|
|
||||||
|
# Формируем сообщение из аргументов командной строки
|
||||||
|
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
||||||
|
|
||||||
|
# Отправляем сообщение с установкой свойства persistence
|
||||||
|
channel.basic_publish(
|
||||||
|
exchange='',
|
||||||
|
routing_key='task_queue',
|
||||||
|
body=message,
|
||||||
|
properties=pika.BasicProperties(
|
||||||
|
delivery_mode=pika.DeliveryMode.Persistent
|
||||||
|
)
|
||||||
|
)
|
||||||
|
print(f" [x] Sent {message}")
|
||||||
|
connection.close()
|
@ -0,0 +1,27 @@
|
|||||||
|
import pika
|
||||||
|
import time
|
||||||
|
|
||||||
|
# Устанавливаем соединение с RabbitMQ
|
||||||
|
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
# Объявляем очередь с параметром durability=True
|
||||||
|
channel.queue_declare(queue='task_queue', durable=True)
|
||||||
|
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||||
|
|
||||||
|
# Обработка сообщения
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
print(f" [x] Received {body.decode()}")
|
||||||
|
time.sleep(body.count(b'.')) # Имитируем длительную задачу
|
||||||
|
print(" [x] Done")
|
||||||
|
# Подтверждаем обработку сообщения
|
||||||
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||||
|
|
||||||
|
# Настраиваем prefetch_count=1 для справедливой диспетчеризации
|
||||||
|
channel.basic_qos(prefetch_count=1)
|
||||||
|
|
||||||
|
# Указываем обработчик для очереди
|
||||||
|
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
||||||
|
|
||||||
|
# Запускаем процесс ожидания сообщений
|
||||||
|
channel.start_consuming()
|
@ -0,0 +1,15 @@
|
|||||||
|
import pika
|
||||||
|
import sys
|
||||||
|
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
# Создание обменника
|
||||||
|
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||||
|
|
||||||
|
# Формирование сообщения
|
||||||
|
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
||||||
|
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
||||||
|
print(f" [x] Sent {message}")
|
||||||
|
connection.close()
|
@ -0,0 +1,26 @@
|
|||||||
|
import pika
|
||||||
|
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
# Создание обменника
|
||||||
|
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||||
|
|
||||||
|
# Создание временной очереди
|
||||||
|
result = channel.queue_declare(queue='', exclusive=True)
|
||||||
|
queue_name = result.method.queue
|
||||||
|
|
||||||
|
# Привязка очереди к обменнику
|
||||||
|
channel.queue_bind(exchange='logs', queue=queue_name)
|
||||||
|
|
||||||
|
print(' [*] Waiting for logs. To exit press CTRL+C')
|
||||||
|
|
||||||
|
# Обработка входящих сообщений
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
print(f" [x] {body}")
|
||||||
|
|
||||||
|
channel.basic_consume(
|
||||||
|
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
||||||
|
|
||||||
|
channel.start_consuming()
|
Loading…
Reference in New Issue
Block a user