gusev_vladislav_lab_4 #46

Merged
Alexey merged 2 commits from gusev_vladislav_lab_4 into main 2023-12-28 10:57:07 +04:00
10 changed files with 122 additions and 0 deletions

View File

@ -0,0 +1,37 @@
# Лабораторная работа №4 - Работа с брокером сообщений
# Задачи:
Необходимо выбрать предметную область и разработать следующие приложения:
1) Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт".
2) Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#).
3) Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1.
# Как запустить
Командой "python 'названиеайла'" в текущей папке
# Работа программы
publisher:
![img.png](img.png)
consumer_2:
![img_1.png](img_1.png)
consumer_1 в одном экземпляре:
![img_2.png](img_2.png)
второй consumer_1:
![img_3.png](img_3.png)
наша первая очередь за 10 минут, на пике зафиксирован запуск второго экземпляра
![img_4.png](img_4.png)
вторая очередь, пик из-за того, что consumer_2 запустился, когда уже были какие-то сообщения в очереди
![img_5.png](img_5.png)
Можно сказать, что запуск второго экземпляра consumer_1 скорости уменьшил нагрузку на очередь.
Видео -> https://drive.google.com/file/d/1JDzdRgUFYzMBYBt0QA5Pho9yvOsu9Ecu/view?usp=sharing

View File

@ -0,0 +1,33 @@
import pika
import time
import threading
def process_message(ch, method, properties, body):
print(f"Получено сообщение (Consumer 1): {body.decode('utf-8')}")
time.sleep(2)
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume_messages(channel, queue_name, exchange_name):
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
print("Consumer 1 начал прослушивание сообщений...")
channel.start_consuming()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'queue1'
consumer_thread = threading.Thread(target=consume_messages, args=(channel, queue_name, exchange_name))
consumer_thread.start()
consumer_thread.join()
connection.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,27 @@
import pika
def process_message(ch, method, properties, body):
print(f"Получено сообщение (Consumer 2): {body.decode('utf-8')}")
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume_messages(channel, queue_name, exchange_name):
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
print("Consumer 2 начал прослушивание сообщений...")
channel.start_consuming()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'queue2'
consume_messages(channel, queue_name, exchange_name)
connection.close()
if __name__ == '__main__':
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.3 KiB

View File

@ -0,0 +1,25 @@
import pika
import time
import random
def publish_event(channel, exchange_name):
events = ["Пришла фотография", "Пришло сообщение", "Необходимо обработать запрос"]
while True:
event = random.choice(events)
channel.basic_publish(exchange=exchange_name, routing_key='', body=event)
print(f"Отправлено событие: {event}")
time.sleep(3)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
publish_event(channel, exchange_name)
connection.close()
if __name__ == '__main__':
main()