Лабораторная работа №2
Описание
Программа реализует параллельную обработку данных с использованием MPI (MPJ Express для Java) и демонстрирует распределенные вычисления в кластере контейнеров (Proxmox LXC). Основная цель — вычисление суммы произведений строк квадратной матрицы, распределенной между несколькими процессами.
Используемые технологии
- Proxmox с LXC-контейнерами
- Java и библиотека mpjexpress для реализации MPI
- SSH без пароля для взаимодействия между контейнерами
Подготовка окружения (Установка)
Установка Java и MPJ Express на всех LXC-контейнерах
apt install default-jdk
apt install default-jre
-
Скачать и установить MPJ Express:
wget https://sourceforge.net/projects/mpjexpress/files/latest/download -O mpjexpress.tar.gz unzip mpjexpress.tar.gz mv mpjexpress /opt/mpj -
Добавить переменные среды в
~/.bashrc:export MPJ_HOME=/opt/mpj export PATH=$PATH:$MPJ_HOME/bin export CLASSPATH=$CLASSPATH:$MPJ_HOME/lib/mpj.jar -
Запустить демоны MPJ:
mpjdaemon -boot
Настройка SSH без пароля между контейнерами
ssh-keygen -t rsa
ssh-copy-id user@node2
Настройка /etc/hosts на всех контейнерах
Пример:
192.168.9.101 cont1
192.168.9.102 CT102
Компиляция и запуск
- Компилируем файл, если нет готового:
javac App.java
- Кладем
App.classи создаем файлmach, содержащий IP/имена всех узлов:
cont1
CT102
- Запускаем демоны:
mpjboot mach
или вручную на каждом узле:
mpjdaemon -boot
- Запускаем программу:
mpjrun.sh -np 4 -dev niodev -machinesfile mach App -- 4
Пояснение параметров:
-np 4— количество процессов (должно быть ≥2)-- 4— размер матрицы (например, 4x4)
Особенности реализации
- MPI реализован через MPJ Express.
- Используется Scatterv для распределения строк матрицы между процессами.
- Каждый процесс считает произведение элементов строки и отправляет результат обратно с помощью Gatherv.
- Процесс с
rank 0собирает результаты, считает общую сумму и время выполнения. - Используются настроенные типы данных MPI (массивы фиксированной длины).
- Ввод размера матрицы происходит через аргументы командной строки.
public static void main(String[] args) {
MPI.Init(args);
int threadCount = MPI.COMM_WORLD.Size();
int rank = MPI.COMM_WORLD.Rank();
int[] matr = new int[0];
int size = 0;
long startTime = System.nanoTime();
if (rank == 0){ // Генерации матрицы в 0 процесса
Scanner s = new Scanner(System.in);
Random r = new Random();
size = Integer.parseInt(args[args.length - 1]);
System.out.println("Matrix size: " + size);
matr = new int[size * size];
for (int i = 0; i < size * size; i++) {
matr[i] = r.nextInt(10) + 1;
}
}
int[] bfSize = {size};
// Раскидывание размера матрицы по всем процесса
MPI.COMM_WORLD.Bcast(bfSize,0, 1,MPI.INT, 0);
size = bfSize[0];
if (size == 0){
return;
}
// Новый тип данных, чтобы кидать массив
Datatype dta = Datatype.Contiguous(size, MPI.INT);
dta.Commit();
// Распредение сколько каждому процессу строк оттдать
int[] sendCount = new int[threadCount];
for (int i = 1; i < threadCount; i++) {
sendCount[i] = size / (threadCount - 1);
};
for (int i = 0; i < size % (threadCount - 1); i++) {
sendCount[threadCount - 1 - i] += 1;
}
// Смещение для каждого из потоков(откуда считывать)
int[] displs = new int[threadCount];
displs[0] = 0;
for (int i = 1; i < threadCount; i++) {
displs[i] = displs[i - 1] + sendCount[i - 1];
}
int[] recBuf = new int[size * sendCount[rank]];
if (rank == 0) {
System.out.println("Start scatterv");
}
// Расспределение на процессы
MPI.COMM_WORLD.Scatterv(matr, 0, sendCount, displs, dta,
recBuf, 0, sendCount[rank], dta, 0);
System.out.println("Rank " + rank + " : " + print(recBuf));
// Расслыка процессами количества принимаемого на каждый процесс
MPI.COMM_WORLD.Bcast(sendCount, 0, sendCount.length, MPI.INT, 0);
int[] sendBuf = new int[sendCount[rank]];
if (rank != 0) { // Вычисление кому какую порцию данных дать(именно выбрать какую строчку и сколько на каждый процесс)
for (int i = 0; i < sendCount[rank]; i++) {
int sum = 1;
for (int j = 0; j < recBuf.length / sendCount[rank]; j++) {
sum *= recBuf[i * recBuf.length / sendCount[rank] + j];
}
sendBuf[i] = sum;
}
}
System.out.println("Out rank " + rank + " : " + print(sendBuf));
// Расрапдедение индексов смещения на каждый процесс
MPI.COMM_WORLD.Bcast(displs, 0, displs.length, MPI.INT, 0);
int[] recBuf2 = new int[size];
// Сбор данных на 0 процесс
MPI.COMM_WORLD.Gatherv(sendBuf, 0, sendCount[rank], MPI.INT,
recBuf2,0, sendCount, displs, MPI.INT, 0);
// Результируюший подсчет
if (rank == 0){
int resSum = Arrays.stream(recBuf2).sum();
System.out.println("Res sum: " + resSum);
System.out.println("Res time: " + (System.nanoTime() - startTime));
}
}
Пример вывода
MPJ Express (0.44) is started in the cluster configuration with niodev
Starting process <1> on <cont1>
Starting process <0> on <cont1>
Starting process <2> on <CT102>
Starting process <3> on <CT102>
Matrix size: 4
Start scatterv
Rank 0 :
Out rank 0 :
Rank 3 : 2 3 1 10 10 10 2 10
Out rank 3 : 60 2000
Rank 1 : 10 2 2 3
Out rank 1 : 120
Stopping Process <1> on <cont1>
Rank 2 : 2 2 3 1
Out rank 2 : 12
Stopping Process <2> on <CT102>
Stopping Process <3> on <CT102>
Res sum: 2192
Res time: 168002875
Stopping Process <0> on <cont1>
Вывод
В рамках лабораторной работы была реализована система распределенных вычислений с использованием MPJ Express. Программа успешно демонстрирует работу с MPI в кластере контейнеров, включая передачу данных, выполнение расчетов и сбор результатов. Получен опыт настройки окружения и взаимодействия контейнеров через SSH.
В сравнении с классической реализации потоков, можно отметить что облегчается сам процесс написания кода, ведь надо написать всего лишь один экзепляр и не задумываться над распределением задач. Но более затратно по времени развренуть и настроить общение между узлами.