diff --git a/gusev_vladislav_lab_4/README.md b/gusev_vladislav_lab_4/README.md new file mode 100644 index 0000000..8e95bf7 --- /dev/null +++ b/gusev_vladislav_lab_4/README.md @@ -0,0 +1,37 @@ +# Лабораторная работа №4 - Работа с брокером сообщений + +# Задачи: + +Необходимо выбрать предметную область и разработать следующие приложения: + +1) Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт". +2) Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#). +3) Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1. + + +# Как запустить +Командой "python 'название_файла'" в текущей папке + +# Работа программы + +publisher: +![img.png](img.png) + +consumer_2: +![img_1.png](img_1.png) + +consumer_1 в одном экземпляре: +![img_2.png](img_2.png) + +второй consumer_1: +![img_3.png](img_3.png) + +наша первая очередь за 10 минут, на пике зафиксирован запуск второго экземпляра +![img_4.png](img_4.png) + +вторая очередь, пик из-за того, что consumer_2 запустился, когда уже были какие-то сообщения в очереди +![img_5.png](img_5.png) + +Можно сказать, что запуск второго экземпляра consumer_1 скорости уменьшил нагрузку на очередь. + +Видео -> https://drive.google.com/file/d/1JDzdRgUFYzMBYBt0QA5Pho9yvOsu9Ecu/view?usp=sharing \ No newline at end of file diff --git a/gusev_vladislav_lab_4/consumer_1.py b/gusev_vladislav_lab_4/consumer_1.py new file mode 100644 index 0000000..a56c2dc --- /dev/null +++ b/gusev_vladislav_lab_4/consumer_1.py @@ -0,0 +1,33 @@ +import pika +import time +import threading + +def process_message(ch, method, properties, body): + print(f"Получено сообщение (Consumer 1): {body.decode('utf-8')}") + time.sleep(2) + ch.basic_ack(delivery_tag=method.delivery_tag) + +def consume_messages(channel, queue_name, exchange_name): + channel.queue_declare(queue=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + channel.basic_consume(queue=queue_name, on_message_callback=process_message) + + print("Consumer 1 начал прослушивание сообщений...") + channel.start_consuming() + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + queue_name = 'queue1' + + consumer_thread = threading.Thread(target=consume_messages, args=(channel, queue_name, exchange_name)) + consumer_thread.start() + + consumer_thread.join() + + connection.close() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/gusev_vladislav_lab_4/consumer_2.py b/gusev_vladislav_lab_4/consumer_2.py new file mode 100644 index 0000000..66e9119 --- /dev/null +++ b/gusev_vladislav_lab_4/consumer_2.py @@ -0,0 +1,27 @@ +import pika + +def process_message(ch, method, properties, body): + print(f"Получено сообщение (Consumer 2): {body.decode('utf-8')}") + ch.basic_ack(delivery_tag=method.delivery_tag) + +def consume_messages(channel, queue_name, exchange_name): + channel.queue_declare(queue=queue_name) + channel.queue_bind(exchange=exchange_name, queue=queue_name) + channel.basic_consume(queue=queue_name, on_message_callback=process_message) + + print("Consumer 2 начал прослушивание сообщений...") + channel.start_consuming() + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + queue_name = 'queue2' + + consume_messages(channel, queue_name, exchange_name) + + connection.close() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/gusev_vladislav_lab_4/img.png b/gusev_vladislav_lab_4/img.png new file mode 100644 index 0000000..ff50dc3 Binary files /dev/null and b/gusev_vladislav_lab_4/img.png differ diff --git a/gusev_vladislav_lab_4/img_1.png b/gusev_vladislav_lab_4/img_1.png new file mode 100644 index 0000000..1aa5ca2 Binary files /dev/null and b/gusev_vladislav_lab_4/img_1.png differ diff --git a/gusev_vladislav_lab_4/img_2.png b/gusev_vladislav_lab_4/img_2.png new file mode 100644 index 0000000..006ee61 Binary files /dev/null and b/gusev_vladislav_lab_4/img_2.png differ diff --git a/gusev_vladislav_lab_4/img_3.png b/gusev_vladislav_lab_4/img_3.png new file mode 100644 index 0000000..06cfbc1 Binary files /dev/null and b/gusev_vladislav_lab_4/img_3.png differ diff --git a/gusev_vladislav_lab_4/img_4.png b/gusev_vladislav_lab_4/img_4.png new file mode 100644 index 0000000..2ef9afa Binary files /dev/null and b/gusev_vladislav_lab_4/img_4.png differ diff --git a/gusev_vladislav_lab_4/img_5.png b/gusev_vladislav_lab_4/img_5.png new file mode 100644 index 0000000..2317214 Binary files /dev/null and b/gusev_vladislav_lab_4/img_5.png differ diff --git a/gusev_vladislav_lab_4/publisher.py b/gusev_vladislav_lab_4/publisher.py new file mode 100644 index 0000000..ac5b20a --- /dev/null +++ b/gusev_vladislav_lab_4/publisher.py @@ -0,0 +1,25 @@ +import pika +import time +import random + +def publish_event(channel, exchange_name): + events = ["Пришла фотография", "Пришло сообщение", "Необходимо обработать запрос"] + while True: + event = random.choice(events) + channel.basic_publish(exchange=exchange_name, routing_key='', body=event) + print(f"Отправлено событие: {event}") + time.sleep(3) + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) + channel = connection.channel() + + exchange_name = 'logs' + channel.exchange_declare(exchange=exchange_name, exchange_type='fanout') + + publish_event(channel, exchange_name) + + connection.close() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/gusev_vladislav_lab_6/gusev_vladislav_lab_6.py b/gusev_vladislav_lab_6/gusev_vladislav_lab_6.py new file mode 100644 index 0000000..e20d2f7 --- /dev/null +++ b/gusev_vladislav_lab_6/gusev_vladislav_lab_6.py @@ -0,0 +1,49 @@ +import time +import random +from scipy.linalg import det +from multiprocessing import Pool + +def sequential_matrix_determinant(matrix): + return det(matrix) + +def parallel_matrix_determinant_worker(args): + matrix, row_start, row_end = args + submatrix = [row[:min(row_end - row_start, len(matrix[0]))] for row in matrix[row_start:row_end]] + return det(submatrix) + +def parallel_matrix_determinant(matrix, num_processes=2): + num_rows = len(matrix) + chunk_size = num_rows // num_processes + pool = Pool(processes=num_processes) + results = pool.map(parallel_matrix_determinant_worker, [(matrix, i * chunk_size, (i + 1) * chunk_size) for i in range(num_processes)]) + pool.close() + pool.join() + return results[0] + +def run_determinant_test(matrix_size, num_processes=2): + matrix = [[random.random() for _ in range(matrix_size)] for _ in range(matrix_size)] + + start_time = time.time() + result_sequential = sequential_matrix_determinant(matrix) + sequential_time = time.time() - start_time + print(f"Sequential determinant calculation time ({matrix_size}x{matrix_size}): {sequential_time} seconds") + print(f"Sequential determinant result: {result_sequential}") + + start_time = time.time() + result_parallel = parallel_matrix_determinant(matrix, num_processes) + parallel_time = time.time() - start_time + print(f"Parallel determinant calculation time ({matrix_size}x{matrix_size}) with {num_processes} processes: {parallel_time} seconds") + print(f"Parallel determinant result: {result_parallel}") + + +if __name__ == '__main__': + # Тесты для квадратных матриц размером 100x100, 300x300 и 500x500 с разным числом процессов + run_determinant_test(100, num_processes=2) + run_determinant_test(100, num_processes=4) + run_determinant_test(300, num_processes=2) + run_determinant_test(300, num_processes=4) + run_determinant_test(500, num_processes=2) + run_determinant_test(500, num_processes=4) + + +