forked from Alexey/DAS_2024_1
Compare commits
No commits in common. "zhimolostnova_anna_lab_4" and "main" have entirely different histories.
zhimolostn
...
main
@ -1,5 +0,0 @@
|
||||
module FirstTutorial
|
||||
|
||||
go 1.23.2
|
||||
|
||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
@ -1,4 +0,0 @@
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
@ -1,65 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||
failOnError(err, "Failed to connect to RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Failed to open a channel")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"hello", // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
failOnError(err, "Failed to declare a queue")
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
q.Name, // queue
|
||||
"", // consumer
|
||||
true, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
failOnError(err, "Failed to register a consumer")
|
||||
|
||||
var forever chan struct{}
|
||||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
log.Printf("Received a message: %s", d.Body)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
|
||||
<-forever
|
||||
}
|
@ -1,63 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||
failOnError(err, "Failed to connect to RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Failed to open a channel")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"hello", // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
failOnError(err, "Failed to declare a queue")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
for d := range 10 {
|
||||
body := "This message for consumer by number " + strconv.Itoa(d+1)
|
||||
err = ch.PublishWithContext(ctx,
|
||||
"", // exchange
|
||||
q.Name, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: []byte(body),
|
||||
})
|
||||
failOnError(err, "Failed to publish a message")
|
||||
log.Printf(" [x] Sent %s\n", body)
|
||||
}
|
||||
}
|
@ -1,80 +0,0 @@
|
||||
# Отчет по лабораторной работе №4
|
||||
|
||||
## Поставленные задачи
|
||||
|
||||
1. Установить брокер сообщений RabbitMQ.
|
||||
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
|
||||
3. Продемонстрировать работу брокера сообщений.
|
||||
|
||||
## Предметная область
|
||||
|
||||
Уведомление о штрафах за нарушение ПДД.
|
||||
|
||||
## Запуск работы
|
||||
|
||||
1. Убедиться, что установлены необходимые технологии:
|
||||
- **Docker**: Платформа для контейнеризации приложений.
|
||||
- **RabbitMQ**: Брокер сообщений.
|
||||
- **Docker Compose**: Инструмент для запуска многоконтейнерных приложений на основе `docker-compose.yaml`. Обычно поставляется вместе с Docker. Чтобы проверить, установлена ли утилита, нужно запустить команду:
|
||||
```bash
|
||||
docker-compose --version
|
||||
```
|
||||
|
||||
2. В директории, где находится файл `docker-compose.yaml`, выполнить следующую команду для запуска всех сервисов:
|
||||
```bash
|
||||
docker-compose up --build
|
||||
```
|
||||
Эта команда сначала выполнит сборку, а затем запустит контейнеры.
|
||||
|
||||
3. После успешного запуска можно перейти на RabbitMQ Management UI:
|
||||
- RabbitMQ Management UI: [http://localhost:15672/#/](http://localhost:15672/#/).
|
||||
|
||||
## Технологии
|
||||
|
||||
1. Golang: основной язык программирования.
|
||||
2. Docker & Docker Compose: для контейнеризации сервисов и удобного развертывания.
|
||||
3. RabbitMQ: брокер сообщений.
|
||||
|
||||
## Анализ полученных данных
|
||||
|
||||
![SlowQueueResult.png](ReportImages%2FSlowQueueResult.png)
|
||||
Для замедленной очереди можно сделать следующие выводы:
|
||||
- Все 366 сообщений доставлены потребителям, но они еще не подтверждены.
|
||||
Поскольку "Ready" равен нулю, это означает, что все сообщения сразу передаются потребителям,
|
||||
и ни одно сообщение не ожидает очереди.
|
||||
- Однако, поскольку все 366 сообщений находятся в состоянии "Unacked",
|
||||
это указывает на то, что потребитель не подтверждает обработку сообщений вовремя,
|
||||
что связано с медленной обработкой (как предполагает название очереди slow_consumer_queue).
|
||||
- Публикация сообщений идет с постоянной скоростью 1 сообщение в секунду,
|
||||
и с той же скоростью сообщения доставляются потребителю.
|
||||
- Однако скорость подтверждения (Consumer ack) составляет лишь 0.60 сообщений в секунду,
|
||||
что означает, что потребитель обрабатывает и подтверждает сообщения медленнее, чем они поступают.
|
||||
Это вызывает накопление неподтвержденных сообщений в очереди (Unacked), и со временем их число
|
||||
может продолжать расти, если скорость обработки не увеличится.
|
||||
|
||||
![FastQueueResult.png](ReportImages%2FFastQueueResult.png)
|
||||
Для быстрой очереди можно сделать следующие выводы:
|
||||
- Очередь пуста. Т.е. нет сообщений, которые ожидают обработки или находятся в процессе обработки.
|
||||
Это означает, что потребитель обрабатывает сообщения сразу после их получения, и сообщения не
|
||||
накапливаются в очереди.
|
||||
- Сообщения публикуются и доставляются потребителю со скоростью 1 сообщение в секунду, и
|
||||
потребитель подтверждает обработку каждого сообщения также со скоростью 1 сообщение в секунду.
|
||||
- Поскольку скорость подтверждения и доставки совпадают, сообщений не накапливается, и все
|
||||
обрабатывается своевременно.
|
||||
|
||||
## Ход работы
|
||||
|
||||
1. **Прохождение первого урока**: директория FirstTutorial
|
||||
![FirstTutorial.png](ReportImages%2FFirstTutorial.png)
|
||||
|
||||
2. **Прохождение второго урока**: директория SecondTutorial
|
||||
![SecondTutorial.png](ReportImages%2FSecondTutorial.png)
|
||||
|
||||
3. **Прохождение третьего урока и выполнение задания на ЛР №3**: директория ThirdTutorial
|
||||
![ThirdTutorial.png](ReportImages%2FThirdTutorial.png)
|
||||
|
||||
В коде присутствуют пояснительные комментарии.
|
||||
|
||||
## Демонстрационное видео
|
||||
|
||||
Видеозапись доступна по адресу: [https://vk.com/video193898050_456240872](https://vk.com/video193898050_456240872)
|
Binary file not shown.
Before Width: | Height: | Size: 32 KiB |
Binary file not shown.
Before Width: | Height: | Size: 253 KiB |
Binary file not shown.
Before Width: | Height: | Size: 245 KiB |
Binary file not shown.
Before Width: | Height: | Size: 33 KiB |
Binary file not shown.
Before Width: | Height: | Size: 223 KiB |
@ -1,5 +0,0 @@
|
||||
module SecondTutorial
|
||||
|
||||
go 1.23.2
|
||||
|
||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
@ -1,4 +0,0 @@
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
@ -1,74 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||
failOnError(err, "Failed to connect to RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Failed to open a channel")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"task", // name
|
||||
true, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
failOnError(err, "Failed to declare a queue")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
body := bodyFrom(os.Args)
|
||||
err = ch.PublishWithContext(ctx,
|
||||
"", // exchange
|
||||
q.Name, // routing key
|
||||
false, // mandatory
|
||||
false,
|
||||
amqp.Publishing{
|
||||
DeliveryMode: amqp.Persistent,
|
||||
ContentType: "text/plain",
|
||||
Body: []byte(body),
|
||||
})
|
||||
failOnError(err, "Failed to publish a message")
|
||||
log.Printf(" [x] Sent %s", body)
|
||||
}
|
||||
|
||||
func bodyFrom(args []string) string {
|
||||
var s string
|
||||
if (len(args) < 2) || os.Args[1] == "" {
|
||||
s = "hello"
|
||||
} else {
|
||||
s = strings.Join(args[1:], " ")
|
||||
}
|
||||
return s
|
||||
}
|
@ -1,82 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||
failOnError(err, "Failed to connect to RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Failed to open a channel")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"task", // name
|
||||
true, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
failOnError(err, "Failed to declare a queue")
|
||||
|
||||
err = ch.Qos(
|
||||
1, // prefetch count
|
||||
0, // prefetch size
|
||||
false, // global
|
||||
)
|
||||
failOnError(err, "Failed to set QoS")
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
q.Name, // queue
|
||||
"", // consumer
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
failOnError(err, "Failed to register a consumer")
|
||||
|
||||
var forever chan struct{}
|
||||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
log.Printf("Received a message: %s", d.Body)
|
||||
countChar := bytes.Count(d.Body, []byte(""))
|
||||
t := time.Duration(countChar)
|
||||
time.Sleep(t * time.Second)
|
||||
log.Printf("Done")
|
||||
err := d.Ack(false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
|
||||
<-forever
|
||||
}
|
@ -1,17 +0,0 @@
|
||||
# Пример для Consumer 1
|
||||
FROM golang:1.23
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Копируем модули и загружаем зависимости
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
# Копируем исходный код
|
||||
COPY . .
|
||||
|
||||
# Сборка
|
||||
RUN go build -o firstConsumer .
|
||||
|
||||
# Запуск
|
||||
CMD ["./firstConsumer"]
|
@ -1,92 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
||||
failOnError(err, "Не удалось подключиться к RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Не удалось открыть канал")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
err = ch.ExchangeDeclare(
|
||||
"tax_notifications", // имя
|
||||
"fanout", // тип
|
||||
true, // durable
|
||||
false, // auto-deleted
|
||||
false, // internal
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failOnError(err, "Не удалось объявить exchange")
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"slow_consumer_queue", // имя очереди
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failOnError(err, "Не удалось объявить очередь")
|
||||
|
||||
err = ch.QueueBind(
|
||||
q.Name, // имя очереди
|
||||
"", // routing key
|
||||
"tax_notifications", // exchange
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
failOnError(err, "Не удалось привязать очередь")
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
q.Name, // очередь
|
||||
"", // firstConsumer
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failOnError(err, "Не удалось зарегистрировать firstConsumer")
|
||||
|
||||
var forever chan struct{}
|
||||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
log.Printf(" [x] Получено: %s", d.Body)
|
||||
time.Sleep(2 * time.Second) // Задержка в 2 секунды
|
||||
log.Printf(" [x] Обработка завершена")
|
||||
err := d.Ack(false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C")
|
||||
<-forever
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
module firstConsumer
|
||||
|
||||
go 1.23.2
|
||||
|
||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
@ -1,4 +0,0 @@
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
@ -1,17 +0,0 @@
|
||||
# Пример для publisher
|
||||
FROM golang:1.23
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Копируем модули и загружаем зависимости
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
# Копируем исходный код
|
||||
COPY . .
|
||||
|
||||
# Сборка
|
||||
RUN go build -o publisher .
|
||||
|
||||
# Запуск
|
||||
CMD ["./publisher"]
|
@ -1,5 +0,0 @@
|
||||
module publisher
|
||||
|
||||
go 1.23.2
|
||||
|
||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
@ -1,4 +0,0 @@
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
@ -1,70 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
||||
failOnError(err, "Не удалось подключиться к RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Не удалось открыть канал")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
err = ch.ExchangeDeclare(
|
||||
"tax_notifications", // имя
|
||||
"fanout", // тип
|
||||
true, // durable
|
||||
false, // auto-deleted
|
||||
false, // internal
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failOnError(err, "Не удалось объявить exchange")
|
||||
|
||||
i := 1
|
||||
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
body := "Уведомление о штрафе за нарушение ПДД №" + strconv.Itoa(i)
|
||||
err = ch.PublishWithContext(ctx,
|
||||
"tax_notifications", // exchange
|
||||
"", // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: []byte(body),
|
||||
})
|
||||
failOnError(err, "Не удалось отправить сообщение")
|
||||
|
||||
log.Printf(" [x] Отправлено: %s", body)
|
||||
time.Sleep(1 * time.Second) // Генерация сообщений раз в секунду
|
||||
i++
|
||||
}
|
||||
}
|
@ -1,17 +0,0 @@
|
||||
# Пример для Consumer 1
|
||||
FROM golang:1.23
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Копируем модули и загружаем зависимости
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
# Копируем исходный код
|
||||
COPY . .
|
||||
|
||||
# Сборка
|
||||
RUN go build -o secondConsumer .
|
||||
|
||||
# Запуск
|
||||
CMD ["./secondConsumer"]
|
@ -1,5 +0,0 @@
|
||||
module secondConsumer
|
||||
|
||||
go 1.23.2
|
||||
|
||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
@ -1,4 +0,0 @@
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
@ -1,90 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
||||
failError(err, "Не удалось подключиться к RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failError(err, "Не удалось открыть канал")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
err = ch.ExchangeDeclare(
|
||||
"tax_notifications", // имя
|
||||
"fanout", // тип
|
||||
true, // durable
|
||||
false, // auto-deleted
|
||||
false, // internal
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failError(err, "Не удалось объявить exchange")
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"fast_consumer_queue", // имя очереди
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failError(err, "Не удалось объявить очередь")
|
||||
|
||||
err = ch.QueueBind(
|
||||
q.Name, // имя очереди
|
||||
"", // routing key
|
||||
"tax_notifications", // exchange
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
failError(err, "Не удалось привязать очередь")
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
q.Name, // очередь
|
||||
"", // firstConsumer
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failError(err, "Не удалось зарегистрировать firstConsumer")
|
||||
|
||||
var forever chan struct{}
|
||||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
log.Printf(" [x] Получено: %s", d.Body)
|
||||
log.Printf(" [x] Обработка завершена")
|
||||
err := d.Ack(false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C")
|
||||
<-forever
|
||||
}
|
@ -1,38 +0,0 @@
|
||||
services:
|
||||
rabbitmq:
|
||||
image: rabbitmq:3-management
|
||||
ports:
|
||||
- "5672:5672"
|
||||
- "15672:15672"
|
||||
environment:
|
||||
RABBITMQ_DEFAULT_USER: guest
|
||||
RABBITMQ_DEFAULT_PASS: guest
|
||||
healthcheck:
|
||||
test: ["CMD", "rabbitmqctl", "status"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
|
||||
publisher:
|
||||
build: ThirdTutorial/publisher
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
environment:
|
||||
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
||||
|
||||
first_consumer:
|
||||
build: ThirdTutorial/firstConsumer
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
environment:
|
||||
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
||||
|
||||
second_consumer:
|
||||
build: ThirdTutorial/secondConsumer
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
environment:
|
||||
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
Loading…
Reference in New Issue
Block a user