diff --git a/bazunov_andrew_lab_5/README.md b/bazunov_andrew_lab_5/README.md new file mode 100644 index 0000000..f462f2b --- /dev/null +++ b/bazunov_andrew_lab_5/README.md @@ -0,0 +1,95 @@ +# Распределенные вычисления и приложения Л5 + +## _Автор Базунов Андрей Игревич ПИбд-42_ + +--- +> ### Задание +> - Кратко: реализовать умножение двух больших квадратных матриц. +> - Подробно: в лабораторной работе требуется сделать два алгоритма: обычный и параллельный (задание со * - реализовать + это в рамках одного алгоритма). В параллельном алгоритме предусмотреть ручное задание количества потоков (число + потоков = 1 как раз и реализует задание со *), каждый из которых будет выполнять умножение элементов матрицы в + рамках своей зоны ответственности. +--- + +## Алгоритм умножения: + +### Функция умножения строки на столбец + +
+Код + +```python +def mul_row_and_column_in_thread(queue: _QUEUE_TYPE) -> list[tuple[int | float, int]]: + result = [] + while queue.qsize(): + local_result = 0 + row, column, place = queue.get() + for k in range(len(row)): + local_result += row[k] * column[k] + result.append((local_result, place)) + + return result +``` + +
+ +### Функция распределения вычислений между процессами и сборки результатов в матрицу + +
+Код + +```python +def mul_matrixs(m1: Matrix, m2: Matrix, threads: int = 0): + if m1.size != m2.size: + return None + + if threads == 0: + threads = 1 + + result = Matrix(size=m1.size, suplyer=0) + + thread_queues = [Queue() for _ in range(threads)] + thread_iterator = 0 + + for row_m1, column_m2 in itertools.product(m1.rows, m2.columns): + thread_queues[thread_iterator].put((row_m1, column_m2, thread_iterator)) + thread_iterator += 1 + if thread_iterator >= threads: + thread_iterator = 0 + + with ThreadPoolExecutor(max_workers=threads) as executor: + flat = [] + + for item in executor.map(mul_row_and_column_in_thread, thread_queues): + flat += item + + flat.sort(key=lambda x: x[1]) + result.from_flat([*map(lambda x: x[0], flat)]) + + return result +``` + +
+ +| Размер матрицы | 1 Поток (сек) | 5 Потоков (сек) | 20 Потоков (сек) | +|----------------|---------------|-----------------|------------------| +| 50x50 | 0.00654 | 0.00666 | 0.00685 | +| 100x100 | 0.03809 | 0.03753 | 0.03796 | +| 150x150 | 0.11277 | 0.11239 | 0.11342 | +| 200x200 | 0.24218 | 0.2474 | 0.25167 | +| 250x250 | 0.45891 | 0.46111 | 0.46475 | +| 300x300 | 0.80544 | 0.81466 | 0.8198 | +| 350x350 | 1.28701 | 1.30309 | 1.32854 | +| 400x400 | 1.93252 | 1.97905 | 2.00756 | +| 450x450 | 2.81152 | 2.88893 | 2.94218 | +| 500x500 | 3.87053 | 4.02238 | 4.03649 | +| 550x550 | 5.26902 | 5.43371 | 6.10302 | +| 600x600 | 6.82189 | 7.09791 | 7.17916 | +| 650x650 | 8.80005 | 9.15325 | 11.26757 | +| 700x700 | 11.04467 | 11.50241 | 12.2389 | +| 750x750 | 13.50451 | 14.23333 | 18.4895 | +| 800x800 | 16.5745 | 17.30878 | 20.61224 | +| 850x850 | 19.99281 | 23.19894 | 30.13554 | +| 900x900 | 23.48408 | 24.84928 | 31.74338 | +| 950x950 | 27.84541 | 29.19429 | 41.40166 | +| 1000x1000 | 32.5547 | 34.0334 | 44.55267 | \ No newline at end of file diff --git a/bazunov_andrew_lab_5/main.py b/bazunov_andrew_lab_5/main.py new file mode 100644 index 0000000..9dbb3aa --- /dev/null +++ b/bazunov_andrew_lab_5/main.py @@ -0,0 +1,27 @@ +import time +from collections.abc import Callable + +from matrix import Matrix +from random import random + +_THREADS = 20 + + +def measure_time(func: Callable, *args) -> float: + t1 = time.process_time() + func(*args) + t2 = time.process_time() + return round(t2 - t1, 5) + + +tests = [50 * i for i in range(1, 21)] + +for test in tests: + mt1 = Matrix(size=test, suplyer=random) + mt2 = Matrix(size=test, suplyer=random) + + t1 = measure_time(lambda: mt1 * mt2) + t5 = measure_time(lambda: mt1 * (mt2, 5)) + t20 = measure_time(lambda: mt1 * (mt2, 20)) + + print(f"|{f'{test}x{test}':<16}|{t1:^11}|{t5:^11}|{t20:^12}|") diff --git a/bazunov_andrew_lab_5/matrix.py b/bazunov_andrew_lab_5/matrix.py new file mode 100644 index 0000000..a368bb9 --- /dev/null +++ b/bazunov_andrew_lab_5/matrix.py @@ -0,0 +1,121 @@ +import itertools +from collections.abc import Callable +from concurrent.futures import ThreadPoolExecutor +from queue import Queue +from typing import Tuple, List +import numpy as np + +_SUPLYER_TYPE = Callable[[], int | float] | int | float +_QUEUE_TYPE = Queue[Tuple[List[float | int], List[float | int], int]] + + +class Matrix: + def __init__(self, size: int, suplyer: _SUPLYER_TYPE = 0): + self.__size = size + self.__matrix = self._generate_matrix(suplyer) + + def _generate_matrix(self, suplyer: _SUPLYER_TYPE): + if suplyer: + match suplyer: + case int() | float(): + return [[suplyer for _ in range(self.__size)] for _ in range(self.__size)] + case Callable(): + return [[suplyer() for _ in range(self.__size)] for _ in range(self.__size)] + return [[0 for _ in range(self.__size)] for _ in range(self.__size)] + + def from_flat(self, numbers: List[int | float]): + if len(numbers) != self.__size ** 2: + raise Exception(f"Invalid matrix size {self.__size} ^ 2 != {len(numbers)}") + x, y = 0, 0 + for number in numbers: + self.__matrix[y][x] = number + x += 1 + if x >= self.__size: + x = 0 + y += 1 + + @property + def rows(self): + return self.__matrix + + @property + def columns(self): + return [[self.__matrix[i][j] for i in range(self.__size)] for j in range(self.__size)] + + @property + def size(self): + return self.__size + + @staticmethod + def random(*, size: int): + import random + return Matrix(size=size, suplyer=random.random) + + def to_numpy(self): + return np.array(self.__matrix) + + def __eq__(self, other): + return (isinstance(other, Matrix) + and self.__size == other.__size) + + def __str__(self): + return f"Matrix {self.__size}x{self.__size} \n" + "\n".join([str( + " ".join([f"{element:.5f}" for element in row]) + ) for row in self.__matrix]) + + def __iter__(self): + return iter(self.__matrix) + + def __getitem__(self, index): + return self.__matrix[index] + + def __mul__(self, other): + match other: + case Matrix(): + return mul_matrixs(self, other) + case tuple(): + other_matrix, count_threads = other + return mul_matrixs(self, other_matrix, count_threads) + return None + + +def mul_row_and_column_in_thread(queue: _QUEUE_TYPE) -> list[tuple[int | float, int]]: + result = [] + while queue.qsize(): + local_result = 0 + row, column, place = queue.get() + for k in range(len(row)): + local_result += row[k] * column[k] + result.append((local_result, place)) + + return result + + +def mul_matrixs(m1: Matrix, m2: Matrix, threads: int = 0): + if m1.size != m2.size: + return None + + if threads == 0: + threads = 1 + + result = Matrix(size=m1.size, suplyer=0) + + thread_queues = [Queue() for _ in range(threads)] + thread_iterator = 0 + + for row_m1, column_m2 in itertools.product(m1.rows, m2.columns): + thread_queues[thread_iterator].put((row_m1, column_m2, thread_iterator)) + thread_iterator += 1 + if thread_iterator >= threads: + thread_iterator = 0 + + with ThreadPoolExecutor(max_workers=threads) as executor: + flat = [] + + for item in executor.map(mul_row_and_column_in_thread, thread_queues): + flat += item + + flat.sort(key=lambda x: x[1]) + result.from_flat([*map(lambda x: x[0], flat)]) + + return result