import itertools from collections.abc import Callable from concurrent.futures import ThreadPoolExecutor from queue import Queue from typing import Tuple, List import numpy as np _SUPLYER_TYPE = Callable[[], int | float] | int | float _QUEUE_TYPE = Queue[Tuple[List[float | int], List[float | int], int]] class Matrix: def __init__(self, size: int, suplyer: _SUPLYER_TYPE = 0): self.__size = size self.__matrix = self._generate_matrix(suplyer) def _generate_matrix(self, suplyer: _SUPLYER_TYPE): if suplyer: match suplyer: case int() | float(): return [[suplyer for _ in range(self.__size)] for _ in range(self.__size)] case Callable(): return [[suplyer() for _ in range(self.__size)] for _ in range(self.__size)] return [[0 for _ in range(self.__size)] for _ in range(self.__size)] def from_flat(self, numbers: List[int | float]): if len(numbers) != self.__size ** 2: raise Exception(f"Invalid matrix size {self.__size} ^ 2 != {len(numbers)}") x, y = 0, 0 for number in numbers: self.__matrix[y][x] = number x += 1 if x >= self.__size: x = 0 y += 1 @property def rows(self): return self.__matrix @property def columns(self): return [[self.__matrix[i][j] for i in range(self.__size)] for j in range(self.__size)] @property def size(self): return self.__size @staticmethod def random(*, size: int): import random return Matrix(size=size, suplyer=random.random) def to_numpy(self): return np.array(self.__matrix) def __eq__(self, other): return (isinstance(other, Matrix) and self.__size == other.__size) def __str__(self): return f"Matrix {self.__size}x{self.__size} \n" + "\n".join([str( " ".join([f"{round(element, 3):<10}" for element in row]) ) for row in self.__matrix]) def __iter__(self): return iter(self.__matrix) def __getitem__(self, index): return self.__matrix[index] def __mul__(self, other): match other: case Matrix(): return mul_matrixs(self, other) case tuple(): other_matrix, count_threads = other return mul_matrixs(self, other_matrix, count_threads) return None def toLU(self, threads: int = 1): L = Matrix(size=self.__size, suplyer=lambda: 0) U = Matrix(size=self.__size, suplyer=lambda: 0) n = self.__size for i in range(n): for k in range(i, n): sum_upper = sum(L[i][j] * U[j][k] for j in range(i)) U[i][k] = self.__matrix[i][k] - sum_upper L[i][i] = 1 for k in range(i + 1, n): sum_lower = sum(L[k][j] * U[j][i] for j in range(i)) L[k][i] = (self.__matrix[k][i] - sum_lower) / U[i][i] return L, U def triangle_det(self): det = 1 for i in range(self.__size): det *= self.__matrix[i][i] return det def det(self, threads=1) -> float: return determinant(self.__matrix, threads) def minor(matrix, i, j): return [row[:j] + row[j + 1:] for row in (matrix[:i] + matrix[i + 1:])] def determinant(matrix: list, threads=1) -> float: if len(matrix) == 1: return matrix[0][0] elif len(matrix) == 2: return matrix[0][0] * matrix[1][1] - matrix[0][1] * matrix[1][0] det = 0 futures = [] with ThreadPoolExecutor(max_workers=threads) as executor: for col in range(len(matrix)): cofactor = (-1) ** col * matrix[0][col] futures.append(executor.submit(lambda m, cof: cof * determinant(m), minor(matrix, 0, col), cofactor)) for future in futures: det += future.result() return det def mul_row_and_column_in_thread(queue: _QUEUE_TYPE) -> list[tuple[int | float, int]]: result = [] while queue.qsize(): local_result = 0 row, column, place = queue.get() for k in range(len(row)): local_result += row[k] * column[k] result.append((local_result, place)) return result def mul_matrixs(m1: Matrix, m2: Matrix, threads: int = 0): if m1.size != m2.size: return None if threads == 0: threads = 1 result = Matrix(size=m1.size, suplyer=0) thread_queues = [Queue() for _ in range(threads)] thread_iterator = 0 for row_m1, column_m2 in itertools.product(m1.rows, m2.columns): thread_queues[thread_iterator].put((row_m1, column_m2, thread_iterator)) thread_iterator += 1 if thread_iterator >= threads: thread_iterator = 0 with ThreadPoolExecutor(max_workers=threads) as executor: flat = [] for item in executor.map(mul_row_and_column_in_thread, thread_queues): flat += item flat.sort(key=lambda x: x[1]) result.from_flat([*map(lambda x: x[0], flat)]) return result