Files
SSPR_25/lyakhov_timofey_lab_2
2025-03-24 10:30:03 +04:00
..
2025-03-02 16:51:39 +04:00
2025-03-24 10:30:03 +04:00

Лабораторная работа №2

Описание

Программа реализует параллельную обработку данных с использованием MPI (MPJ Express для Java) и демонстрирует распределенные вычисления в кластере контейнеров (Proxmox LXC). Основная цель — вычисление суммы произведений строк квадратной матрицы, распределенной между несколькими процессами.

Используемые технологии

  • Proxmox с LXC-контейнерами
  • Java и библиотека mpjexpress для реализации MPI
  • SSH без пароля для взаимодействия между контейнерами

Подготовка окружения (Установка)

Установка Java и MPJ Express на всех LXC-контейнерах

apt install default-jdk
apt install default-jre
  1. Скачать и установить MPJ Express:

    wget https://sourceforge.net/projects/mpjexpress/files/latest/download -O mpjexpress.tar.gz
    unzip mpjexpress.tar.gz
    mv mpjexpress /opt/mpj
    
  2. Добавить переменные среды в ~/.bashrc:

    export MPJ_HOME=/opt/mpj
    export PATH=$PATH:$MPJ_HOME/bin
    export CLASSPATH=$CLASSPATH:$MPJ_HOME/lib/mpj.jar
    
  3. Запустить демоны MPJ:

    mpjdaemon -boot
    

Настройка SSH без пароля между контейнерами

ssh-keygen -t rsa
ssh-copy-id user@node2

Настройка /etc/hosts на всех контейнерах

Пример:

192.168.9.101 cont1
192.168.9.102 CT102

Компиляция и запуск

  1. Компилируем файл, если нет готового:
javac App.java
  1. Кладем App.class и создаем файл mach, содержащий IP/имена всех узлов:
cont1
CT102
  1. Запускаем демоны:
mpjboot mach

или вручную на каждом узле:

mpjdaemon -boot
  1. Запускаем программу:
mpjrun.sh -np 4 -dev niodev -machinesfile mach App -- 4

Пояснение параметров:

  • -np 4 — количество процессов (должно быть ≥2)
  • -- 4 — размер матрицы (например, 4x4)

Особенности реализации

  • MPI реализован через MPJ Express.
  • Используется Scatterv для распределения строк матрицы между процессами.
  • Каждый процесс считает произведение элементов строки и отправляет результат обратно с помощью Gatherv.
  • Процесс с rank 0 собирает результаты, считает общую сумму и время выполнения.
  • Используются настроенные типы данных MPI (массивы фиксированной длины).
  • Ввод размера матрицы происходит через аргументы командной строки.
 public static void main(String[] args) {
        MPI.Init(args);
        int threadCount = MPI.COMM_WORLD.Size();
        int rank = MPI.COMM_WORLD.Rank();

        int[] matr = new int[0];
        int size = 0;
        long startTime = System.nanoTime();
        if (rank == 0){ // Генерации матрицы в 0 процесса
            Scanner s = new Scanner(System.in);
            Random r = new Random();
            size = Integer.parseInt(args[args.length - 1]);
            System.out.println("Matrix size: " + size);
            matr = new int[size * size];
            for (int i = 0; i < size * size; i++) {
                matr[i] = r.nextInt(10) + 1;
            }
        }
        int[] bfSize = {size};
        // Раскидывание размера матрицы по всем процесса
        MPI.COMM_WORLD.Bcast(bfSize,0, 1,MPI.INT, 0); 
        size = bfSize[0];
        if (size == 0){
            return;
        }
        // Новый тип данных, чтобы кидать массив 
        Datatype dta = Datatype.Contiguous(size, MPI.INT);
        dta.Commit();

        // Распредение сколько каждому процессу строк оттдать
        int[] sendCount = new int[threadCount];
        for (int i = 1; i < threadCount; i++) {
            sendCount[i] = size / (threadCount - 1);
        };
        for (int i = 0; i < size % (threadCount - 1); i++) {
            sendCount[threadCount - 1 - i] += 1;
        }
        // Смещение для каждого из потоков(откуда считывать)
        int[] displs = new int[threadCount];
        displs[0] = 0;
        for (int i = 1; i < threadCount; i++) {
            displs[i] = displs[i - 1] + sendCount[i - 1];
        }
        
        int[] recBuf = new int[size * sendCount[rank]];
        if (rank == 0) {
            System.out.println("Start scatterv");
        }
        // Расспределение на процессы
        MPI.COMM_WORLD.Scatterv(matr, 0, sendCount, displs, dta,
                recBuf, 0, sendCount[rank], dta, 0);
        System.out.println("Rank " + rank + " : " + print(recBuf));
        // Расслыка процессами количества принимаемого на каждый процесс
        MPI.COMM_WORLD.Bcast(sendCount, 0, sendCount.length, MPI.INT, 0);
        int[] sendBuf = new int[sendCount[rank]];
        if (rank != 0) { // Вычисление кому какую порцию данных дать(именно выбрать какую строчку и сколько на каждый процесс)
            for (int i = 0; i < sendCount[rank]; i++) {
                int sum = 1;
                for (int j = 0; j < recBuf.length / sendCount[rank]; j++) {
                    sum *= recBuf[i * recBuf.length / sendCount[rank] + j];
                }
                sendBuf[i] = sum;
            }
        }
        System.out.println("Out rank " + rank + " : " + print(sendBuf));
        // Расрапдедение индексов смещения на каждый процесс
        MPI.COMM_WORLD.Bcast(displs, 0, displs.length, MPI.INT, 0);
        int[] recBuf2 = new int[size];
        // Сбор данных на 0 процесс
                MPI.COMM_WORLD.Gatherv(sendBuf, 0, sendCount[rank], MPI.INT,
                        recBuf2,0, sendCount, displs, MPI.INT, 0);
   // Результируюший подсчет   
        if (rank == 0){
            int resSum = Arrays.stream(recBuf2).sum();
            System.out.println("Res sum: " + resSum);
            System.out.println("Res time: " + (System.nanoTime() - startTime));
        }
    }

Пример вывода

MPJ Express (0.44) is started in the cluster configuration with niodev
Starting process <1> on <cont1>
Starting process <0> on <cont1>
Starting process <2> on <CT102>
Starting process <3> on <CT102>
Matrix size: 4
Start scatterv
Rank 0 :
Out rank 0 :
Rank 3 : 2 3 1 10 10 10 2 10
Out rank 3 : 60 2000
Rank 1 : 10 2 2 3
Out rank 1 : 120
Stopping Process <1> on <cont1>
Rank 2 : 2 2 3 1
Out rank 2 : 12
Stopping Process <2> on <CT102>
Stopping Process <3> on <CT102>
Res sum: 2192
Res time: 168002875
Stopping Process <0> on <cont1>

Вывод

В рамках лабораторной работы была реализована система распределенных вычислений с использованием MPJ Express. Программа успешно демонстрирует работу с MPI в кластере контейнеров, включая передачу данных, выполнение расчетов и сбор результатов. Получен опыт настройки окружения и взаимодействия контейнеров через SSH.

В сравнении с классической реализации потоков, можно отметить что облегчается сам процесс написания кода, ведь надо написать всего лишь один экзепляр и не задумываться над распределением задач. Но более затратно по времени развренуть и настроить общение между узлами.