Merge pull request 'PIbd-21_Bartasova_K.A._LabWork02' (#81) from Ksenia/SSPR_25:bartasova_ksenia_lab_2 into main

в следующих работах ожидаю сравнение с прошлыми реализациями - сравнение времени работы как минимум. чем больше метрик, тем лучше
Reviewed-on: sevastyan_b/SSPR_25#81
This commit is contained in:
2025-03-29 00:31:39 +04:00
5 changed files with 366 additions and 0 deletions

View File

@@ -0,0 +1,147 @@
package org.example.sspr2;
import mpi.MPI;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
public class App {
public static void main(String[] args) throws mpi.MPIException, IOException {
MPI.Init(args);
int rank = MPI.COMM_WORLD.Rank();
int size = MPI.COMM_WORLD.Size();
int matrixSize = 0;
double[][] matrix = null;
FileInteraction file = new FileInteraction();
double sred = 0;
// Процесс 0 генерирует матрицу
if (rank == 0) {
matrixSize = Integer.parseInt(args[args.length - 1]);
Matrix matrixInteraction = new Matrix();
matrix = matrixInteraction.GenerateMassive(matrixSize); // Генерация матрицы
// Вычисление среднего значения
for (var i : matrix) {
for (var j : i) {
sred += j;
}
}
// Отправка sred процессу с rank == 1
double[] sredBuffer = new double[]{sred};
MPI.COMM_WORLD.Send(sredBuffer, 0, 1, MPI.DOUBLE, 1, 0);
}
// Получение sred процессом с rank == 1
if (rank == 1) {
double[] sredBuffer = new double[1];
MPI.COMM_WORLD.Recv(sredBuffer, 0, 1, MPI.DOUBLE, 0, 0);
sred = sredBuffer[0];
System.out.println("Received sred: " + sred);
}
// Передача matrixSize всем процессам
int[] sizeBuffer = new int[]{matrixSize};
MPI.COMM_WORLD.Bcast(sizeBuffer, 0, 1, MPI.INT, 0);
matrixSize = sizeBuffer[0];
// Каждый процесс выделяет память под матрицу
if (rank != 0) {
matrix = new double[matrixSize][matrixSize];
}
// Определяем размер подмассивов
int chunkSize = matrixSize / 2; // Убираем добавление остатка
double[][] im1 = new double[chunkSize][matrixSize];
double[][] im2 = new double[matrixSize - chunkSize][matrixSize];
// Процесс 0 разрезает матрицу и отправляет часть процессу 1
if (rank == 0) {
// Проверка размера matrix перед копированием
if (matrix.length < chunkSize) {
throw new IllegalStateException("Matrix size is smaller than chunkSize");
}
System.arraycopy(matrix, 0, im1, 0, chunkSize);
System.arraycopy(matrix, chunkSize, im2, 0, matrixSize - chunkSize);
for (int i = 0; i < matrixSize - chunkSize; i++) {
MPI.COMM_WORLD.Send(im2[i], 0, matrixSize, MPI.DOUBLE, 1, 0);
}
} else if (rank == 1) {
for (int i = 0; i < matrixSize - chunkSize; i++) {
MPI.COMM_WORLD.Recv(im2[i], 0, matrixSize, MPI.DOUBLE, 0, 0);
}
}
// Обработка данных
if (rank == 0) {
im1 = DoAlgorithm(im1, sred);
} else if (rank == 1) {
im2 = DoAlgorithm(im2, sred);
}
// Отправка обратно в процесс 0
if (rank == 1) {
for (int i = 0; i < matrixSize - chunkSize; i++) {
MPI.COMM_WORLD.Send(im2[i], 0, matrixSize, MPI.DOUBLE, 0, 0);
}
} else if (rank == 0) {
long startTime = System.nanoTime();
for (int i = 0; i < matrixSize - chunkSize; i++) {
MPI.COMM_WORLD.Recv(im2[i], 0, matrixSize, MPI.DOUBLE, 1, 0);
}
// Собираем окончательный массив
for (int i = 0; i < chunkSize; i++) {
matrix[i] = im1[i];
}
for (int i = 0; i < matrixSize - chunkSize; i++) {
matrix[chunkSize + i] = im2[i];
}
file.WriteToFile("output.txt", matrix);
long endTime = System.nanoTime();
long duration = (endTime - startTime);
System.out.println("Matrix processing completed.");
System.out.println("Time taken: " + duration + " ns");
}
MPI.Finalize();
}
private static double[][] DoAlgorithm(double[][] massive, double finalSred) {
Matrix matrixInteraction = new Matrix();
int chunkSize = massive.length / 4;
double[][][] chunks = new double[4][chunkSize][];
// Разделяем массив на части
for (int i = 0; i < 4; i++) {
chunks[i] = Arrays.copyOfRange(massive, i * chunkSize, (i + 1) * chunkSize);
}
CompletableFuture<double[][]> future1 = CompletableFuture.supplyAsync(() -> matrixInteraction.DevideBySred(chunks[0], finalSred));
CompletableFuture<double[][]> future2 = CompletableFuture.supplyAsync(() -> matrixInteraction.DevideBySred(chunks[1], finalSred));
CompletableFuture<double[][]> future3 = CompletableFuture.supplyAsync(() -> matrixInteraction.DevideBySred(chunks[2], finalSred));
CompletableFuture<double[][]> future4 = CompletableFuture.supplyAsync(() -> matrixInteraction.DevideBySred(chunks[3], finalSred));
CompletableFuture.allOf(future1, future2, future3, future4).join();
// Собираем отсортированные части обратно в один массив
double[][] result = new double[massive.length][];
System.arraycopy(future1.join(), 0, result, 0, chunkSize);
System.arraycopy(future2.join(), 0, result, chunkSize, chunkSize);
System.arraycopy(future3.join(), 0, result, 2 * chunkSize, chunkSize);
System.arraycopy(future4.join(), 0, result, 3 * chunkSize, chunkSize);
return result;
}
}

View File

@@ -0,0 +1,132 @@
package org.example.sspr2;
import java.io.*;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class FileInteraction {
public Path currentPath;
private Lock lock = new ReentrantLock();
public FileInteraction() {
currentPath = Paths.get("").toAbsolutePath();
}
public void WriteToFile(String fileName, double[][][] chucks, double[][] massive) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.submit(() -> {
lock.lock();
WriteToFile(fileName);
Path filePath = Paths.get(Paths.get("").toAbsolutePath() + File.separator + fileName);
File file = new File(filePath.toString());
try (Writer writer = new BufferedWriter(new FileWriter(file))) {
for (var i : chucks) {
for (var j : i) {
for (var q : j) {
writer.write(Double.toString(q) + " ");
}
writer.write('\n');
}
}
for (var i : massive) {
for (var j : i) {
writer.write(Double.toString(j) + " ");
}
writer.write('\n');
}
} catch (IOException e) {
throw new RuntimeException(e);
}
finally {
lock.unlock();
}
});
executorService.shutdown();
}
public void WriteToFile(String fileName, double[][] massive) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.submit(() -> {
lock.lock();
WriteToFile(fileName);
Path filePath = Paths.get(Paths.get("").toAbsolutePath() + File.separator + fileName);
File file = new File(filePath.toString());
try (Writer writer = new BufferedWriter(new FileWriter(file))) {
for (var i : massive) {
for (var j : i) {
writer.write(Double.toString(j) + " ");
}
writer.write('\n');
}
} catch (IOException e) {
throw new RuntimeException(e);
}
finally {
lock.unlock();
}
});
executorService.shutdown();
}
public void WriteToFile(String fileName, String data) {
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.submit(() -> {
lock.lock();
WriteToFile(fileName);
Path filePath = Paths.get(Paths.get("").toAbsolutePath() + File.separator + fileName);
File file = new File(filePath.toString());
try (Writer writer = new BufferedWriter(new FileWriter(file))) {
writer.write(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
finally {
lock.unlock();
}
});
executorService.shutdown();
}
public void WriteToFile(String fileName) {
try {
GenerateFile(fileName);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private void GenerateFile(String fileName) throws IOException {
Path filePath = Paths.get(Paths.get("").toAbsolutePath() + File.separator + fileName);
if (Files.exists(filePath)) {
Files.deleteIfExists(filePath);
}
File file = new File(filePath.toString());
try {
file.createNewFile();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,26 @@
package org.example.sspr2;
import java.util.Random;
public class Matrix {
public double[][] GenerateMassive(int n) {
double[][] matrix = new double[n][n];
Random random = new Random();
for (int i = 0; i < n; i++) {
for (int j = 0; j < n; j++) {
matrix[i][j] = random.nextDouble(10);
}
}
return matrix;
}
public double[][] DevideBySred(double[][] massive, double sred) {
double[][] result = new double[massive.length][massive[0].length];
for (int i = 0; i < massive.length; i++) {
for (int j = 0; j < massive[i].length; j++) {
result[i][j] = massive[i][j] / sred; // Делим каждый элемент на среднее значение
}
}
return result;
}
}

View File

@@ -0,0 +1,7 @@
package org.example.sspr2;
public class Program {
public static void main(String[] args) {
System.out.println("+");
}
}

View File

@@ -0,0 +1,54 @@
# Лабораторная работа №2
Разработка параллельного MPI приложения на языке Java.
Необходимо разработать параллельный вариант алгоритма с применением MPI и замерить время его работы. В рамках работы программы должно быть две копии приложения, которые соединяются друг с другом по сети. Сообщения о статусе соединения (например, что соединение установлено) должны выводиться в консоль.
Запуск каждого экземпляра MPI происходит в своём LXC контейнере. Такие сервисы, как Docker, использовать нельзя.
## Вариант 3
Разделить элементы матрицы на среднее арифметическое всех ее элементов.
### Как запустить лабораторную работу:
Команда запускает MPJ Daemon:
>mpjdaemon -boot
MPJ Express запускает вашу программу App на двух процессах (-np 2), программа будет распределена в файле machines. Вводится размер входных данных 1000:
>mpjrun.sh -np 2 -dev niodev -machinesfile machines App -- 1000
### Какие технологии использовали:
Используется библиотека MPJ Express, которая используется для написания параллельных программ, способные выполняться на нескольких процессах или узлах, обмениваясь данными через сообщения.
### Как работает программа:
1. Программа инициализирует MPI. Это обязательный шаг для работы с MPI:
>MPI.Init(args);
2. rank — идентификатор текущего процесса (0, 1); size — общее количество процессов:
>int rank = MPI.COMM_WORLD.Rank();
>int size = MPI.COMM_WORLD.Size();
3. Процесс с рангом 0 генерирует матрицу размером matrixSize x matrixSize.
4. Процесс 0 вычисляет среднее значение всех элементов матрицы.
5. Процесс 0 отправляет значение sred процессу 1 с использованием MPI-функций Send и Recv.
6. Процесс 0 разделяет матрицу на две части: im1 — первая часть (остается у процесса 0); im2 — вторая часть (отправляется процессу 1). Процесс 1 получает свою часть матрицы.
7. Каждый процесс обрабатывает свою часть матрицы с помощью метода DoAlgorithm. Внутри DoAlgorithm матрица делится на 4 части, и каждая часть обрабатывается асинхронно с использованием CompletableFuture.
>if (rank == 0) {
> im1 = DoAlgorithm(im1, sred);
>} else if (rank == 1) {
> im2 = DoAlgorithm(im2, sred);
>}
8. Процесс 1 отправляет обработанную часть матрицы обратно процессу 0.
9. Процесс 0 собирает обработанные части матрицы в одну.
10. Все процессы завершают работу с MPI:
>MPI.Finalize();
### Тесты:
Starting process <0> on < kseniya >
Starting process <1> on < ksenia >
Received sred: 4.997216944501581
Matrix processing completed.
Time taken: 89550398 ns
Stopping Process <0> on < kseniya >
Stopping Process <1> on < ksenia >
### Вывод:
Программа демонстрирует эффективное использование MPI и параллельных вычислений для обработки матриц. Она успешно распределяет задачи между процессами, обрабатывает данные асинхронно. Но MPI требует ручного управления процессами.