1 Лабораторная работа
Задание
Разработка многопоточного приложения с использованием Java Concurrency согласно варианту задания.
Необходимо:
- Разработать однопоточный вариант алгоритма и замерить время его работы.
- Разработать параллельный вариант алгоритма с использованием
ThreadPoolExecutorи замерить время его работы. - Разработать параллельный вариант алгоритма с использованием
ForkJoinPoolи замерить время его работы.
Вариант: "Определить минимальный элемент матрицы ниже главной диагонали"
Описание
Это программа для сортировки матрицы по убыванию значений в первом столбце. Программа поддерживает три режима сортировки:
- Однопоточная сортировка.
- Многопоточная сортировка с использованием ThreadPoolExecutor.
- Многопоточная сортировка с использованием ForkJoinPool.
Программа генерирует случайную матрицу и сохраняет её в файл, а после сортировки сохраняет отсортированную матрицу в другой файл.
Подготовка
Однопоточная сортировка
private static void singleThreadSort(int[][] matrix) {
Arrays.sort(matrix, Comparator.comparingInt(a -> -a[0]));
}
Параллельная сортировка с использованием ThreadPoolExecutor
private static void multiThreadSort(int[][] matrix) throws InterruptedException {
int processors = Runtime.getRuntime().availableProcessors();
int chunkSize = matrix.length / processors;
ExecutorService executor = Executors.newFixedThreadPool(processors);
Future<?>[] futures = new Future[processors];
for (int i = 0; i < processors; i++) {
int start = i * chunkSize;
int end = (i == processors - 1) ? matrix.length : start + chunkSize;
futures[i] = executor.submit(() -> {
Arrays.sort(matrix, start, end, Comparator.comparingInt(a -> -a[0]));
});
}
for (Future<?> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException("Sorting error", e);
}
}
executor.shutdown();
}
-
Определение количества потоков: Количество потоков равно количеству доступных процессоров
(Runtime.getRuntime().availableProcessors()). -
Разделение матрицы на части: Матрица делится на processors частей, где каждая часть содержит примерно одинаковое количество строк.
-
Создание и запуск задач: Для каждой части создается задача, которая сортирует эту часть с помощью
Arrays.sort. -
Задачи выполняются параллельно в пуле потоков
(ExecutorService). -
Ожидание завершения всех задач: Основной поток ждет завершения всех задач с помощью
future.get(). -
Завершение работы пула потоков: После завершения всех задач пул потоков закрывается
(executor.shutdown()).
Параллельная сортировка с использованием ForkJoinPool
private static void forkJoinSort(int[][] matrix) {
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(new ForkJoinTask(matrix));
pool.shutdown();
}
static class ForkJoinTask extends RecursiveAction {
private static final int THRESHOLD = 500;
private final int[][] matrix;
ForkJoinTask(int[][] matrix) {
this.matrix = matrix;
}
@Override
protected void compute() {
if (matrix.length <= THRESHOLD) {
Arrays.sort(matrix, Comparator.comparingInt(a -> -a[0]));
} else {
int mid = matrix.length / 2;
int[][] left = Arrays.copyOfRange(matrix, 0, mid);
int[][] right = Arrays.copyOfRange(matrix, mid, matrix.length);
ForkJoinTask leftTask = new ForkJoinTask(left);
ForkJoinTask rightTask = new ForkJoinTask(right);
invokeAll(leftTask, rightTask);
System.arraycopy(left, 0, matrix, 0, left.length);
System.arraycopy(right, 0, matrix, left.length, right.length);
}
}
}
-
Создание пула потоков: Создается пул потоков
ForkJoinPool, который автоматически управляет потоками и задачами. -
Запуск задачи: В пул передается задача
ForkJoinTask, которая начинает выполнение. -
Рекурсивное разделение задачи: Если размер матрицы больше порога
(THRESHOLD), матрица разделяется на левую и праву часть, каждая подзадача выполняется рекурсивно. -
Базовый случай: Если размер матрицы меньше или равен порогу
(THRESHOLD), выполняется сортировка с помощьюArrays.sort. -
Объединение результатов: После завершения сортировки левой и правой частей результаты объединяются в исходную матрицу с помощью
System.arraycopy. -
Завершение работы пула потоков:
-
После завершения всех задач пул потоков закрывается
(pool.shutdown()).
Как запустить
javac Matrix.java
java Matrix
Используемые технологии
Программа использует Java Concurrency, включая классы и интерфейсы:
ExecutorService,ThreadPoolExecutor,ForkJoinPool,Future,CompletableFuture,RecursiveAction— для организации многопоточности.
Результаты работы алгоритма
Пример 1
Входные данные
65 649 455 884 387
174 805 983 575 957
597 789 601 674 916
689 703 885 353 412
Входные данные
689 703 885 353 412
597 789 601 674 916
174 805 983 575 957
65 649 455 884 387
Анализ замеров программы на разных объёмах данных
Матрица 100x100
Single-threaded algorithm: 0.106388 ms
Multi-threaded algorithm (ThreadPoolExecutor): 32.085907 ms
Multi-threaded algorithm (ForkJoinPool): 17.146648 ms
Матрица 10000x1000
Single-threaded algorithm: 92.179132 ms
Multi-threaded algorithm (ThreadPoolExecutor): 54.21498 ms
Multi-threaded algorithm (ForkJoinPool): 45.481057 ms
Матрица 50000x1000
Single-threaded algorithm: 379.713324 ms
Multi-threaded algorithm (ThreadPoolExecutor): 130.736923 ms
Multi-threaded algorithm (ForkJoinPool): 71.736064 ms
Вывод
- Можно сказать, что на маленьких объемах многопоточность негативно сказывается на эффективности, на размере 10000x100 уже многопоточность оптимизирует задачу, ThreadPool и ForkJoin примерно на одинаковом уровне. На размере 50000x1000 ForkJoin лидирует над ThreadPool. Можно сделать вывод, что ForkJoin в целом работает лучше, чем ThreadPool, но наверное не всегда можно сделать рекурсивное разделение, как в нашем варианте.