diff --git a/zhimolostnova_anna_lab_4/FirstTutorial/go.mod b/zhimolostnova_anna_lab_4/FirstTutorial/go.mod new file mode 100644 index 0000000..49ec952 --- /dev/null +++ b/zhimolostnova_anna_lab_4/FirstTutorial/go.mod @@ -0,0 +1,5 @@ +module FirstTutorial + +go 1.23.2 + +require github.com/rabbitmq/amqp091-go v1.10.0 diff --git a/zhimolostnova_anna_lab_4/FirstTutorial/go.sum b/zhimolostnova_anna_lab_4/FirstTutorial/go.sum new file mode 100644 index 0000000..024eebe --- /dev/null +++ b/zhimolostnova_anna_lab_4/FirstTutorial/go.sum @@ -0,0 +1,4 @@ +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/zhimolostnova_anna_lab_4/FirstTutorial/receiver/receive.go b/zhimolostnova_anna_lab_4/FirstTutorial/receiver/receive.go new file mode 100644 index 0000000..f510a10 --- /dev/null +++ b/zhimolostnova_anna_lab_4/FirstTutorial/receiver/receive.go @@ -0,0 +1,65 @@ +package main + +import ( + "log" + + amqp "github.com/rabbitmq/amqp091-go" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} + +func main() { + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer func(conn *amqp.Connection) { + err := conn.Close() + if err != nil { + + } + }(conn) + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer func(ch *amqp.Channel) { + err := ch.Close() + if err != nil { + + } + }(ch) + + q, err := ch.QueueDeclare( + "hello", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") + + var forever chan struct{} + + go func() { + for d := range msgs { + log.Printf("Received a message: %s", d.Body) + } + }() + + log.Printf(" [*] Waiting for messages. To exit press CTRL+C") + <-forever +} diff --git a/zhimolostnova_anna_lab_4/FirstTutorial/sender/send.go b/zhimolostnova_anna_lab_4/FirstTutorial/sender/send.go new file mode 100644 index 0000000..13d8cf6 --- /dev/null +++ b/zhimolostnova_anna_lab_4/FirstTutorial/sender/send.go @@ -0,0 +1,63 @@ +package main + +import ( + "context" + "log" + "strconv" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} + +func main() { + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer func(conn *amqp.Connection) { + err := conn.Close() + if err != nil { + + } + }(conn) + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer func(ch *amqp.Channel) { + err := ch.Close() + if err != nil { + + } + }(ch) + + q, err := ch.QueueDeclare( + "hello", // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + for d := range 10 { + body := "This message for consumer by number " + strconv.Itoa(d+1) + err = ch.PublishWithContext(ctx, + "", // exchange + q.Name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(body), + }) + failOnError(err, "Failed to publish a message") + log.Printf(" [x] Sent %s\n", body) + } +} diff --git a/zhimolostnova_anna_lab_4/README.md b/zhimolostnova_anna_lab_4/README.md new file mode 100644 index 0000000..0f772af --- /dev/null +++ b/zhimolostnova_anna_lab_4/README.md @@ -0,0 +1,80 @@ +# Отчет по лабораторной работе №4 + +## Поставленные задачи + +1. Установить брокер сообщений RabbitMQ. +2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования. +3. Продемонстрировать работу брокера сообщений. + +## Предметная область + +Уведомление о штрафах за нарушение ПДД. + +## Запуск работы + +1. Убедиться, что установлены необходимые технологии: + - **Docker**: Платформа для контейнеризации приложений. + - **RabbitMQ**: Брокер сообщений. + - **Docker Compose**: Инструмент для запуска многоконтейнерных приложений на основе `docker-compose.yaml`. Обычно поставляется вместе с Docker. Чтобы проверить, установлена ли утилита, нужно запустить команду: + ```bash + docker-compose --version + ``` + +2. В директории, где находится файл `docker-compose.yaml`, выполнить следующую команду для запуска всех сервисов: + ```bash + docker-compose up --build + ``` + Эта команда сначала выполнит сборку, а затем запустит контейнеры. + +3. После успешного запуска можно перейти на RabbitMQ Management UI: + - RabbitMQ Management UI: [http://localhost:15672/#/](http://localhost:15672/#/). + +## Технологии + +1. Golang: основной язык программирования. +2. Docker & Docker Compose: для контейнеризации сервисов и удобного развертывания. +3. RabbitMQ: брокер сообщений. + +## Анализ полученных данных + +![SlowQueueResult.png](ReportImages%2FSlowQueueResult.png) + Для замедленной очереди можно сделать следующие выводы: + - Все 366 сообщений доставлены потребителям, но они еще не подтверждены. + Поскольку "Ready" равен нулю, это означает, что все сообщения сразу передаются потребителям, + и ни одно сообщение не ожидает очереди. + - Однако, поскольку все 366 сообщений находятся в состоянии "Unacked", + это указывает на то, что потребитель не подтверждает обработку сообщений вовремя, + что связано с медленной обработкой (как предполагает название очереди slow_consumer_queue). + - Публикация сообщений идет с постоянной скоростью 1 сообщение в секунду, + и с той же скоростью сообщения доставляются потребителю. + - Однако скорость подтверждения (Consumer ack) составляет лишь 0.60 сообщений в секунду, + что означает, что потребитель обрабатывает и подтверждает сообщения медленнее, чем они поступают. + Это вызывает накопление неподтвержденных сообщений в очереди (Unacked), и со временем их число + может продолжать расти, если скорость обработки не увеличится. + +![FastQueueResult.png](ReportImages%2FFastQueueResult.png) + Для быстрой очереди можно сделать следующие выводы: + - Очередь пуста. Т.е. нет сообщений, которые ожидают обработки или находятся в процессе обработки. + Это означает, что потребитель обрабатывает сообщения сразу после их получения, и сообщения не + накапливаются в очереди. + - Сообщения публикуются и доставляются потребителю со скоростью 1 сообщение в секунду, и + потребитель подтверждает обработку каждого сообщения также со скоростью 1 сообщение в секунду. + - Поскольку скорость подтверждения и доставки совпадают, сообщений не накапливается, и все + обрабатывается своевременно. + +## Ход работы + +1. **Прохождение первого урока**: директория FirstTutorial + ![FirstTutorial.png](ReportImages%2FFirstTutorial.png) + +2. **Прохождение второго урока**: директория SecondTutorial + ![SecondTutorial.png](ReportImages%2FSecondTutorial.png) + +3. **Прохождение третьего урока и выполнение задания на ЛР №3**: директория ThirdTutorial + ![ThirdTutorial.png](ReportImages%2FThirdTutorial.png) + +В коде присутствуют пояснительные комментарии. + +## Демонстрационное видео + +Видеозапись доступна по адресу: [https://vk.com/video193898050_456240872](https://vk.com/video193898050_456240872) \ No newline at end of file diff --git a/zhimolostnova_anna_lab_4/ReportImages/FastQueueResult.png b/zhimolostnova_anna_lab_4/ReportImages/FastQueueResult.png new file mode 100644 index 0000000..c8ea9dc Binary files /dev/null and b/zhimolostnova_anna_lab_4/ReportImages/FastQueueResult.png differ diff --git a/zhimolostnova_anna_lab_4/ReportImages/FirstTutorial.png b/zhimolostnova_anna_lab_4/ReportImages/FirstTutorial.png new file mode 100644 index 0000000..ef28f3f Binary files /dev/null and b/zhimolostnova_anna_lab_4/ReportImages/FirstTutorial.png differ diff --git a/zhimolostnova_anna_lab_4/ReportImages/SecondTutorial.png b/zhimolostnova_anna_lab_4/ReportImages/SecondTutorial.png new file mode 100644 index 0000000..1bb9001 Binary files /dev/null and b/zhimolostnova_anna_lab_4/ReportImages/SecondTutorial.png differ diff --git a/zhimolostnova_anna_lab_4/ReportImages/SlowQueueResult.png b/zhimolostnova_anna_lab_4/ReportImages/SlowQueueResult.png new file mode 100644 index 0000000..68db929 Binary files /dev/null and b/zhimolostnova_anna_lab_4/ReportImages/SlowQueueResult.png differ diff --git a/zhimolostnova_anna_lab_4/ReportImages/ThirdTutorial.png b/zhimolostnova_anna_lab_4/ReportImages/ThirdTutorial.png new file mode 100644 index 0000000..44e867b Binary files /dev/null and b/zhimolostnova_anna_lab_4/ReportImages/ThirdTutorial.png differ diff --git a/zhimolostnova_anna_lab_4/SecondTutorial/go.mod b/zhimolostnova_anna_lab_4/SecondTutorial/go.mod new file mode 100644 index 0000000..8e28516 --- /dev/null +++ b/zhimolostnova_anna_lab_4/SecondTutorial/go.mod @@ -0,0 +1,5 @@ +module SecondTutorial + +go 1.23.2 + +require github.com/rabbitmq/amqp091-go v1.10.0 diff --git a/zhimolostnova_anna_lab_4/SecondTutorial/go.sum b/zhimolostnova_anna_lab_4/SecondTutorial/go.sum new file mode 100644 index 0000000..024eebe --- /dev/null +++ b/zhimolostnova_anna_lab_4/SecondTutorial/go.sum @@ -0,0 +1,4 @@ +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/zhimolostnova_anna_lab_4/SecondTutorial/tasks/new_task.go b/zhimolostnova_anna_lab_4/SecondTutorial/tasks/new_task.go new file mode 100644 index 0000000..f90d074 --- /dev/null +++ b/zhimolostnova_anna_lab_4/SecondTutorial/tasks/new_task.go @@ -0,0 +1,74 @@ +package main + +import ( + "context" + "log" + "os" + "strings" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} + +func main() { + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer func(conn *amqp.Connection) { + err := conn.Close() + if err != nil { + + } + }(conn) + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer func(ch *amqp.Channel) { + err := ch.Close() + if err != nil { + + } + }(ch) + + q, err := ch.QueueDeclare( + "task", // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + body := bodyFrom(os.Args) + err = ch.PublishWithContext(ctx, + "", // exchange + q.Name, // routing key + false, // mandatory + false, + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: []byte(body), + }) + failOnError(err, "Failed to publish a message") + log.Printf(" [x] Sent %s", body) +} + +func bodyFrom(args []string) string { + var s string + if (len(args) < 2) || os.Args[1] == "" { + s = "hello" + } else { + s = strings.Join(args[1:], " ") + } + return s +} diff --git a/zhimolostnova_anna_lab_4/SecondTutorial/workers/worker.go b/zhimolostnova_anna_lab_4/SecondTutorial/workers/worker.go new file mode 100644 index 0000000..9ba57f3 --- /dev/null +++ b/zhimolostnova_anna_lab_4/SecondTutorial/workers/worker.go @@ -0,0 +1,82 @@ +package main + +import ( + "bytes" + "log" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} + +func main() { + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + failOnError(err, "Failed to connect to RabbitMQ") + defer func(conn *amqp.Connection) { + err := conn.Close() + if err != nil { + + } + }(conn) + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer func(ch *amqp.Channel) { + err := ch.Close() + if err != nil { + + } + }(ch) + + q, err := ch.QueueDeclare( + "task", // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + err = ch.Qos( + 1, // prefetch count + 0, // prefetch size + false, // global + ) + failOnError(err, "Failed to set QoS") + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") + + var forever chan struct{} + + go func() { + for d := range msgs { + log.Printf("Received a message: %s", d.Body) + countChar := bytes.Count(d.Body, []byte("")) + t := time.Duration(countChar) + time.Sleep(t * time.Second) + log.Printf("Done") + err := d.Ack(false) + if err != nil { + return + } + } + }() + + log.Printf(" [*] Waiting for messages. To exit press CTRL+C") + <-forever +} diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/Dockerfile b/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/Dockerfile new file mode 100644 index 0000000..a704c42 --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/Dockerfile @@ -0,0 +1,17 @@ +# Пример для Consumer 1 +FROM golang:1.23 + +WORKDIR /app + +# Копируем модули и загружаем зависимости +COPY go.mod go.sum ./ +RUN go mod download + +# Копируем исходный код +COPY . . + +# Сборка +RUN go build -o firstConsumer . + +# Запуск +CMD ["./firstConsumer"] diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/firstConsumer.go b/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/firstConsumer.go new file mode 100644 index 0000000..5a0b04c --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/firstConsumer.go @@ -0,0 +1,92 @@ +package main + +import ( + "log" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} + +func main() { + conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/") + failOnError(err, "Не удалось подключиться к RabbitMQ") + defer func(conn *amqp.Connection) { + err := conn.Close() + if err != nil { + + } + }(conn) + + ch, err := conn.Channel() + failOnError(err, "Не удалось открыть канал") + defer func(ch *amqp.Channel) { + err := ch.Close() + if err != nil { + + } + }(ch) + + err = ch.ExchangeDeclare( + "tax_notifications", // имя + "fanout", // тип + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // аргументы + ) + failOnError(err, "Не удалось объявить exchange") + + q, err := ch.QueueDeclare( + "slow_consumer_queue", // имя очереди + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // аргументы + ) + failOnError(err, "Не удалось объявить очередь") + + err = ch.QueueBind( + q.Name, // имя очереди + "", // routing key + "tax_notifications", // exchange + false, + nil, + ) + failOnError(err, "Не удалось привязать очередь") + + msgs, err := ch.Consume( + q.Name, // очередь + "", // firstConsumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // аргументы + ) + failOnError(err, "Не удалось зарегистрировать firstConsumer") + + var forever chan struct{} + + go func() { + for d := range msgs { + log.Printf(" [x] Получено: %s", d.Body) + time.Sleep(2 * time.Second) // Задержка в 2 секунды + log.Printf(" [x] Обработка завершена") + err := d.Ack(false) + if err != nil { + return + } + } + }() + + log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C") + <-forever +} diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/go.mod b/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/go.mod new file mode 100644 index 0000000..50e2cfc --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/go.mod @@ -0,0 +1,5 @@ +module firstConsumer + +go 1.23.2 + +require github.com/rabbitmq/amqp091-go v1.10.0 diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/go.sum b/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/go.sum new file mode 100644 index 0000000..024eebe --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/firstConsumer/go.sum @@ -0,0 +1,4 @@ +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/Dockerfile b/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/Dockerfile new file mode 100644 index 0000000..809d32e --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/Dockerfile @@ -0,0 +1,17 @@ +# Пример для publisher +FROM golang:1.23 + +WORKDIR /app + +# Копируем модули и загружаем зависимости +COPY go.mod go.sum ./ +RUN go mod download + +# Копируем исходный код +COPY . . + +# Сборка +RUN go build -o publisher . + +# Запуск +CMD ["./publisher"] diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/go.mod b/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/go.mod new file mode 100644 index 0000000..014b017 --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/go.mod @@ -0,0 +1,5 @@ +module publisher + +go 1.23.2 + +require github.com/rabbitmq/amqp091-go v1.10.0 diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/go.sum b/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/go.sum new file mode 100644 index 0000000..024eebe --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/go.sum @@ -0,0 +1,4 @@ +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/publisher.go b/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/publisher.go new file mode 100644 index 0000000..a7e153c --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/publisher/publisher.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "log" + "strconv" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} + +func main() { + conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/") + failOnError(err, "Не удалось подключиться к RabbitMQ") + defer func(conn *amqp.Connection) { + err := conn.Close() + if err != nil { + + } + }(conn) + + ch, err := conn.Channel() + failOnError(err, "Не удалось открыть канал") + defer func(ch *amqp.Channel) { + err := ch.Close() + if err != nil { + + } + }(ch) + + err = ch.ExchangeDeclare( + "tax_notifications", // имя + "fanout", // тип + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // аргументы + ) + failOnError(err, "Не удалось объявить exchange") + + i := 1 + + for { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + body := "Уведомление о штрафе за нарушение ПДД №" + strconv.Itoa(i) + err = ch.PublishWithContext(ctx, + "tax_notifications", // exchange + "", // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(body), + }) + failOnError(err, "Не удалось отправить сообщение") + + log.Printf(" [x] Отправлено: %s", body) + time.Sleep(1 * time.Second) // Генерация сообщений раз в секунду + i++ + } +} diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/Dockerfile b/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/Dockerfile new file mode 100644 index 0000000..d50e1b3 --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/Dockerfile @@ -0,0 +1,17 @@ +# Пример для Consumer 1 +FROM golang:1.23 + +WORKDIR /app + +# Копируем модули и загружаем зависимости +COPY go.mod go.sum ./ +RUN go mod download + +# Копируем исходный код +COPY . . + +# Сборка +RUN go build -o secondConsumer . + +# Запуск +CMD ["./secondConsumer"] diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/go.mod b/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/go.mod new file mode 100644 index 0000000..d1ac9a3 --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/go.mod @@ -0,0 +1,5 @@ +module secondConsumer + +go 1.23.2 + +require github.com/rabbitmq/amqp091-go v1.10.0 diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/go.sum b/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/go.sum new file mode 100644 index 0000000..024eebe --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/go.sum @@ -0,0 +1,4 @@ +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/secondConsumer.go b/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/secondConsumer.go new file mode 100644 index 0000000..a4c14ee --- /dev/null +++ b/zhimolostnova_anna_lab_4/ThirdTutorial/secondConsumer/secondConsumer.go @@ -0,0 +1,90 @@ +package main + +import ( + "log" + + amqp "github.com/rabbitmq/amqp091-go" +) + +func failError(err error, msg string) { + if err != nil { + log.Panicf("%s: %s", msg, err) + } +} + +func main() { + conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/") + failError(err, "Не удалось подключиться к RabbitMQ") + defer func(conn *amqp.Connection) { + err := conn.Close() + if err != nil { + + } + }(conn) + + ch, err := conn.Channel() + failError(err, "Не удалось открыть канал") + defer func(ch *amqp.Channel) { + err := ch.Close() + if err != nil { + + } + }(ch) + + err = ch.ExchangeDeclare( + "tax_notifications", // имя + "fanout", // тип + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // аргументы + ) + failError(err, "Не удалось объявить exchange") + + q, err := ch.QueueDeclare( + "fast_consumer_queue", // имя очереди + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // аргументы + ) + failError(err, "Не удалось объявить очередь") + + err = ch.QueueBind( + q.Name, // имя очереди + "", // routing key + "tax_notifications", // exchange + false, + nil, + ) + failError(err, "Не удалось привязать очередь") + + msgs, err := ch.Consume( + q.Name, // очередь + "", // firstConsumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // аргументы + ) + failError(err, "Не удалось зарегистрировать firstConsumer") + + var forever chan struct{} + + go func() { + for d := range msgs { + log.Printf(" [x] Получено: %s", d.Body) + log.Printf(" [x] Обработка завершена") + err := d.Ack(false) + if err != nil { + return + } + } + }() + + log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C") + <-forever +} diff --git a/zhimolostnova_anna_lab_4/docker-compose.yml b/zhimolostnova_anna_lab_4/docker-compose.yml new file mode 100644 index 0000000..55f04c5 --- /dev/null +++ b/zhimolostnova_anna_lab_4/docker-compose.yml @@ -0,0 +1,38 @@ +services: + rabbitmq: + image: rabbitmq:3-management + ports: + - "5672:5672" + - "15672:15672" + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + healthcheck: + test: ["CMD", "rabbitmqctl", "status"] + interval: 10s + timeout: 5s + retries: 5 + + publisher: + build: ThirdTutorial/publisher + depends_on: + rabbitmq: + condition: service_healthy + environment: + - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/ + + first_consumer: + build: ThirdTutorial/firstConsumer + depends_on: + rabbitmq: + condition: service_healthy + environment: + - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/ + + second_consumer: + build: ThirdTutorial/secondConsumer + depends_on: + rabbitmq: + condition: service_healthy + environment: + - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/