diff --git a/borschevskaya_anna_lab_4/README.md b/borschevskaya_anna_lab_4/README.md new file mode 100644 index 0000000..ce7c98a --- /dev/null +++ b/borschevskaya_anna_lab_4/README.md @@ -0,0 +1,50 @@ +# Отчет. Лабораторная работа 4 + +## Описание +В ходе лабораторной работы были изучены главы туториала о работе с RabbitMQ. Результат выполнения заданий каждой главы +отражен на скриншотах в папке /images: +- Tutorial-Task1.png +![Tutorial-Task1](images/Tutorial-Task1.png) +- Tutorial-Task2.png +![Tutorial-Task2](images/Tutorial-Task2.png) +- Tutorial-Task3.png +![Tutorial-Task3](images/Tutorial-Task3.png) + +Задание из 3-ей главы туториала было расширено условиями, которые были поставлены в задании к данной лабораторной работе. +Для демонстрации работы сервисов посредством ассинхронного общения через брокер сообщений RabbitMQ была выбрана +предметная область "Обработка заказов". + +Сервис-издатель "Publisher" публикует в очередь сообщений событие поступления заказа с некоторым номером. + +Сервисы-подписчики обрабатывают сообщения о заказах, при этом подписчики обрабатывают сообщение по-разному. Один вид +подписчика обрабатывает с задержкой в несколько секунд, другой - "мгновенно", они получают одни и те жа сообщения, +но соединены с разными очередями. +В качестве эксперимента изначально были запущены по одному экземпляру каждого вида. +На изображении Consumer2.png представлена работа мгновенно обрабатывающего подписчика. Он справляется с нагрузкой, +так как размер очереди не растет. +![Consumer 2](images/Consumer2.png) +На изображении Consumer1.png представлена работа подписчика, обрабатывающего сообщения с задержкой. Как мы видим, +в очереди накапливаются сообщения в состоянии 'Ready' - эти сообщения готовы для того, чтобы быть доставленными подписчикам. +Сервис не справляется с нагрузкой, так как отправляются сообщения быстрее, чем обрабатываются. +![Consumer 1](images/Consumer1.png) +Для того, чтобы обеспечить равную скорость отправки и обработки, увеличиваем количество экземпляров-подписчиков данного типа до трех. +На изображении видно, что теперь длина очереди не растет и система справляется с поступающими сообщениями. Также скорость "publish" и +"consumer ack" стали равны. +![Consumer 1](images/Consumer1-scaling.png) +## Как запустить +Для того, чтобы запустить сервисы, необходимо выполнить следующие действия: +1. Установить и запустить Docker Engine или Docker Desktop +2. Через консоль перейти в папку, в которой расположен файл docker-compose.yml +3. Выполнить команду для запуска брокера сообщений rabbitmq: +``` +docker compose up rabbit -d +``` +4. Выполнить команду для запуска остальных контейнеров: +``` +docker compose up -d +``` +Такой порядок запуска важен для того, чтобы брокер сообщений успел полностью запуститься +и произвести действия для того, чтобы быть готовым принимать соединения от сервисов. Потому что указания depends_on не хватает +для отслеживания завершения всех необходимых подготовительных процессов брокера. +## Видео-отчет +Работоспособность лабораторной работы можно оценить в следующем [видео](https://disk.yandex.ru/i/G0vsfp7vwazYHw). \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/consumer-app/Dockerfile b/borschevskaya_anna_lab_4/consumer-app/Dockerfile new file mode 100644 index 0000000..30fc690 --- /dev/null +++ b/borschevskaya_anna_lab_4/consumer-app/Dockerfile @@ -0,0 +1,23 @@ +# Используем образ Maven для сборки +FROM maven:3.8-eclipse-temurin-21-alpine AS build + +# Устанавливаем рабочую директорию +WORKDIR /app + +# Копируем остальные исходные файлы +COPY pom.xml . +COPY src src + +# Собираем весь проект +RUN mvn clean package -DskipTests +RUN mvn dependency:copy-dependencies + +# Используем официальный образ JDK для запуска собранного jar-файла +FROM eclipse-temurin:21-jdk-alpine + +# Копируем jar-файл из предыдущего этапа +COPY --from=build /app/target/*.jar /app.jar +COPY --from=build /app/target/dependency / + +# Указываем команду для запуска приложения +CMD ["java", "-jar", "app.jar"] diff --git a/borschevskaya_anna_lab_4/consumer-app/pom.xml b/borschevskaya_anna_lab_4/consumer-app/pom.xml new file mode 100644 index 0000000..a8ee175 --- /dev/null +++ b/borschevskaya_anna_lab_4/consumer-app/pom.xml @@ -0,0 +1,50 @@ + + + 4.0.0 + + ru.somecompany + consumer-app + 1.0.0-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-parent + 3.2.3 + + + + + 21 + 21 + UTF-8 + + + + + org.springframework.boot + spring-boot-starter-web + + + com.rabbitmq + amqp-client + 5.22.0 + + + org.projectlombok + lombok + 1.18.30 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/Main.java b/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/Main.java new file mode 100644 index 0000000..c541a18 --- /dev/null +++ b/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/Main.java @@ -0,0 +1,15 @@ +package ru.somecompany; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; +import ru.somecompany.config.property.RabbitProperties; + +@SpringBootApplication +@ConfigurationPropertiesScan(basePackageClasses = RabbitProperties.class) +public class Main { + + public static void main(String[] args) { + SpringApplication.run(Main.class, args); + } +} \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/config/ConnectionFactoryConfig.java b/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/config/ConnectionFactoryConfig.java new file mode 100644 index 0000000..3dadb49 --- /dev/null +++ b/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/config/ConnectionFactoryConfig.java @@ -0,0 +1,45 @@ +package ru.somecompany.config; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import ru.somecompany.config.property.RabbitProperties; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Configuration +@RequiredArgsConstructor +public class ConnectionFactoryConfig { + + private final RabbitProperties rabbitProperties; + + @Bean + public ConnectionFactory connectionFactory() { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(rabbitProperties.getHost()); + factory.setPort(rabbitProperties.getPort()); + return factory; + } + + @Bean + public Connection connection(ConnectionFactory connectionFactory) throws IOException, TimeoutException { + return connectionFactory.newConnection(); + } + + @Bean + public Channel channel(Connection connection) throws IOException { + var exchange = rabbitProperties.getExchange(); + var queue = rabbitProperties.getQueue(); + var channel = connection.createChannel(); + + channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT); + channel.queueDeclare(queue, true, false, true, null); + channel.queueBind(queue, exchange, ""); + return channel; + } +} diff --git a/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/config/property/RabbitProperties.java b/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/config/property/RabbitProperties.java new file mode 100644 index 0000000..25f73f7 --- /dev/null +++ b/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/config/property/RabbitProperties.java @@ -0,0 +1,19 @@ +package ru.somecompany.config.property; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@Data +@ConfigurationProperties(prefix = "app.rabbit-properties") +public class RabbitProperties { + + private String host; + + private Integer port; + + private Integer delay; + + private String queue; + + private String exchange; +} diff --git a/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/consumer/Consumer.java b/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/consumer/Consumer.java new file mode 100644 index 0000000..4f0d5b6 --- /dev/null +++ b/borschevskaya_anna_lab_4/consumer-app/src/main/java/ru/somecompany/consumer/Consumer.java @@ -0,0 +1,76 @@ +package ru.somecompany.consumer; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import ru.somecompany.config.property.RabbitProperties; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +@Slf4j +@Component +@RequiredArgsConstructor +public class Consumer { + + private final RabbitProperties rabbitProperties; + + private final Connection connection; + private final Channel channel; + + @PostConstruct + public void consume() { + try { + channel.basicQos(1); + channel.basicConsume(rabbitProperties.getQueue(), false, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, + Envelope envelope, + AMQP.BasicProperties properties, + byte[] body) throws IOException { + long deliveryTag = envelope.getDeliveryTag(); + + String message = new String(body, StandardCharsets.UTF_8); + System.out.println(" [x] Received '" + message + "'"); + + var delay = rabbitProperties.getDelay(); + try { + doWork(delay); + } finally { + System.out.println(" [x] Processed '" + message + "'"); + channel.basicAck(deliveryTag, false); + } + } + }); + } catch (Exception exception) { + log.error("Error while set up connection with rabbit", exception); + } + } + + private static void doWork(Integer delay) { + if (delay > 0) { + try { + Thread.sleep(delay); + } catch (InterruptedException _ignored) { + Thread.currentThread().interrupt(); + } + } + } + + @PreDestroy + public void cleanUp() throws Exception { + if (channel != null) { + channel.close(); + } + if (connection != null) { + connection.close(); + } + } +} diff --git a/borschevskaya_anna_lab_4/consumer-app/src/main/resources/application.yml b/borschevskaya_anna_lab_4/consumer-app/src/main/resources/application.yml new file mode 100644 index 0000000..a7cded6 --- /dev/null +++ b/borschevskaya_anna_lab_4/consumer-app/src/main/resources/application.yml @@ -0,0 +1,10 @@ +server: + port: ${SERVER_PORT:8081} + +app: + rabbit-properties: + host: ${RABBIT_HOST:localhost} + port: ${RABBIT_PORT:5672} + delay: ${PROCESS_DELAY:0} + queue: ${QUEUE_NAME:queue-1} + exchange: ${EXCHANGE_NAME:order-events} \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/docker-compose.yml b/borschevskaya_anna_lab_4/docker-compose.yml new file mode 100644 index 0000000..a59cc51 --- /dev/null +++ b/borschevskaya_anna_lab_4/docker-compose.yml @@ -0,0 +1,79 @@ +services: + rabbit: + container_name: rabbit + image: rabbitmq:3-management + ports: + - "15672:15672" + - "5672:5672" + - "5671:5671" + networks: + - local + publisher: + build: ./publisher-app + container_name: publisher + depends_on: + - rabbit + environment: + RABBIT_HOST: rabbit + RABBIT_PORT: 5672 + networks: + - local + consumer-1: + build: ./consumer-app + container_name: consumer-1 + depends_on: + - rabbit + - publisher + environment: + RABBIT_HOST: rabbit + RABBIT_PORT: 5672 + PROCESS_DELAY: 3000 + QUEUE_NAME: queue1 + EXCHANGE_NAME: order-events + networks: + - local + consumer-2: + build: ./consumer-app + container_name: consumer-2 + depends_on: + - rabbit + - publisher + environment: + RABBIT_HOST: rabbit + RABBIT_PORT: 5672 + PROCESS_DELAY: 0 + QUEUE_NAME: queue2 + EXCHANGE_NAME: order-events + networks: + - local + consumer-12: + build: ./consumer-app + container_name: consumer-12 + depends_on: + - rabbit + - publisher + environment: + RABBIT_HOST: rabbit + RABBIT_PORT: 5672 + PROCESS_DELAY: 3000 + QUEUE_NAME: queue1 + EXCHANGE_NAME: order-events + networks: + - local + consumer-13: + build: ./consumer-app + container_name: consumer-13 + depends_on: + - rabbit + - publisher + environment: + RABBIT_HOST: rabbit + RABBIT_PORT: 5672 + PROCESS_DELAY: 3000 + QUEUE_NAME: queue1 + EXCHANGE_NAME: order-events + networks: + - local + +networks: + local: \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/helloworld-tutorial/pom.xml b/borschevskaya_anna_lab_4/helloworld-tutorial/pom.xml new file mode 100644 index 0000000..2a85c45 --- /dev/null +++ b/borschevskaya_anna_lab_4/helloworld-tutorial/pom.xml @@ -0,0 +1,34 @@ + + + 4.0.0 + + ru.somecompany + helloworld-tutorial + 1.0-SNAPSHOT + + + 21 + 21 + UTF-8 + + + + + com.rabbitmq + amqp-client + 5.22.0 + + + org.slf4j + slf4j-api + 1.7.5 + + + org.slf4j + slf4j-log4j12 + 1.7.5 + + + \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/java/ru/somecompany/Main.java b/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/java/ru/somecompany/Main.java new file mode 100644 index 0000000..13f9f54 --- /dev/null +++ b/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/java/ru/somecompany/Main.java @@ -0,0 +1,28 @@ +package ru.somecompany; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeoutException; + +public class Main { + + private static final String QUEUE_NAME = "hello-world"; + + public static void main(String[] args) { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + try(Connection connection = factory.newConnection(); + Channel channel = connection.createChannel();) { + var sender = new Sender(channel); + var receiver = new Receiver(channel); + + } catch (Exception e) { + System.out.println(" [*] Error in Hello-World"); + } + } +} \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/java/ru/somecompany/Receiver.java b/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/java/ru/somecompany/Receiver.java new file mode 100644 index 0000000..ce8f740 --- /dev/null +++ b/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/java/ru/somecompany/Receiver.java @@ -0,0 +1,20 @@ +package ru.somecompany; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; + +import java.io.IOException; + +public class Receiver { + + private static final String QUEUE_NAME = "hello-world"; + + public Receiver(Channel channel) throws IOException { + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), "UTF-8"); + System.out.println(" [x] Received '" + message + "'"); + }; + channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); + } +} diff --git a/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/java/ru/somecompany/Sender.java b/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/java/ru/somecompany/Sender.java new file mode 100644 index 0000000..05afcfc --- /dev/null +++ b/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/java/ru/somecompany/Sender.java @@ -0,0 +1,18 @@ +package ru.somecompany; + +import com.rabbitmq.client.Channel; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class Sender { + + private static final String QUEUE_NAME = "hello-world"; + + public Sender(Channel channel) throws IOException { + channel.queueDeclare(QUEUE_NAME, false, false, false, null); + String message = "Hello World!"; + channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); + System.out.println(" [x] Sent '" + message + "'"); + } +} diff --git a/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/resources/log4j.properties b/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/resources/log4j.properties new file mode 100644 index 0000000..393e087 --- /dev/null +++ b/borschevskaya_anna_lab_4/helloworld-tutorial/src/main/resources/log4j.properties @@ -0,0 +1,8 @@ +# Root logger option +log4j.rootLogger=INFO, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/images/Consumer1-scaling.PNG b/borschevskaya_anna_lab_4/images/Consumer1-scaling.PNG new file mode 100644 index 0000000..9a87b2b Binary files /dev/null and b/borschevskaya_anna_lab_4/images/Consumer1-scaling.PNG differ diff --git a/borschevskaya_anna_lab_4/images/Consumer1.PNG b/borschevskaya_anna_lab_4/images/Consumer1.PNG new file mode 100644 index 0000000..7c60be9 Binary files /dev/null and b/borschevskaya_anna_lab_4/images/Consumer1.PNG differ diff --git a/borschevskaya_anna_lab_4/images/Consumer2.PNG b/borschevskaya_anna_lab_4/images/Consumer2.PNG new file mode 100644 index 0000000..9592d50 Binary files /dev/null and b/borschevskaya_anna_lab_4/images/Consumer2.PNG differ diff --git a/borschevskaya_anna_lab_4/images/Tutorial-Task1.PNG b/borschevskaya_anna_lab_4/images/Tutorial-Task1.PNG new file mode 100644 index 0000000..b2129cd Binary files /dev/null and b/borschevskaya_anna_lab_4/images/Tutorial-Task1.PNG differ diff --git a/borschevskaya_anna_lab_4/images/Tutorial-Task2.PNG b/borschevskaya_anna_lab_4/images/Tutorial-Task2.PNG new file mode 100644 index 0000000..e29defd Binary files /dev/null and b/borschevskaya_anna_lab_4/images/Tutorial-Task2.PNG differ diff --git a/borschevskaya_anna_lab_4/images/Tutorial-Task3.PNG b/borschevskaya_anna_lab_4/images/Tutorial-Task3.PNG new file mode 100644 index 0000000..e77d7af Binary files /dev/null and b/borschevskaya_anna_lab_4/images/Tutorial-Task3.PNG differ diff --git a/borschevskaya_anna_lab_4/publisher-app/.gitignore b/borschevskaya_anna_lab_4/publisher-app/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/borschevskaya_anna_lab_4/publisher-app/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/publisher-app/Dockerfile b/borschevskaya_anna_lab_4/publisher-app/Dockerfile new file mode 100644 index 0000000..cc673eb --- /dev/null +++ b/borschevskaya_anna_lab_4/publisher-app/Dockerfile @@ -0,0 +1,21 @@ +# Используем образ Maven для сборки +FROM maven:3.8-eclipse-temurin-21-alpine AS build + +# Устанавливаем рабочую директорию +WORKDIR /app + +# Копируем остальные исходные файлы +COPY pom.xml . +COPY src src + +# Собираем весь проект +RUN mvn clean package -DskipTests + +# Используем официальный образ JDK для запуска собранного jar-файла +FROM eclipse-temurin:21-jdk-alpine + +# Копируем jar-файл из предыдущего этапа +COPY --from=build /app/target/*.jar /app.jar + +# Указываем команду для запуска приложения +CMD ["java", "-jar", "app.jar"] diff --git a/borschevskaya_anna_lab_4/publisher-app/pom.xml b/borschevskaya_anna_lab_4/publisher-app/pom.xml new file mode 100644 index 0000000..4cfb240 --- /dev/null +++ b/borschevskaya_anna_lab_4/publisher-app/pom.xml @@ -0,0 +1,49 @@ + + + 4.0.0 + + ru.somecompany + publisher-app + 1.0-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-parent + 3.2.3 + + + + + 21 + 21 + UTF-8 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.projectlombok + lombok + 1.18.30 + + + com.rabbitmq + amqp-client + 5.22.0 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/Main.java b/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/Main.java new file mode 100644 index 0000000..1ce208a --- /dev/null +++ b/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/Main.java @@ -0,0 +1,17 @@ +package ru.somecompany; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.ConfigurationPropertiesScan; +import org.springframework.scheduling.annotation.EnableScheduling; +import ru.somecompany.config.property.RabbitProperties; + +@EnableScheduling +@SpringBootApplication +@ConfigurationPropertiesScan(basePackageClasses = RabbitProperties.class) +public class Main { + + public static void main(String[] args) { + SpringApplication.run(Main.class, args); + } +} \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/config/ConnectionFactoryConfig.java b/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/config/ConnectionFactoryConfig.java new file mode 100644 index 0000000..0cef48f --- /dev/null +++ b/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/config/ConnectionFactoryConfig.java @@ -0,0 +1,40 @@ +package ru.somecompany.config; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import ru.somecompany.config.property.RabbitProperties; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +@Configuration +@RequiredArgsConstructor +public class ConnectionFactoryConfig { + + private final RabbitProperties rabbitProperties; + + @Bean + public ConnectionFactory connectionFactory() { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(rabbitProperties.getHost()); + factory.setPort(rabbitProperties.getPort()); + return factory; + } + + @Bean + public Connection connection(ConnectionFactory connectionFactory) throws IOException, TimeoutException { + return connectionFactory.newConnection(); + } + + @Bean + public Channel channel(Connection connection) throws IOException { + var channel = connection.createChannel(); + channel.exchangeDeclare(rabbitProperties.getExchange(), BuiltinExchangeType.FANOUT); + return channel; + } +} diff --git a/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/config/property/RabbitProperties.java b/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/config/property/RabbitProperties.java new file mode 100644 index 0000000..29a8bc5 --- /dev/null +++ b/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/config/property/RabbitProperties.java @@ -0,0 +1,15 @@ +package ru.somecompany.config.property; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@Data +@ConfigurationProperties(prefix = "app.rabbit-properties") +public class RabbitProperties { + + private String host; + + private Integer port; + + private String exchange; +} diff --git a/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/scheduler/SenderScheduler.java b/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/scheduler/SenderScheduler.java new file mode 100644 index 0000000..66fed4c --- /dev/null +++ b/borschevskaya_anna_lab_4/publisher-app/src/main/java/ru/somecompany/scheduler/SenderScheduler.java @@ -0,0 +1,41 @@ +package ru.somecompany.scheduler; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import lombok.RequiredArgsConstructor; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeoutException; + +@Service +@RequiredArgsConstructor +public class SenderScheduler { + + private static final String EXCHANGE_NAME = "order-events"; + private static final String MESSAGE = "Поступил заказ №%d"; + private Integer index = 0; + + private final ConnectionFactory connectionFactory; + private final Connection connection; + private final Channel channel; + + @Scheduled(cron = "*/1 * * * * *") + public void sendMessage() { + try { + var message = String.format(MESSAGE, index); + + channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); + index++; + System.out.println(" [x] Sent '" + message + "'"); + } catch (IOException e) { + System.out.println(" [x] Error while send message"); + throw new RuntimeException(e); + } + } + +} diff --git a/borschevskaya_anna_lab_4/publisher-app/src/main/resources/application.yml b/borschevskaya_anna_lab_4/publisher-app/src/main/resources/application.yml new file mode 100644 index 0000000..983abc0 --- /dev/null +++ b/borschevskaya_anna_lab_4/publisher-app/src/main/resources/application.yml @@ -0,0 +1,8 @@ +server: + port: ${SERVER_PORT:8080} + +app: + rabbit-properties: + host: ${RABBIT_HOST:localhost} + port: ${RABBIT_PORT:5672} + exchange: ${EXCHANGE_NAME:order-events} \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/workqueue-tutorial/pom.xml b/borschevskaya_anna_lab_4/workqueue-tutorial/pom.xml new file mode 100644 index 0000000..4a67d9f --- /dev/null +++ b/borschevskaya_anna_lab_4/workqueue-tutorial/pom.xml @@ -0,0 +1,30 @@ + + + 4.0.0 + + ru.somecompany + workqueue-tutorial + 1.0-SNAPSHOT + + + 21 + 21 + UTF-8 + + + + + com.rabbitmq + amqp-client + 5.22.0 + + + org.slf4j + slf4j-log4j12 + 1.7.5 + + + + \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/java/ru/somecompany/Main.java b/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/java/ru/somecompany/Main.java new file mode 100644 index 0000000..8e49bb1 --- /dev/null +++ b/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/java/ru/somecompany/Main.java @@ -0,0 +1,23 @@ +package ru.somecompany; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +public class Main { + + public static final String QUEUE_NAME = "task_queue"; + + public static void main(String[] args) { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("localhost"); + try(Connection connection = factory.newConnection(); + Channel channel = connection.createChannel();) { + var sender = new Sender(channel); + sender.send("Work Queue message"); + var receiver = new Receiver(channel); + } catch (Exception e) { + System.out.println(" [*] Error in Work-Queue: " + e.getMessage()); + } + } +} \ No newline at end of file diff --git a/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/java/ru/somecompany/Receiver.java b/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/java/ru/somecompany/Receiver.java new file mode 100644 index 0000000..4569eef --- /dev/null +++ b/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/java/ru/somecompany/Receiver.java @@ -0,0 +1,44 @@ +package ru.somecompany; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.DeliverCallback; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static ru.somecompany.Main.QUEUE_NAME; + +public class Receiver { + + public Receiver(Channel channel) throws IOException { + channel.queueDeclare(QUEUE_NAME, true, false, false, null); + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + channel.basicQos(1); + + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), StandardCharsets.UTF_8); + + System.out.println(" [x] Received '" + message + "'"); + try { + doWork(message); + } finally { + System.out.println(" [x] Done"); + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + } + }; + channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); + } + + private static void doWork(String task) { + for (char ch : task.toCharArray()) { + if (ch == '.') { + try { + Thread.sleep(1000); + } catch (InterruptedException _ignored) { + Thread.currentThread().interrupt(); + } + } + } + } +} diff --git a/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/java/ru/somecompany/Sender.java b/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/java/ru/somecompany/Sender.java new file mode 100644 index 0000000..3b7ea29 --- /dev/null +++ b/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/java/ru/somecompany/Sender.java @@ -0,0 +1,29 @@ +package ru.somecompany; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.MessageProperties; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; + +import static ru.somecompany.Main.QUEUE_NAME; + +public class Sender { + + private Channel channel; + + public Sender(Channel channel) throws IOException { + channel.queueDeclare(QUEUE_NAME, true, false, false, null); + this.channel = channel; + } + + public void send(String msg) throws IOException { + String message = String.join(" ", msg); + + channel.basicPublish("", QUEUE_NAME, + MessageProperties.PERSISTENT_TEXT_PLAIN, + message.getBytes("UTF-8")); + System.out.println(" [x] Sent '" + message + "'"); + } +} diff --git a/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/resources/log4j.properties b/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/resources/log4j.properties new file mode 100644 index 0000000..393e087 --- /dev/null +++ b/borschevskaya_anna_lab_4/workqueue-tutorial/src/main/resources/log4j.properties @@ -0,0 +1,8 @@ +# Root logger option +log4j.rootLogger=INFO, stdout + +# Direct log messages to stdout +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.Target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n \ No newline at end of file