Files

Лабораторная работа №1

Задание

Разработать многопоточное приложение с использованием Java Concurrency.

Необходимо:

  1. Разработать однопоточный вариант алгоритма и замерить время его работы.
  2. Разработать параллельный вариант алгоритма с использованием ThreadPoolExecutor и замерить время его работы.
  3. Разработать параллельный вариант алгоритма с использованием ForkJoinPool и замерить время его работы.

Вариант: Упорядочить строки матрицы по возрастанию суммы их элементов (18).


Описание

Данное приложение реализует алгоритм сортировки строк матрицы с использованием трех подходов:

  • Однопоточный (Single-threaded) — классический merge sort.

  • Многопоточный (ThreadPoolExecutor) — разбиение сортировки на несколько потоков с ThreadPoolExecutor.

  • Многопоточный (ForkJoinPool) — рекурсивная сортировка с ForkJoinPool.

Приложение также позволяет измерять время выполнения каждого метода и проверять корректность сортировки.

Технологии

  • Java 17 — основной и единственный язык программирования.

  • JVM — среда выполнения.

  • ThreadPoolExecutor — пул потоков для многопоточной обработки.

  • ForkJoinPool — рекурсивная параллельная обработка.

  • LXC (Linux Containers) — среда выполнения контейнера.


Как запустить программу

На локальной машине, с которой будет происходить запуск приложения, должны быть установлены Java (17+ версии) и соответствующий ей пакет JDK.

После того, как вы скопировали проект в нужную вам директорию, перейдите в неё (команда cd) и выполните следующие шаги:

  • Скомпилируйте файл .java при помощи команды javac
    javac MatrixSorting.java
    
  • Запустите при помощи команды java
    java MatrixSorting
    

Как работает программа

Программа создает матрицу размера 10000 x 10000, заполняет ее случайными числами и выполняет сортировку ее строк по сумме элементов.

В качестве алгоритма сортировки была выбрана сортировка слиянием. Алгоритм сортировки слиянием можно разделить на два основных этапа:

  1. Разделение. Массив рекурсивно делится на две равные (или почти равные) части до тех пор, пока каждый подмассив не будет состоять из одного элемента.
  2. Слияние. Подмассивы объединяются в отсортированном порядке, начиная с самых маленьких подмассивов.

Простейшая реализация однопоточного варианта алгоритма заключена собственно в этих двух методах:

private static void mergeSort(int[][] matrix, int left, int right) {
  if (left < right) {
    int mid = (left + right) / 2;
    mergeSort(matrix, left, mid);
    mergeSort(matrix, mid + 1, right);
    merge(matrix, left, mid, right);
  }
}

private static void merge(int[][] matrix, int left, int mid, int right) {
  int[][] temp = new int[right - left + 1][];
  int i = left, j = mid + 1, k = 0;

  while (i <= mid && j <= right) {
    if (rowSum(matrix[i]) <= rowSum(matrix[j])) {
      temp[k++] = matrix[i++];
    } else {
      temp[k++] = matrix[j++];
    }
  }
  while (i <= mid) temp[k++] = matrix[i++];
  while (j <= right) temp[k++] = matrix[j++];

  System.arraycopy(temp, 0, matrix, left, temp.length);
}

ThreadPoolExecutor

C ThreadPoolExecutor интереснее. Метод parallelMergeSortWithExecutor выполняет сортировку строк матрицы параллельно, разделяя матрицу на фрагменты и обрабатывая их в нескольких потоках (количество потоков будет зависить от количества доступных процессоров в системе). После сортировки частей выполняется итеративное слияние.

private static void parallelMergeSortWithExecutor(int[][] matrix) throws InterruptedException, ExecutionException {
  int numThreads = Runtime.getRuntime().availableProcessors();
  ExecutorService executor = Executors.newFixedThreadPool(numThreads);
  int chunkSize = matrix.length / numThreads;
  Future<?>[] futures = new Future[numThreads];

  for (int i = 0; i < numThreads; i++) {
    int left = i * chunkSize;
    int right = (i == numThreads - 1) ? matrix.length - 1 : (left + chunkSize - 1);
    futures[i] = executor.submit(() -> mergeSort(matrix, left, right));
  }

  for (Future<?> future : futures) {
    future.get();
  }

  for (int size = chunkSize; size < matrix.length; size *= 2) {
    for (int left = 0; left < matrix.length - size; left += 2 * size) {
      int mid = left + size - 1;
      int right = Math.min(left + 2 * size - 1, matrix.length - 1);
      merge(matrix, left, mid, right);
    }
  }

  executor.shutdown();
}

ForkJoinPool

Класс ForkJoinPool реализует ForkJoinMergeSortTask многопоточную сортировку строк матрицы с использованием ForkJoinPool. Он расширяет RecursiveAction, что делает его частью параллельного механизма ForkJoin.

Как это работает? ForkJoinMergeSortTask разбивает задачу на подзадачи. Если размер диапазона строк меньше порогового значения (THRESHOLD = 100), выполняется обычная сортировка слиянием. Если диапазон больше порога, задача разделяется на две подзадачи. Каждая подзадача запускается параллельно с invokeAll(leftTask, rightTask). После завершения подзадач выполняется слияние.

static class ForkJoinMergeSortTask extends RecursiveAction {
  private static final int THRESHOLD = 100;
  private final int[][] matrix;
  private final int left, right;

  ForkJoinMergeSortTask(int[][] matrix, int left, int right) {
    this.matrix = matrix;
    this.left = left;
    this.right = right;
  }

  @Override
  protected void compute() {
    if (right - left < THRESHOLD) {
      mergeSort(matrix, left, right);
    } else {
      int mid = (left + right) / 2;
      ForkJoinMergeSortTask leftTask = new ForkJoinMergeSortTask(matrix, left, mid);
      ForkJoinMergeSortTask rightTask = new ForkJoinMergeSortTask(matrix, mid + 1, right);
      invokeAll(leftTask, rightTask);
      merge(matrix, left, mid, right);
    }
  }
}

Тестирование

Время выполнения каждого метода фиксируется. После каждой сортировки вызывается isSorted(), которая проверяет корректность сортировки.

Вывод:

Single-threaded execution time: 658 ms
Single-threaded sorted correctly: true

ThreadPoolExecutor execution time: 295 ms
ThreadPoolExecutor sorted correctly: true

ForkJoinPool execution time: 206 ms
ForkJoinPool sorted correctly: true
Single-threaded execution time: 669 ms
Single-threaded sorted correctly: true

ThreadPoolExecutor execution time: 295 ms
ThreadPoolExecutor sorted correctly: true

ForkJoinPool execution time: 179 ms
ForkJoinPool sorted correctly: true
Single-threaded execution time: 651 ms
Single-threaded sorted correctly: true

ThreadPoolExecutor execution time: 290 ms
ThreadPoolExecutor sorted correctly: true

ForkJoinPool execution time: 204 ms
ForkJoinPool sorted correctly: true

Выводы

  • Однопоточная сортировка выполняется медленнее по сравнению с многопоточными вариантами, но работает предсказуемо (можно не бкспокоиться о проблемах с синхронизацией), а ее довольно простая реализация делает однопоточный подход лучшим выбором для сортировки матриц малого размера.

  • ThreadPoolExecutor даёт ускорение за счёт распределения нагрузки. Количество потоков выбирается равным числу процессоров, что позволяет эффективно использовать многозадачность. Главный минус данного подхода состоит в возможных накладных расходах на управление потоками.

  • ForkJoinPool показывает лучшую производительность, особенно при больших массивах. Работа здесь делится на мелкие подзадачи (автоматическое распределение нагрузки), которые выполняются параллельно. При таком подходе мы получим все тот же минус в накладных расходах на создание и управление потоками, а также из-за рекурсивной структуры ForkJoinMergeSortTask будем иметь некоторые сложности с отладкой метода.

  • ThreadPoolExecutor и ForkJoinPool будут рациональным выбором в случаях, когда мы имеем матрицы довольно больших размеров, так как плюс многопоточности в виде скорости выполнения будет нивелировать минус в виде затрат на потоки.