Лабораторная №2
Разработка параллельного MPI приложения на языке Java. Необходимо разработать параллельный вариант алгоритма с применением MPI и замерить время его работы. В рамках работы программы должно быть две копии приложения, которые соединяются друг с другом по сети. Сообщения о статусе соединения (например, что соединение установлено) должны выводиться в консоль. Запуск каждого экземпляра MPI происходит в своём LXC контейнере. Такие сервисы, как Docker, использовать нельзя.
Вариант и задание
- Разделить элементы матрицы на среднее арифметическое всех ее элементов.
Как запустить лабораторную работу на Fedora и Debian
Обновление пакетов в контейнерах Fedora и Debian.
dnf update -y
apt update && apt upgrade -y
Установка на два контейнера одинаковой версии Java.
Найти доступные версии:
dnf search openjdk
Установить:
dnf install -y java-17-openjdk
Проверить версию:
java -version
Если нужно переключаться между версиями:
alternatives --config java
Для Debian:
apt search openjdk
apt install -y openjdk-17-jdk
java -version
update-alternatives --config java
Установка MPJ Express (одинакова для Fedora и Debian).
Скачать MPJ Express:
wget https://sourceforge.net/projects/mpjexpress/files/mpj-v0_44.tar.gz
Распаковать архив:
tar -xvzf mpj-v0_44.tar.gz
Переместить его в /opt:
mv mpj-v0_44 /opt/mpj
Ввести в консоль (три строчки одной командой):
export MPJ_HOME=/opt/mpj
export PATH=$PATH:$MPJ_HOME/bin
export CLASSPATH=$CLASSPATH:$MPJ_HOME/lib/mpj.jar
Добавить в конец ~/.bashrc эти три строчки:
nano ~/.bashrc
Применить изменения:
source ~/.bashrc
Проверить, что mpjrun.sh доступен:
mpjrun.sh
Тест (одинаков для Fedora и Debian).
Создать тестовый файл:
nano HelloMPI.java
Добавить код:
import mpi.MPI;
public class HelloMPI {
public static void main(String[] args) {
MPI.Init(args);
int rank = MPI.COMM_WORLD.Rank();
int size = MPI.COMM_WORLD.Size();
System.out.println("Hello from process " + rank + " out of " + size);
MPI.Finalize();
}
}
Сохранить файл: (Ctrl + X → Y → Enter)
Скомпилировать файл:
javac --release 17 -cp $MPJ_HOME/lib/mpj.jar HelloMPI.java
Запустить с двумя процессами (-np 2):
mpjrun.sh -np 2 HelloMPI
Ожидаемый вывод:
Hello from process 0 out of 2
Hello from process 1 out of 2
Установка соединения.
Если при подключении между контейнерами по ssh Permission denied, please try again (одинаково для Fedora и Debian): Открыть настройки SSH:
nano /etc/ssh/sshd_config
Убедиться, что включены следующие параметры:
PermitRootLogin yes
PasswordAuthentication yes
PubkeyAuthentication yes
Применить изменения, если есть:
systemctl restart sshd
Создать в контейнерах файл (nano ~/machines) со списком IP контейнеров внутри. Каждый IP с новой строки. Как это выглядит:
192.168.27.127
192.168.27.128
Настройка подключения ssh без пароля (на каждом контейнере). Сгенерировать SSH-ключи:
ssh-keygen -t rsa -b 4096
На все вопросы (3шт) просто нажать Enter. Это создаст пару ключей (приватный и публичный) в директории ~/.ssh/ (по умолчанию ~/.ssh/id_rsa и ~/.ssh/id_rsa.pub).
Копирование публичного ключа на удаленный сервер (remote_host — на адрес сервера):
ssh-copy-id username@remote_host
Надо будет ввести пароль от контейнера, к которому подключаемся.
Подключитесь к серверу (remote_host — на адрес сервера):
ssh username@remote_host
Выполнение программы.
На каждом контейнере:
Записать в файл .java код программы, например:
nano Program.java
Скомпилировать:
javac --release 17 -cp $MPJ_HOME/lib/mpj.jar Program.java
Если символы в комментариях искажаются или возникают ошибки при компиляции error: unmappable character (0xD0) for encoding US-ASCII:
Сначала убедитесь, что локаль en_US.UTF-8 доступна в системе. Выполните команду:
locale -a
Это покажет список всех доступных локалей. Если en_US.UTF-8 отсутствует в списке, её нужно сгенерировать.
Если локаль en_US.UTF-8 отсутствует, выполните следующие шаги: Откройте файл /etc/locale.gen в текстовом редакторе (например, nano):
nano /etc/locale.gen
Найдите строку, содержащую en_US.UTF-8, и раскомментируйте её (удалите символ # в начале строки):
en_US.UTF-8 UTF-8
Сохраните файл и выйдите из редактора (Ctrl + O, затем Ctrl + X).
Сгенерируйте локали:
locale-gen
Обновите настройки локали:
sudo update-locale LANG=en_US.UTF-8
Проверка текущей локали После выполнения вышеуказанных шагов проверьте текущую локаль:
locale
Убедитесь, что переменные LANG, LC_CTYPE и другие установлены в en_US.UTF-8. Например:
LANG=en_US.UTF-8
LC_CTYPE=en_US.UTF-8
Установка локали в текущей сессии Если вы хотите временно установить локаль для текущей сессии, выполните:
export LANG=en_US.UTF-8
export LC_ALL=en_US.UTF-8
Проверьте, что локаль установлена:
locale
Проверка после перезагрузки.
После настройки локали перезагрузите систему или выйдите и войдите заново:
reboot
После перезагрузки снова проверьте локаль:
locale
Запустить демонов (можно раздельно):
mpjdaemon -boot <ip контейнера 1> <ip контейнера 2>
Запуск программы:
mpjrun.sh -np 2 -dev niodev -machinesfile ~/machines DividingByAverageDistributed
Если MPJ Express не может разрешить имена хостов (fedora и debian) то на обоих контейнерах:
nano /etc/hosts
И добавьте, если отсутствуют строки:
192.168.27.127 fedora.pve fedora
192.168.27.128 debian.pve debian
Если ошибка java.lang.OutOfMemoryError: Java heap space - вашей программе не хватает оперативной памяти (RAM) для выполнения операций. Добавьте в команду -Xmx2G или более:
mpjrun.sh -np 2 -dev niodev -machinesfile ~/machines -Xmx2G DividingByAverageDistributed
Какие технологии использовались
Программа использует MPI для параллельных вычислений, язык Java и LXC-контейнеры.
Что она делает
Программа генерирует матрицу, распределяет её части между процессами, вычисляет локальные суммы, собирает результаты на главном процессе, где вычисляется среднее арифметическое. Далее среднее арифметическое распределяется на каждый процесс, который делит свою часть матрицы на это значение. В конце происходит восстановление целостности матрицы. Также на нескольких этапах программы производятся замеры времени их выполнения.
Листинг кода
import mpi.MPI;
import java.util.Random;
public class DividingByAverageDistributed {
public static void main(String[] args) {
// Инициализация MPI
MPI.Init(args);
// Получение ранга процесса и общего количества процессов
int rank = MPI.COMM_WORLD.Rank();
int size = MPI.COMM_WORLD.Size();
// Замер времени начала работы
long startTime = System.nanoTime();
// Сообщение о статусе соединения
System.out.println("[Container " + rank + "] Соединение установлено. Готов к работе.");
int arraySize = 8000;
double[][] array = null;
double[] flatArray = null;
// Генерация массива на главном процессе (rank == 0)
if (rank == 0) {
array = generateMatrix(arraySize);
flatArray = flattenMatrix(array);
System.out.println("[Container 0] Матрица сгенерирована. Отправка данных на другие контейнеры...");
// Вывод элемента [100][100] после генерации
System.out.println("[Container 0] array[100][100] до обработки: " + array[100][100]);
}
// Определение количества строк для каждого процесса
int rowsPerProcess = arraySize / size;
double[] localRows = new double[rowsPerProcess * arraySize];
// Замер времени перед распределением данных
long scatterStartTime = System.nanoTime();
// Распределение данных между процессами
MPI.COMM_WORLD.Scatter(
flatArray, 0, rowsPerProcess * arraySize, MPI.DOUBLE,
localRows, 0, rowsPerProcess * arraySize, MPI.DOUBLE,
0
);
// Замер времени после распределения данных
long scatterEndTime = System.nanoTime();
System.out.println("[Container " + rank + "] Данные получены. Время распределения: " + (scatterEndTime - scatterStartTime) / 1_000_000 + " мс");
// Сообщение о получении данных
System.out.println("[Container " + rank + "] Начинаю вычисления...");
// Замер времени перед вычислением локальной суммы
long computeStartTime = System.nanoTime();
// Вычисление локальной суммы
double localSum = 0;
for (double value : localRows) {
localSum += value;
}
// Замер времени после вычисления локальной суммы
long computeEndTime = System.nanoTime();
System.out.println("[Container " + rank + "] Локальная сумма вычислена. Время вычислений: " + (computeEndTime - computeStartTime) / 1_000_000 + " мс");
// Сбор локальных сумм на главном процессе
double[] globalSums = new double[size];
MPI.COMM_WORLD.Gather(new double[]{localSum}, 0, 1, MPI.DOUBLE, globalSums, 0, 1, MPI.DOUBLE, 0);
// Вычисление среднего арифметического на главном процессе
double average = 0;
if (rank == 0) {
double totalSum = 0;
for (double sum : globalSums) {
totalSum += sum;
}
average = totalSum / (arraySize * arraySize);
System.out.println("[Container 0] Среднее арифметическое: " + average);
}
// Распространение среднего значения на все процессы
MPI.COMM_WORLD.Bcast(new double[]{average}, 0, 1, MPI.DOUBLE, 0);
// Замер времени перед делением элементов массива
long normalizeStartTime = System.nanoTime();
// Деление элементов массива на среднее значение
for (int i = 0; i < localRows.length; i++) {
localRows[i] /= average;
}
// Замер времени после деления элементов массива
long normalizeEndTime = System.nanoTime();
System.out.println("[Container " + rank + "] Деление элементов завершено. Время деления: " + (normalizeEndTime - normalizeStartTime) / 1_000_000 + " мс");
// Сбор результатов на главном процессе
MPI.COMM_WORLD.Gather(
localRows, 0, rowsPerProcess * arraySize, MPI.DOUBLE,
flatArray, 0, rowsPerProcess * arraySize, MPI.DOUBLE,
0
);
// Восстановление массива на главном процессе
if (rank == 0) {
array = unflattenMatrix(flatArray, arraySize);
System.out.println("[Container 0] Обработка завершена.");
// Вывод элемента [100][100] после обработки
System.out.println("[Container 0] array[100][100] после обработки: " + array[100][100]);
}
// Замер времени окончания работы
long endTime = System.nanoTime();
// Сообщение о завершении работы
System.out.println("[Container " + rank + "] Работа завершена. Общее время выполнения: " + (endTime - startTime) / 1_000_000 + " мс");
// Завершение работы MPI
MPI.Finalize();
}
// Генерация матрицы
private static double[][] generateMatrix(int size) {
Random random = new Random();
double[][] matrix = new double[size][size];
for (int i = 0; i < size; i++) {
for (int j = 0; j < size; j++) {
matrix[i][j] = random.nextInt(101);
}
}
return matrix;
}
// Преобразование матрицы в одномерный массив
private static double[] flattenMatrix(double[][] matrix) {
int size = matrix.length;
double[] flatArray = new double[size * size];
for (int i = 0; i < size; i++) {
for (int j = 0; j < size; j++) {
flatArray[i * size + j] = matrix[i][j];
}
}
return flatArray;
}
// Преобразование одномерного массива в матрицу
private static double[][] unflattenMatrix(double[] flatArray, int size) {
double[][] matrix = new double[size][size];
for (int i = 0; i < size; i++) {
for (int j = 0; j < size; j++) {
matrix[i][j] = flatArray[i * size + j];
}
}
return matrix;
}
}
Результаты работы
Starting process <1> on <debian>
Starting process <0> on <fedora>
[Container 1] Соединение установлено. Готов к работе.
[Container 0] Соединение установлено. Готов к работе.
[Container 0] Матрица сгенерирована. Отправка данных на другие контейнеры...
[Container 0] array[100][100] до обработки: 3.0
[Container 0] Данные получены. Время распределения: 1236 мс
[Container 0] Начинаю вычисления...
[Container 0] Локальная сумма вычислена. Время вычислений: 64 мс
[Container 0] Среднее арифметическое: 50.001607140625
[Container 0] Деление элементов завершено. Время деления: 61 мс
[Container 1] Данные получены. Время распределения: 3296 мс
[Container 1] Начинаю вычисления...
[Container 1] Локальная сумма вычислена. Время вычислений: 58 мс
[Container 1] Деление элементов завершено. Время деления: 99 мс
[Container 1] Работа завершена. Общее время выполнения: 4204 мс
[Container 0] Обработка завершена.
[Container 0] array[100][100] после обработки: 0.05999807149323763
[Container 0] Работа завершена. Общее время выполнения: 7462 мс
Stopping Process <0> on <fedora>
Stopping Process <1> on <debian>
Проблемы, с которыми столкнулся (их решение описано в пункте о запуске программы):
- Несовместимость версий Java
- Компиляция на другой версии без явного указания
- Permission denied при подключении по shh
- Лишние Enter в файле machines
- Искажение комментариев
- Установка локали
- MPJ Express не может разрешить имена хостов
- Ошибка java.lang.OutOfMemoryError: Java heap space
Вывод
Используя MPI (MPJ Express) и контейнеры Fedora и Debian, через боль и мучения удалось реализовать решение задачи с помощью вычислений, распределённых по разным контейнерам. Решение масштабируется и показывает, как распределённые процессы могут совместно решать одну задачу, оптимизируя использование ресурсов.
Результаты измерений из первой лабораторной работы:
- Однопоточный алгоритм - 388 мс
- Алгоритм с ThreadPoolExecutor - 323 мс
- Алгоритм с ForkJoinPool - 379 мс
Можно сделать вывод, что на данных мощностях наиболее эффективными оказались алгоритмы, которые работают в несколько потоков в одной среде. Это связано с тем, что во второй лабораторной работе реализована передача информации из одного контейнера в другой, что значительно увеличивает время выполнения. Однако нужно заметить, что в первом случае мы можем пользоваться мощностями лишь одной среды, а во втором - двух (и более, если расширить масштаб). Это значит, что при подходе с технологией MPI мы можем добиться лучших результатов при увеличении входных данных и расширении масштаба проекта за счёт новых узлов.