Compare commits
No commits in common. "877d246bb5a21650e7a533e2b5f235567abde35b" and "3b9698ac38a9ac2a5f0e2c1efa0fd543cea6df83" have entirely different histories.
877d246bb5
...
3b9698ac38
@ -1,44 +0,0 @@
|
|||||||
# Лабораторная работа №4
|
|
||||||
#### ПИбд-42. Минхасапов Руслан.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
#### Туториал
|
|
||||||
|
|
||||||
- Первый
|
|
||||||
![Скриншот 1](./pictures/tutor_1.png)
|
|
||||||
|
|
||||||
- Второй
|
|
||||||
![Скриншот 2](./pictures/tutor_2.png)
|
|
||||||
|
|
||||||
- Третий
|
|
||||||
![Скриншот 3](./pictures/tutor_3.png)
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
#### Выполнение лабораторной работы
|
|
||||||
|
|
||||||
Выбранная предметная область: Мониторинг серверов
|
|
||||||
|
|
||||||
Сначала запустим продюсера и 1/2 консьюмера.
|
|
||||||
|
|
||||||
Результат:
|
|
||||||
![Скриншот 4](./pictures/lab_con1&2.png)
|
|
||||||
|
|
||||||
Вывод:
|
|
||||||
Так как скорость поступления сообщений в очередь первого консьюмера больше, чем скорость его обработки сообщений - очередь довольно быстро переполняется.
|
|
||||||
Второй же консьюмер моментально обрабатывает входящие сообщения, из-за чего его очередь никогда не заполняется.
|
|
||||||
|
|
||||||
Теперь запустим сначала один экземпляр первого консьюмера, а потом несколько, чтобы они разгрузили накопившуюся очередь.
|
|
||||||
|
|
||||||
Результат:
|
|
||||||
![Скриншот 5](./pictures/lab_con1&many.png)
|
|
||||||
|
|
||||||
Вывод:
|
|
||||||
Накопившаяся очередь постепенно разгружается благодаря нескольким одновременно запущенным экземплярам первого консьюмера.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
#### Демонстрация работы
|
|
||||||
|
|
||||||
Видео по [ссылке](https://disk.yandex.ru/i/zlf_9UFCmSEQgQ)
|
|
@ -1,28 +0,0 @@
|
|||||||
import pika
|
|
||||||
import time
|
|
||||||
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f"[x] Consumer 1 received: {body.decode()}")
|
|
||||||
|
|
||||||
time.sleep(3)
|
|
||||||
|
|
||||||
print(f"[x] Consumer 1 processed:: {body.decode()}")
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
||||||
|
|
||||||
|
|
||||||
def consume():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
|
|
||||||
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='consumer_1_queue')
|
|
||||||
channel.queue_bind(exchange='server_monitoring', queue='consumer_1_queue')
|
|
||||||
|
|
||||||
channel.basic_consume(queue='consumer_1_queue', on_message_callback=callback)
|
|
||||||
|
|
||||||
print(' [*] Consumer 1 waiting for messages. To exit press CTRL+C')
|
|
||||||
channel.start_consuming()
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
consume()
|
|
@ -1,26 +0,0 @@
|
|||||||
import pika
|
|
||||||
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f"[x] Consumer 2 received: {body.decode()}")
|
|
||||||
|
|
||||||
print(f"[x] Consumer 2 processed:: {body.decode()}")
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
||||||
|
|
||||||
|
|
||||||
def consume():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
|
|
||||||
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='consumer_2_queue')
|
|
||||||
channel.queue_bind(exchange='server_monitoring', queue='consumer_2_queue')
|
|
||||||
|
|
||||||
channel.basic_consume(queue='consumer_2_queue', on_message_callback=callback)
|
|
||||||
|
|
||||||
print(' [*] Consumer 2 waiting for messages. To exit press CTRL+C')
|
|
||||||
channel.start_consuming()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
consume()
|
|
@ -1,28 +0,0 @@
|
|||||||
import pika
|
|
||||||
import time
|
|
||||||
import random
|
|
||||||
|
|
||||||
def generage_message_metrics():
|
|
||||||
return f" | CPU: {random.randint(0, 100)}% | RAM: {random.randint(0, 100)}% | CD: {random.randint(0, 100)}% |"
|
|
||||||
|
|
||||||
def produce():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.exchange_declare(exchange='server_monitoring', exchange_type='fanout')
|
|
||||||
|
|
||||||
messages = [
|
|
||||||
"Server | ID: 1",
|
|
||||||
"Server | ID: 2",
|
|
||||||
"Server | ID: 3",
|
|
||||||
]
|
|
||||||
|
|
||||||
while True:
|
|
||||||
for message in messages:
|
|
||||||
message_with_metric = message + generage_message_metrics()
|
|
||||||
channel.basic_publish(exchange='server_monitoring', routing_key='', body=message_with_metric)
|
|
||||||
print(f"[x] Sent: {message_with_metric}")
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
produce()
|
|
Binary file not shown.
Before Width: | Height: | Size: 92 KiB |
Binary file not shown.
Before Width: | Height: | Size: 75 KiB |
Binary file not shown.
Before Width: | Height: | Size: 49 KiB |
Binary file not shown.
Before Width: | Height: | Size: 44 KiB |
Binary file not shown.
Before Width: | Height: | Size: 38 KiB |
@ -1,28 +0,0 @@
|
|||||||
import os
|
|
||||||
import sys
|
|
||||||
import pika
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='hello')
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [x] Received {body}")
|
|
||||||
|
|
||||||
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
|
||||||
|
|
||||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
|
||||||
channel.start_consuming()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
try:
|
|
||||||
main()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print('Interrupted')
|
|
||||||
try:
|
|
||||||
sys.exit(0)
|
|
||||||
except SystemExit:
|
|
||||||
os._exit(0)
|
|
@ -1,13 +0,0 @@
|
|||||||
import pika
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='hello')
|
|
||||||
|
|
||||||
channel.basic_publish(exchange='',
|
|
||||||
routing_key='hello',
|
|
||||||
body='Hello World!')
|
|
||||||
print(" [x] Sent 'Hello World!'")
|
|
||||||
|
|
||||||
connection.close()
|
|
@ -1,19 +0,0 @@
|
|||||||
import pika
|
|
||||||
import sys
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='task_queue', durable=True)
|
|
||||||
|
|
||||||
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
|
||||||
channel.basic_publish(
|
|
||||||
exchange='',
|
|
||||||
routing_key='task_queue',
|
|
||||||
body=message,
|
|
||||||
properties=pika.BasicProperties(
|
|
||||||
delivery_mode=pika.DeliveryMode.Persistent
|
|
||||||
))
|
|
||||||
print(f" [x] Sent {message}")
|
|
||||||
connection.close()
|
|
@ -1,22 +0,0 @@
|
|||||||
import pika
|
|
||||||
import time
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='task_queue', durable=True)
|
|
||||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
|
||||||
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [x] Received {body.decode()}")
|
|
||||||
time.sleep(body.count(b'.'))
|
|
||||||
print(" [x] Done")
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
||||||
|
|
||||||
|
|
||||||
channel.basic_qos(prefetch_count=1)
|
|
||||||
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
|
||||||
|
|
||||||
channel.start_consuming()
|
|
@ -1,13 +0,0 @@
|
|||||||
import pika
|
|
||||||
import sys
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
|
||||||
|
|
||||||
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
|
||||||
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
|
||||||
print(f" [x] Sent {message}")
|
|
||||||
connection.close()
|
|
@ -1,22 +0,0 @@
|
|||||||
import pika
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
|
||||||
|
|
||||||
result = channel.queue_declare(queue='', exclusive=True)
|
|
||||||
queue_name = result.method.queue
|
|
||||||
|
|
||||||
channel.queue_bind(exchange='logs', queue=queue_name)
|
|
||||||
|
|
||||||
print(' [*] Waiting for logs. To exit press CTRL+C')
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [x] {body}")
|
|
||||||
|
|
||||||
channel.basic_consume(
|
|
||||||
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
|
||||||
|
|
||||||
channel.start_consuming()
|
|
Loading…
Reference in New Issue
Block a user