potantsev_denis_lab_4 is ready

This commit is contained in:
2025-03-22 15:13:47 +04:00
parent c3d5ac3ba2
commit dcc14c8be1
2 changed files with 491 additions and 0 deletions

View File

@@ -0,0 +1,353 @@
# Лабораторная работа №4
## Задание
Разработать распределенное приложение с использованием платформы Apache Ignite.
**Необходимо:** разработать параллельный вариант алгоритма с применением подхода Grid
Gain и платформы Apache Ignite, замерить время его работы.
**Вариант:** упорядочить строки матрицы по возрастанию суммы их элементов (**18**).
---
## Описание
Данный проект представляет собой пример распределенной сортировки матрицы с использованием фреймворка **Apache Ignite**. *Основная задача* — распределить вычисление сумм строк матрицы между несколькими узлами кластера, а затем отсортировать матрицу по этим суммам.
### Технологии
- **Spring Boot** и **Maven** — инструменты для сборки и управления зависимостями.
- **LXC** — технология контейнеризации для запуска изолированных сред.
- **Java 8** — язык программирования (а также **JDK** под него).
- **Apache Ignite** — используется для распределенных вычислений и управления кластером.
---
## Что делает программа
### 1. Класс DistributedMatrixSorter
Это основной класс программы, который содержит точку входа main. Он отвечает за запуск Ignite, генерацию матрицы, распределение задач по вычислению сумм строк и сортировку матрицы.
***Методы класса:***
- `main(String[] args)`
- *Описание*: точка входа в программу.
- *Логика работы*:
- Замеряет время начала выполнения программы.
- Настраивает конфигурацию Ignite (используя IgniteConfiguration).
- Запускает Ignite и подключается к кластеру.
- Генерирует случайную матрицу.
- Распределяет задачи по вычислению сумм строк между узлами кластера.
- Сортирует матрицу по суммам строк.
- Выводит исходную и отсортированную матрицы.
- Замеряет время окончания выполнения программы и выводит общее время выполнения.
- `generateMatrix(int rows, int cols)`: генерирует случайную матрицу заданного размера.
- `waitForNodes(Ignite ignite, int expectedNodes)`
- *Описание*: Ожидает подключения всех узлов кластера.
- *Логика работы*:
- В цикле проверяет количество подключенных узлов.
- Если количество узлов меньше expectedNodes, программа ждет 1 секунду и проверяет снова.
- Как только все узлы подключены, выводит сообщение об успешном подключении.
```java
private static void waitForNodes(Ignite ignite, int expectedNodes) {
while (ignite.cluster().nodes().size() < expectedNodes) {
System.out.println("Ожидание подключения узлов... Текущее количество узлов: " + ignite.cluster().nodes().size());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Все узлы подключены: " + ignite.cluster().nodes().size());
}
```
- `sortMatrixByRowSums(int[][] matrix, Map<Integer, Integer> rowSums)`: cортирует матрицу по суммам строк.
- `printMatrix(int[][] matrix)`: выводит матрицу на консоль.
### 2. Класс MatrixSumDistributor
Этот класс реализует интерфейс ComputeTask и отвечает за распределение задач по вычислению сумм строк между узлами кластера.
***Методы класса:***
- `map(List<ClusterNode> subgrid, int[][] matrix)`
- *Описание*: распределяет задачи по узлам кластера.
- *Логика работы*:
- Создает задачи (RowSumCalculator) для каждой строки матрицы.
- Распределяет задачи между узлами кластера, используя круг robin-алгоритм (каждая строка отправляется на следующий узел в списке).
- Возвращает карту, где ключ — задача, а значение — узел, на котором задача будет выполняться.
```java
@Override
public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, int[][] matrix) {
Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
for (int i = 0; i < matrix.length; i++) {
// Убедимся, что каждая строка рассчитывается только один раз
jobs.put(new RowSumCalculator(matrix[i], i), subgrid.get(i % subgrid.size()));
}
return jobs;
}
```
- `result(ComputeJobResult res, List<ComputeJobResult> received)`
- *Описание*: определяет политику обработки результатов задач.
- *Логика работы*: возвращает ComputeJobResultPolicy.WAIT, что означает, что программа будет ждать завершения всех задач перед переходом к следующему шагу.
- `reduce(List<ComputeJobResult> results)`
- *Описание*: собирает результаты выполнения задач.
- *Логика работы*:
- Преобразует результаты задач в карту, где ключ — индекс строки, а значение — сумма строки.
- Возвращает эту карту.
```java
@Override
public Map<Integer, Integer> reduce(List<ComputeJobResult> results) {
return results.stream()
.map(res -> (Map.Entry<Integer, Integer>) res.getData())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
```
### 3. Класс RowSumCalculator
Этот класс реализует интерфейс ComputeJob и отвечает за вычисление суммы элементов строки матрицы.
***Методы класса:***
- `execute()`
- *Описание*: выполняет задачу.
- *Логика работы*:
- Вычисляет сумму элементов строки.
- Выводит сообщение о том, какой узел вычислил сумму строки.
- Возвращает результат в виде Map.Entry, где ключ — индекс строки, а значение — сумма строки.
```java
@Override
public Object execute() {
int sum = Arrays.stream(row).sum();
System.out.println("Узел " + Ignition.ignite().cluster().localNode().id() +
" вычислил сумму строки " + rowIndex + ": " + sum);
return new AbstractMap.SimpleEntry<>(rowIndex, sum);
}
```
- `cancel()`
- *Описание*: метод для отмены задачи.
- *Логика работы*: в текущей реализации не выполняет никаких действий (не реализован).
---
## Как запустить программу
### 1. Создание проекта
В каждом контейнере создается **Maven-проект**:
```bash
mvn archetype:generate -DgroupId=com.example -DartifactId=matrix-sorting -Dversion=1.0 -Dpackage=com.example.matrixsorting -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false
```
### 2. Настройка *pom.xml*:
Добавляем нужные зависимости и библиотеки в pom.xml для каждого проекта.
```xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>matrix-sorting</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- Зависимость для Apache Ignite -->
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>2.8.0</version>
</dependency>
<!-- Зависимости для JUnit 5 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Плагин для компиляции -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Плагин для создания JAR -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>DistributedMatrixSorter</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```
JDK Java ставим 8-й версии, так как некоторые методы утратили долгосрочную поддержку или вовсе перестали существовать.
```bash
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
export PATH=$JAVA_HOME/bin:$PATH
```
### 3. Настройка *default-config.xml*:
В папке с Ignite ищем конфигурационный файл *default-config.xml*. Этот XML-файл конфигурирует Apache Ignite для использования TCP-механизма обнаружения узлов (TcpDiscoverySpi) и поиска IP-адресов узлов в виртуальной машине (TcpDiscoveryVmIpFinder). Он указывает два узла с IP-адресами 192.168.17.117 и 192.168.17.118, которые будут использовать порты с 47500 по 47509 для соединения.
```xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>192.168.17.117:47500..47509</value>
<value>192.168.17.118:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
```
После запуска Ignite оба узла должны обнаружить друг друга и образовать кластер.
```bash
export IGNITE_HOME=/labs/Lab4/apache-ignite-2.17.0-bin
export PATH=$PATH:$IGNITE_HOME/bin
ignite.sh
```
### 4. Сборка проекта
В каждом контейнере нужно выполнить команду для сборки проекта:
```bash
mvn clean package
```
### 5. Запуск приложения
Запускаем каждый узел в соответствующем контейнере. Для этого прописываем команду *java* и указываем путь до сформированного jar-файла в папке target (по умолчанию).
```bash
java -jar target/matrix-sorting-1.0-SNAPSHOT.jar
```
---
## Результаты работы программы
**Главный узел (master node)**
```
[09:25:42] Ignite node started OK (id=9d004b19)
[09:25:42] Topology snapshot [ver=2, locNode=9d004b19, servers=2, clients=0, state=ACTIVE, CPUs=4, offheap=1.5GB, heap=1.7GB]
[09:25:42] ^-- Baseline [id=0, size=2, online=2, offline=0]
Узел запущен: 9d004b19-9288-4839-9a25-ec9df441348c
Все узлы подключены: 2
Исходная матрица:
[18, 69, 92, 52]
[93, 61, 74, 80]
[21, 50, 17, 4]
[2, 86, 32, 93]
[42, 76, 15, 89]
[19, 15, 64, 79]
[82, 37, 38, 42]
[26, 70, 19, 73]
Узел 9d004b19-9288-4839-9a25-ec9df441348c вычислил сумму строки 6: 199
Узел 9d004b19-9288-4839-9a25-ec9df441348c вычислил сумму строки 0: 231
Узел 9d004b19-9288-4839-9a25-ec9df441348c вычислил сумму строки 2: 92
Узел 9d004b19-9288-4839-9a25-ec9df441348c вычислил сумму строки 4: 222
Отсортированная матрица:
[21, 50, 17, 4]
[19, 15, 64, 79]
[26, 70, 19, 73]
[82, 37, 38, 42]
[2, 86, 32, 93]
[42, 76, 15, 89]
[18, 69, 92, 52]
[93, 61, 74, 80]
[09:25:42] Ignite node stopped OK [uptime=00:00:00.206]
Общее время выполнения: 11699 мс
```
**Узел работника (worker node)**
```
[09:25:28] Ignite node started OK (id=a41464a3)
[09:25:28] Topology snapshot [ver=1, locNode=a41464a3, servers=1, clients=0, state=ACTIVE, CPUs=2, offheap=0.77GB, heap=0.85GB]
[09:25:28] ^-- Baseline [id=0, size=1, online=1, offline=0]
Узел запущен: a41464a3-fdbe-48a4-aeac-2073a976e567
Ожидание подключения узлов... Текущее количество узлов: 1
Ожидание подключения узлов... Текущее количество узлов: 1
Ожидание подключения узлов... Текущее количество узлов: 1
[09:25:36] Joining node doesn't have encryption data [node=9d004b19-9288-4839-9a25-ec9df441348c]
Ожидание подключения узлов... Текущее количество узлов: 1
Ожидание подключения узлов... Текущее количество узлов: 1
Ожидание подключения узлов... Текущее количество узлов: 1
[09:25:41] Topology snapshot [ver=2, locNode=a41464a3, servers=2, clients=0, state=ACTIVE, CPUs=4, offheap=1.5GB, heap=1.7GB]
[09:25:41] ^-- Baseline [id=0, size=2, online=2, offline=0]
Узел a41464a3-fdbe-48a4-aeac-2073a976e567 вычислил сумму строки 1: 308
Узел a41464a3-fdbe-48a4-aeac-2073a976e567 вычислил сумму строки 5: 177
Узел a41464a3-fdbe-48a4-aeac-2073a976e567 вычислил сумму строки 7: 188
Узел a41464a3-fdbe-48a4-aeac-2073a976e567 вычислил сумму строки 3: 213
[09:25:42] Topology snapshot [ver=3, locNode=a41464a3, servers=1, clients=0, state=ACTIVE, CPUs=2, offheap=0.77GB, heap=0.85GB]
[09:25:42] ^-- Baseline [id=0, size=1, online=1, offline=0]
```
---
# Выводы
- Apache Ignite не лучшим образом подходит для простых или маломасштабных проектов по типу этого (сортировка строк в матрице). Накладные расходы, связанные с настройкой распределённой системы и управлением ею становятся существенным минусом, который заставляет смотреть в сторону более компактных и простых решений.
- По тестам можно заметить серьезную временную задержку при подключении узлов друг к другу. Это тоже, конечно, сказывается на общей эффективности созданной системы.
- Если сравнивать технологию Ignite и технологию MPI, также используемую для распределенных вычислений, MPI обеспечивает максимальную производительность и низкую задержку в распределённых вычислениях за счёт прямого обмена сообщениями, тогда как Ignite жертвует частью эффективности ради удобства и автоматического масштабирования.
- Так или иначе, выбор технологии для разработки распределенных систем — это вопрос довольно сложный и зависит от того, на что ориентирован пользователь и чем он готов жертвовать.

View File

@@ -0,0 +1,138 @@
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.compute.*;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import java.util.*;
import java.util.stream.Collectors;
public class DistributedMatrixSorter {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
IgniteConfiguration cfg = new IgniteConfiguration();
// Настройка обнаружения узлов
TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
ipFinder.setAddresses(Arrays.asList("192.168.17.117:47500", "192.168.17.118:47500"));
discoSpi.setIpFinder(ipFinder);
cfg.setDiscoverySpi(discoSpi);
try (Ignite ignite = Ignition.start(cfg)) {
System.out.println("Узел запущен: " + ignite.cluster().localNode().id());
// Ожидание подключения всех узлов
waitForNodes(ignite, 2);
// Генерация матрицы
int[][] matrix = generateMatrix(8, 4);
System.out.println("Исходная матрица:");
printMatrix(matrix);
// Распределение задач для вычисления сумм строк
Map<Integer, Integer> rowSums = ignite.compute().execute(new MatrixSumDistributor(), matrix);
// Сортировка матрицы по сумме строк
int[][] sortedMatrix = sortMatrixByRowSums(matrix, rowSums);
// Вывод отсортированной матрицы
System.out.println("Отсортированная матрица:");
printMatrix(sortedMatrix);
}
long endTime = System.currentTimeMillis();
System.out.println("Общее время выполнения: " + (endTime - startTime) + " мс");
}
// Генерация случайной матрицы
private static int[][] generateMatrix(int rows, int cols) {
Random random = new Random();
int[][] matrix = new int[rows][cols];
for (int i = 0; i < rows; i++) {
for (int j = 0; j < cols; j++) {
matrix[i][j] = random.nextInt(100);
}
}
return matrix;
}
// Ожидание подключения всех узлов
private static void waitForNodes(Ignite ignite, int expectedNodes) {
while (ignite.cluster().nodes().size() < expectedNodes) {
System.out.println("Ожидание подключения узлов... Текущее количество узлов: " + ignite.cluster().nodes().size());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Все узлы подключены: " + ignite.cluster().nodes().size());
}
// Сортировка матрицы по суммам строк
private static int[][] sortMatrixByRowSums(int[][] matrix, Map<Integer, Integer> rowSums) {
return Arrays.stream(matrix)
.sorted(Comparator.comparingInt(row -> rowSums.get(Arrays.asList(matrix).indexOf(row))))
.toArray(int[][]::new);
}
// Вывод матрицы
private static void printMatrix(int[][] matrix) {
for (int[] row : matrix) {
System.out.println(Arrays.toString(row));
}
}
private static class MatrixSumDistributor implements ComputeTask<int[][], Map<Integer, Integer>> {
@Override
public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, int[][] matrix) {
Map<ComputeJob, ClusterNode> jobs = new HashMap<>();
for (int i = 0; i < matrix.length; i++) {
// Убедимся, что каждая строка рассчитывается только один раз
jobs.put(new RowSumCalculator(matrix[i], i), subgrid.get(i % subgrid.size()));
}
return jobs;
}
@Override
public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> received) {
return ComputeJobResultPolicy.WAIT;
}
@Override
public Map<Integer, Integer> reduce(List<ComputeJobResult> results) {
return results.stream()
.map(res -> (Map.Entry<Integer, Integer>) res.getData())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
// Класс для вычисления суммы строки
private static class RowSumCalculator implements ComputeJob {
private final int[] row;
private final int rowIndex;
public RowSumCalculator(int[] row, int rowIndex) {
this.row = row;
this.rowIndex = rowIndex;
}
@Override
public Object execute() {
int sum = Arrays.stream(row).sum();
System.out.println("Узел " + Ignition.ignite().cluster().localNode().id() +
" вычислил сумму строки " + rowIndex + ": " + sum);
return new AbstractMap.SimpleEntry<>(rowIndex, sum);
}
@Override
public void cancel() {
// Метод отмены задачи (не реализован)
}
}
}