pupkov_alexey_lab4 is ready

This commit is contained in:
4uvirlo 2024-11-16 23:12:02 +03:00
parent 3b9698ac38
commit d90f748a00
17 changed files with 230 additions and 0 deletions

View File

@ -0,0 +1,25 @@
import pika
import time
# Настройка соединения
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание очереди с фиксированным именем
queue_name = 'orders_slow'
channel.queue_declare(queue=queue_name)
# Binding очереди с exchange
channel.queue_bind(exchange='orders_exchange', queue=queue_name)
# Callback для обработки сообщений
def callback(ch, method, properties, body):
print(f"[Consumer 1] Получено: {body.decode()}")
time.sleep(3) # Эмуляция долгой обработки
print(f"[Consumer 1] Обработано: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print("[Consumer 1] Ожидание сообщений...")
channel.start_consuming()

View File

@ -0,0 +1,22 @@
import pika
# Настройка соединения
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание очереди с фиксированным именем
queue_name = 'orders_fast'
channel.queue_declare(queue=queue_name)
# Binding очереди с exchange
channel.queue_bind(exchange='orders_exchange', queue=queue_name)
# Callback для обработки сообщений
def callback(ch, method, properties, body):
print(f"[Consumer 2] Получено и обработано: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print("[Consumer 2] Ожидание сообщений...")
channel.start_consuming()

View File

@ -0,0 +1,22 @@
import pika
import time
import random
# Настройка соединения
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# Создание exchange типа fanout
channel.exchange_declare(exchange='orders_exchange', exchange_type='fanout')
# Генератор случайных заказов
orders = ["пицца Маргарита", "паста Карбонара", "тирамису"]
while True:
order = f"Поступил заказ: {random.choice(orders)}"
channel.basic_publish(exchange='orders_exchange', routing_key='', body=order)
print(f"[Publisher] Отправлено: {order}")
time.sleep(1)
# Закрытие соединения (в реальной реализации стоит предусмотреть graceful shutdown)
connection.close()

View File

@ -0,0 +1,48 @@
### Лабораторная работа №4 - Работа с брокером сообщений
#### Задание
1. Установить брокер сообщений RabbitMQ.
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
3. Продемонстрировать работу брокера сообщений.
#### Описание работы программы:
- Класс Publisher успешно осуществляет отправку сообщений своим клиентам.
- Класс Consumer1 принимает и обрабатывает сообщения с задержкой в 3 секунды, что можно заметить на видео.
- Класс Consumer2 мгновенно принимает и обрабатывает сообщения.
#### Уроки
1. lesson_1
![lesson_1.jpg](lesson_1.jpg)
2. lesson_2
![lesson_2.jpg](lesson_2.jpg)
3. lesson_3
![lesson_3.jpg](lesson_3.jpg)
## Показания очереди orders_slow при одном запущенном экземпляре Consumer_1
![img.jpg](img.jpg)
## Показания очереди orders_fast
![img_1.jpg](img_1.jpg)
## Показания очереди orders_slow при двух запущенных экземплярах Consumer_1
![img_2.jpg](img_2.jpg)
## Показания очереди orders_slow при трех запущенных экземплярах Consumer_1
![img_4.jpg](img_4.jpg)
## Видео ВК
[Ссылка на демонстрацию работы программы](https://vk.com/video547368103_456239602?list=ln-pQzNJDPgjODeUIgNca)

BIN
pupkov_alexey_lab_4/img.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 50 KiB

View File

@ -0,0 +1,25 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@ -0,0 +1,11 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

View File

@ -0,0 +1,19 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,23 @@
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

View File

@ -0,0 +1,13 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,22 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()