Compare commits
No commits in common. "775bd417497a46348e57ad6358687ad0303c4ba5" and "f2adafe2e10a6fad6681397a5af2b6411694a11a" have entirely different histories.
775bd41749
...
f2adafe2e1
@ -1,37 +0,0 @@
|
|||||||
# Лабораторная работа №4 - Работа с брокером сообщений
|
|
||||||
|
|
||||||
# Задачи:
|
|
||||||
|
|
||||||
Необходимо выбрать предметную область и разработать следующие приложения:
|
|
||||||
|
|
||||||
1) Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт".
|
|
||||||
2) Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#).
|
|
||||||
3) Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1.
|
|
||||||
|
|
||||||
|
|
||||||
# Как запустить
|
|
||||||
Командой "python 'название_файла'" в текущей папке
|
|
||||||
|
|
||||||
# Работа программы
|
|
||||||
|
|
||||||
publisher:
|
|
||||||

|
|
||||||
|
|
||||||
consumer_2:
|
|
||||||

|
|
||||||
|
|
||||||
consumer_1 в одном экземпляре:
|
|
||||||

|
|
||||||
|
|
||||||
второй consumer_1:
|
|
||||||

|
|
||||||
|
|
||||||
наша первая очередь за 10 минут, на пике зафиксирован запуск второго экземпляра
|
|
||||||

|
|
||||||
|
|
||||||
вторая очередь, пик из-за того, что consumer_2 запустился, когда уже были какие-то сообщения в очереди
|
|
||||||

|
|
||||||
|
|
||||||
Можно сказать, что запуск второго экземпляра consumer_1 скорости уменьшил нагрузку на очередь.
|
|
||||||
|
|
||||||
Видео -> https://drive.google.com/file/d/1JDzdRgUFYzMBYBt0QA5Pho9yvOsu9Ecu/view?usp=sharing
|
|
@ -1,33 +0,0 @@
|
|||||||
import pika
|
|
||||||
import time
|
|
||||||
import threading
|
|
||||||
|
|
||||||
def process_message(ch, method, properties, body):
|
|
||||||
print(f"Получено сообщение (Consumer 1): {body.decode('utf-8')}")
|
|
||||||
time.sleep(2)
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
||||||
|
|
||||||
def consume_messages(channel, queue_name, exchange_name):
|
|
||||||
channel.queue_declare(queue=queue_name)
|
|
||||||
channel.queue_bind(exchange=exchange_name, queue=queue_name)
|
|
||||||
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
|
|
||||||
|
|
||||||
print("Consumer 1 начал прослушивание сообщений...")
|
|
||||||
channel.start_consuming()
|
|
||||||
|
|
||||||
def main():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
exchange_name = 'logs'
|
|
||||||
queue_name = 'queue1'
|
|
||||||
|
|
||||||
consumer_thread = threading.Thread(target=consume_messages, args=(channel, queue_name, exchange_name))
|
|
||||||
consumer_thread.start()
|
|
||||||
|
|
||||||
consumer_thread.join()
|
|
||||||
|
|
||||||
connection.close()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
@ -1,27 +0,0 @@
|
|||||||
import pika
|
|
||||||
|
|
||||||
def process_message(ch, method, properties, body):
|
|
||||||
print(f"Получено сообщение (Consumer 2): {body.decode('utf-8')}")
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
||||||
|
|
||||||
def consume_messages(channel, queue_name, exchange_name):
|
|
||||||
channel.queue_declare(queue=queue_name)
|
|
||||||
channel.queue_bind(exchange=exchange_name, queue=queue_name)
|
|
||||||
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
|
|
||||||
|
|
||||||
print("Consumer 2 начал прослушивание сообщений...")
|
|
||||||
channel.start_consuming()
|
|
||||||
|
|
||||||
def main():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
exchange_name = 'logs'
|
|
||||||
queue_name = 'queue2'
|
|
||||||
|
|
||||||
consume_messages(channel, queue_name, exchange_name)
|
|
||||||
|
|
||||||
connection.close()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
Binary file not shown.
Before Width: | Height: | Size: 36 KiB |
Binary file not shown.
Before Width: | Height: | Size: 30 KiB |
Binary file not shown.
Before Width: | Height: | Size: 35 KiB |
Binary file not shown.
Before Width: | Height: | Size: 19 KiB |
Binary file not shown.
Before Width: | Height: | Size: 24 KiB |
Binary file not shown.
Before Width: | Height: | Size: 7.3 KiB |
@ -1,25 +0,0 @@
|
|||||||
import pika
|
|
||||||
import time
|
|
||||||
import random
|
|
||||||
|
|
||||||
def publish_event(channel, exchange_name):
|
|
||||||
events = ["Пришла фотография", "Пришло сообщение", "Необходимо обработать запрос"]
|
|
||||||
while True:
|
|
||||||
event = random.choice(events)
|
|
||||||
channel.basic_publish(exchange=exchange_name, routing_key='', body=event)
|
|
||||||
print(f"Отправлено событие: {event}")
|
|
||||||
time.sleep(3)
|
|
||||||
|
|
||||||
def main():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
exchange_name = 'logs'
|
|
||||||
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
|
|
||||||
|
|
||||||
publish_event(channel, exchange_name)
|
|
||||||
|
|
||||||
connection.close()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
Loading…
x
Reference in New Issue
Block a user