This commit is contained in:
a-shdv 2023-12-23 20:50:41 +04:00
parent 1d7145da66
commit 3a1de5ead0
10 changed files with 133 additions and 0 deletions

View File

@ -0,0 +1,37 @@
# Лабораторная работа №4 - Работа с брокером сообщений
# Задачи:
Необходимо выбрать предметную область и разработать следующие приложения:
1) Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт".
2) Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#).
3) Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1.
# Как запустить
Запустить `python 'названиеайла'`
# Работа программы
**publisher:**
![img.png](screenshots/img.png)
**consumer_1:**
![img_1.png](screenshots/img_1.png)
**consumer_2:**
![img_2.png](screenshots/img_2.png)
**1-я очередь**
![img_3.png](screenshots/img_3.png)
**2-я очередь**
![img_4.png](screenshots/img_4.png)
Видео: https://youtu.be/3pxgHmm3i0Q

View File

@ -0,0 +1,37 @@
import pika
import time
import threading
def process_message(ch, method, properties, body):
print(f"Получена вакансия (Consumer 1): {body.decode('utf-8')}")
time.sleep(2)
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume_messages(channel, queue_name, exchange_name):
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
print("Consumer 1 начал прослушивание сообщений...")
channel.start_consuming()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'queue1'
consumer_thread = threading.Thread(target=consume_messages, args=(channel, queue_name, exchange_name))
consumer_thread.start()
consumer_thread.join()
connection.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,31 @@
import pika
def process_message(ch, method, properties, body):
print(f"Получена вакансия (Consumer 2): {body.decode('utf-8')}")
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume_messages(channel, queue_name, exchange_name):
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
print("Consumer 2 начал прослушивание сообщений...")
channel.start_consuming()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'queue2'
consume_messages(channel, queue_name, exchange_name)
connection.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,28 @@
import pika
import time
import random
def publish_event(channel, exchange_name):
events = ["Пришла страница", "Пришла вакансия", "Необходимо обработать запрос"]
while True:
event = random.choice(events)
channel.basic_publish(exchange=exchange_name, routing_key='', body=event)
print(f"Отправлено событие: {event}")
time.sleep(3)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
publish_event(channel, exchange_name)
connection.close()
if __name__ == '__main__':
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 95 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 151 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 162 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 85 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

BIN
shadaev_anton_lab_5/img.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB