forked from Alexey/DAS_2024_1
Compare commits
No commits in common. "zhimolostnova_anna_lab_4" and "main" have entirely different histories.
zhimolostn
...
main
@ -1,5 +0,0 @@
|
|||||||
module FirstTutorial
|
|
||||||
|
|
||||||
go 1.23.2
|
|
||||||
|
|
||||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
|
@ -1,4 +0,0 @@
|
|||||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
|
||||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
|
||||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
|
@ -1,65 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
func failOnError(err error, msg string) {
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("%s: %s", msg, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
|
||||||
failOnError(err, "Failed to connect to RabbitMQ")
|
|
||||||
defer func(conn *amqp.Connection) {
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(conn)
|
|
||||||
|
|
||||||
ch, err := conn.Channel()
|
|
||||||
failOnError(err, "Failed to open a channel")
|
|
||||||
defer func(ch *amqp.Channel) {
|
|
||||||
err := ch.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(ch)
|
|
||||||
|
|
||||||
q, err := ch.QueueDeclare(
|
|
||||||
"hello", // name
|
|
||||||
false, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-wait
|
|
||||||
nil, // arguments
|
|
||||||
)
|
|
||||||
failOnError(err, "Failed to declare a queue")
|
|
||||||
|
|
||||||
msgs, err := ch.Consume(
|
|
||||||
q.Name, // queue
|
|
||||||
"", // consumer
|
|
||||||
true, // auto-ack
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-local
|
|
||||||
false, // no-wait
|
|
||||||
nil, // args
|
|
||||||
)
|
|
||||||
failOnError(err, "Failed to register a consumer")
|
|
||||||
|
|
||||||
var forever chan struct{}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for d := range msgs {
|
|
||||||
log.Printf("Received a message: %s", d.Body)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
|
|
||||||
<-forever
|
|
||||||
}
|
|
@ -1,63 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"log"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
func failOnError(err error, msg string) {
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("%s: %s", msg, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
|
||||||
failOnError(err, "Failed to connect to RabbitMQ")
|
|
||||||
defer func(conn *amqp.Connection) {
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(conn)
|
|
||||||
|
|
||||||
ch, err := conn.Channel()
|
|
||||||
failOnError(err, "Failed to open a channel")
|
|
||||||
defer func(ch *amqp.Channel) {
|
|
||||||
err := ch.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(ch)
|
|
||||||
|
|
||||||
q, err := ch.QueueDeclare(
|
|
||||||
"hello", // name
|
|
||||||
false, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-wait
|
|
||||||
nil, // arguments
|
|
||||||
)
|
|
||||||
failOnError(err, "Failed to declare a queue")
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
for d := range 10 {
|
|
||||||
body := "This message for consumer by number " + strconv.Itoa(d+1)
|
|
||||||
err = ch.PublishWithContext(ctx,
|
|
||||||
"", // exchange
|
|
||||||
q.Name, // routing key
|
|
||||||
false, // mandatory
|
|
||||||
false, // immediate
|
|
||||||
amqp.Publishing{
|
|
||||||
ContentType: "text/plain",
|
|
||||||
Body: []byte(body),
|
|
||||||
})
|
|
||||||
failOnError(err, "Failed to publish a message")
|
|
||||||
log.Printf(" [x] Sent %s\n", body)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,80 +0,0 @@
|
|||||||
# Отчет по лабораторной работе №4
|
|
||||||
|
|
||||||
## Поставленные задачи
|
|
||||||
|
|
||||||
1. Установить брокер сообщений RabbitMQ.
|
|
||||||
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
|
|
||||||
3. Продемонстрировать работу брокера сообщений.
|
|
||||||
|
|
||||||
## Предметная область
|
|
||||||
|
|
||||||
Уведомление о штрафах за нарушение ПДД.
|
|
||||||
|
|
||||||
## Запуск работы
|
|
||||||
|
|
||||||
1. Убедиться, что установлены необходимые технологии:
|
|
||||||
- **Docker**: Платформа для контейнеризации приложений.
|
|
||||||
- **RabbitMQ**: Брокер сообщений.
|
|
||||||
- **Docker Compose**: Инструмент для запуска многоконтейнерных приложений на основе `docker-compose.yaml`. Обычно поставляется вместе с Docker. Чтобы проверить, установлена ли утилита, нужно запустить команду:
|
|
||||||
```bash
|
|
||||||
docker-compose --version
|
|
||||||
```
|
|
||||||
|
|
||||||
2. В директории, где находится файл `docker-compose.yaml`, выполнить следующую команду для запуска всех сервисов:
|
|
||||||
```bash
|
|
||||||
docker-compose up --build
|
|
||||||
```
|
|
||||||
Эта команда сначала выполнит сборку, а затем запустит контейнеры.
|
|
||||||
|
|
||||||
3. После успешного запуска можно перейти на RabbitMQ Management UI:
|
|
||||||
- RabbitMQ Management UI: [http://localhost:15672/#/](http://localhost:15672/#/).
|
|
||||||
|
|
||||||
## Технологии
|
|
||||||
|
|
||||||
1. Golang: основной язык программирования.
|
|
||||||
2. Docker & Docker Compose: для контейнеризации сервисов и удобного развертывания.
|
|
||||||
3. RabbitMQ: брокер сообщений.
|
|
||||||
|
|
||||||
## Анализ полученных данных
|
|
||||||
|
|
||||||
![SlowQueueResult.png](ReportImages%2FSlowQueueResult.png)
|
|
||||||
Для замедленной очереди можно сделать следующие выводы:
|
|
||||||
- Все 366 сообщений доставлены потребителям, но они еще не подтверждены.
|
|
||||||
Поскольку "Ready" равен нулю, это означает, что все сообщения сразу передаются потребителям,
|
|
||||||
и ни одно сообщение не ожидает очереди.
|
|
||||||
- Однако, поскольку все 366 сообщений находятся в состоянии "Unacked",
|
|
||||||
это указывает на то, что потребитель не подтверждает обработку сообщений вовремя,
|
|
||||||
что связано с медленной обработкой (как предполагает название очереди slow_consumer_queue).
|
|
||||||
- Публикация сообщений идет с постоянной скоростью 1 сообщение в секунду,
|
|
||||||
и с той же скоростью сообщения доставляются потребителю.
|
|
||||||
- Однако скорость подтверждения (Consumer ack) составляет лишь 0.60 сообщений в секунду,
|
|
||||||
что означает, что потребитель обрабатывает и подтверждает сообщения медленнее, чем они поступают.
|
|
||||||
Это вызывает накопление неподтвержденных сообщений в очереди (Unacked), и со временем их число
|
|
||||||
может продолжать расти, если скорость обработки не увеличится.
|
|
||||||
|
|
||||||
![FastQueueResult.png](ReportImages%2FFastQueueResult.png)
|
|
||||||
Для быстрой очереди можно сделать следующие выводы:
|
|
||||||
- Очередь пуста. Т.е. нет сообщений, которые ожидают обработки или находятся в процессе обработки.
|
|
||||||
Это означает, что потребитель обрабатывает сообщения сразу после их получения, и сообщения не
|
|
||||||
накапливаются в очереди.
|
|
||||||
- Сообщения публикуются и доставляются потребителю со скоростью 1 сообщение в секунду, и
|
|
||||||
потребитель подтверждает обработку каждого сообщения также со скоростью 1 сообщение в секунду.
|
|
||||||
- Поскольку скорость подтверждения и доставки совпадают, сообщений не накапливается, и все
|
|
||||||
обрабатывается своевременно.
|
|
||||||
|
|
||||||
## Ход работы
|
|
||||||
|
|
||||||
1. **Прохождение первого урока**: директория FirstTutorial
|
|
||||||
![FirstTutorial.png](ReportImages%2FFirstTutorial.png)
|
|
||||||
|
|
||||||
2. **Прохождение второго урока**: директория SecondTutorial
|
|
||||||
![SecondTutorial.png](ReportImages%2FSecondTutorial.png)
|
|
||||||
|
|
||||||
3. **Прохождение третьего урока и выполнение задания на ЛР №3**: директория ThirdTutorial
|
|
||||||
![ThirdTutorial.png](ReportImages%2FThirdTutorial.png)
|
|
||||||
|
|
||||||
В коде присутствуют пояснительные комментарии.
|
|
||||||
|
|
||||||
## Демонстрационное видео
|
|
||||||
|
|
||||||
Видеозапись доступна по адресу: [https://vk.com/video193898050_456240872](https://vk.com/video193898050_456240872)
|
|
Binary file not shown.
Before Width: | Height: | Size: 32 KiB |
Binary file not shown.
Before Width: | Height: | Size: 253 KiB |
Binary file not shown.
Before Width: | Height: | Size: 245 KiB |
Binary file not shown.
Before Width: | Height: | Size: 33 KiB |
Binary file not shown.
Before Width: | Height: | Size: 223 KiB |
@ -1,5 +0,0 @@
|
|||||||
module SecondTutorial
|
|
||||||
|
|
||||||
go 1.23.2
|
|
||||||
|
|
||||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
|
@ -1,4 +0,0 @@
|
|||||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
|
||||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
|
||||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
|
@ -1,74 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"log"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
func failOnError(err error, msg string) {
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("%s: %s", msg, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
|
||||||
failOnError(err, "Failed to connect to RabbitMQ")
|
|
||||||
defer func(conn *amqp.Connection) {
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(conn)
|
|
||||||
|
|
||||||
ch, err := conn.Channel()
|
|
||||||
failOnError(err, "Failed to open a channel")
|
|
||||||
defer func(ch *amqp.Channel) {
|
|
||||||
err := ch.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(ch)
|
|
||||||
|
|
||||||
q, err := ch.QueueDeclare(
|
|
||||||
"task", // name
|
|
||||||
true, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-wait
|
|
||||||
nil, // arguments
|
|
||||||
)
|
|
||||||
failOnError(err, "Failed to declare a queue")
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
body := bodyFrom(os.Args)
|
|
||||||
err = ch.PublishWithContext(ctx,
|
|
||||||
"", // exchange
|
|
||||||
q.Name, // routing key
|
|
||||||
false, // mandatory
|
|
||||||
false,
|
|
||||||
amqp.Publishing{
|
|
||||||
DeliveryMode: amqp.Persistent,
|
|
||||||
ContentType: "text/plain",
|
|
||||||
Body: []byte(body),
|
|
||||||
})
|
|
||||||
failOnError(err, "Failed to publish a message")
|
|
||||||
log.Printf(" [x] Sent %s", body)
|
|
||||||
}
|
|
||||||
|
|
||||||
func bodyFrom(args []string) string {
|
|
||||||
var s string
|
|
||||||
if (len(args) < 2) || os.Args[1] == "" {
|
|
||||||
s = "hello"
|
|
||||||
} else {
|
|
||||||
s = strings.Join(args[1:], " ")
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
@ -1,82 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
func failOnError(err error, msg string) {
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("%s: %s", msg, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
|
||||||
failOnError(err, "Failed to connect to RabbitMQ")
|
|
||||||
defer func(conn *amqp.Connection) {
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(conn)
|
|
||||||
|
|
||||||
ch, err := conn.Channel()
|
|
||||||
failOnError(err, "Failed to open a channel")
|
|
||||||
defer func(ch *amqp.Channel) {
|
|
||||||
err := ch.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(ch)
|
|
||||||
|
|
||||||
q, err := ch.QueueDeclare(
|
|
||||||
"task", // name
|
|
||||||
true, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-wait
|
|
||||||
nil, // arguments
|
|
||||||
)
|
|
||||||
failOnError(err, "Failed to declare a queue")
|
|
||||||
|
|
||||||
err = ch.Qos(
|
|
||||||
1, // prefetch count
|
|
||||||
0, // prefetch size
|
|
||||||
false, // global
|
|
||||||
)
|
|
||||||
failOnError(err, "Failed to set QoS")
|
|
||||||
|
|
||||||
msgs, err := ch.Consume(
|
|
||||||
q.Name, // queue
|
|
||||||
"", // consumer
|
|
||||||
false, // auto-ack
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-local
|
|
||||||
false, // no-wait
|
|
||||||
nil, // args
|
|
||||||
)
|
|
||||||
failOnError(err, "Failed to register a consumer")
|
|
||||||
|
|
||||||
var forever chan struct{}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for d := range msgs {
|
|
||||||
log.Printf("Received a message: %s", d.Body)
|
|
||||||
countChar := bytes.Count(d.Body, []byte(""))
|
|
||||||
t := time.Duration(countChar)
|
|
||||||
time.Sleep(t * time.Second)
|
|
||||||
log.Printf("Done")
|
|
||||||
err := d.Ack(false)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
|
|
||||||
<-forever
|
|
||||||
}
|
|
@ -1,17 +0,0 @@
|
|||||||
# Пример для Consumer 1
|
|
||||||
FROM golang:1.23
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
# Копируем модули и загружаем зависимости
|
|
||||||
COPY go.mod go.sum ./
|
|
||||||
RUN go mod download
|
|
||||||
|
|
||||||
# Копируем исходный код
|
|
||||||
COPY . .
|
|
||||||
|
|
||||||
# Сборка
|
|
||||||
RUN go build -o firstConsumer .
|
|
||||||
|
|
||||||
# Запуск
|
|
||||||
CMD ["./firstConsumer"]
|
|
@ -1,92 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
func failOnError(err error, msg string) {
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("%s: %s", msg, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
|
||||||
failOnError(err, "Не удалось подключиться к RabbitMQ")
|
|
||||||
defer func(conn *amqp.Connection) {
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(conn)
|
|
||||||
|
|
||||||
ch, err := conn.Channel()
|
|
||||||
failOnError(err, "Не удалось открыть канал")
|
|
||||||
defer func(ch *amqp.Channel) {
|
|
||||||
err := ch.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(ch)
|
|
||||||
|
|
||||||
err = ch.ExchangeDeclare(
|
|
||||||
"tax_notifications", // имя
|
|
||||||
"fanout", // тип
|
|
||||||
true, // durable
|
|
||||||
false, // auto-deleted
|
|
||||||
false, // internal
|
|
||||||
false, // no-wait
|
|
||||||
nil, // аргументы
|
|
||||||
)
|
|
||||||
failOnError(err, "Не удалось объявить exchange")
|
|
||||||
|
|
||||||
q, err := ch.QueueDeclare(
|
|
||||||
"slow_consumer_queue", // имя очереди
|
|
||||||
false, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-wait
|
|
||||||
nil, // аргументы
|
|
||||||
)
|
|
||||||
failOnError(err, "Не удалось объявить очередь")
|
|
||||||
|
|
||||||
err = ch.QueueBind(
|
|
||||||
q.Name, // имя очереди
|
|
||||||
"", // routing key
|
|
||||||
"tax_notifications", // exchange
|
|
||||||
false,
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
failOnError(err, "Не удалось привязать очередь")
|
|
||||||
|
|
||||||
msgs, err := ch.Consume(
|
|
||||||
q.Name, // очередь
|
|
||||||
"", // firstConsumer
|
|
||||||
false, // auto-ack
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-local
|
|
||||||
false, // no-wait
|
|
||||||
nil, // аргументы
|
|
||||||
)
|
|
||||||
failOnError(err, "Не удалось зарегистрировать firstConsumer")
|
|
||||||
|
|
||||||
var forever chan struct{}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for d := range msgs {
|
|
||||||
log.Printf(" [x] Получено: %s", d.Body)
|
|
||||||
time.Sleep(2 * time.Second) // Задержка в 2 секунды
|
|
||||||
log.Printf(" [x] Обработка завершена")
|
|
||||||
err := d.Ack(false)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C")
|
|
||||||
<-forever
|
|
||||||
}
|
|
@ -1,5 +0,0 @@
|
|||||||
module firstConsumer
|
|
||||||
|
|
||||||
go 1.23.2
|
|
||||||
|
|
||||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
|
@ -1,4 +0,0 @@
|
|||||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
|
||||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
|
||||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
|
@ -1,17 +0,0 @@
|
|||||||
# Пример для publisher
|
|
||||||
FROM golang:1.23
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
# Копируем модули и загружаем зависимости
|
|
||||||
COPY go.mod go.sum ./
|
|
||||||
RUN go mod download
|
|
||||||
|
|
||||||
# Копируем исходный код
|
|
||||||
COPY . .
|
|
||||||
|
|
||||||
# Сборка
|
|
||||||
RUN go build -o publisher .
|
|
||||||
|
|
||||||
# Запуск
|
|
||||||
CMD ["./publisher"]
|
|
@ -1,5 +0,0 @@
|
|||||||
module publisher
|
|
||||||
|
|
||||||
go 1.23.2
|
|
||||||
|
|
||||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
|
@ -1,4 +0,0 @@
|
|||||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
|
||||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
|
||||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
|
@ -1,70 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"log"
|
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
func failOnError(err error, msg string) {
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("%s: %s", msg, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
|
||||||
failOnError(err, "Не удалось подключиться к RabbitMQ")
|
|
||||||
defer func(conn *amqp.Connection) {
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(conn)
|
|
||||||
|
|
||||||
ch, err := conn.Channel()
|
|
||||||
failOnError(err, "Не удалось открыть канал")
|
|
||||||
defer func(ch *amqp.Channel) {
|
|
||||||
err := ch.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(ch)
|
|
||||||
|
|
||||||
err = ch.ExchangeDeclare(
|
|
||||||
"tax_notifications", // имя
|
|
||||||
"fanout", // тип
|
|
||||||
true, // durable
|
|
||||||
false, // auto-deleted
|
|
||||||
false, // internal
|
|
||||||
false, // no-wait
|
|
||||||
nil, // аргументы
|
|
||||||
)
|
|
||||||
failOnError(err, "Не удалось объявить exchange")
|
|
||||||
|
|
||||||
i := 1
|
|
||||||
|
|
||||||
for {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
body := "Уведомление о штрафе за нарушение ПДД №" + strconv.Itoa(i)
|
|
||||||
err = ch.PublishWithContext(ctx,
|
|
||||||
"tax_notifications", // exchange
|
|
||||||
"", // routing key
|
|
||||||
false, // mandatory
|
|
||||||
false, // immediate
|
|
||||||
amqp.Publishing{
|
|
||||||
ContentType: "text/plain",
|
|
||||||
Body: []byte(body),
|
|
||||||
})
|
|
||||||
failOnError(err, "Не удалось отправить сообщение")
|
|
||||||
|
|
||||||
log.Printf(" [x] Отправлено: %s", body)
|
|
||||||
time.Sleep(1 * time.Second) // Генерация сообщений раз в секунду
|
|
||||||
i++
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,17 +0,0 @@
|
|||||||
# Пример для Consumer 1
|
|
||||||
FROM golang:1.23
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
# Копируем модули и загружаем зависимости
|
|
||||||
COPY go.mod go.sum ./
|
|
||||||
RUN go mod download
|
|
||||||
|
|
||||||
# Копируем исходный код
|
|
||||||
COPY . .
|
|
||||||
|
|
||||||
# Сборка
|
|
||||||
RUN go build -o secondConsumer .
|
|
||||||
|
|
||||||
# Запуск
|
|
||||||
CMD ["./secondConsumer"]
|
|
@ -1,5 +0,0 @@
|
|||||||
module secondConsumer
|
|
||||||
|
|
||||||
go 1.23.2
|
|
||||||
|
|
||||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
|
@ -1,4 +0,0 @@
|
|||||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
|
||||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
|
||||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
|
@ -1,90 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
func failError(err error, msg string) {
|
|
||||||
if err != nil {
|
|
||||||
log.Panicf("%s: %s", msg, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
|
||||||
failError(err, "Не удалось подключиться к RabbitMQ")
|
|
||||||
defer func(conn *amqp.Connection) {
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(conn)
|
|
||||||
|
|
||||||
ch, err := conn.Channel()
|
|
||||||
failError(err, "Не удалось открыть канал")
|
|
||||||
defer func(ch *amqp.Channel) {
|
|
||||||
err := ch.Close()
|
|
||||||
if err != nil {
|
|
||||||
|
|
||||||
}
|
|
||||||
}(ch)
|
|
||||||
|
|
||||||
err = ch.ExchangeDeclare(
|
|
||||||
"tax_notifications", // имя
|
|
||||||
"fanout", // тип
|
|
||||||
true, // durable
|
|
||||||
false, // auto-deleted
|
|
||||||
false, // internal
|
|
||||||
false, // no-wait
|
|
||||||
nil, // аргументы
|
|
||||||
)
|
|
||||||
failError(err, "Не удалось объявить exchange")
|
|
||||||
|
|
||||||
q, err := ch.QueueDeclare(
|
|
||||||
"fast_consumer_queue", // имя очереди
|
|
||||||
false, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-wait
|
|
||||||
nil, // аргументы
|
|
||||||
)
|
|
||||||
failError(err, "Не удалось объявить очередь")
|
|
||||||
|
|
||||||
err = ch.QueueBind(
|
|
||||||
q.Name, // имя очереди
|
|
||||||
"", // routing key
|
|
||||||
"tax_notifications", // exchange
|
|
||||||
false,
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
failError(err, "Не удалось привязать очередь")
|
|
||||||
|
|
||||||
msgs, err := ch.Consume(
|
|
||||||
q.Name, // очередь
|
|
||||||
"", // firstConsumer
|
|
||||||
false, // auto-ack
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-local
|
|
||||||
false, // no-wait
|
|
||||||
nil, // аргументы
|
|
||||||
)
|
|
||||||
failError(err, "Не удалось зарегистрировать firstConsumer")
|
|
||||||
|
|
||||||
var forever chan struct{}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for d := range msgs {
|
|
||||||
log.Printf(" [x] Получено: %s", d.Body)
|
|
||||||
log.Printf(" [x] Обработка завершена")
|
|
||||||
err := d.Ack(false)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C")
|
|
||||||
<-forever
|
|
||||||
}
|
|
@ -1,38 +0,0 @@
|
|||||||
services:
|
|
||||||
rabbitmq:
|
|
||||||
image: rabbitmq:3-management
|
|
||||||
ports:
|
|
||||||
- "5672:5672"
|
|
||||||
- "15672:15672"
|
|
||||||
environment:
|
|
||||||
RABBITMQ_DEFAULT_USER: guest
|
|
||||||
RABBITMQ_DEFAULT_PASS: guest
|
|
||||||
healthcheck:
|
|
||||||
test: ["CMD", "rabbitmqctl", "status"]
|
|
||||||
interval: 10s
|
|
||||||
timeout: 5s
|
|
||||||
retries: 5
|
|
||||||
|
|
||||||
publisher:
|
|
||||||
build: ThirdTutorial/publisher
|
|
||||||
depends_on:
|
|
||||||
rabbitmq:
|
|
||||||
condition: service_healthy
|
|
||||||
environment:
|
|
||||||
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
|
||||||
|
|
||||||
first_consumer:
|
|
||||||
build: ThirdTutorial/firstConsumer
|
|
||||||
depends_on:
|
|
||||||
rabbitmq:
|
|
||||||
condition: service_healthy
|
|
||||||
environment:
|
|
||||||
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
|
||||||
|
|
||||||
second_consumer:
|
|
||||||
build: ThirdTutorial/secondConsumer
|
|
||||||
depends_on:
|
|
||||||
rabbitmq:
|
|
||||||
condition: service_healthy
|
|
||||||
environment:
|
|
||||||
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
|
Loading…
Reference in New Issue
Block a user