diff --git a/minhasapov_ruslan_lab_4/README.md b/minhasapov_ruslan_lab_4/README.md new file mode 100644 index 0000000..3ea4fa3 --- /dev/null +++ b/minhasapov_ruslan_lab_4/README.md @@ -0,0 +1,44 @@ +# Лабораторная работа №4 +#### ПИбд-42. Минхасапов Руслан. + +--- + +#### Туториал + +- Первый +![Скриншот 1](./pictures/tutor_1.png) + +- Второй +![Скриншот 2](./pictures/tutor_2.png) + +- Третий +![Скриншот 3](./pictures/tutor_3.png) + +--- + +#### Выполнение лабораторной работы + +Выбранная предметная область: Мониторинг серверов + +Сначала запустим продюсера и 1/2 консьюмера. + +Результат: +![Скриншот 4](./pictures/lab_con1&2.png) + +Вывод: +Так как скорость поступления сообщений в очередь первого консьюмера больше, чем скорость его обработки сообщений - очередь довольно быстро переполняется. +Второй же консьюмер моментально обрабатывает входящие сообщения, из-за чего его очередь никогда не заполняется. + +Теперь запустим сначала один экземпляр первого консьюмера, а потом несколько, чтобы они разгрузили накопившуюся очередь. + +Результат: +![Скриншот 5](./pictures/lab_con1&many.png) + +Вывод: +Накопившаяся очередь постепенно разгружается благодаря нескольким одновременно запущенным экземплярам первого консьюмера. + +--- + +#### Демонстрация работы + +Видео по [ссылке](https://disk.yandex.ru/i/zlf_9UFCmSEQgQ) \ No newline at end of file diff --git a/minhasapov_ruslan_lab_4/labwork/consumer_1.py b/minhasapov_ruslan_lab_4/labwork/consumer_1.py new file mode 100644 index 0000000..228e9d0 --- /dev/null +++ b/minhasapov_ruslan_lab_4/labwork/consumer_1.py @@ -0,0 +1,28 @@ +import pika +import time + + +def callback(ch, method, properties, body): + print(f"[x] Consumer 1 received: {body.decode()}") + + time.sleep(3) + + print(f"[x] Consumer 1 processed:: {body.decode()}") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +def consume(): + connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672)) + + channel = connection.channel() + + channel.queue_declare(queue='consumer_1_queue') + channel.queue_bind(exchange='server_monitoring', queue='consumer_1_queue') + + channel.basic_consume(queue='consumer_1_queue', on_message_callback=callback) + + print(' [*] Consumer 1 waiting for messages. To exit press CTRL+C') + channel.start_consuming() + +if __name__ == "__main__": + consume() \ No newline at end of file diff --git a/minhasapov_ruslan_lab_4/labwork/consumer_2.py b/minhasapov_ruslan_lab_4/labwork/consumer_2.py new file mode 100644 index 0000000..a911b82 --- /dev/null +++ b/minhasapov_ruslan_lab_4/labwork/consumer_2.py @@ -0,0 +1,26 @@ +import pika + + +def callback(ch, method, properties, body): + print(f"[x] Consumer 2 received: {body.decode()}") + + print(f"[x] Consumer 2 processed:: {body.decode()}") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +def consume(): + connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672)) + + channel = connection.channel() + + channel.queue_declare(queue='consumer_2_queue') + channel.queue_bind(exchange='server_monitoring', queue='consumer_2_queue') + + channel.basic_consume(queue='consumer_2_queue', on_message_callback=callback) + + print(' [*] Consumer 2 waiting for messages. To exit press CTRL+C') + channel.start_consuming() + + +if __name__ == "__main__": + consume() \ No newline at end of file diff --git a/minhasapov_ruslan_lab_4/labwork/publisher.py b/minhasapov_ruslan_lab_4/labwork/publisher.py new file mode 100644 index 0000000..2ead1d9 --- /dev/null +++ b/minhasapov_ruslan_lab_4/labwork/publisher.py @@ -0,0 +1,28 @@ +import pika +import time +import random + +def generage_message_metrics(): + return f" | CPU: {random.randint(0, 100)}% | RAM: {random.randint(0, 100)}% | CD: {random.randint(0, 100)}% |" + +def produce(): + connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672)) + channel = connection.channel() + + channel.exchange_declare(exchange='server_monitoring', exchange_type='fanout') + + messages = [ + "Server | ID: 1", + "Server | ID: 2", + "Server | ID: 3", + ] + + while True: + for message in messages: + message_with_metric = message + generage_message_metrics() + channel.basic_publish(exchange='server_monitoring', routing_key='', body=message_with_metric) + print(f"[x] Sent: {message_with_metric}") + time.sleep(1) + +if __name__ == "__main__": + produce() \ No newline at end of file diff --git a/minhasapov_ruslan_lab_4/pictures/lab_con1&2.png b/minhasapov_ruslan_lab_4/pictures/lab_con1&2.png new file mode 100644 index 0000000..98a402a Binary files /dev/null and b/minhasapov_ruslan_lab_4/pictures/lab_con1&2.png differ diff --git a/minhasapov_ruslan_lab_4/pictures/lab_con1&many.png b/minhasapov_ruslan_lab_4/pictures/lab_con1&many.png new file mode 100644 index 0000000..a48a7a6 Binary files /dev/null and b/minhasapov_ruslan_lab_4/pictures/lab_con1&many.png differ diff --git a/minhasapov_ruslan_lab_4/pictures/tutor_1.png b/minhasapov_ruslan_lab_4/pictures/tutor_1.png new file mode 100644 index 0000000..53c289b Binary files /dev/null and b/minhasapov_ruslan_lab_4/pictures/tutor_1.png differ diff --git a/minhasapov_ruslan_lab_4/pictures/tutor_2.png b/minhasapov_ruslan_lab_4/pictures/tutor_2.png new file mode 100644 index 0000000..a5209fd Binary files /dev/null and b/minhasapov_ruslan_lab_4/pictures/tutor_2.png differ diff --git a/minhasapov_ruslan_lab_4/pictures/tutor_3.png b/minhasapov_ruslan_lab_4/pictures/tutor_3.png new file mode 100644 index 0000000..787f3b5 Binary files /dev/null and b/minhasapov_ruslan_lab_4/pictures/tutor_3.png differ diff --git a/minhasapov_ruslan_lab_4/tutor_1/receiver.py b/minhasapov_ruslan_lab_4/tutor_1/receiver.py new file mode 100644 index 0000000..eb1efa9 --- /dev/null +++ b/minhasapov_ruslan_lab_4/tutor_1/receiver.py @@ -0,0 +1,28 @@ +import os +import sys +import pika + + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) + channel = connection.channel() + + channel.queue_declare(queue='hello') + + def callback(ch, method, properties, body): + print(f" [x] Received {body}") + + channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) + + print(' [*] Waiting for messages. To exit press CTRL+C') + channel.start_consuming() + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print('Interrupted') + try: + sys.exit(0) + except SystemExit: + os._exit(0) \ No newline at end of file diff --git a/minhasapov_ruslan_lab_4/tutor_1/send.py b/minhasapov_ruslan_lab_4/tutor_1/send.py new file mode 100644 index 0000000..21fb9c2 --- /dev/null +++ b/minhasapov_ruslan_lab_4/tutor_1/send.py @@ -0,0 +1,13 @@ +import pika + +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() + +channel.queue_declare(queue='hello') + +channel.basic_publish(exchange='', + routing_key='hello', + body='Hello World!') +print(" [x] Sent 'Hello World!'") + +connection.close() \ No newline at end of file diff --git a/minhasapov_ruslan_lab_4/tutor_2/new_task.py b/minhasapov_ruslan_lab_4/tutor_2/new_task.py new file mode 100644 index 0000000..a2444e0 --- /dev/null +++ b/minhasapov_ruslan_lab_4/tutor_2/new_task.py @@ -0,0 +1,19 @@ +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) + +message = ' '.join(sys.argv[1:]) or "Hello World!" +channel.basic_publish( + exchange='', + routing_key='task_queue', + body=message, + properties=pika.BasicProperties( + delivery_mode=pika.DeliveryMode.Persistent + )) +print(f" [x] Sent {message}") +connection.close() \ No newline at end of file diff --git a/minhasapov_ruslan_lab_4/tutor_2/worker.py b/minhasapov_ruslan_lab_4/tutor_2/worker.py new file mode 100644 index 0000000..8f8ab64 --- /dev/null +++ b/minhasapov_ruslan_lab_4/tutor_2/worker.py @@ -0,0 +1,22 @@ +import pika +import time + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) +print(' [*] Waiting for messages. To exit press CTRL+C') + + +def callback(ch, method, properties, body): + print(f" [x] Received {body.decode()}") + time.sleep(body.count(b'.')) + print(" [x] Done") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +channel.basic_qos(prefetch_count=1) +channel.basic_consume(queue='task_queue', on_message_callback=callback) + +channel.start_consuming() \ No newline at end of file diff --git a/minhasapov_ruslan_lab_4/tutor_3/emit_log.py b/minhasapov_ruslan_lab_4/tutor_3/emit_log.py new file mode 100644 index 0000000..45b6989 --- /dev/null +++ b/minhasapov_ruslan_lab_4/tutor_3/emit_log.py @@ -0,0 +1,13 @@ +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.exchange_declare(exchange='logs', exchange_type='fanout') + +message = ' '.join(sys.argv[1:]) or "info: Hello World!" +channel.basic_publish(exchange='logs', routing_key='', body=message) +print(f" [x] Sent {message}") +connection.close() \ No newline at end of file diff --git a/minhasapov_ruslan_lab_4/tutor_3/receive_logs.py b/minhasapov_ruslan_lab_4/tutor_3/receive_logs.py new file mode 100644 index 0000000..60d881d --- /dev/null +++ b/minhasapov_ruslan_lab_4/tutor_3/receive_logs.py @@ -0,0 +1,22 @@ +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.exchange_declare(exchange='logs', exchange_type='fanout') + +result = channel.queue_declare(queue='', exclusive=True) +queue_name = result.method.queue + +channel.queue_bind(exchange='logs', queue=queue_name) + +print(' [*] Waiting for logs. To exit press CTRL+C') + +def callback(ch, method, properties, body): + print(f" [x] {body}") + +channel.basic_consume( + queue=queue_name, on_message_callback=callback, auto_ack=True) + +channel.start_consuming() \ No newline at end of file