tabeev_alexander_lab_4 is ready
This commit is contained in:
29
tabeev_alexander_lab_4/mainLab/fast_consumer.py
Normal file
29
tabeev_alexander_lab_4/mainLab/fast_consumer.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import pika
|
||||
import json
|
||||
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='conference', exchange_type='fanout')
|
||||
|
||||
queue_name = 'fast_queue'
|
||||
channel.queue_declare(queue=queue_name)
|
||||
channel.queue_bind(exchange='conference', queue=queue_name)
|
||||
|
||||
print(f"Fast Consumer запущен. Очередь: {queue_name}")
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
data = json.loads(body)
|
||||
print(f"[FAST] Получено: {data['event']} (id: {data['id']})")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
channel.basic_consume(queue=queue_name, on_message_callback=callback)
|
||||
|
||||
print("Ожидание сообщений... (Ctrl+C для остановки)")
|
||||
|
||||
try:
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
print("\nFast Consumer остановлен.")
|
||||
finally:
|
||||
connection.close()
|
||||
43
tabeev_alexander_lab_4/mainLab/publisher.py
Normal file
43
tabeev_alexander_lab_4/mainLab/publisher.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import pika
|
||||
import time
|
||||
import json
|
||||
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='conference', exchange_type='fanout')
|
||||
|
||||
events = [
|
||||
"Новый участник зарегистрирован",
|
||||
"Статья подана на рецензию",
|
||||
"Рецензия готова",
|
||||
"Расписание обновлено"
|
||||
]
|
||||
|
||||
count = 0
|
||||
print("Publisher запущен")
|
||||
|
||||
try:
|
||||
while True:
|
||||
event = events[count % len(events)]
|
||||
message = json.dumps({
|
||||
"id": count,
|
||||
"event": event,
|
||||
"time": time.time()
|
||||
})
|
||||
|
||||
channel.basic_publish(
|
||||
exchange='conference',
|
||||
routing_key='',
|
||||
body=message
|
||||
)
|
||||
|
||||
print(f"Отправлено: {event}")
|
||||
count += 1
|
||||
time.sleep(1)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nPublisher остановлен")
|
||||
print(f"Всего отправлено: {count} сообщений")
|
||||
finally:
|
||||
connection.close()
|
||||
37
tabeev_alexander_lab_4/mainLab/slow_consumer.py
Normal file
37
tabeev_alexander_lab_4/mainLab/slow_consumer.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import pika
|
||||
import time
|
||||
import json
|
||||
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='conference', exchange_type='fanout')
|
||||
|
||||
queue_name = 'slow_queue'
|
||||
channel.queue_declare(queue=queue_name)
|
||||
channel.queue_bind(exchange='conference', queue=queue_name)
|
||||
|
||||
print(f"Slow Consumer запущен. Очередь: {queue_name}")
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
try:
|
||||
data = json.loads(body)
|
||||
print(f"[SLOW] Получено: {data['event']} (id: {data['id']})")
|
||||
print("Обработка 3 секунды...")
|
||||
time.sleep(3)
|
||||
print("Обработка завершена")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
except KeyboardInterrupt:
|
||||
print("\nПрервано во время обработки")
|
||||
raise
|
||||
|
||||
channel.basic_consume(queue=queue_name, on_message_callback=callback)
|
||||
|
||||
print("Ожидание сообщений... (Ctrl+C для остановки)")
|
||||
|
||||
try:
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
print("\nSlow Consumer остановлен.")
|
||||
finally:
|
||||
connection.close()
|
||||
BIN
tabeev_alexander_lab_4/pictures/tutorial1RabbitMQ.png
Normal file
BIN
tabeev_alexander_lab_4/pictures/tutorial1RabbitMQ.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 36 KiB |
BIN
tabeev_alexander_lab_4/pictures/tutorial1Terminal.png
Normal file
BIN
tabeev_alexander_lab_4/pictures/tutorial1Terminal.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 39 KiB |
BIN
tabeev_alexander_lab_4/pictures/tutorial2RabbitMQ.png
Normal file
BIN
tabeev_alexander_lab_4/pictures/tutorial2RabbitMQ.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 39 KiB |
BIN
tabeev_alexander_lab_4/pictures/tutorial2Terminal.png
Normal file
BIN
tabeev_alexander_lab_4/pictures/tutorial2Terminal.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 39 KiB |
BIN
tabeev_alexander_lab_4/pictures/tutorial3RabbitMQ.png
Normal file
BIN
tabeev_alexander_lab_4/pictures/tutorial3RabbitMQ.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 42 KiB |
BIN
tabeev_alexander_lab_4/pictures/tutorial3Terminal.png
Normal file
BIN
tabeev_alexander_lab_4/pictures/tutorial3Terminal.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 46 KiB |
33
tabeev_alexander_lab_4/readme.md
Normal file
33
tabeev_alexander_lab_4/readme.md
Normal file
@@ -0,0 +1,33 @@
|
||||
## Цель: изучение проектирования приложений при помощи брокера сообщений.
|
||||
|
||||
## Задачи:
|
||||
1. Установить брокер сообщений RabbitMQ.
|
||||
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
|
||||
3. Продемонстрировать работу брокера сообщений.
|
||||
|
||||
## Как запустить ЛР
|
||||
1. Установить rabbitmq-server-4.2.1 (при запуске попросит установить erlang будет всплывающее окно и там можно перейти на сайт и скачать)
|
||||
1.1 Включить веб-интерфейс, введя команду в командную строку rabbitmq-plugins enable rabbitmq_management
|
||||
1.2 Перезапустить службу RabbitMQ изначально Вы могли поставить галочку запустить службу, поэтому сначала Stop-Service RabbitMQ, а затем Start-Service RabbitMQ
|
||||
1.3 После этих шагов http://localhost:15672 станет доступен логин и пароль по умолчанию guest
|
||||
2. Установить Python
|
||||
3. Установить библиотеку pika (python -m pip install pika)
|
||||
4. Открыть три отдельных терминала: для publisher, fast_consumer и slow_consumer
|
||||
5. Перейти в директориию mainLab и запустить файл python publisher.py в первом терминале, остальные файлы запустить по аналогии, соответственно во втором и третьем терминале
|
||||
6. Чтобы отсановить приложение нажмите сочетание колавиш ctrl+c
|
||||
|
||||
## Используемые технологии
|
||||
1. RabbitMQ 4.2.1 - брокер сообщений
|
||||
2. Python 3.14 - ЯП для реализации приложений
|
||||
3. pika - клиентская библиотека для работы с RabbitMQ
|
||||
|
||||
### Что делает ЛР
|
||||
## Предметная область
|
||||
Система уведомлений научной конференции. Система отслеживает события конференции и распределяет их между различными обработчиками для параллельной обработки.
|
||||
Publisher создает exchange(точка маршрутизации сообщений) типа fanout для широковещательной рассылки с именем conference; каждую секунду генерирует собитие конференции; отправляет сообщения всем подключенным потребителям
|
||||
События которые генерирууются:
|
||||
1. Новый участник зарегистрирован
|
||||
2. Статья подана на рецензию
|
||||
3. Рецензия готова
|
||||
4. Расписание обновлено
|
||||
FastConsumer обрабатывает сообщения мгновенно, SlowConsumer обрабатывает сообщение за 3 сенкунды каждое и в очереди slow_queue накапливаются сообщения. Сообщения накапливаются в очереди медленного обработчика, демонстрируя асинхронную природу брокера сообщений. При запуске 3 копий Slow Consumer сообщения распределяются между ними, следовательно скорость обработки увеличивается в 3 раза и накопление очереди уменьшается.
|
||||
25
tabeev_alexander_lab_4/tutorial1/receive.py
Normal file
25
tabeev_alexander_lab_4/tutorial1/receive.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import pika, sys, os
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body}")
|
||||
|
||||
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
||||
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
channel.start_consuming()
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print('Interrupted')
|
||||
try:
|
||||
sys.exit(0)
|
||||
except SystemExit:
|
||||
os._exit(0)
|
||||
11
tabeev_alexander_lab_4/tutorial1/send.py
Normal file
11
tabeev_alexander_lab_4/tutorial1/send.py
Normal file
@@ -0,0 +1,11 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
|
||||
print(" [x] Sent 'Hello World!'")
|
||||
connection.close()
|
||||
19
tabeev_alexander_lab_4/tutorial2/new_task.py
Normal file
19
tabeev_alexander_lab_4/tutorial2/new_task.py
Normal file
@@ -0,0 +1,19 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
||||
channel.basic_publish(
|
||||
exchange='',
|
||||
routing_key='task_queue',
|
||||
body=message,
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=pika.DeliveryMode.Persistent
|
||||
))
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
||||
22
tabeev_alexander_lab_4/tutorial2/worker.py
Normal file
22
tabeev_alexander_lab_4/tutorial2/worker.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body.decode()}")
|
||||
time.sleep(body.count(b'.'))
|
||||
print(" [x] Done")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
||||
|
||||
channel.start_consuming()
|
||||
13
tabeev_alexander_lab_4/tutorial3/emit_log.py
Normal file
13
tabeev_alexander_lab_4/tutorial3/emit_log.py
Normal file
@@ -0,0 +1,13 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
||||
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
||||
22
tabeev_alexander_lab_4/tutorial3/receive_logs.py
Normal file
22
tabeev_alexander_lab_4/tutorial3/receive_logs.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
result = channel.queue_declare(queue='', exclusive=True)
|
||||
queue_name = result.method.queue
|
||||
|
||||
channel.queue_bind(exchange='logs', queue=queue_name)
|
||||
|
||||
print(' [*] Waiting for logs. To exit press CTRL+C')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] {body}")
|
||||
|
||||
channel.basic_consume(
|
||||
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
||||
|
||||
channel.start_consuming()
|
||||
Reference in New Issue
Block a user