Merge pull request 'shadaev_anton_lab_4' (#84) from shadaev_anton_lab_4 into main

Reviewed-on: http://student.git.athene.tech/Alexey/DAS_2023_1/pulls/84
This commit is contained in:
Alexey 2023-12-28 10:43:58 +04:00
commit aeee4e3f7e
26 changed files with 351 additions and 0 deletions

View File

@ -0,0 +1,37 @@
# Лабораторная работа №4 - Работа с брокером сообщений
# Задачи:
Необходимо выбрать предметную область и разработать следующие приложения:
1) Publisher. Программа, которая создаёт один exchange с типом fanout. Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области. Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт".
2) Consumer 1. Программа, которая создаёт под себя отдельную не анонимную (!) очередь (queue) (то есть имя queue НЕ пустая строка), создаёт binding на exchange и начинает принимать сообщения (consume). Программа должна обрабатывать сообщения 2-3 секунды. Можно реализовать через обычный Thread.Sleep (для C#).
3) Consumer 2. Аналогично Consumer 1, только сообщения необходимо обрабатывать моментально. Только имя очереди должно отличаться от Consumer 1.
# Как запустить
Запустить `python 'названиеайла'`
# Работа программы
**publisher:**
![img.png](screenshots/img.png)
**consumer_1:**
![img_1.png](screenshots/img_1.png)
**consumer_2:**
![img_2.png](screenshots/img_2.png)
**1-я очередь**
![img_3.png](screenshots/img_3.png)
**2-я очередь**
![img_4.png](screenshots/img_4.png)
Видео: https://youtu.be/3pxgHmm3i0Q

View File

@ -0,0 +1,37 @@
import pika
import time
import threading
def process_message(ch, method, properties, body):
print(f"Получена вакансия (Consumer 1): {body.decode('utf-8')}")
time.sleep(2)
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume_messages(channel, queue_name, exchange_name):
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
print("Consumer 1 начал прослушивание сообщений...")
channel.start_consuming()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'queue1'
consumer_thread = threading.Thread(target=consume_messages, args=(channel, queue_name, exchange_name))
consumer_thread.start()
consumer_thread.join()
connection.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,31 @@
import pika
def process_message(ch, method, properties, body):
print(f"Получена вакансия (Consumer 2): {body.decode('utf-8')}")
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume_messages(channel, queue_name, exchange_name):
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange=exchange_name, queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=process_message)
print("Consumer 2 начал прослушивание сообщений...")
channel.start_consuming()
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
queue_name = 'queue2'
consume_messages(channel, queue_name, exchange_name)
connection.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,28 @@
import pika
import time
import random
def publish_event(channel, exchange_name):
events = ["Пришла страница", "Пришла вакансия", "Необходимо обработать запрос"]
while True:
event = random.choice(events)
channel.basic_publish(exchange=exchange_name, routing_key='', body=event)
print(f"Отправлено событие: {event}")
time.sleep(3)
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
exchange_name = 'logs'
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
publish_event(channel, exchange_name)
connection.close()
if __name__ == '__main__':
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 95 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 151 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 162 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 85 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

View File

@ -0,0 +1,34 @@
# Лабораторная работа №5 - Параллельное умножение матриц
Цель:
1. Реализовать алгоритм последовательного умножения матриц
2. Реализовать алгоритм параллельного умножения матриц
## Запуск
Запускаем скрипт `main.py`, вывод будет консольным.
## Код:
Последовательное умножение:
![img.png](screenshots/img.png)
Параллельное умножение:
![img_2.png](screenshots/img_2.png)
![img_3.png](screenshots/img_3.png)
Тесты:
![img_4.png](screenshots/img_4.png)
# Работа программы
Вывод:
![img_5.png](screenshots/img_5.png)
Таким образом, параллельное умножение дало преимущество при переменожении матриц большей размерности,
а на матрицах меньших размерностей не давало никаких преимуществ, а даже, наоборот, проигрывало последовательному перемножению.
Это связано с тем, что больше времени ушло на переключение между ядрами процессора.
Видео: https://youtu.be/f4ayPI423n0

BIN
shadaev_anton_lab_5/img.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

View File

@ -0,0 +1,73 @@
import numpy as np
import time
import multiprocessing
def sequential_matrix_multiply(matrix_a, matrix_b):
result = np.zeros((len(matrix_a), len(matrix_b[0])))
for i in range(len(matrix_a)):
for j in range(len(matrix_b[0])):
for k in range(len(matrix_b)):
result[i][j] += matrix_a[i][k] * matrix_b[k][j]
return result
def parallel_matrix_multiply_worker(args):
matrix_a, matrix_b, row_start, row_end, result = args
local_result = np.zeros((row_end - row_start, len(matrix_b[0])))
for i in range(row_start, row_end):
for j in range(len(matrix_b[0])):
for k in range(len(matrix_b)):
local_result[i - row_start][j] += matrix_a[i][k] * matrix_b[k][j]
result.extend(local_result)
def parallel_matrix_multiply(matrix_a, matrix_b, num_processes=2):
num_rows_a = len(matrix_a)
chunk_size = num_rows_a // num_processes
processes = []
manager = multiprocessing.Manager()
result = manager.list()
for i in range(num_processes):
row_start = i * chunk_size
row_end = (i + 1) * chunk_size if i < num_processes - 1 else num_rows_a
process_args = (matrix_a, matrix_b, row_start, row_end, result)
process = multiprocessing.Process(target=parallel_matrix_multiply_worker, args=(process_args,))
processes.append(process)
for process in processes:
process.start()
for process in processes:
process.join()
return np.vstack(result)
def run_test(matrix_size, num_processes=2):
matrix_a = np.random.rand(matrix_size, matrix_size)
matrix_b = np.random.rand(matrix_size, matrix_size)
start_time = time.time()
result_sequential = sequential_matrix_multiply(matrix_a, matrix_b)
sequential_time = time.time() - start_time
print(f"Последовательноe умножение заняло ({matrix_size}x{matrix_size}): {sequential_time} секунд")
start_time = time.time()
result_parallel = parallel_matrix_multiply(matrix_a, matrix_b, num_processes)
parallel_time = time.time() - start_time
print(
f"Параллельное умножение матрицы ({matrix_size}x{matrix_size}) с {num_processes} потоками заняло: {parallel_time} секунд")
print("========================================")
# Тесты для матриц размером 100x100, 300x300 и 500x500 с разным числом процессов
# Бенчмарки для матриц размером 100, 300, 500 строк
if __name__ == '__main__':
run_test(100, num_processes=2)
run_test(100, num_processes=4)
run_test(300, num_processes=2)
run_test(300, num_processes=4)
run_test(500, num_processes=2)
run_test(500, num_processes=4)

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 106 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 114 KiB

View File

@ -0,0 +1,27 @@
# Лабораторная работа №6
Цель:
1. Реализовать нахождение детерминанта квадратной матрицы.
## Запуск
Запускаем скрипт `main.py`, вывод будет консольным.
## Код:
![img.png](img.png)
`determinant_block`: Эта функция принимает квадратный блок матрицы и использует функцию np.linalg.det из библиотеки NumPy для вычисления его детерминанта.
![img_1.png](img_1.png)
`determinant_parallel`: Эта функция осуществляет разбиение исходной матрицы на квадратные блоки, а затем применяет библиотеку multiprocessing для параллельного вычисления детерминантов блоков. Путем создания пула процессов, она разделяет матрицу на блоки и передает каждый блок в пул процессов для параллельного вычисления детерминанта. После этого результаты собираются и перемножаются для получения общего детерминанта матрицы.
![img_2.png](img_2.png)
В блоке `if __name__ == "__main__":` генерируется случайная матрица указанного размера, после чего функция determinant_parallel вызывается с различным числом процессов (num_processes). Замеряется время выполнения для каждого размера матрицы и каждого количества процессов. Также проводится измерение времени выполнения для последовательного (однопоточного) вычисления детерминанта с использованием функции determinant_block.
# Работа программы
Вывод:
![img.png](screenshots/img.png)
Из полученных результатов видно, что реализованный алгоритм не проявляет эффективность при использовании различного числа потоков для указанных размеров матрицы. Вероятно, это связано с процессом разбиения квадратной матрицы на более мелкие блоки во время вычислений.

BIN
shadaev_anton_lab_6/img.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 79 KiB

View File

@ -0,0 +1,40 @@
import numpy as np
from multiprocessing import Pool
import time
def determinant_block(matrix_block):
return np.linalg.det(matrix_block)
def determinant_parallel(matrix, num_processes):
size = matrix.shape[0]
step = size // num_processes
pool = Pool(processes=num_processes)
blocks = []
for i in range(0, size, step):
blocks.append(matrix[i:i+step, i:i+step])
dets = pool.map(determinant_block, blocks)
return np.product(dets)
if __name__ == "__main__":
sizes = [100, 300, 500]
processes = [2, 4, 8]
for size in sizes:
matrix = np.random.rand(size, size)
for p in processes:
start = time.time()
det = determinant_parallel(matrix, p)
end = time.time()
print(f"{size}x{size} матрица с {p} процессами заняла {end - start:.5f} сек")
start = time.time()
det_seq = determinant_block(matrix)
end = time.time()
print(f"{size}x{size} умн. последовательно заняло {end - start:.5f} сек")
print("=======================================")

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

View File

@ -0,0 +1,21 @@
# Лабораторная работа № 7. Балансировка нагрузки в распределённых системах при помощи открытых технологий на примерах
# Задание
Написать небольшое эссе (буквально несколько абзацев) своими словами (пожалуйста не пользуйтесь гуглом :). А помогут Вам в этом вопросы из списка:
1) Какие алгоритмы и методы используются для балансировки нагрузки?
2) Какие открытые технологии существуют для балансировки нагрузки?
3) Как осуществляется балансировка нагрузки на базах данных?
4) Реверс-прокси как один из элементов балансировки нагрузки.
# Эссе
Балансировка нагрузки представляет собой стратегию распределения трафика или задач равномерно между ресурсами, с целью предотвращения перегрузок и обеспечения оптимального использования вычислительных ресурсов, таких как компьютеры, серверы и др.
Существует несколько методов и алгоритмов для осуществления балансировки. Например, метод Round Robin (RR) просто распределяет нагрузку по кругу, в порядке очереди. Взвешенный Round Robin добавляет веса, чтобы более мощные ресурсы получали больше задач.
В контексте балансировки нагрузки мы изучили веб-сервер Nginx, который отличается простотой использования и эффективностью. С его помощью можно настроить распределение запросов для оптимального использования ресурсов.
В области баз данных балансировка нагрузки имеет большое значение. Это позволяет распределить данные между серверами, что приводит к улучшению производительности и масштабируемости.
Реверс-прокси, установленный перед серверами, фильтрует входящий трафик. Он способен распределять запросы, скрывать реальные серверы и даже выполнять кэширование данных.

View File

@ -0,0 +1,23 @@
# Лабораторная работа № 8. Как Вы поняли, что называется распределенной системой и как она устроена?
# Задание
Написать небольшое эссе (буквально несколько абзацев) своими словами (пожалуйста не пользуйтесь гуглом :) ) на тему "Устройство распределенных систем". А помогут Вам в этом вопросы из списка:
1) Зачем сложные системы (например, социальная сеть ВКонтакте) пишутся в "распределенном" стиле, где каждое отдельное приложение (или сервис) функционально выполняет только ограниченный спектр задач?
2) Для чего были созданы системы оркестрации приложений? Каким образом они упрощают / усложняют разработку и сопровождение распределенных систем?
3) Для чего нужны очереди обработки сообщений и что может подразумеваться под сообщениями?
4) Какие преимущества и недостатки распределенных приложений существуют на Ваш взгляд?
5) Целесообразно ли в сложную распределенную систему внедрять параллельные вычисления? Приведите примеры, когда это действительно нужно, а когда нет.
# Эссе
Сложные системы, такие как социальная сеть ВКонтакте, используют распределенный стиль для эффективного распределения нагрузки и обеспечения масштабируемости. Каждое отдельное приложение или сервис выполняет ограниченный спектр задач, что позволяет легче масштабировать и поддерживать систему.
Системы оркестрации приложений созданы для управления и координирования распределенными сервисами. Они упрощают разработку, позволяя легко масштабировать и изменять состав сервисов. Однако, вместе с тем, они могут усложнять разработку и сопровождение из-за необходимости управления конфигурациями и обработки сложных сценариев взаимодействия.
Очереди обработки сообщений необходимы для асинхронного взаимодействия между компонентами распределенной системы. Они позволяют разделить производителей и потребителей сообщений, обеспечивая гибкость и отказоустойчивость. Под сообщениями может подразумеваться разнообразная информация, например, запросы на выполнение задач или данные для обмена между сервисами.
Распределенные приложения обладают преимуществами, такими как масштабируемость и отказоустойчивость, но также сопряжены с недостатками. Преимущества включают возможность эффективного использования ресурсов и обработку больших объемов данных. Однако, недостатки могут включать сложность разработки, поддержки и управления консистентностью данных.
Внедрение параллельных вычислений в сложную распределенную систему целесообразно в зависимости от конкретных требований. Например, параллельные вычисления могут быть полезны при обработке больших объемов данных или выполнении вычислительно сложных задач. Однако, это может быть излишним, если система имеет небольшую нагрузку или задачи не требуют параллельной обработки.