Merge pull request 'podkorytova_yulia_lab_4 is ready' (#134) from podkorytova_yulia_lab_4 into main
Reviewed-on: http://student.git.athene.tech/Alexey/DAS_2023_1/pulls/134
45
podkorytova_yulia_lab_4/README.md
Normal file
@ -0,0 +1,45 @@
|
||||
# Лабораторная работа 4. Работа с брокером сообщений
|
||||
### Задание на лабораторную работу
|
||||
1. Установить брокер сообщений RabbitMQ.
|
||||
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
|
||||
3. Продемонстрировать работу брокера сообщений.
|
||||
|
||||
***
|
||||
### Описание работы
|
||||
Были разработаны 3 приложения на *java*:
|
||||
1. **Publisher**. Программа, которая создаёт один *exchange* с типом *fanout* и раз в секунду генерирует сообщение.
|
||||
|
||||
![](images/publisher.jpg)
|
||||
|
||||
2. **Consumer1**. Программа, которая создаёт под себя отдельную не анонимную очередь (*queue1*), создаёт *binding* на
|
||||
*exchange* и начинает принимать сообщения. Программа обрабатывает сообщения 3 секунды.
|
||||
|
||||
![](images/consumer1.jpg)
|
||||
|
||||
3. **Consumer2**. Аналогично *Consumer1*, только сообщения обрабатываются моментально и имя очереди (*queue2*)
|
||||
отличается от *Consumer1*.
|
||||
|
||||
![](images/consumer2.jpg)
|
||||
|
||||
***
|
||||
### Отчеты
|
||||
***RabbitMQ Management UI***
|
||||
|
||||
![](images/result1.jpg)
|
||||
|
||||
![](images/result2.jpg)
|
||||
|
||||
***Exchange***
|
||||
|
||||
![](images/result3.jpg)
|
||||
|
||||
***Очередь Consumer1***
|
||||
|
||||
![](images/queue1.jpg)
|
||||
|
||||
***Очередь Consumer2***
|
||||
|
||||
![](images/queue2.jpg)
|
||||
|
||||
### Ссылка на видео:
|
||||
https://drive.google.com/file/d/19OdXnNM29SjayVZJ1qdsrMFTHkAUGxZf/view?usp=sharing
|
@ -0,0 +1,50 @@
|
||||
package org.example;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
|
||||
import java.time.LocalTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
public class Consumer1 {
|
||||
private static final String EXCHANGE_NAME = "messages";
|
||||
private static final String QUEUE_NAME = "queue1";
|
||||
|
||||
public static void main(String[] argv) {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
factory.setPort(5672);
|
||||
factory.setUsername("guest");
|
||||
factory.setPassword("guest");
|
||||
try (Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel()) {
|
||||
// Создание не анонимной очереди с уникальным именем
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
|
||||
|
||||
System.out.println(LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")) + " Consumer 1 ожидает сообщений...");
|
||||
|
||||
// Обработка сообщений
|
||||
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
||||
String message = new String(delivery.getBody());
|
||||
System.out.println(LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")) + " Consumer 1 обрабатывает " + message);
|
||||
try {
|
||||
Thread.sleep(3000); // Обработка 3 секунды
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
System.out.println(LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")) + " Consumer 1 получил " + message);
|
||||
};
|
||||
|
||||
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
|
||||
while (true) {
|
||||
// Поддержание работы приложения
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
package org.example;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
import com.rabbitmq.client.DeliverCallback;
|
||||
|
||||
import java.time.LocalTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
public class Consumer2 {
|
||||
private static final String EXCHANGE_NAME = "messages";
|
||||
private static final String QUEUE_NAME = "queue2";
|
||||
|
||||
public static void main(String[] argv) {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
factory.setPort(5672);
|
||||
factory.setUsername("guest");
|
||||
factory.setPassword("guest");
|
||||
try (Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel()) {
|
||||
// Создание не анонимной очереди с уникальным именем
|
||||
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
||||
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
|
||||
|
||||
System.out.println(LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")) + " Consumer 2 ожидает сообщений...");
|
||||
|
||||
// Обработка сообщений
|
||||
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
||||
String message = new String(delivery.getBody());
|
||||
// Обработка моментально, без задержки
|
||||
System.out.println(LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")) + " Consumer 2 получил " + message);
|
||||
};
|
||||
|
||||
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
|
||||
while (true) {
|
||||
// Поддержание работы приложения
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package org.example;
|
||||
|
||||
import com.rabbitmq.client.Channel;
|
||||
import com.rabbitmq.client.Connection;
|
||||
import com.rabbitmq.client.ConnectionFactory;
|
||||
|
||||
import java.time.LocalTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
public class Publisher {
|
||||
private static final String EXCHANGE_NAME = "messages";
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
ConnectionFactory factory = new ConnectionFactory();
|
||||
factory.setHost("localhost");
|
||||
factory.setPort(5672);
|
||||
factory.setUsername("guest");
|
||||
factory.setPassword("guest");
|
||||
try (Connection connection = factory.newConnection();
|
||||
Channel channel = connection.createChannel()) {
|
||||
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
|
||||
|
||||
while (true) {
|
||||
String message = "сообщение от пользователя c id = " + Math.round((Math.random() * 100));
|
||||
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
|
||||
System.out.println(LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")) + " Отправлено: " + message);
|
||||
Thread.sleep(1000); // Пауза в 1 секунду
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
BIN
podkorytova_yulia_lab_4/images/consumer1.JPG
Normal file
After Width: | Height: | Size: 76 KiB |
BIN
podkorytova_yulia_lab_4/images/consumer2.JPG
Normal file
After Width: | Height: | Size: 77 KiB |
BIN
podkorytova_yulia_lab_4/images/publisher.JPG
Normal file
After Width: | Height: | Size: 78 KiB |
BIN
podkorytova_yulia_lab_4/images/queue1.JPG
Normal file
After Width: | Height: | Size: 42 KiB |
BIN
podkorytova_yulia_lab_4/images/queue2.JPG
Normal file
After Width: | Height: | Size: 42 KiB |
BIN
podkorytova_yulia_lab_4/images/result1.JPG
Normal file
After Width: | Height: | Size: 90 KiB |
BIN
podkorytova_yulia_lab_4/images/result2.JPG
Normal file
After Width: | Height: | Size: 56 KiB |
BIN
podkorytova_yulia_lab_4/images/result3.JPG
Normal file
After Width: | Height: | Size: 21 KiB |