forked from Alexey/DAS_2024_1
complete lab
This commit is contained in:
parent
3b9698ac38
commit
4b86dfd750
95
bazunov_andrew_lab_5/README.md
Normal file
95
bazunov_andrew_lab_5/README.md
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
# Распределенные вычисления и приложения Л5
|
||||||
|
|
||||||
|
## _Автор Базунов Андрей Игревич ПИбд-42_
|
||||||
|
|
||||||
|
---
|
||||||
|
> ### Задание
|
||||||
|
> - Кратко: реализовать умножение двух больших квадратных матриц.
|
||||||
|
> - Подробно: в лабораторной работе требуется сделать два алгоритма: обычный и параллельный (задание со * - реализовать
|
||||||
|
это в рамках одного алгоритма). В параллельном алгоритме предусмотреть ручное задание количества потоков (число
|
||||||
|
потоков = 1 как раз и реализует задание со *), каждый из которых будет выполнять умножение элементов матрицы в
|
||||||
|
рамках своей зоны ответственности.
|
||||||
|
---
|
||||||
|
|
||||||
|
## Алгоритм умножения:
|
||||||
|
|
||||||
|
### Функция умножения строки на столбец
|
||||||
|
|
||||||
|
<details>
|
||||||
|
<summary>Код</summary>
|
||||||
|
|
||||||
|
```python
|
||||||
|
def mul_row_and_column_in_thread(queue: _QUEUE_TYPE) -> list[tuple[int | float, int]]:
|
||||||
|
result = []
|
||||||
|
while queue.qsize():
|
||||||
|
local_result = 0
|
||||||
|
row, column, place = queue.get()
|
||||||
|
for k in range(len(row)):
|
||||||
|
local_result += row[k] * column[k]
|
||||||
|
result.append((local_result, place))
|
||||||
|
|
||||||
|
return result
|
||||||
|
```
|
||||||
|
|
||||||
|
</details>
|
||||||
|
|
||||||
|
### Функция распределения вычислений между процессами и сборки результатов в матрицу
|
||||||
|
|
||||||
|
<details>
|
||||||
|
<summary>Код</summary>
|
||||||
|
|
||||||
|
```python
|
||||||
|
def mul_matrixs(m1: Matrix, m2: Matrix, threads: int = 0):
|
||||||
|
if m1.size != m2.size:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if threads == 0:
|
||||||
|
threads = 1
|
||||||
|
|
||||||
|
result = Matrix(size=m1.size, suplyer=0)
|
||||||
|
|
||||||
|
thread_queues = [Queue() for _ in range(threads)]
|
||||||
|
thread_iterator = 0
|
||||||
|
|
||||||
|
for row_m1, column_m2 in itertools.product(m1.rows, m2.columns):
|
||||||
|
thread_queues[thread_iterator].put((row_m1, column_m2, thread_iterator))
|
||||||
|
thread_iterator += 1
|
||||||
|
if thread_iterator >= threads:
|
||||||
|
thread_iterator = 0
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=threads) as executor:
|
||||||
|
flat = []
|
||||||
|
|
||||||
|
for item in executor.map(mul_row_and_column_in_thread, thread_queues):
|
||||||
|
flat += item
|
||||||
|
|
||||||
|
flat.sort(key=lambda x: x[1])
|
||||||
|
result.from_flat([*map(lambda x: x[0], flat)])
|
||||||
|
|
||||||
|
return result
|
||||||
|
```
|
||||||
|
|
||||||
|
</details>
|
||||||
|
|
||||||
|
| Размер матрицы | 1 Поток (сек) | 5 Потоков (сек) | 20 Потоков (сек) |
|
||||||
|
|----------------|---------------|-----------------|------------------|
|
||||||
|
| 50x50 | 0.00654 | 0.00666 | 0.00685 |
|
||||||
|
| 100x100 | 0.03809 | 0.03753 | 0.03796 |
|
||||||
|
| 150x150 | 0.11277 | 0.11239 | 0.11342 |
|
||||||
|
| 200x200 | 0.24218 | 0.2474 | 0.25167 |
|
||||||
|
| 250x250 | 0.45891 | 0.46111 | 0.46475 |
|
||||||
|
| 300x300 | 0.80544 | 0.81466 | 0.8198 |
|
||||||
|
| 350x350 | 1.28701 | 1.30309 | 1.32854 |
|
||||||
|
| 400x400 | 1.93252 | 1.97905 | 2.00756 |
|
||||||
|
| 450x450 | 2.81152 | 2.88893 | 2.94218 |
|
||||||
|
| 500x500 | 3.87053 | 4.02238 | 4.03649 |
|
||||||
|
| 550x550 | 5.26902 | 5.43371 | 6.10302 |
|
||||||
|
| 600x600 | 6.82189 | 7.09791 | 7.17916 |
|
||||||
|
| 650x650 | 8.80005 | 9.15325 | 11.26757 |
|
||||||
|
| 700x700 | 11.04467 | 11.50241 | 12.2389 |
|
||||||
|
| 750x750 | 13.50451 | 14.23333 | 18.4895 |
|
||||||
|
| 800x800 | 16.5745 | 17.30878 | 20.61224 |
|
||||||
|
| 850x850 | 19.99281 | 23.19894 | 30.13554 |
|
||||||
|
| 900x900 | 23.48408 | 24.84928 | 31.74338 |
|
||||||
|
| 950x950 | 27.84541 | 29.19429 | 41.40166 |
|
||||||
|
| 1000x1000 | 32.5547 | 34.0334 | 44.55267 |
|
27
bazunov_andrew_lab_5/main.py
Normal file
27
bazunov_andrew_lab_5/main.py
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
import time
|
||||||
|
from collections.abc import Callable
|
||||||
|
|
||||||
|
from matrix import Matrix
|
||||||
|
from random import random
|
||||||
|
|
||||||
|
_THREADS = 20
|
||||||
|
|
||||||
|
|
||||||
|
def measure_time(func: Callable, *args) -> float:
|
||||||
|
t1 = time.process_time()
|
||||||
|
func(*args)
|
||||||
|
t2 = time.process_time()
|
||||||
|
return round(t2 - t1, 5)
|
||||||
|
|
||||||
|
|
||||||
|
tests = [50 * i for i in range(1, 21)]
|
||||||
|
|
||||||
|
for test in tests:
|
||||||
|
mt1 = Matrix(size=test, suplyer=random)
|
||||||
|
mt2 = Matrix(size=test, suplyer=random)
|
||||||
|
|
||||||
|
t1 = measure_time(lambda: mt1 * mt2)
|
||||||
|
t5 = measure_time(lambda: mt1 * (mt2, 5))
|
||||||
|
t20 = measure_time(lambda: mt1 * (mt2, 20))
|
||||||
|
|
||||||
|
print(f"|{f'{test}x{test}':<16}|{t1:^11}|{t5:^11}|{t20:^12}|")
|
121
bazunov_andrew_lab_5/matrix.py
Normal file
121
bazunov_andrew_lab_5/matrix.py
Normal file
@ -0,0 +1,121 @@
|
|||||||
|
import itertools
|
||||||
|
from collections.abc import Callable
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from queue import Queue
|
||||||
|
from typing import Tuple, List
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
_SUPLYER_TYPE = Callable[[], int | float] | int | float
|
||||||
|
_QUEUE_TYPE = Queue[Tuple[List[float | int], List[float | int], int]]
|
||||||
|
|
||||||
|
|
||||||
|
class Matrix:
|
||||||
|
def __init__(self, size: int, suplyer: _SUPLYER_TYPE = 0):
|
||||||
|
self.__size = size
|
||||||
|
self.__matrix = self._generate_matrix(suplyer)
|
||||||
|
|
||||||
|
def _generate_matrix(self, suplyer: _SUPLYER_TYPE):
|
||||||
|
if suplyer:
|
||||||
|
match suplyer:
|
||||||
|
case int() | float():
|
||||||
|
return [[suplyer for _ in range(self.__size)] for _ in range(self.__size)]
|
||||||
|
case Callable():
|
||||||
|
return [[suplyer() for _ in range(self.__size)] for _ in range(self.__size)]
|
||||||
|
return [[0 for _ in range(self.__size)] for _ in range(self.__size)]
|
||||||
|
|
||||||
|
def from_flat(self, numbers: List[int | float]):
|
||||||
|
if len(numbers) != self.__size ** 2:
|
||||||
|
raise Exception(f"Invalid matrix size {self.__size} ^ 2 != {len(numbers)}")
|
||||||
|
x, y = 0, 0
|
||||||
|
for number in numbers:
|
||||||
|
self.__matrix[y][x] = number
|
||||||
|
x += 1
|
||||||
|
if x >= self.__size:
|
||||||
|
x = 0
|
||||||
|
y += 1
|
||||||
|
|
||||||
|
@property
|
||||||
|
def rows(self):
|
||||||
|
return self.__matrix
|
||||||
|
|
||||||
|
@property
|
||||||
|
def columns(self):
|
||||||
|
return [[self.__matrix[i][j] for i in range(self.__size)] for j in range(self.__size)]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def size(self):
|
||||||
|
return self.__size
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def random(*, size: int):
|
||||||
|
import random
|
||||||
|
return Matrix(size=size, suplyer=random.random)
|
||||||
|
|
||||||
|
def to_numpy(self):
|
||||||
|
return np.array(self.__matrix)
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
return (isinstance(other, Matrix)
|
||||||
|
and self.__size == other.__size)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"Matrix {self.__size}x{self.__size} \n" + "\n".join([str(
|
||||||
|
" ".join([f"{element:.5f}" for element in row])
|
||||||
|
) for row in self.__matrix])
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(self.__matrix)
|
||||||
|
|
||||||
|
def __getitem__(self, index):
|
||||||
|
return self.__matrix[index]
|
||||||
|
|
||||||
|
def __mul__(self, other):
|
||||||
|
match other:
|
||||||
|
case Matrix():
|
||||||
|
return mul_matrixs(self, other)
|
||||||
|
case tuple():
|
||||||
|
other_matrix, count_threads = other
|
||||||
|
return mul_matrixs(self, other_matrix, count_threads)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def mul_row_and_column_in_thread(queue: _QUEUE_TYPE) -> list[tuple[int | float, int]]:
|
||||||
|
result = []
|
||||||
|
while queue.qsize():
|
||||||
|
local_result = 0
|
||||||
|
row, column, place = queue.get()
|
||||||
|
for k in range(len(row)):
|
||||||
|
local_result += row[k] * column[k]
|
||||||
|
result.append((local_result, place))
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def mul_matrixs(m1: Matrix, m2: Matrix, threads: int = 0):
|
||||||
|
if m1.size != m2.size:
|
||||||
|
return None
|
||||||
|
|
||||||
|
if threads == 0:
|
||||||
|
threads = 1
|
||||||
|
|
||||||
|
result = Matrix(size=m1.size, suplyer=0)
|
||||||
|
|
||||||
|
thread_queues = [Queue() for _ in range(threads)]
|
||||||
|
thread_iterator = 0
|
||||||
|
|
||||||
|
for row_m1, column_m2 in itertools.product(m1.rows, m2.columns):
|
||||||
|
thread_queues[thread_iterator].put((row_m1, column_m2, thread_iterator))
|
||||||
|
thread_iterator += 1
|
||||||
|
if thread_iterator >= threads:
|
||||||
|
thread_iterator = 0
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(max_workers=threads) as executor:
|
||||||
|
flat = []
|
||||||
|
|
||||||
|
for item in executor.map(mul_row_and_column_in_thread, thread_queues):
|
||||||
|
flat += item
|
||||||
|
|
||||||
|
flat.sort(key=lambda x: x[1])
|
||||||
|
result.from_flat([*map(lambda x: x[0], flat)])
|
||||||
|
|
||||||
|
return result
|
Loading…
Reference in New Issue
Block a user