# Данный GITIGNORE-файл был автоматически создан Microsoft(R) Visual Studio.
# Лабораторная работа 4
## Описание
Данная лабораторная работа предназначена для изучения проектирования приложений при помощи брокера сообщений.
### Уроки из RabbitMQ Tutorials
1. ![img.png](images/img1.png)
2. ![img.png](images/img2.png)
3. ![img.png](images/img3.png)
### Задание
Ссылка на демонстрацию работы программы:
## Выводы
1. Запуск 1 и 2 Consumer'а: Consumer-2 (без задержки) график Queued Messages держится на нуле, в то время как у Consumer-1 график возрастает (скорость обработки сообщений < 1 с).
2. Запуск 2 первых Consumer'ов: обрабатывают сообщения последовательно и накапливают очередь.
package ru.ulstu;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
package ru.ulstu;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
package ru.ulstu;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = String.join(" ", argv);
channel.basicPublish("", TASK_QUEUE_NAME,
System.out.println(" [x] Sent '" + message + "'");
package ru.ulstu;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
int prefetchCount = 1;
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
package ru.ulstu;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" : String.join(" ", argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
package ru.ulstu;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
package ru.ulstu;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class OrderConsumer1 {
private static final String EXCHANGE_NAME = "orders";
private static final String QUEUE_NAME = "consumer1_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println(" [*] Consumer 1 ожидает сообщений.");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(" [x] Consumer 1 получил сообщение, обработает через 3 с.: " + message);
try {
} catch (InterruptedException e) {
throw new RuntimeException(e);
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
package ru.ulstu;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class OrderConsumer2 {
private static final String EXCHANGE_NAME = "orders";
private static final String QUEUE_NAME = "consumer2_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Создаём exchange и очередь
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println(" [*] Consumer 2 ожидает сообщений.");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println(" [x] Consumer 2 моментально обработал сообщение: " + message);
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
package ru.ulstu;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class OrderPublisher {
private static final String EXCHANGE_NAME = "orders";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
while (true) {
String message = "Новый заказ #" + System.currentTimeMillis();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Отправлено сообщение: " + message);
# Лабораторная работа 5
## Описание
Задание заключается в реализации алгоритмов умножения больших квадратных матриц. Необходимо разработать два алгоритма: последовательный и параллельный. А также провести бенчмарки, а затем описать результаты в отчете.
**100x100 матрица**:
- **4 потока** — наилучший результат.
- **10 потоков** — медленнее на почти в половину.
- **6 и 8 потоков** — хуже 4 потоков.
- **1 и 2 потока** — значительно медленнее.
**300x300 матрица**:
- **4 потока** — лучший результат.
- **8 потоков** — чуть хуже.
- **10 потоков** — медленнее.
- **1 и 2 потока** — значительно медленнее.
**500x500 матрица**:
- **8 потоков** — лучший результат.
- **6 и 10 потоков** — немного хуже.
- **4 потока** — значительно медленнее.
- **1 поток** — самый медленный.
**Ссылка на демонстрацию работы программы**:
- Если операция сложнее, рост производительности происходит с увеличением числа потоков.
- Слишком много потоков увеличивает накладные расходы (например, 10 потоков). Это может быть связано, например, с:
1. **Переключением контекстов**: Когда потоков больше, чем ядер процессора, операционная система часто переключает контексты, что занимает время.
2. **Конкуренцией за ресурсы**: Много потоков конкурируют за ограниченные ресурсы, такие как процессорное время и кэш.
3. **Управлением потоками**: С увеличением числа потоков растёт нагрузка на систему, связанную с их созданием, управлением и завершением.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class MatrixMultiplier {
private final int[][] matrixA;
private final int[][] matrixB;
private final int[][] result;
private final int size;
public MatrixMultiplier(int size) {
this.size = size;
this.matrixA = generateMatrix(size);
this.matrixB = generateMatrix(size);
this.result = new int[size][size];
private int[][] generateMatrix(int size) {
int[][] matrix = new int[size][size];
for (int i = 0; i < size; i++) {
for (int j = 0; j < size; j++) {
matrix[i][j] = (int) (Math.random() * 10);
return matrix;
public void multiplySequential() {
for (int i = 0; i < size; i++) {
for (int j = 0; j < size; j++) {
for (int k = 0; k < size; k++) {
result[i][j] += matrixA[i][k] * matrixB[k][j];
public void multiplyParallel(int numThreads) throws InterruptedException {
if (numThreads == 1) {
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
int chunkSize = (int) Math.ceil((double) size / numThreads);
for (int thread = 0; thread < numThreads; thread++) {
final int startRow = thread * chunkSize;
final int endRow = Math.min(startRow + chunkSize, size);
executor.submit(() -> {
for (int i = startRow; i < endRow; i++) {
for (int j = 0; j < size; j++) {
for (int k = 0; k < size; k++) {
result[i][j] += matrixA[i][k] * matrixB[k][j];
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
private void resetResult() {
for (int i = 0; i < size; i++) {
Arrays.fill(result[i], 0);
static class Result {
int threads;
long time;
Result(int threads, long time) {
this.threads = threads;
this.time = time;
public static void main(String[] args) throws InterruptedException {
int[] matrixSizes = {100, 300, 500};
int[] threadCounts = {1, 2, 4, 6, 8, 10};
int runs = 5; // количество прогонов
for (int size : matrixSizes) {
System.out.println("\nРазмер матрицы: " + size + "x" + size);
MatrixMultiplier multiplier = new MatrixMultiplier(size);
List<Result> results = new ArrayList<>();
for (int threads : threadCounts) {
long totalDuration = 0;
for (int run = 0; run < runs; run++) {
long startTime = System.nanoTime();
long endTime = System.nanoTime();
totalDuration += (endTime - startTime);
long averageDuration = totalDuration / runs;
results.add(new Result(threads, averageDuration));
// Сортировка по времени выполнения
results.sort(Comparator.comparingLong(r -> r.time));
System.out.println("Результаты (среднее время за " + runs + " прогонов):");
for (Result result : results) {
System.out.printf("Потоки: %d, среднее время: %d нс\n", result.threads, result.time);
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FastDeterminantCalculator {
public static void main(String[] args) {
int[] sizes = {100, 300, 500};
int[] threads = {1, 4, 8, 10};
for (int size : sizes) {
BigDecimal[][] matrix = generateMatrix(size);
for (int threadCount : threads) {
long start = System.currentTimeMillis();
BigDecimal determinant = calculateDeterminant(matrix, threadCount);
long end = System.currentTimeMillis();
System.out.printf("Matrix size: %dx%d, Threads: %d, Time: %d ms\n",
size, size, threadCount, (end - start));
public static BigDecimal[][] generateMatrix(int size) {
BigDecimal[][] matrix = new BigDecimal[size][size];
for (int i = 0; i < size; i++) {
BigDecimal rowSum = BigDecimal.ZERO;
for (int j = 0; j < size; j++) {
matrix[i][j] = BigDecimal.valueOf(Math.random() * 10);
if (i != j) {
rowSum = rowSum.add(matrix[i][j]);
matrix[i][i] = rowSum.add(BigDecimal.valueOf(Math.random() * 10 + 1));
return matrix;
public static BigDecimal calculateDeterminant(BigDecimal[][] matrix, int threadCount) {
int size = matrix.length;
BigDecimal[][] lu = new BigDecimal[size][size];
int[] permutations = new int[size];
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
if (!luDecomposition(matrix, lu, permutations, executor)) {
return BigDecimal.ZERO; // Матрица вырожденная
BigDecimal determinant = BigDecimal.ONE;
for (int i = 0; i < size; i++) {
determinant = determinant.multiply(lu[i][i]);
if (permutations[i] != i) {
determinant = determinant.negate(); // Меняем знак при перестановке
return determinant;
public static boolean luDecomposition(BigDecimal[][] matrix, BigDecimal[][] lu, int[] permutations, ExecutorService executor) {
int size = matrix.length;
for (int i = 0; i < size; i++) {
System.arraycopy(matrix[i], 0, lu[i], 0, size);
permutations[i] = i;
for (int k = 0; k < size; k++) {
int pivot = k;
for (int i = k + 1; i < size; i++) {
if (lu[i][k].abs().compareTo(lu[pivot][k].abs()) > 0) {
pivot = i;
if (lu[pivot][k].abs().compareTo(BigDecimal.valueOf(1e-10)) < 0) {
return false;
if (pivot != k) {
BigDecimal[] temp = lu[k];
lu[k] = lu[pivot];
lu[pivot] = temp;
int tempPerm = permutations[k];
permutations[k] = permutations[pivot];
permutations[pivot] = tempPerm;
CountDownLatch latch = new CountDownLatch(size - k - 1);
for (int i = k + 1; i < size; i++) {
int row = i;
int finalK = k;
executor.submit(() -> {
MathContext mc = new MathContext(20, RoundingMode.HALF_UP);
lu[row][finalK] = lu[row][finalK].divide(lu[finalK][finalK], mc);
for (int j = finalK + 1; j < size; j++) {
lu[row][j] = lu[row][j].subtract(lu[row][finalK].multiply(lu[finalK][j], mc));
try {
} catch (InterruptedException e) {
return false;
return true;
# Лабораторная работа 6
## Описание
Задание заключается в реализации алгоритмов нахождения детерминанта квадратной матрицы. Необходимо разработать два алгоритма: последовательный и параллельный. А также провести бенчмарки, а затем описать результаты в отчете.
**100x100 матрица**:
- **8 потоков** — наилучший результат.
- **10 потоков** — результат немного хуже.
- **4 потока** — примерно такой же результат как на 10 потоках.
- **1 поток** — наихудший результат.
**300x300 матрица**:
- **10 потока** — лучший результат.
- **8 потоков** — чуть хуже.
- **4 потока** — ещё медленее.
- **1 поток** — наихудший результат.
**500x500 матрица**:
- **10 потока** — лучший результат.
- **8 потоков** — чуть хуже.
- **4 потока** — ещё медленее.
- **1 поток** — наихудший результат.
**Ссылка на демонстрацию работы программы**:
- Если операция сложнее, рост производительности происходит с увеличением числа потоков.
- Слишком много потоков увеличивает накладные расходы (замтено только на неочень сложных операциях). Это может быть связано, например, с:
1. **Переключением контекстов**: Когда потоков больше, чем ядер процессора, операционная система часто переключает контексты, что занимает время.
2. **Конкуренцией за ресурсы**: Много потоков конкурируют за ограниченные ресурсы, такие как процессорное время и кэш.
3. **Управлением потоками**: С увеличением числа потоков растёт нагрузка на систему, связанную с их созданием, управлением и завершением.
Балансировка нагрузки — это способ распределения запросов между серверами для предотвращения их перегрузки и обеспечения быстродействия системы.
Для этого используются алгоритмы, такие как Round Robin, Least Connections, IP Hash.
Среди популярных открытых технологий — Nginx, HAProxy и Traefik. Nginx часто работает как реверс-прокси,
распределяя запросы и обрабатывая SSL. HAProxy подходит для высоконагруженных систем, а Traefik автоматически
настраивает маршрутизацию в облачных кластерах.
В базах данных балансировка нагрузки позволяет направлять запросы на чтение к репликам, а на запись — к основному узлу.
Инструменты, такие как ProxySQL, помогают автоматизировать этот процесс.
Реверс-прокси не только распределяет нагрузку, но и повышает безопасность системы, скрывая её внутреннюю архитектуру.
Таким образом, открытые технологии играют ключевую роль в создании масштабируемых и надёжных систем.
Распределённые системы являются основой современных сервисов, включая социальные сети. Их устройство предполагает разделение задач на микросервисы,
где каждый компонент выполняет узкоспециализированную функцию. Это упрощает разработку, позволяет масштабировать только необходимые части системы и
делает её более устойчивой к сбоям.
Для управления такими системами используются инструменты оркестрации, например, Kubernetes и Docker Swarm. Они автоматизируют развёртывание,
масштабирование и обновление сервисов, упрощая сопровождение. Однако их использование требует опыта и может осложнить отладку.
Очереди сообщений, такие как RabbitMQ или Kafka, помогают асинхронно передавать данные между сервисами. Это снижает нагрузку и обеспечивает надёжное взаимодействие,
передавая запросы, уведомления или данные для обработки.
Распределённые системы обладают преимуществами в виде масштабируемости, устойчивости и гибкости разработки.
Однако их сложность может стать серьёзным вызовом при проектировании и сопровождении.
Параллельные вычисления полезны, например, для обработки больших объёмов данных или машинного обучения,
но в некоторых случаях последовательная обработка более предпочтительна. Такой подход требует анализа задач, чтобы избежать излишней сложности.
image: mariadb:10.6.4-focal
command: '--default-authentication-plugin=mysql_native_password'
- wp_db_data:/var/lib/mysql
restart: always
- MYSQL_ROOT_PASSWORD=somewordpress
- MYSQL_DATABASE=wordpress
- MYSQL_USER=wordpress
- MYSQL_PASSWORD=wordpress
- 3306
- 33060
image: wordpress:latest
- wp_data:/var/www/html
- 52384:80
restart: always
image: postgres:latest
container_name: db
POSTGRES_DB: postgres
- db_data:/var/lib/postgresql
image: redmine:latest
container_name: redmine
- "11001:3000"
- db
image: gitea/gitea:latest
container_name: gitea
- "11002:3000"
- USER_UID=1000
- USER_GID=1000
- gitea_data:/data
▎Лабораторная работа №1 - Знакомство с Docker и Docker Compose
Цель: Изучение современных технологий контейнеризации.
1. Установить Docker.
2. Изучить применение и принципы работы Docker.
3. Изучить утилиту Docker Compose и структуру файла docker-compose.yml.
4. Развернуть не менее трех различных сервисов с помощью Docker Compose.
▎Разворачивание сервисов
Необходимо развернуть минимум три сервиса из предложенного списка:
1. Redmine - система учета багов (баг-трекер).
2. WordPress - популярная система управления контентом.
3. Gitea - сервис для хранения git-репозиториев.
▎Требования к Docker Compose:
• Несколько контейнеров.
• Хотя бы один volume.
• Хотя бы один проброшенный порт на хост.
Система должна быть развернута полностью, включая создание администратора и корректное функционирование, что будет подтверждено скриншотами.
▎Ход работы
1. Установил Docker Desktop на Windows.
2. Проверил установку с помощью команды docker --version.
3. Развернул сервисы (описаны ниже).
▎Объяснение работы кода:
1. Образ сборки:
image: wordpress:latest - используется последний официальный образ WordPress.
2. Имя контейнера:
container_name: wordpress - имя контейнера устанавливается как wordpress.
3. Проброс портов:
Порт 80 контейнера пробрасывается на порт 8080 хоста.
4. Тома для хранения данных:
- wordpress_data:/var/www/html
Том wordpress_data монтируется в директорию /var/www/html контейнера для хранения данных WordPress.
5. Переменные окружения для WordPress:
WORDPRESS_DB_USER: example_user
WORDPRESS_DB_PASSWORD: example_password
Параметры для подключения к базе данных.
6. Зависимость от базы данных:
depends_on: db - указывает, что WordPress зависит от контейнера базы данных (db).
1. Образ сборки:
image: redmine:latest - используется последний официальный образ Redmine.
2. Имя контейнера:
container_name: redmine - имя контейнера устанавливается как redmine.
3. Проброс портов:
ports:"8081:3000" - порт 3000 контейнера пробрасывается на порт 8081 хоста.
4. Переменные окружения для Redmine:
REDMINE_DB_MYSQL: redmine_db
REDMINE_DB_PASSWORD: redmine_password
Параметры для подключения к базе данных.
5. Зависимость от базы данных:
depends_on: - redmine_db - Redmine зависит от контейнера с базой данных.
1. Образ сборки:
image: gitea/gitea:latest - используется последний официальный образ Gitea.
2. Имя контейнера:
container_name: gitea - имя контейнера устанавливается как gitea.
3. Проброс портов:
ports:"8082:3000" - порт 3000 контейнера пробрасывается на порт 8082 хоста.
4. Тома для хранения данных:
- gitea_data:/data
# См. статью по ссылке, чтобы узнать как настроить контейнер отладки и как Visual Studio использует этот Dockerfile для создания образов для ускорения отладки.
# Этот этап используется при запуске из VS в быстром режиме (по умолчанию для конфигурации отладки)
FROM AS base
USER app
# Этот этап используется для сборки проекта службы
FROM AS build
COPY ["Consumer1.csproj", "."]
RUN dotnet restore "./Consumer1.csproj"
COPY . .
WORKDIR "/src/."
RUN dotnet build "./Consumer1.csproj" -c $BUILD_CONFIGURATION -o /app/build
# Этот этап используется для публикации проекта службы, который будет скопирован на последний этап
FROM build AS publish
RUN dotnet publish "./Consumer1.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
# Этот этап используется в рабочей среде или при запуске из VS в обычном режиме (по умолчанию, когда конфигурация отладки не используется)
FROM base AS final
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "Consumer1.dll"]
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
var factory = new ConnectionFactory
HostName = "rabbitmq",
UserName = "admin",
Password = "admin"
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
var queueName = "slow_queue";
var exchangeName = "logs_exchange";
await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: "");
Console.WriteLine("[Consumer1] Waiting for messages...");
while (true)
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"[Consumer1] Received: {message}");
Thread.Sleep(new Random().Next(2000, 3000));
Console.WriteLine("[Consumer1] Done processing");
channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
return Task.CompletedTask;
await channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumer: consumer);
"profiles": {
"Consumer1": {
"commandName": "Project"
"Container (Dockerfile)": {
"commandName": "Docker"
# См. статью по ссылке, чтобы узнать как настроить контейнер отладки и как Visual Studio использует этот Dockerfile для создания образов для ускорения отладки.
# Этот этап используется при запуске из VS в быстром режиме (по умолчанию для конфигурации отладки)
FROM AS base
USER app
# Этот этап используется для сборки проекта службы
FROM AS build
COPY ["Consumer2.csproj", "."]
RUN dotnet restore "./Consumer2.csproj"
COPY . .
WORKDIR "/src/."
RUN dotnet build "./Consumer2.csproj" -c $BUILD_CONFIGURATION -o /app/build
# Этот этап используется для публикации проекта службы, который будет скопирован на последний этап
FROM build AS publish
RUN dotnet publish "./Consumer2.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
# Этот этап используется в рабочей среде или при запуске из VS в обычном режиме (по умолчанию, когда конфигурация отладки не используется)
FROM base AS final
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "Consumer2.dll"]
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Data.Common;
using System.Text;
using System.Threading.Channels;
var factory = new ConnectionFactory
HostName = "rabbitmq",
UserName = "admin",
Password = "admin"
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
var queueName = "fast_queue";
var exchangeName = "logs_exchange";
await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: "");
Console.WriteLine("[Consumer2] Waiting for messages...");
while (true)
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"[Consumer2] Received: {message}");
Console.WriteLine("[Consumer2] Done processing");
channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
return Task.CompletedTask;
await channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumer: consumer);
"profiles": {
"Consumer2": {
"commandName": "Project"
"Container (Dockerfile)": {
"commandName": "Docker"
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(queue: "hello", durable: false,
exclusive: false, autoDelete: false,arguments: null);
Console.WriteLine("[*] Waiting for messages...");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [*] Received {message}");
return Task.CompletedTask;
await channel.BasicConsumeAsync("hello", autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
<Project Sdk="Microsoft.NET.Sdk">
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(queue: "hello", durable: false,
exclusive: false, autoDelete: false, arguments: null);
const string message = "Hello, World! ~from Artem";
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "hello", body: body);
Console.WriteLine($" [x] Sent {message}");
Console.WriteLine(" Press [enter] to exit.");
<Project Sdk="Microsoft.NET.Sdk">
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
# См. статью по ссылке, чтобы узнать как настроить контейнер отладки и как Visual Studio использует этот Dockerfile для создания образов для ускорения отладки.
# Этот этап используется при запуске из VS в быстром режиме (по умолчанию для конфигурации отладки)
FROM AS base
USER app
# Этот этап используется для сборки проекта службы
FROM AS build
COPY ["Publisher.csproj", "."]
RUN dotnet restore "./Publisher.csproj"
COPY . .
WORKDIR "/src/."
RUN dotnet build "./Publisher.csproj" -c $BUILD_CONFIGURATION -o /app/build
# Этот этап используется для публикации проекта службы, который будет скопирован на последний этап
FROM build AS publish
RUN dotnet publish "./Publisher.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
# Этот этап используется в рабочей среде или при запуске из VS в обычном режиме (по умолчанию, когда конфигурация отладки не используется)
FROM base AS final
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "Publisher.dll"]
@ -1,10 +0,0 @@
"profiles": {
"Publisher": {
"commandName": "Project"
"Container (Dockerfile)": {
"commandName": "Docker"
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory
HostName = "rabbitmq",
UserName = "admin",
Password = "admin"
using var connection = await factory.CreateConnectionAsync();
Console.WriteLine("Connection established.");
using var channel = await connection.CreateChannelAsync();
Console.WriteLine("Channel created.");
await channel.ExchangeDeclareAsync(exchange: "logs_exchange", type: ExchangeType.Fanout);
while (true)
var message = $"Event: {GenerateRandomEvent()}";
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: "logs_exchange", routingKey: string.Empty, body: body);
Console.WriteLine($"[Publisher] Sent: {message}");
await Task.Delay(1000);
static string GenerateRandomEvent()
var events = new[] { "Order Received", "User Message", "Create Report" };
return events[new Random().Next(events.Length)] + " #" + new Random().Next(0, 99);
<Project Sdk="Microsoft.NET.Sdk">
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="Current" xmlns="">
<ActiveDebugProfile>Container (Dockerfile)</ActiveDebugProfile>
<Project Sdk="Microsoft.NET.Sdk">
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,
autoDelete: false, arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = new BasicProperties
Persistent = true
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "task_queue", mandatory: true,
basicProperties: properties, body: body);
Console.WriteLine($" [x] Sent {message}");
static string GetMessage(string[] args)
return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(queue: "task_queue", durable: true, exclusive: false,
autoDelete: false, arguments: null);
await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) =>
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
int dots = message.Split('.').Length - 1;
await Task.Delay(dots * 1000);
Console.WriteLine(" [x] Done");
// here channel could also be accessed as ((AsyncEventingBasicConsumer)sender).Channel
await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
await channel.BasicConsumeAsync("task_queue", autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
<Project Sdk="Microsoft.NET.Sdk">
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);
Console.WriteLine($" [x] Sent {message}");
Console.WriteLine(" Press [enter] to exit.");
static string GetMessage(string[] args)
return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!");
<Project Sdk="Microsoft.NET.Sdk">
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: "logs",
type: ExchangeType.Fanout);
// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] {message}");
return Task.CompletedTask;
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
<Project Sdk="Microsoft.NET.Sdk">
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
image: rabbitmq:management
container_name: rabbitmq
restart: always
- "5672:5672"
- "15672:15672"
- my_network
context: ./Publisher
restart: always
- rabbitmq
RABBIT_EXCHANGE: 'logs_exchange'
- my_network
context: ./Consumer1
restart: always
- rabbitmq
RABBIT_EXCHANGE: 'logs_exchange'
RABBIT_QUEUE: 'slow_queue'
- my_network
context: ./Consumer2
restart: always
- rabbitmq
RABBIT_EXCHANGE: 'logs_exchange'
RABBIT_QUEUE: 'fast_queue'
- my_network
driver: bridge
# Лабораторная работа 4 - Работа с брокером сообщений
## ПИбд-42 || Алейкин Артем
### Описание
В данной лабораторной работе мы познакомились с такой утилитой как RabbitMQ.
### Туториалы
1. HelloWorld - Tutorial
![Консольный вывод - первый туториал](./Images/Туториал_1.png)
2. Work Queues - Tutorial
![Консольный вывод - второй туториал](./Images/Туториал_2.png)
3. Publish/Subscribe - Tutorial
![Консольный вывод - третий туториал](./Images/Туториал_3.png)
### Основное задание
Было разработано 3 приложения: Publisher, Consumer1 и Consumer2.
Первое отвечало за доставку сообщений в очереди. Оно генерирует одно сообщение раз в секунду.
Второе и Третье за обработку этих сообщений из очередей, но Consumer1 имел искусственную задержку в 2-3 секунды, в то время как Consumer2 таких ограничений не имел и работу.
### Шаги для запуска:
1. Запуск контейнеров:
docker-compose up -d
В результате мы можем посмотреть графики по этой ссылке http://localhost:15672/
![График Consumer1 - медленный](./Images/Лаба_Отчет2.png)
![График Consumer2 - быстрый](./Images/Лаба_Отчет1.png)
После этого было добавлено еще 3 клиента типа Consumer1 и только после этого их суммарной производительности стало хватать для обработки сообщений.
![График Consumer1 для нескольких клиентов - медленный](./Images/Лаба_Отчет3.png)
![График Consumer2 - быстрый](./Images/Лаба_Отчет4.png)
Видео демонстрации работы:
# .gitignore
Для выполнения второй лабораторной работы по созданию распределённого приложения с использованием Docker и Docker Compose, давайте разберем все этапы, шаг за шагом. Я предлагаю реализовать вариант программы 1 и программу 2 следующим образом:
### 1. Вариант программы 1
Программа будет искать в каталоге `/var/data` файл с наибольшим количеством строк и перекладывать его в `/var/result/data.txt`.
### 2. Вариант программы 2
Программа будет искать наименьшее число из файла `/var/data/data.txt` и сохранять его третью степень в файл `/var/result/result.txt`.
### Структура проекта
1. `moiseev-vv-lab_2/worker-1`: Программа для нахождения файла с наибольшим количеством строк.
2. `moiseev-vv-lab_2/worker-2`: Программа для нахождения минимального числа в файле и записи его третьей степени.
### Шаги реализации:
#### 1. Реализация программы 1
#### 2. Реализация программы 2
Для обоих приложений создадим Dockerfile. Вот пример для **worker-1**:
- **Stage 1**: Мы используем `python:3.10-slim` как образ для сборки, где копируем файл `` и устанавливаем зависимости, если это необходимо.
- **Stage 2**: В этом слое мы копируем скомпилированные файлы из предыдущего этапа и определяем команду для запуска приложения.
Аналогичный Dockerfile будет для **worker-2**.
### Docker Compose файл
Теперь нужно настроить файл `docker-compose.yml`, который позволит запустить оба приложения:
- **services**: Мы объявляем два сервиса — `worker-1` и `worker-2`.
- **build**: Указываем контекст сборки для каждого сервиса (директории, где находятся Dockerfile и код).
- **volumes**: Монтируем локальные директории `./data` и `./result` в контейнеры, чтобы обмениваться файлами между сервисами.
- **depends_on**: Задаем зависимость `worker-2` от `worker-1`, чтобы второй сервис запускался только после первого.
### .gitignore
Для предотвращения попадания ненужных файлов в репозиторий, добавляем файл `.gitignore`. Пример для Python проектов:
# .gitignore
### Шаги для сборки и запуска
1. Склонировать репозиторий и перейти в директорию с лабораторной работой:
git clone <репозиторий>
cd moiseev-vv-lab_2
2. Скопировать файлы для `worker-1` и `worker-2` в соответствующие папки.
3. Создать файл `docker-compose.yml`.
4. Запустить приложение с помощью команды:
docker-compose up --build
5. Проверить вывод, результаты должны быть в директориях `./data` и `./result`.
### Заключение
Это пример, как можно реализовать простейшее распределённое приложение с использованием Docker. Первое приложение генерирует данные для второго, который обрабатывает их и записывает результат в файл. Docker и Docker Compose позволяют легко управлять и изолировать каждое приложение.ker Compose для запуска двух программ, обрабатывающих данные в контейнерах.
## Видео ВК
FROM python:3.9-slim
COPY . /app
CMD ["python", ""]
import os
import random
def generate_random_files(directory, num_files, num_lines_per_file, min_value, max_value):
os.makedirs(directory, exist_ok=True)
for i in range(num_files):
file_path = os.path.join(directory, f"file_{i + 1}.txt")
with open(file_path, 'w') as f:
for _ in range(num_lines_per_file):
random_number = random.randint(min_value, max_value)
print(f"Generated file: {file_path}")
def main():
data_directory = '/var/data'
num_files = 10
num_lines_per_file = 12
min_value = 1
max_value = 100
generate_random_files(data_directory, num_files, num_lines_per_file, min_value, max_value)
print(f"Generated {num_files} files in {data_directory}")
if __name__ == "__main__":
# docker-compose.yml
context: ./worker-1
- ./data:/var/data
- ./result:/var/result
- worker-2
context: ./worker-2
- ./data:/var/data
- ./result:/var/result
# worker-1/Dockerfile
# Stage 1: Build the application
FROM python:3.10-slim as builder
COPY ./ .
# Stage 2: Set up the runtime environment
FROM python:3.10-slim
COPY --from=builder /app/ .
CMD ["python", ""]
# worker-1/
import os
def find_file_with_most_lines(directory):
files = os.listdir(directory)
max_lines = 0
target_file = None
for filename in files:
filepath = os.path.join(directory, filename)
if os.path.isfile(filepath):
with open(filepath, 'r') as file:
lines = file.readlines()
if len(lines) > max_lines:
max_lines = len(lines)
target_file = filepath
return target_file
def main():
source_directory = '/var/data'
result_file = '/var/result/data.txt'
file_to_copy = find_file_with_most_lines(source_directory)
if file_to_copy:
with open(file_to_copy, 'r') as source, open(result_file, 'w') as dest:
print(f"File with the most lines: {file_to_copy} copied to {result_file}")
print("No files found in the source directory.")
if __name__ == "__main__":
# worker-1/Dockerfile
# Stage 1: Build the application
FROM python:3.10-slim as builder
COPY ./ .
# Stage 2: Set up the runtime environment
FROM python:3.10-slim
COPY --from=builder /app/ .
CMD ["python", ""]