Merge pull request 'lobashov_ivan_lab_4 is ready' (#439) from lobashov_ivan_lab_4 into main

Reviewed-on: #439
This commit was merged in pull request #439.
This commit is contained in:
2025-12-08 23:07:58 +04:00
30 changed files with 483 additions and 0 deletions

View File

@@ -0,0 +1,55 @@
# Лабораторная работа 4
**Тема**: Параллельная обработка сообщений с использованием RabbitMQ
## Предметная область
Система мониторинга автомобилей, отслеживающая различные события транспортных средств.
## Использованные технологии
- Java 17 язык, на котором были пройдены туториалы и выполнено основное задание;
- RabbitMQ - используемый брокер сообщений.
# Что делает программа
## Publisher
Генерирует каждую секунду случайное событие по предметной области и публикует его в обменник RabbitMQ.
## Consumer 1
Моделирует медленного обработчика событий, с задержкой в 2 или 3 секунды (случайно).
## Consumer 2
Моделирует быстрый обработчик событий с мгновенной обработкой сообщений.
1. CarEventPublisher
* Генерирует события автомобилей (превышение скорости, низкий уровень топлива, перегрев двигателя и др.)
* Каждую секунду публикует сообщение, которое отправляется всем подписчикам через exchange типа fanout
* Отправляет данные о марке автомобиля, скорости, уровне топлива и типе события
2. CarAnalyticsConsumer (Consumer 1)
* Получает все события и анализирует их для статистики и отчетности
* Симулирует глубокий анализ, делая паузу 2-3 секунды
* Выводит информацию о начале и завершении анализа
* Создает очередь car_queue_slow
3. CarMonitoringConsumer (Consumer 2)
* Получает те же события, но для реального мониторинга и оповещений
* Не совершает паузы и мгновенно обрабатывает критические события
* Выводит информацию о быстрой обработке для систем оповещения
* Создает очередь car_queue_fast
## Выводы о скорости работы потребителей
Скорости работы двух потребителей НЕ хватает, чтобы полностью обрабатывать 1 Publisher:
* CarMonitoringConsumer (быстрый) успевает обрабатывать все сообщения мгновенно (скриншот fast)
* CarAnalyticsConsumer (медленный) не успевает за потоком сообщений, сообщения накапливаются в очереди (скриншот slow)
При запуске 3 копий потребителя CarAnalyticsConsumer (скриншот "3 экземпляра slow"):
* Обработка ускорилась, но сообщения продолжали копиться в очереди (скриншот результат теста)
* CarMonitoringConsumer продолжал мгновенно обрабатывать все поступающие сообщения
* Демонстрируется необходимость правильного балансирования нагрузки в распределенных системах
# Ссылка на видео:
[Видео](https://rutube.ru/video/private/7eb1204158459ece3e8767202a255bbf/?p=lsYn9Mc77yd0_Vcyl-fkpw)

View File

@@ -0,0 +1,86 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rabbitmq</groupId>
<artifactId>rabbitmq-four-lab</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitmq-four-lab</name>
<url/>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.27.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.17</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.14.1</version>
<configuration>
<release>17</release>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<version>3.5.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.3.1</version>
<configuration>
<propertiesEncoding>${project.build.sourceEncoding}</propertiesEncoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.6.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>ReceiveLogs</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Binary file not shown.

After

Width:  |  Height:  |  Size: 186 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 177 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

View File

@@ -0,0 +1,39 @@
package com.rabbit_mq.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class CarConsumerFast {
private static final String EXCHANGE_NAME = "car_events";
private static final String QUEUE_NAME = "car_queue_fast";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println(" [*] Fast Car Consumer waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Fast] Received: " + message);
// Мгновенная обработка
System.out.println(" [Fast] Processed instantly - event logged to system.");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}

View File

@@ -0,0 +1,48 @@
package com.rabbit_mq.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
import java.util.Random;
public class CarConsumerSlow {
private static final String EXCHANGE_NAME = "car_events";
private static final String QUEUE_NAME = "car_queue_slow";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println(" [*] Slow Car Consumer waiting for messages. To exit press CTRL+C");
Random random = new Random();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Slow] Received: " + message);
// Обработка занимает 2-3 секунды
int delaySeconds = random.nextBoolean() ? 2 : 3;
try {
System.out.println(" [Slow] Processing (will take " + delaySeconds + "s)...");
Thread.sleep(delaySeconds * 1000L);
} catch (InterruptedException ignored) {}
System.out.println(" [Slow] Done processing after " + delaySeconds + "s.");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}

View File

@@ -0,0 +1,60 @@
package com.rabbit_mq.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.Random;
public class CarEventPublisher {
private static final String EXCHANGE_NAME = "car_events";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
Random random = new Random();
String[] carModels = {"Toyota Camry", "Honda Civic", "Ford Focus", "BMW X5", "Mercedes C-Class"};
String[] eventTypes = {
"Превышение скорости",
"Низкий уровень топлива",
"Перегрев двигателя",
"Ремень безопасности не пристегнут",
"Низкое давление в шинах",
"Неисправность ABS",
"Требуется техобслуживание",
"Резкое торможение"
};
System.out.println(" [*] Car Event Publisher started. Sending events every second...");
int messageCount = 0;
while (messageCount < 100) {
String carModel = carModels[random.nextInt(carModels.length)];
String eventType = eventTypes[random.nextInt(eventTypes.length)];
String carId = "CAR" + (1000 + random.nextInt(9000));
int speed = 30 + random.nextInt(150);
double fuelLevel = random.nextDouble() * 100;
String message = String.format("[%s] Машина: %s (%s) | Событие: %s | Скорость: %d км/ч | Топливо: %.1f%%",
LocalDateTime.now(), carModel, carId, eventType, speed, fuelLevel);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent: " + message);
Thread.sleep(1000);
messageCount++;
}
System.out.println(" [*] Publisher finished after 100 messages");
}
}
}

View File

@@ -0,0 +1,28 @@
package com.rabbit_mq.tutorials.tutorial1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

View File

@@ -0,0 +1,21 @@
package com.rabbit_mq.tutorials.tutorial1;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}

View File

@@ -0,0 +1,28 @@
package com.rabbit_mq.tutorials.tutorial2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = String.join(" ", argv);
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}

View File

@@ -0,0 +1,48 @@
package com.rabbit_mq.tutorials.tutorial2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}

View File

@@ -0,0 +1,26 @@
package com.rabbit_mq.tutorials.tutorial3;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}

View File

@@ -0,0 +1,26 @@
package com.rabbit_mq.tutorials.tutorial3;
import com.rabbitmq.client.*;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}

View File

@@ -0,0 +1,9 @@
com\rabbit_mq\tutorials\tutorial1\Recv.class
com\rabbit_mq\tutorials\tutorial2\Worker.class
com\rabbit_mq\demo\CarConsumerFast.class
com\rabbit_mq\demo\CarEventPublisher.class
com\rabbit_mq\tutorials\tutorial1\Send.class
com\rabbit_mq\tutorials\tutorial3\EmitLog.class
com\rabbit_mq\demo\CarConsumerSlow.class
com\rabbit_mq\tutorials\tutorial2\NewTask.class
com\rabbit_mq\tutorials\tutorial3\ReceiveLogs.class

View File

@@ -0,0 +1,9 @@
C:\Users\goldfest\Desktop\4lab rvip\DAS_2025_1\lobashov_ivan_lab_4\src\main\java\com\rabbit_mq\demo\CarConsumerFast.java
C:\Users\goldfest\Desktop\4lab rvip\DAS_2025_1\lobashov_ivan_lab_4\src\main\java\com\rabbit_mq\demo\CarConsumerSlow.java
C:\Users\goldfest\Desktop\4lab rvip\DAS_2025_1\lobashov_ivan_lab_4\src\main\java\com\rabbit_mq\demo\CarEventPublisher.java
C:\Users\goldfest\Desktop\4lab rvip\DAS_2025_1\lobashov_ivan_lab_4\src\main\java\com\rabbit_mq\tutorials\tutorial1\Recv.java
C:\Users\goldfest\Desktop\4lab rvip\DAS_2025_1\lobashov_ivan_lab_4\src\main\java\com\rabbit_mq\tutorials\tutorial1\Send.java
C:\Users\goldfest\Desktop\4lab rvip\DAS_2025_1\lobashov_ivan_lab_4\src\main\java\com\rabbit_mq\tutorials\tutorial2\NewTask.java
C:\Users\goldfest\Desktop\4lab rvip\DAS_2025_1\lobashov_ivan_lab_4\src\main\java\com\rabbit_mq\tutorials\tutorial2\Worker.java
C:\Users\goldfest\Desktop\4lab rvip\DAS_2025_1\lobashov_ivan_lab_4\src\main\java\com\rabbit_mq\tutorials\tutorial3\EmitLog.java
C:\Users\goldfest\Desktop\4lab rvip\DAS_2025_1\lobashov_ivan_lab_4\src\main\java\com\rabbit_mq\tutorials\tutorial3\ReceiveLogs.java