Merge pull request 'borschevskaya_anna_lab_4 is ready' (#50) from borschevskaya_anna_lab_4 into main

Reviewed-on: Alexey/DAS_2024_1#50
This commit is contained in:
Alexey 2024-10-14 11:03:34 +04:00
commit ea8da8c665
33 changed files with 838 additions and 0 deletions

View File

@ -0,0 +1,50 @@
# Отчет. Лабораторная работа 4
## Описание
В ходе лабораторной работы были изучены главы туториала о работе с RabbitMQ. Результат выполнения заданий каждой главы
отражен на скриншотах в папке /images:
- Tutorial-Task1.png
![Tutorial-Task1](images/Tutorial-Task1.png)
- Tutorial-Task2.png
![Tutorial-Task2](images/Tutorial-Task2.png)
- Tutorial-Task3.png
![Tutorial-Task3](images/Tutorial-Task3.png)
Задание из 3-ей главы туториала было расширено условиями, которые были поставлены в задании к данной лабораторной работе.
Для демонстрации работы сервисов посредством ассинхронного общения через брокер сообщений RabbitMQ была выбрана
предметная область "Обработка заказов".
Сервис-издатель "Publisher" публикует в очередь сообщений событие поступления заказа с некоторым номером.
Сервисы-подписчики обрабатывают сообщения о заказах, при этом подписчики обрабатывают сообщение по-разному. Один вид
подписчика обрабатывает с задержкой в несколько секунд, другой - "мгновенно", они получают одни и те жа сообщения,
но соединены с разными очередями.
В качестве эксперимента изначально были запущены по одному экземпляру каждого вида.
На изображении Consumer2.png представлена работа мгновенно обрабатывающего подписчика. Он справляется с нагрузкой,
так как размер очереди не растет.
![Consumer 2](images/Consumer2.png)
На изображении Consumer1.png представлена работа подписчика, обрабатывающего сообщения с задержкой. Как мы видим,
в очереди накапливаются сообщения в состоянии 'Ready' - эти сообщения готовы для того, чтобы быть доставленными подписчикам.
Сервис не справляется с нагрузкой, так как отправляются сообщения быстрее, чем обрабатываются.
![Consumer 1](images/Consumer1.png)
Для того, чтобы обеспечить равную скорость отправки и обработки, увеличиваем количество экземпляров-подписчиков данного типа до трех.
На изображении видно, что теперь длина очереди не растет и система справляется с поступающими сообщениями. Также скорость "publish" и
"consumer ack" стали равны.
![Consumer 1](images/Consumer1-scaling.png)
## Как запустить
Для того, чтобы запустить сервисы, необходимо выполнить следующие действия:
1. Установить и запустить Docker Engine или Docker Desktop
2. Через консоль перейти в папку, в которой расположен файл docker-compose.yml
3. Выполнить команду для запуска брокера сообщений rabbitmq:
```
docker compose up rabbit -d
```
4. Выполнить команду для запуска остальных контейнеров:
```
docker compose up -d
```
Такой порядок запуска важен для того, чтобы брокер сообщений успел полностью запуститься
и произвести действия для того, чтобы быть готовым принимать соединения от сервисов. Потому что указания depends_on не хватает
для отслеживания завершения всех необходимых подготовительных процессов брокера.
## Видео-отчет
Работоспособность лабораторной работы можно оценить в следующем [видео](https://disk.yandex.ru/i/G0vsfp7vwazYHw).

View File

@ -0,0 +1,23 @@
# Используем образ Maven для сборки
FROM maven:3.8-eclipse-temurin-21-alpine AS build
# Устанавливаем рабочую директорию
WORKDIR /app
# Копируем остальные исходные файлы
COPY pom.xml .
COPY src src
# Собираем весь проект
RUN mvn clean package -DskipTests
RUN mvn dependency:copy-dependencies
# Используем официальный образ JDK для запуска собранного jar-файла
FROM eclipse-temurin:21-jdk-alpine
# Копируем jar-файл из предыдущего этапа
COPY --from=build /app/target/*.jar /app.jar
COPY --from=build /app/target/dependency /
# Указываем команду для запуска приложения
CMD ["java", "-jar", "app.jar"]

View File

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.somecompany</groupId>
<artifactId>consumer-app</artifactId>
<version>1.0.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.3</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.22.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,15 @@
package ru.somecompany;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import ru.somecompany.config.property.RabbitProperties;
@SpringBootApplication
@ConfigurationPropertiesScan(basePackageClasses = RabbitProperties.class)
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}

View File

@ -0,0 +1,45 @@
package ru.somecompany.config;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import ru.somecompany.config.property.RabbitProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Configuration
@RequiredArgsConstructor
public class ConnectionFactoryConfig {
private final RabbitProperties rabbitProperties;
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rabbitProperties.getHost());
factory.setPort(rabbitProperties.getPort());
return factory;
}
@Bean
public Connection connection(ConnectionFactory connectionFactory) throws IOException, TimeoutException {
return connectionFactory.newConnection();
}
@Bean
public Channel channel(Connection connection) throws IOException {
var exchange = rabbitProperties.getExchange();
var queue = rabbitProperties.getQueue();
var channel = connection.createChannel();
channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT);
channel.queueDeclare(queue, true, false, true, null);
channel.queueBind(queue, exchange, "");
return channel;
}
}

View File

@ -0,0 +1,19 @@
package ru.somecompany.config.property;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "app.rabbit-properties")
public class RabbitProperties {
private String host;
private Integer port;
private Integer delay;
private String queue;
private String exchange;
}

View File

@ -0,0 +1,76 @@
package ru.somecompany.consumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import ru.somecompany.config.property.RabbitProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
@RequiredArgsConstructor
public class Consumer {
private final RabbitProperties rabbitProperties;
private final Connection connection;
private final Channel channel;
@PostConstruct
public void consume() {
try {
channel.basicQos(1);
channel.basicConsume(rabbitProperties.getQueue(), false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
var delay = rabbitProperties.getDelay();
try {
doWork(delay);
} finally {
System.out.println(" [x] Processed '" + message + "'");
channel.basicAck(deliveryTag, false);
}
}
});
} catch (Exception exception) {
log.error("Error while set up connection with rabbit", exception);
}
}
private static void doWork(Integer delay) {
if (delay > 0) {
try {
Thread.sleep(delay);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
@PreDestroy
public void cleanUp() throws Exception {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
}
}

View File

@ -0,0 +1,10 @@
server:
port: ${SERVER_PORT:8081}
app:
rabbit-properties:
host: ${RABBIT_HOST:localhost}
port: ${RABBIT_PORT:5672}
delay: ${PROCESS_DELAY:0}
queue: ${QUEUE_NAME:queue-1}
exchange: ${EXCHANGE_NAME:order-events}

View File

@ -0,0 +1,79 @@
services:
rabbit:
container_name: rabbit
image: rabbitmq:3-management
ports:
- "15672:15672"
- "5672:5672"
- "5671:5671"
networks:
- local
publisher:
build: ./publisher-app
container_name: publisher
depends_on:
- rabbit
environment:
RABBIT_HOST: rabbit
RABBIT_PORT: 5672
networks:
- local
consumer-1:
build: ./consumer-app
container_name: consumer-1
depends_on:
- rabbit
- publisher
environment:
RABBIT_HOST: rabbit
RABBIT_PORT: 5672
PROCESS_DELAY: 3000
QUEUE_NAME: queue1
EXCHANGE_NAME: order-events
networks:
- local
consumer-2:
build: ./consumer-app
container_name: consumer-2
depends_on:
- rabbit
- publisher
environment:
RABBIT_HOST: rabbit
RABBIT_PORT: 5672
PROCESS_DELAY: 0
QUEUE_NAME: queue2
EXCHANGE_NAME: order-events
networks:
- local
consumer-12:
build: ./consumer-app
container_name: consumer-12
depends_on:
- rabbit
- publisher
environment:
RABBIT_HOST: rabbit
RABBIT_PORT: 5672
PROCESS_DELAY: 3000
QUEUE_NAME: queue1
EXCHANGE_NAME: order-events
networks:
- local
consumer-13:
build: ./consumer-app
container_name: consumer-13
depends_on:
- rabbit
- publisher
environment:
RABBIT_HOST: rabbit
RABBIT_PORT: 5672
PROCESS_DELAY: 3000
QUEUE_NAME: queue1
EXCHANGE_NAME: order-events
networks:
- local
networks:
local:

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.somecompany</groupId>
<artifactId>helloworld-tutorial</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.22.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,28 @@
package ru.somecompany;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Main {
private static final String QUEUE_NAME = "hello-world";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel();) {
var sender = new Sender(channel);
var receiver = new Receiver(channel);
} catch (Exception e) {
System.out.println(" [*] Error in Hello-World");
}
}
}

View File

@ -0,0 +1,20 @@
package ru.somecompany;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
public class Receiver {
private static final String QUEUE_NAME = "hello-world";
public Receiver(Channel channel) throws IOException {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

View File

@ -0,0 +1,18 @@
package ru.somecompany;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Sender {
private static final String QUEUE_NAME = "hello-world";
public Sender(Channel channel) throws IOException {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}

View File

@ -0,0 +1,8 @@
# Root logger option
log4j.rootLogger=INFO, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 101 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 128 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 848 KiB

View File

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

View File

@ -0,0 +1,21 @@
# Используем образ Maven для сборки
FROM maven:3.8-eclipse-temurin-21-alpine AS build
# Устанавливаем рабочую директорию
WORKDIR /app
# Копируем остальные исходные файлы
COPY pom.xml .
COPY src src
# Собираем весь проект
RUN mvn clean package -DskipTests
# Используем официальный образ JDK для запуска собранного jar-файла
FROM eclipse-temurin:21-jdk-alpine
# Копируем jar-файл из предыдущего этапа
COPY --from=build /app/target/*.jar /app.jar
# Указываем команду для запуска приложения
CMD ["java", "-jar", "app.jar"]

View File

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.somecompany</groupId>
<artifactId>publisher-app</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.3</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.22.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,17 @@
package ru.somecompany;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
import org.springframework.scheduling.annotation.EnableScheduling;
import ru.somecompany.config.property.RabbitProperties;
@EnableScheduling
@SpringBootApplication
@ConfigurationPropertiesScan(basePackageClasses = RabbitProperties.class)
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}

View File

@ -0,0 +1,40 @@
package ru.somecompany.config;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import ru.somecompany.config.property.RabbitProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@Configuration
@RequiredArgsConstructor
public class ConnectionFactoryConfig {
private final RabbitProperties rabbitProperties;
@Bean
public ConnectionFactory connectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rabbitProperties.getHost());
factory.setPort(rabbitProperties.getPort());
return factory;
}
@Bean
public Connection connection(ConnectionFactory connectionFactory) throws IOException, TimeoutException {
return connectionFactory.newConnection();
}
@Bean
public Channel channel(Connection connection) throws IOException {
var channel = connection.createChannel();
channel.exchangeDeclare(rabbitProperties.getExchange(), BuiltinExchangeType.FANOUT);
return channel;
}
}

View File

@ -0,0 +1,15 @@
package ru.somecompany.config.property;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "app.rabbit-properties")
public class RabbitProperties {
private String host;
private Integer port;
private String exchange;
}

View File

@ -0,0 +1,41 @@
package ru.somecompany.scheduler;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.RequiredArgsConstructor;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
@Service
@RequiredArgsConstructor
public class SenderScheduler {
private static final String EXCHANGE_NAME = "order-events";
private static final String MESSAGE = "Поступил заказ №%d";
private Integer index = 0;
private final ConnectionFactory connectionFactory;
private final Connection connection;
private final Channel channel;
@Scheduled(cron = "*/1 * * * * *")
public void sendMessage() {
try {
var message = String.format(MESSAGE, index);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
index++;
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
System.out.println(" [x] Error while send message");
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,8 @@
server:
port: ${SERVER_PORT:8080}
app:
rabbit-properties:
host: ${RABBIT_HOST:localhost}
port: ${RABBIT_PORT:5672}
exchange: ${EXCHANGE_NAME:order-events}

View File

@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ru.somecompany</groupId>
<artifactId>workqueue-tutorial</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.22.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,23 @@
package ru.somecompany;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Main {
public static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel();) {
var sender = new Sender(channel);
sender.send("Work Queue message");
var receiver = new Receiver(channel);
} catch (Exception e) {
System.out.println(" [*] Error in Work-Queue: " + e.getMessage());
}
}
}

View File

@ -0,0 +1,44 @@
package ru.somecompany;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static ru.somecompany.Main.QUEUE_NAME;
public class Receiver {
public Receiver(Channel channel) throws IOException {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}

View File

@ -0,0 +1,29 @@
package ru.somecompany;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import static ru.somecompany.Main.QUEUE_NAME;
public class Sender {
private Channel channel;
public Sender(Channel channel) throws IOException {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
this.channel = channel;
}
public void send(String msg) throws IOException {
String message = String.join(" ", msg);
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}

View File

@ -0,0 +1,8 @@
# Root logger option
log4j.rootLogger=INFO, stdout
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n