basharin_sevastyan_lab_4 is ready #28
BIN
basharin_sevastyan_lab_4/PC1C1.png
Normal file
BIN
basharin_sevastyan_lab_4/PC1C1.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 275 KiB |
BIN
basharin_sevastyan_lab_4/PC1C2.png
Normal file
BIN
basharin_sevastyan_lab_4/PC1C2.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 270 KiB |
76
basharin_sevastyan_lab_4/README.md
Normal file
76
basharin_sevastyan_lab_4/README.md
Normal file
@ -0,0 +1,76 @@
|
||||
# Лабораторная работа 3. Вариант 5.
|
||||
|
||||
### Как запустить
|
||||
В директории с файлами выполнить следующие команды:
|
||||
+ Запустить Publisher `python publisher.py`.
|
||||
+ Запустить Consumer 1 `python consumer1.py`.
|
||||
+ Запустить Consumer 2 `python consumer2.py`.
|
||||
|
||||
### Описание работы
|
||||
Есть отправитель `publisher` и два слушателя `consumer1` и `consumer2`
|
||||
|
||||
`publisher`, согласно заданию, отправляет сообщения раз в 3 секунды.
|
||||
```python
|
||||
import time
|
||||
import random
|
||||
|
||||
def publish_event(channel, exchange_name):
|
||||
events = ["пришёл заказ", "сообщение от пользователя", "необходимо создать отчёт"]
|
||||
while True:
|
||||
event = random.choice(events)
|
||||
channel.basic_publish(exchange=exchange_name, routing_key='', body=event)
|
||||
print(f"Отправлено событие: {event}")
|
||||
time.sleep(3) # Задержка отправки
|
||||
```
|
||||
|
||||
`consumer1` прослушивает очередь `queue1` раз в 2 секунды.
|
||||
```python
|
||||
import time
|
||||
|
||||
def process_message(ch, method, properties, body): # Функция получения сообщений
|
||||
print(f"Получено сообщение (Consumer 1): {body.decode('utf-8')}")
|
||||
time.sleep(2) # Задержка прослушиваения
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
```
|
||||
Созданная очередь `queue1`
|
||||
![](queue1.png "")
|
||||
|
||||
`consumer2` прослушивает очередь без задержек.
|
||||
```python
|
||||
def process_message(ch, method, properties, body): # Функция получения сообщений
|
||||
print(f"Получено сообщение (Consumer 2): {body.decode('utf-8')}")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
def consume_messages(channel, queue_name, exchange_name): # Функция прослушки очереди
|
||||
channel.queue_declare(queue=queue_name)
|
||||
channel.queue_bind(exchange=exchange_name, queue=queue_name)
|
||||
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
|
||||
|
||||
print("Consumer 2 начал прослушивание сообщений...")
|
||||
channel.start_consuming()
|
||||
```
|
||||
Созданная очередь `queue2`
|
||||
![](queue2.png "")
|
||||
|
||||
Скриншот из `RabbitMQ Management UI`.
|
||||
![](queues.png "")
|
||||
|
||||
### Анализ результатов
|
||||
Сначала разберем первую ситуацию, когда запущены `consumer1` и `consumer2`.
|
||||
![](PC1C2.png "")
|
||||
Тогда график очереди `queue1` будет таким:
|
||||
![](queue1.png "")
|
||||
Если мы параллельно запустим ещё один процесс `consumer1`, то с заданными нами параметрами ситуация изменится слабо,
|
||||
тк в момент обновления `RabbitMQ Management UI` в очереди будет как минимум одно сообщение, но сообщений будет получено
|
||||
больше. Если уменьшить время отправки сообщений, то можно будет отследить изменения в очереди `queue1`.
|
||||
|
||||
Терминалы:
|
||||
![](PC1C1.png "")
|
||||
|
||||
`RabbitMQ Management UI`:
|
||||
![](queue1_c1c1.png "")
|
||||
В данном эксперименте трудно установить, какой из вариантов лучше, но можно установить, что `consumer2` будет показывать
|
||||
ту же эффективность, что и связка `consumer1` + `consumer1` и то, что два `consumer1` будут эффективнее разгружать очередь, чем один `consumer1`.
|
||||
|
||||
### Видео
|
||||
https://youtu.be/GoXtPGZe9jY
|
33
basharin_sevastyan_lab_4/consumer1.py
Normal file
33
basharin_sevastyan_lab_4/consumer1.py
Normal file
@ -0,0 +1,33 @@
|
||||
import pika
|
||||
import time
|
||||
import threading
|
||||
|
||||
def process_message(ch, method, properties, body):
|
||||
print(f"Получено сообщение (Consumer 1): {body.decode('utf-8')}")
|
||||
time.sleep(2)
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
def consume_messages(channel, queue_name, exchange_name):
|
||||
channel.queue_declare(queue=queue_name)
|
||||
channel.queue_bind(exchange=exchange_name, queue=queue_name)
|
||||
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
|
||||
|
||||
print("Consumer 1 начал прослушивание сообщений...")
|
||||
channel.start_consuming()
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
exchange_name = 'logs'
|
||||
queue_name = 'queue1'
|
||||
|
||||
consumer_thread = threading.Thread(target=consume_messages, args=(channel, queue_name, exchange_name))
|
||||
consumer_thread.start()
|
||||
|
||||
consumer_thread.join()
|
||||
|
||||
connection.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
27
basharin_sevastyan_lab_4/consumer2.py
Normal file
27
basharin_sevastyan_lab_4/consumer2.py
Normal file
@ -0,0 +1,27 @@
|
||||
import pika
|
||||
|
||||
def process_message(ch, method, properties, body):
|
||||
print(f"Получено сообщение (Consumer 2): {body.decode('utf-8')}")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
def consume_messages(channel, queue_name, exchange_name):
|
||||
channel.queue_declare(queue=queue_name)
|
||||
channel.queue_bind(exchange=exchange_name, queue=queue_name)
|
||||
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
|
||||
|
||||
print("Consumer 2 начал прослушивание сообщений...")
|
||||
channel.start_consuming()
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
exchange_name = 'logs'
|
||||
queue_name = 'queue2'
|
||||
|
||||
consume_messages(channel, queue_name, exchange_name)
|
||||
|
||||
connection.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
25
basharin_sevastyan_lab_4/publisher.py
Normal file
25
basharin_sevastyan_lab_4/publisher.py
Normal file
@ -0,0 +1,25 @@
|
||||
import pika
|
||||
import time
|
||||
import random
|
||||
|
||||
def publish_event(channel, exchange_name):
|
||||
events = ["пришёл заказ", "сообщение от пользователя", "необходимо создать отчёт"]
|
||||
while True:
|
||||
event = random.choice(events)
|
||||
channel.basic_publish(exchange=exchange_name, routing_key='', body=event)
|
||||
print(f"Отправлено событие: {event}")
|
||||
time.sleep(3)
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
exchange_name = 'logs'
|
||||
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
|
||||
|
||||
publish_event(channel, exchange_name)
|
||||
|
||||
connection.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
BIN
basharin_sevastyan_lab_4/queue1.png
Normal file
BIN
basharin_sevastyan_lab_4/queue1.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 58 KiB |
BIN
basharin_sevastyan_lab_4/queue1_c1c1.png
Normal file
BIN
basharin_sevastyan_lab_4/queue1_c1c1.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 34 KiB |
BIN
basharin_sevastyan_lab_4/queue2.png
Normal file
BIN
basharin_sevastyan_lab_4/queue2.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 69 KiB |
BIN
basharin_sevastyan_lab_4/queues.png
Normal file
BIN
basharin_sevastyan_lab_4/queues.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 21 KiB |
Loading…
Reference in New Issue
Block a user