gusev_vladislav_lab_4 is ready

This commit is contained in:
vladg 2023-12-13 15:01:25 +04:00
parent f2adafe2e1
commit b75a601ab9
11 changed files with 171 additions and 0 deletions

View File

@ -0,0 +1,37 @@
# Лабораторная работа №4 - Работа с брокером сообщений
# Задачи:
Необходимо выбрать предметную область и разработать следующие приложения:
1) Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт".
2) Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#).
3) Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1.
# Как запустить
Командой "python 'названиеайла'" в текущей папке
# Работа программы
publisher:
![img.png](img.png)
consumer_2:
![img_1.png](img_1.png)
consumer_1 в одном экземпляре:
![img_2.png](img_2.png)
второй consumer_1:
![img_3.png](img_3.png)
наша первая очередь за 10 минут, на пике зафиксирован запуск второго экземпляра
![img_4.png](img_4.png)
вторая очередь, пик из-за того, что consumer_2 запустился, когда уже были какие-то сообщения в очереди
![img_5.png](img_5.png)
Можно сказать, что запуск второго экземпляра consumer_1 скорости уменьшил нагрузку на очередь.
Видео -> https://drive.google.com/file/d/1JDzdRgUFYzMBYBt0QA5Pho9yvOsu9Ecu/view?usp=sharing

View File

@ -0,0 +1,33 @@
import pika
import time
import threading
def process_message(ch, method, properties, body):
print(f"Получено сообщение (Consumer 1): {body.decode('utf-8')}")
time.sleep(2)
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume_messages(channel, queue_name, exchange_name):
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
print("Consumer 1 начал прослушивание сообщений...")
channel.start_consuming()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'queue1'
consumer_thread = threading.Thread(target=consume_messages, args=(channel, queue_name, exchange_name))
consumer_thread.start()
consumer_thread.join()
connection.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,27 @@
import pika
def process_message(ch, method, properties, body):
print(f"Получено сообщение (Consumer 2): {body.decode('utf-8')}")
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume_messages(channel, queue_name, exchange_name):
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
print("Consumer 2 начал прослушивание сообщений...")
channel.start_consuming()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'queue2'
consume_messages(channel, queue_name, exchange_name)
connection.close()
if __name__ == '__main__':
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 30 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.3 KiB

View File

@ -0,0 +1,25 @@
import pika
import time
import random
def publish_event(channel, exchange_name):
events = ["Пришла фотография", "Пришло сообщение", "Необходимо обработать запрос"]
while True:
event = random.choice(events)
channel.basic_publish(exchange=exchange_name, routing_key='', body=event)
print(f"Отправлено событие: {event}")
time.sleep(3)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
publish_event(channel, exchange_name)
connection.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,49 @@
import time
import random
from scipy.linalg import det
from multiprocessing import Pool
def sequential_matrix_determinant(matrix):
return det(matrix)
def parallel_matrix_determinant_worker(args):
matrix, row_start, row_end = args
submatrix = [row[:min(row_end - row_start, len(matrix[0]))] for row in matrix[row_start:row_end]]
return det(submatrix)
def parallel_matrix_determinant(matrix, num_processes=2):
num_rows = len(matrix)
chunk_size = num_rows // num_processes
pool = Pool(processes=num_processes)
results = pool.map(parallel_matrix_determinant_worker, [(matrix, i * chunk_size, (i + 1) * chunk_size) for i in range(num_processes)])
pool.close()
pool.join()
return results[0]
def run_determinant_test(matrix_size, num_processes=2):
matrix = [[random.random() for _ in range(matrix_size)] for _ in range(matrix_size)]
start_time = time.time()
result_sequential = sequential_matrix_determinant(matrix)
sequential_time = time.time() - start_time
print(f"Sequential determinant calculation time ({matrix_size}x{matrix_size}): {sequential_time} seconds")
print(f"Sequential determinant result: {result_sequential}")
start_time = time.time()
result_parallel = parallel_matrix_determinant(matrix, num_processes)
parallel_time = time.time() - start_time
print(f"Parallel determinant calculation time ({matrix_size}x{matrix_size}) with {num_processes} processes: {parallel_time} seconds")
print(f"Parallel determinant result: {result_parallel}")
if __name__ == '__main__':
# Тесты для квадратных матриц размером 100x100, 300x300 и 500x500 с разным числом процессов
run_determinant_test(100, num_processes=2)
run_determinant_test(100, num_processes=4)
run_determinant_test(300, num_processes=2)
run_determinant_test(300, num_processes=4)
run_determinant_test(500, num_processes=2)
run_determinant_test(500, num_processes=4)