Лабораторная №4
Разработка распределенного приложения с использованием платформы Apache Ignite. Необходимо разработать параллельный вариант алгоритма с применением подхода Grid Gain и платформа Apache Ignite, замерить время его работы.
Вариант и задание
22 Упорядочить столбцы матрицы по возрастанию первых элементов.
Что программа делает
- MatrixSortingTask распределяет работу по сортировке столбцов между узлами.
- Каждый узел выполняет сортировку своих столбцов с помощью ColumnSortJob.
- После завершения сортировки результаты агрегации собираются и объединяются в итоговую отсортированную матрицу.
Таким образом, задача сортировки матрицы с помощью распределённой обработки на кластере Apache Ignite.
Описание кода
MatrixSortingTask (Задача сортировки матрицы)
Этот класс отвечает за распределение работы по сортировке матрицы на несколько узлов Apache Ignite и последующую агрегацию результатов.
Основные моменты:
-
Распределение работы:
- Задача делит столбцы матрицы между несколькими узлами кластера для параллельной сортировки.
- Каждый узел сортирует определённые столбцы матрицы.
-
Реализация метода
map:- Метод
mapделит работу на несколько частей (по столбцам). - Для каждого узла создаётся задание (задача сортировки столбцов), которое будет обрабатываться этим узлом.
- Метод
@Override
public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> nodes, int[][] matrix) {
int cols = matrix[0].length;
int nodeCount = nodes.size();
int chunkSize = (int) Math.ceil((double) cols / nodeCount);
Map<ComputeJob, ClusterNode> map = new HashMap<>();
// Разбиваем работу на части по столбцам
for (int i = 0; i < nodeCount; i++) {
int startCol = i * chunkSize;
int endCol = Math.min(startCol + chunkSize, cols);
int[][] subMatrix = extractColumns(matrix, startCol, endCol);
map.put(new ColumnSortJob(subMatrix, startCol), nodes.get(i));
}
return map;
}
map:- Вычисляется количество узлов и делится матрица на части (столбцы), которые будут обработаны каждым узлом.
- Создаётся задание
ColumnSortJobдля каждого узла с подматрицей столбцов и индексом начала сортируемых столбцов.
- Агрегация результатов:
- После того как каждый узел завершит свою работу (сортировку столбцов), результаты собираются в одном методе
reduce. - Все отсортированные части матрицы объединяются в итоговую отсортированную матрицу.
- После того как каждый узел завершит свою работу (сортировку столбцов), результаты собираются в одном методе
@Override
public int[][] reduce(List<ComputeJobResult> results) {
int[][] sortedMatrix = new int[results.get(0).getData().length][];
int currentCol = 0;
for (ComputeJobResult result : results) {
int[][] sortedChunk = (int[][]) result.getData();
mergeColumns(sortedMatrix, sortedChunk, currentCol);
currentCol += sortedChunk[0].length;
}
return sortedMatrix;
}
reduce:- Объединяет отсортированные столбцы из разных узлов в единую матрицу.
- Метод
mergeColumnsкопирует данные из отсортированных частей в итоговую матрицу.
- Дополнительные методы:
extractColumns: Извлекает подматрицу столбцов для конкретного узла.mergeColumns: Объединяет отсортированные столбцы в итоговую матрицу.
private int[][] extractColumns(int[][] matrix, int startCol, int endCol) {
// Извлекает столбцы с startCol по endCol
}
private void mergeColumns(int[][] target, int[][] sortedChunk, int startCol) {
// Объединяет отсортированные столбцы с основной матрицей
}
ColumnSortJob (Задача сортировки столбцов)
Этот класс представляет собой задачу для одного узла кластера, которая выполняет сортировку столбцов подматрицы.
Основные моменты:
- Сортировка столбцов:
- Каждый узел получает подматрицу и сортирует столбцы этой подматрицы.
- Для сортировки используется массив индексов, который определяет порядок столбцов после сортировки.
@Override
public Object execute() {
Integer[] indices = new Integer[cols];
for (int i = 0; i < cols; i++) indices[i] = i;
// Сортируем столбцы по значениям первого ряда
Arrays.sort(indices, Comparator.comparingInt(a -> matrix[0][a]));
int[][] sortedMatrix = new int[rows][cols];
for (int i = 0; i < cols; i++) {
for (int j = 0; j < rows; j++) {
sortedMatrix[j][i] = matrix[j][indices[i]];
}
}
return sortedMatrix;
}
execute:- Столбцы подматрицы сортируются с использованием
Arrays.sortпо первому элементу каждого столбца. - Индексы столбцов в массиве
indicesопределяют новый порядок столбцов после сортировки. - Возвращается отсортированная матрица.
- Столбцы подматрицы сортируются с использованием
- Отмена задачи:
- Метод
cancelвызывается, если задача была прервана, но в данном примере он просто выводит сообщение.
- Метод
@Override
public void cancel() {
System.out.println("ColumnSortJob cancelled.");
}
- Конструктор:
- При создании объекта передаются подматрица и индекс начала столбцов, чтобы корректно обработать каждый сегмент матрицы.
public ColumnSortJob(int[][] matrix, int startCol) {
this.matrix = matrix;
this.startCol = startCol;
}
Как запустить
Для успешной работы потребуется конфигурация ignite.xml. Она должна выглядеть примерно так
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Включаем или отключаем загрузку классов между узлами -->
<property name="peerClassLoadingEnabled" value="true"/>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<!-- Список адресов для обнаружения узлов -->
<property name="addresses">
<list>
<value>IP_CONTAINER1:port</value>
<value>IP_CONTAINER2:port</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Основные свойства:
- peerClassLoadingEnabled — разрешает или запрещает загрузку классов между узлами кластера.
- discoverySpi — настройка компонента для поиска и подключения других узлов.
- TcpDiscoverySpi — механизм для обнаружения узлов по TCP.
- ipFinder — хранит список адресов, используемых для поиска узлов.
- TcpDiscoveryVmIpFinder — класс, который указывает список IP-адресов для поиска узлов.
- addresses — конкретные адреса, которые используются для подключения узлов к кластеру.
Необходимые зависимости
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>2.17.0</version>
</dependency>
<!-- Apache Ignite Spring -->
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>2.17.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- Плагин для сборки Fat JAR -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>main.java.com.example.MainTask</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
После сборки в папке target (по умолчанию такая) должен появится .jar файл с необходимыми классами Его уже и нужно будет запустить на компьютере
У меня этот файл называется savinov_roman_lab_4-1.0-SNAPSHOT.jar
На втором контейнере я установил платформу apache ignite и запустил её. (предварительно нужно изменить default-config.xml)
cd workingDirectory
java -jar savinov_roman_lab_4-1.0-SNAPSHOT.jar
Выходное значение для матрицы размером 3000 на 3000
Главная нода
[14:54:37] Ignite node started OK (id=4c43f1a7)
[14:54:37] Topology snapshot [ver=8, locNode=4c43f1a7, servers=2, clients=0, state=ACTIVE, CPUs=4, offheap=1.5GB, heap=1.9GB]
[14:54:37] ^-- Baseline [id=0, size=2, online=2, offline=0]
Ignite started successfully.
Random matrix generated.
Original Matrix:
18 39 26 96 45 81 87 91 6 19
5 98 93 68 14 16 41 81 6 46
37 48 30 49 72 21 69 24 38 7
95 41 76 83 94 53 66 99 39 47
95 13 70 92 49 45 75 0 58 81
82 43 50 1 66 40 55 66 94 54
92 95 71 72 23 58 89 18 49 75
77 36 96 49 26 20 0 63 97 24
2 23 50 89 14 46 39 14 57 16
31 75 88 94 28 46 44 40 92 74
Mapping jobs to nodes. Total nodes: 2
Sorting columns starting from index 1500
Sorting complete for columns starting at index 1500
Reducing results from jobs. Number of results: 2
Sorted Matrix:
0 0 0 0 0 0 0 0 0 0
2 62 56 61 71 93 18 39 55 49
48 22 46 38 47 69 27 23 37 87
8 54 90 39 29 0 6 54 94 60
61 7 43 39 37 13 15 85 35 79
71 12 19 12 40 50 74 63 54 31
47 20 68 45 43 32 56 59 50 2
36 15 60 12 67 52 88 57 54 8
52 40 54 17 67 80 5 27 45 18
30 23 18 78 98 49 45 83 5 43
Time taken: 1275 ms
[14:54:38] Ignite node stopped OK [uptime=00:00:01.806]
Нода работник
[14:54:36] ^-- Baseline [id=0, size=2, online=2, offline=0]
Sorting columns starting from index 0
Sorting complete for columns starting at index 0
[14:54:38] Topology snapshot [ver=9, locNode=8e88275c, servers=1, clients=0, state=ACTIVE, CPUs=2, offheap=0.77GB, heap=0.96GB]
[14:54:38] ^-- Baseline [id=0, size=1, online=1, offline=0]
[14:55:00]
[14:55:00] ^-- sysMemPlc region [type=internal, persistence=false, lazyAlloc=false,
[14:55:00] ... initCfg=40MB, maxCfg=100MB, usedRam=0MB, freeRam=99.21%, allocRam=40MB]
[14:55:00] ^-- default region [type=default, persistence=false, lazyAlloc=true,
[14:55:00] ... initCfg=256MB, maxCfg=783MB, usedRam=0MB, freeRam=100%, allocRam=0MB]
[14:55:00] ^-- volatileDsMemPlc region [type=user, persistence=false, lazyAlloc=true,
[14:55:00] ... initCfg=40MB, maxCfg=100MB, usedRam=0MB, freeRam=100%, allocRam=0MB]
[14:55:00]
[14:55:00]
[14:55:00] Data storage metrics for local node (to disable set 'metricsLogFrequency' to 0)
[14:55:00] ^-- Off-heap memory [used=0MB, free=99.92%, allocated=40MB]
[14:55:00] ^-- Page memory [pages=200]
[14:56:00]
[14:56:00] ^-- sysMemPlc region [type=internal, persistence=false, lazyAlloc=false,
[14:56:00] ... initCfg=40MB, maxCfg=100MB, usedRam=0MB, freeRam=99.21%, allocRam=40MB]
[14:56:00] ^-- default region [type=default, persistence=false, lazyAlloc=true,
[14:56:00] ... initCfg=256MB, maxCfg=783MB, usedRam=0MB, freeRam=100%, allocRam=0MB]
[14:56:00] ^-- volatileDsMemPlc region [type=user, persistence=false, lazyAlloc=true,
[14:56:00] ... initCfg=40MB, maxCfg=100MB, usedRam=0MB, freeRam=100%, allocRam=0MB]
[14:56:00]
Вывод
Контейнеры смогли эффективно взаимодействовать между собой благодаря использованию механизма Peer Class Loader в Apache Ignite, который обеспечивал обмен данными и слаженную работу распределённой системы. Однако эффективность сортировки данных зависела от качества сети, поскольку передача данных по сети добавляла издержки, что могло замедлить процесс обработки, особенно при большом количестве узлов.