Merge pull request 'polevoy_sergey_lab_4' () from polevoy_sergey_lab_4 into main

Reviewed-on: 
This commit is contained in:
Alexey 2024-12-15 13:23:01 +04:00
commit 4b1c9666a0
16 changed files with 236 additions and 0 deletions

@ -0,0 +1,28 @@
import pika
import time
def callback(ch, method, properties, body):
print(f'Consumer 1: получено сообщение. {body.decode()}')
time.sleep(3)
print('Consumer 1 закончил обработку')
ch.basic_ack(method.delivery_tag)
def consume_events_1():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672, credentials=pika.PlainCredentials("superuser", "superpassword")))
channel = connection.channel()
channel.queue_declare(queue='consumer1_queue')
channel.queue_bind(exchange='ml_events', queue='consumer1_queue')
channel.basic_consume(queue='consumer1_queue', on_message_callback=callback)
print('Ожидание сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_1()

@ -0,0 +1,25 @@
import pika
def callback(ch, method, properties, body):
print(f'Consumer 2: получено сообщение. {body.decode()}')
print('Consumer 2 закончил обработку')
ch.basic_ack(method.delivery_tag)
def consume_events_2():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672, credentials=pika.PlainCredentials("superuser", "superpassword")))
channel = connection.channel()
channel.queue_declare(queue='consumer2_queue')
channel.queue_bind(exchange='ml_events', queue='consumer2_queue')
channel.basic_consume(queue='consumer2_queue', on_message_callback=callback)
print('Ожидание сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_2()

@ -0,0 +1,25 @@
import pika
import time
def publish_events():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672, credentials=pika.PlainCredentials("superuser", "superpassword")))
channel = connection.channel()
channel.exchange_declare(exchange='ml_events', exchange_type='fanout')
events = [
"Пользователь начал обучение модели",
"Модель закончила обучение",
"Пользователь запросил создание чекпоинта для модели",
"Пользователь запустил модель для infere"
]
while True:
event = events[int(time.time()) % len(events)]
channel.basic_publish(exchange='ml_events', routing_key='', body=event)
print(f'Отправлено: {event}')
time.sleep(1)
if __name__ == "__main__":
publish_events()

@ -0,0 +1,9 @@
services:
rabbitmq:
image: rabbitmq:3-management
environment:
RABBITMQ_DEFAULT_USER: superuser
RABBITMQ_DEFAULT_PASS: superpassword
ports:
- "5672:5672"
- "15672:15672"

Binary file not shown.

After

(image error) Size: 36 KiB

Binary file not shown.

After

(image error) Size: 33 KiB

Binary file not shown.

After

(image error) Size: 16 KiB

Binary file not shown.

After

(image error) Size: 20 KiB

Binary file not shown.

After

(image error) Size: 14 KiB

@ -0,0 +1,33 @@
# Лабораторная работа №4
## Полевой Сергей ПИбд-42
### Предметная область:
Платформа машинного обучения
### Результаты выполнения туториалов:
- Первый туториал:
![alt text](images/tutorial1.png)
- Второй туториал:
![alt text](images/tutorial2.png)
- Третий туториал:
![alt text](images/tutorial3.png)
### Данные из RabbitMQ:
![alt text](images/rmq1queue.png)
![alt text](images/rmq2queue.png)
### Вывод:
Исходя из графиков и логики работы потребителей следует:
1) Первый потребитель не успевает обрабатывать сообщения из первой очереди, поэтому сообщения постепенно накапливаются, что, тем не менее, можно компенсировать запуском нескольких потребителей, которые будут быстрее обрабатывать сообщения из очереди
2) Второй потребитель полностью успевает за скоростью отправки сообщений и его очередь всегда остаётся без простаивающих сообщений
#### Демонстрация работы доступна по [ссылке](https://disk.yandex.ru/i/QwOYlRQDQKvNnA)

@ -0,0 +1,27 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672, credentials=pika.PlainCredentials("superuser", "superpassword")))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

@ -0,0 +1,13 @@
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672, credentials=pika.PlainCredentials("superuser", "superpassword")))
channel = connection.channel()
channel.queue_declare('hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello world!')
print(" [x] Sent 'Hello world!'")
connection.close()

@ -0,0 +1,21 @@
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672, credentials=pika.PlainCredentials("superuser", "superpassword")))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

@ -0,0 +1,21 @@
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672, credentials=pika.PlainCredentials("superuser", "superpassword")))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

@ -0,0 +1,14 @@
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672, credentials=pika.PlainCredentials("superuser", "superpassword")))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

@ -0,0 +1,20 @@
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672, credentials=pika.PlainCredentials("superuser", "superpassword")))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()