Лабораторная работа №1
Задание
Разработать многопоточное приложение с использованием Java Concurrency.
Необходимо:
- Разработать однопоточный вариант алгоритма и замерить время его работы.
- Разработать параллельный вариант алгоритма с использованием
ThreadPoolExecutorи замерить время его работы. - Разработать параллельный вариант алгоритма с использованием
ForkJoinPoolи замерить время его работы.
Вариант: Упорядочить строки матрицы по возрастанию суммы их элементов (18).
Описание
Данное приложение реализует алгоритм сортировки строк матрицы с использованием трех подходов:
-
Однопоточный (Single-threaded) — классический merge sort.
-
Многопоточный (ThreadPoolExecutor) — разбиение сортировки на несколько потоков с
ThreadPoolExecutor. -
Многопоточный (ForkJoinPool) — рекурсивная сортировка с
ForkJoinPool.
Приложение также позволяет измерять время выполнения каждого метода и проверять корректность сортировки.
Технологии
-
Java 17 — основной и единственный язык программирования.
-
JVM — среда выполнения.
-
ThreadPoolExecutor — пул потоков для многопоточной обработки.
-
ForkJoinPool — рекурсивная параллельная обработка.
-
LXC (Linux Containers) — среда выполнения контейнера.
Как запустить программу
На локальной машине, с которой будет происходить запуск приложения, должны быть установлены Java (17+ версии) и соответствующий ей пакет JDK.
После того, как вы скопировали проект в нужную вам директорию, перейдите в неё (команда cd) и выполните следующие шаги:
- Скомпилируйте файл .java при помощи команды javac
javac MatrixSorting.java - Запустите при помощи команды java
java MatrixSorting
Как работает программа
Программа создает матрицу размера 10000 x 10000, заполняет ее случайными числами и выполняет сортировку ее строк по сумме элементов.
В качестве алгоритма сортировки была выбрана сортировка слиянием. Алгоритм сортировки слиянием можно разделить на два основных этапа:
- Разделение. Массив рекурсивно делится на две равные (или почти равные) части до тех пор, пока каждый подмассив не будет состоять из одного элемента.
- Слияние. Подмассивы объединяются в отсортированном порядке, начиная с самых маленьких подмассивов.
Простейшая реализация однопоточного варианта алгоритма заключена собственно в этих двух методах:
private static void mergeSort(int[][] matrix, int left, int right) {
if (left < right) {
int mid = (left + right) / 2;
mergeSort(matrix, left, mid);
mergeSort(matrix, mid + 1, right);
merge(matrix, left, mid, right);
}
}
private static void merge(int[][] matrix, int left, int mid, int right) {
int[][] temp = new int[right - left + 1][];
int i = left, j = mid + 1, k = 0;
while (i <= mid && j <= right) {
if (rowSum(matrix[i]) <= rowSum(matrix[j])) {
temp[k++] = matrix[i++];
} else {
temp[k++] = matrix[j++];
}
}
while (i <= mid) temp[k++] = matrix[i++];
while (j <= right) temp[k++] = matrix[j++];
System.arraycopy(temp, 0, matrix, left, temp.length);
}
ThreadPoolExecutor
C ThreadPoolExecutor интереснее. Метод parallelMergeSortWithExecutor выполняет сортировку строк матрицы параллельно, разделяя матрицу на фрагменты и обрабатывая их в нескольких потоках (количество потоков будет зависить от количества доступных процессоров в системе). После сортировки частей выполняется итеративное слияние.
private static void parallelMergeSortWithExecutor(int[][] matrix) throws InterruptedException, ExecutionException {
int numThreads = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
int chunkSize = matrix.length / numThreads;
Future<?>[] futures = new Future[numThreads];
for (int i = 0; i < numThreads; i++) {
int left = i * chunkSize;
int right = (i == numThreads - 1) ? matrix.length - 1 : (left + chunkSize - 1);
futures[i] = executor.submit(() -> mergeSort(matrix, left, right));
}
for (Future<?> future : futures) {
future.get();
}
for (int size = chunkSize; size < matrix.length; size *= 2) {
for (int left = 0; left < matrix.length - size; left += 2 * size) {
int mid = left + size - 1;
int right = Math.min(left + 2 * size - 1, matrix.length - 1);
merge(matrix, left, mid, right);
}
}
executor.shutdown();
}
ForkJoinPool
Класс ForkJoinPool реализует ForkJoinMergeSortTask многопоточную сортировку строк матрицы с использованием ForkJoinPool. Он расширяет RecursiveAction, что делает его частью параллельного механизма ForkJoin.
Как это работает? ForkJoinMergeSortTask разбивает задачу на подзадачи. Если размер диапазона строк меньше порогового значения (THRESHOLD = 100), выполняется обычная сортировка слиянием.
Если диапазон больше порога, задача разделяется на две подзадачи.
Каждая подзадача запускается параллельно с invokeAll(leftTask, rightTask).
После завершения подзадач выполняется слияние.
static class ForkJoinMergeSortTask extends RecursiveAction {
private static final int THRESHOLD = 100;
private final int[][] matrix;
private final int left, right;
ForkJoinMergeSortTask(int[][] matrix, int left, int right) {
this.matrix = matrix;
this.left = left;
this.right = right;
}
@Override
protected void compute() {
if (right - left < THRESHOLD) {
mergeSort(matrix, left, right);
} else {
int mid = (left + right) / 2;
ForkJoinMergeSortTask leftTask = new ForkJoinMergeSortTask(matrix, left, mid);
ForkJoinMergeSortTask rightTask = new ForkJoinMergeSortTask(matrix, mid + 1, right);
invokeAll(leftTask, rightTask);
merge(matrix, left, mid, right);
}
}
}
Тестирование
Время выполнения каждого метода фиксируется.
После каждой сортировки вызывается isSorted(), которая проверяет корректность сортировки.
Вывод:
Single-threaded execution time: 658 ms
Single-threaded sorted correctly: true
ThreadPoolExecutor execution time: 295 ms
ThreadPoolExecutor sorted correctly: true
ForkJoinPool execution time: 206 ms
ForkJoinPool sorted correctly: true
Single-threaded execution time: 669 ms
Single-threaded sorted correctly: true
ThreadPoolExecutor execution time: 295 ms
ThreadPoolExecutor sorted correctly: true
ForkJoinPool execution time: 179 ms
ForkJoinPool sorted correctly: true
Single-threaded execution time: 651 ms
Single-threaded sorted correctly: true
ThreadPoolExecutor execution time: 290 ms
ThreadPoolExecutor sorted correctly: true
ForkJoinPool execution time: 204 ms
ForkJoinPool sorted correctly: true
Выводы
-
Однопоточная сортировка выполняется медленнее по сравнению с многопоточными вариантами, но работает предсказуемо (можно не бкспокоиться о проблемах с синхронизацией), а ее довольно простая реализация делает однопоточный подход лучшим выбором для сортировки матриц малого размера.
-
ThreadPoolExecutor даёт ускорение за счёт распределения нагрузки. Количество потоков выбирается равным числу процессоров, что позволяет эффективно использовать многозадачность. Главный минус данного подхода состоит в возможных накладных расходах на управление потоками.
-
ForkJoinPool показывает лучшую производительность, особенно при больших массивах. Работа здесь делится на мелкие подзадачи (автоматическое распределение нагрузки), которые выполняются параллельно. При таком подходе мы получим все тот же минус в накладных расходах на создание и управление потоками, а также из-за рекурсивной структуры ForkJoinMergeSortTask будем иметь некоторые сложности с отладкой метода.
-
ThreadPoolExecutor и ForkJoinPool будут рациональным выбором в случаях, когда мы имеем матрицы довольно больших размеров, так как плюс многопоточности в виде скорости выполнения будет нивелировать минус в виде затрат на потоки.