forked from Alexey/DAS_2024_1
Compare commits
1 Commits
main
...
zhimolostn
Author | SHA1 | Date | |
---|---|---|---|
77790c37fb |
5
zhimolostnova_anna_lab_4/FirstTutorial/go.mod
Normal file
5
zhimolostnova_anna_lab_4/FirstTutorial/go.mod
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
module FirstTutorial
|
||||||
|
|
||||||
|
go 1.23.2
|
||||||
|
|
||||||
|
require github.com/rabbitmq/amqp091-go v1.10.0
|
4
zhimolostnova_anna_lab_4/FirstTutorial/go.sum
Normal file
4
zhimolostnova_anna_lab_4/FirstTutorial/go.sum
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||||
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
65
zhimolostnova_anna_lab_4/FirstTutorial/receiver/receive.go
Normal file
65
zhimolostnova_anna_lab_4/FirstTutorial/receiver/receive.go
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
func failOnError(err error, msg string) {
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("%s: %s", msg, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||||
|
failOnError(err, "Failed to connect to RabbitMQ")
|
||||||
|
defer func(conn *amqp.Connection) {
|
||||||
|
err := conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(conn)
|
||||||
|
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
failOnError(err, "Failed to open a channel")
|
||||||
|
defer func(ch *amqp.Channel) {
|
||||||
|
err := ch.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(ch)
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare(
|
||||||
|
"hello", // name
|
||||||
|
false, // durable
|
||||||
|
false, // delete when unused
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // arguments
|
||||||
|
)
|
||||||
|
failOnError(err, "Failed to declare a queue")
|
||||||
|
|
||||||
|
msgs, err := ch.Consume(
|
||||||
|
q.Name, // queue
|
||||||
|
"", // consumer
|
||||||
|
true, // auto-ack
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-local
|
||||||
|
false, // no-wait
|
||||||
|
nil, // args
|
||||||
|
)
|
||||||
|
failOnError(err, "Failed to register a consumer")
|
||||||
|
|
||||||
|
var forever chan struct{}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for d := range msgs {
|
||||||
|
log.Printf("Received a message: %s", d.Body)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
|
||||||
|
<-forever
|
||||||
|
}
|
63
zhimolostnova_anna_lab_4/FirstTutorial/sender/send.go
Normal file
63
zhimolostnova_anna_lab_4/FirstTutorial/sender/send.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
func failOnError(err error, msg string) {
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("%s: %s", msg, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||||
|
failOnError(err, "Failed to connect to RabbitMQ")
|
||||||
|
defer func(conn *amqp.Connection) {
|
||||||
|
err := conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(conn)
|
||||||
|
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
failOnError(err, "Failed to open a channel")
|
||||||
|
defer func(ch *amqp.Channel) {
|
||||||
|
err := ch.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(ch)
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare(
|
||||||
|
"hello", // name
|
||||||
|
false, // durable
|
||||||
|
false, // delete when unused
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // arguments
|
||||||
|
)
|
||||||
|
failOnError(err, "Failed to declare a queue")
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
for d := range 10 {
|
||||||
|
body := "This message for consumer by number " + strconv.Itoa(d+1)
|
||||||
|
err = ch.PublishWithContext(ctx,
|
||||||
|
"", // exchange
|
||||||
|
q.Name, // routing key
|
||||||
|
false, // mandatory
|
||||||
|
false, // immediate
|
||||||
|
amqp.Publishing{
|
||||||
|
ContentType: "text/plain",
|
||||||
|
Body: []byte(body),
|
||||||
|
})
|
||||||
|
failOnError(err, "Failed to publish a message")
|
||||||
|
log.Printf(" [x] Sent %s\n", body)
|
||||||
|
}
|
||||||
|
}
|
80
zhimolostnova_anna_lab_4/README.md
Normal file
80
zhimolostnova_anna_lab_4/README.md
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
# Отчет по лабораторной работе №4
|
||||||
|
|
||||||
|
## Поставленные задачи
|
||||||
|
|
||||||
|
1. Установить брокер сообщений RabbitMQ.
|
||||||
|
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
|
||||||
|
3. Продемонстрировать работу брокера сообщений.
|
||||||
|
|
||||||
|
## Предметная область
|
||||||
|
|
||||||
|
Уведомление о штрафах за нарушение ПДД.
|
||||||
|
|
||||||
|
## Запуск работы
|
||||||
|
|
||||||
|
1. Убедиться, что установлены необходимые технологии:
|
||||||
|
- **Docker**: Платформа для контейнеризации приложений.
|
||||||
|
- **RabbitMQ**: Брокер сообщений.
|
||||||
|
- **Docker Compose**: Инструмент для запуска многоконтейнерных приложений на основе `docker-compose.yaml`. Обычно поставляется вместе с Docker. Чтобы проверить, установлена ли утилита, нужно запустить команду:
|
||||||
|
```bash
|
||||||
|
docker-compose --version
|
||||||
|
```
|
||||||
|
|
||||||
|
2. В директории, где находится файл `docker-compose.yaml`, выполнить следующую команду для запуска всех сервисов:
|
||||||
|
```bash
|
||||||
|
docker-compose up --build
|
||||||
|
```
|
||||||
|
Эта команда сначала выполнит сборку, а затем запустит контейнеры.
|
||||||
|
|
||||||
|
3. После успешного запуска можно перейти на RabbitMQ Management UI:
|
||||||
|
- RabbitMQ Management UI: [http://localhost:15672/#/](http://localhost:15672/#/).
|
||||||
|
|
||||||
|
## Технологии
|
||||||
|
|
||||||
|
1. Golang: основной язык программирования.
|
||||||
|
2. Docker & Docker Compose: для контейнеризации сервисов и удобного развертывания.
|
||||||
|
3. RabbitMQ: брокер сообщений.
|
||||||
|
|
||||||
|
## Анализ полученных данных
|
||||||
|
|
||||||
|
![SlowQueueResult.png](ReportImages%2FSlowQueueResult.png)
|
||||||
|
Для замедленной очереди можно сделать следующие выводы:
|
||||||
|
- Все 366 сообщений доставлены потребителям, но они еще не подтверждены.
|
||||||
|
Поскольку "Ready" равен нулю, это означает, что все сообщения сразу передаются потребителям,
|
||||||
|
и ни одно сообщение не ожидает очереди.
|
||||||
|
- Однако, поскольку все 366 сообщений находятся в состоянии "Unacked",
|
||||||
|
это указывает на то, что потребитель не подтверждает обработку сообщений вовремя,
|
||||||
|
что связано с медленной обработкой (как предполагает название очереди slow_consumer_queue).
|
||||||
|
- Публикация сообщений идет с постоянной скоростью 1 сообщение в секунду,
|
||||||
|
и с той же скоростью сообщения доставляются потребителю.
|
||||||
|
- Однако скорость подтверждения (Consumer ack) составляет лишь 0.60 сообщений в секунду,
|
||||||
|
что означает, что потребитель обрабатывает и подтверждает сообщения медленнее, чем они поступают.
|
||||||
|
Это вызывает накопление неподтвержденных сообщений в очереди (Unacked), и со временем их число
|
||||||
|
может продолжать расти, если скорость обработки не увеличится.
|
||||||
|
|
||||||
|
![FastQueueResult.png](ReportImages%2FFastQueueResult.png)
|
||||||
|
Для быстрой очереди можно сделать следующие выводы:
|
||||||
|
- Очередь пуста. Т.е. нет сообщений, которые ожидают обработки или находятся в процессе обработки.
|
||||||
|
Это означает, что потребитель обрабатывает сообщения сразу после их получения, и сообщения не
|
||||||
|
накапливаются в очереди.
|
||||||
|
- Сообщения публикуются и доставляются потребителю со скоростью 1 сообщение в секунду, и
|
||||||
|
потребитель подтверждает обработку каждого сообщения также со скоростью 1 сообщение в секунду.
|
||||||
|
- Поскольку скорость подтверждения и доставки совпадают, сообщений не накапливается, и все
|
||||||
|
обрабатывается своевременно.
|
||||||
|
|
||||||
|
## Ход работы
|
||||||
|
|
||||||
|
1. **Прохождение первого урока**: директория FirstTutorial
|
||||||
|
![FirstTutorial.png](ReportImages%2FFirstTutorial.png)
|
||||||
|
|
||||||
|
2. **Прохождение второго урока**: директория SecondTutorial
|
||||||
|
![SecondTutorial.png](ReportImages%2FSecondTutorial.png)
|
||||||
|
|
||||||
|
3. **Прохождение третьего урока и выполнение задания на ЛР №3**: директория ThirdTutorial
|
||||||
|
![ThirdTutorial.png](ReportImages%2FThirdTutorial.png)
|
||||||
|
|
||||||
|
В коде присутствуют пояснительные комментарии.
|
||||||
|
|
||||||
|
## Демонстрационное видео
|
||||||
|
|
||||||
|
Видеозапись доступна по адресу: [https://vk.com/video193898050_456240872](https://vk.com/video193898050_456240872)
|
BIN
zhimolostnova_anna_lab_4/ReportImages/FastQueueResult.png
Normal file
BIN
zhimolostnova_anna_lab_4/ReportImages/FastQueueResult.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 32 KiB |
BIN
zhimolostnova_anna_lab_4/ReportImages/FirstTutorial.png
Normal file
BIN
zhimolostnova_anna_lab_4/ReportImages/FirstTutorial.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 253 KiB |
BIN
zhimolostnova_anna_lab_4/ReportImages/SecondTutorial.png
Normal file
BIN
zhimolostnova_anna_lab_4/ReportImages/SecondTutorial.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 245 KiB |
BIN
zhimolostnova_anna_lab_4/ReportImages/SlowQueueResult.png
Normal file
BIN
zhimolostnova_anna_lab_4/ReportImages/SlowQueueResult.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 33 KiB |
BIN
zhimolostnova_anna_lab_4/ReportImages/ThirdTutorial.png
Normal file
BIN
zhimolostnova_anna_lab_4/ReportImages/ThirdTutorial.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 223 KiB |
5
zhimolostnova_anna_lab_4/SecondTutorial/go.mod
Normal file
5
zhimolostnova_anna_lab_4/SecondTutorial/go.mod
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
module SecondTutorial
|
||||||
|
|
||||||
|
go 1.23.2
|
||||||
|
|
||||||
|
require github.com/rabbitmq/amqp091-go v1.10.0
|
4
zhimolostnova_anna_lab_4/SecondTutorial/go.sum
Normal file
4
zhimolostnova_anna_lab_4/SecondTutorial/go.sum
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||||
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
74
zhimolostnova_anna_lab_4/SecondTutorial/tasks/new_task.go
Normal file
74
zhimolostnova_anna_lab_4/SecondTutorial/tasks/new_task.go
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
func failOnError(err error, msg string) {
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("%s: %s", msg, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||||
|
failOnError(err, "Failed to connect to RabbitMQ")
|
||||||
|
defer func(conn *amqp.Connection) {
|
||||||
|
err := conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(conn)
|
||||||
|
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
failOnError(err, "Failed to open a channel")
|
||||||
|
defer func(ch *amqp.Channel) {
|
||||||
|
err := ch.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(ch)
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare(
|
||||||
|
"task", // name
|
||||||
|
true, // durable
|
||||||
|
false, // delete when unused
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // arguments
|
||||||
|
)
|
||||||
|
failOnError(err, "Failed to declare a queue")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
body := bodyFrom(os.Args)
|
||||||
|
err = ch.PublishWithContext(ctx,
|
||||||
|
"", // exchange
|
||||||
|
q.Name, // routing key
|
||||||
|
false, // mandatory
|
||||||
|
false,
|
||||||
|
amqp.Publishing{
|
||||||
|
DeliveryMode: amqp.Persistent,
|
||||||
|
ContentType: "text/plain",
|
||||||
|
Body: []byte(body),
|
||||||
|
})
|
||||||
|
failOnError(err, "Failed to publish a message")
|
||||||
|
log.Printf(" [x] Sent %s", body)
|
||||||
|
}
|
||||||
|
|
||||||
|
func bodyFrom(args []string) string {
|
||||||
|
var s string
|
||||||
|
if (len(args) < 2) || os.Args[1] == "" {
|
||||||
|
s = "hello"
|
||||||
|
} else {
|
||||||
|
s = strings.Join(args[1:], " ")
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
82
zhimolostnova_anna_lab_4/SecondTutorial/workers/worker.go
Normal file
82
zhimolostnova_anna_lab_4/SecondTutorial/workers/worker.go
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
func failOnError(err error, msg string) {
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("%s: %s", msg, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||||
|
failOnError(err, "Failed to connect to RabbitMQ")
|
||||||
|
defer func(conn *amqp.Connection) {
|
||||||
|
err := conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(conn)
|
||||||
|
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
failOnError(err, "Failed to open a channel")
|
||||||
|
defer func(ch *amqp.Channel) {
|
||||||
|
err := ch.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(ch)
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare(
|
||||||
|
"task", // name
|
||||||
|
true, // durable
|
||||||
|
false, // delete when unused
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // arguments
|
||||||
|
)
|
||||||
|
failOnError(err, "Failed to declare a queue")
|
||||||
|
|
||||||
|
err = ch.Qos(
|
||||||
|
1, // prefetch count
|
||||||
|
0, // prefetch size
|
||||||
|
false, // global
|
||||||
|
)
|
||||||
|
failOnError(err, "Failed to set QoS")
|
||||||
|
|
||||||
|
msgs, err := ch.Consume(
|
||||||
|
q.Name, // queue
|
||||||
|
"", // consumer
|
||||||
|
false, // auto-ack
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-local
|
||||||
|
false, // no-wait
|
||||||
|
nil, // args
|
||||||
|
)
|
||||||
|
failOnError(err, "Failed to register a consumer")
|
||||||
|
|
||||||
|
var forever chan struct{}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for d := range msgs {
|
||||||
|
log.Printf("Received a message: %s", d.Body)
|
||||||
|
countChar := bytes.Count(d.Body, []byte(""))
|
||||||
|
t := time.Duration(countChar)
|
||||||
|
time.Sleep(t * time.Second)
|
||||||
|
log.Printf("Done")
|
||||||
|
err := d.Ack(false)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
|
||||||
|
<-forever
|
||||||
|
}
|
@ -0,0 +1,17 @@
|
|||||||
|
# Пример для Consumer 1
|
||||||
|
FROM golang:1.23
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Копируем модули и загружаем зависимости
|
||||||
|
COPY go.mod go.sum ./
|
||||||
|
RUN go mod download
|
||||||
|
|
||||||
|
# Копируем исходный код
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
# Сборка
|
||||||
|
RUN go build -o firstConsumer .
|
||||||
|
|
||||||
|
# Запуск
|
||||||
|
CMD ["./firstConsumer"]
|
@ -0,0 +1,92 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
func failOnError(err error, msg string) {
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("%s: %s", msg, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
||||||
|
failOnError(err, "Не удалось подключиться к RabbitMQ")
|
||||||
|
defer func(conn *amqp.Connection) {
|
||||||
|
err := conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(conn)
|
||||||
|
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
failOnError(err, "Не удалось открыть канал")
|
||||||
|
defer func(ch *amqp.Channel) {
|
||||||
|
err := ch.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(ch)
|
||||||
|
|
||||||
|
err = ch.ExchangeDeclare(
|
||||||
|
"tax_notifications", // имя
|
||||||
|
"fanout", // тип
|
||||||
|
true, // durable
|
||||||
|
false, // auto-deleted
|
||||||
|
false, // internal
|
||||||
|
false, // no-wait
|
||||||
|
nil, // аргументы
|
||||||
|
)
|
||||||
|
failOnError(err, "Не удалось объявить exchange")
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare(
|
||||||
|
"slow_consumer_queue", // имя очереди
|
||||||
|
false, // durable
|
||||||
|
false, // delete when unused
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // аргументы
|
||||||
|
)
|
||||||
|
failOnError(err, "Не удалось объявить очередь")
|
||||||
|
|
||||||
|
err = ch.QueueBind(
|
||||||
|
q.Name, // имя очереди
|
||||||
|
"", // routing key
|
||||||
|
"tax_notifications", // exchange
|
||||||
|
false,
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
failOnError(err, "Не удалось привязать очередь")
|
||||||
|
|
||||||
|
msgs, err := ch.Consume(
|
||||||
|
q.Name, // очередь
|
||||||
|
"", // firstConsumer
|
||||||
|
false, // auto-ack
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-local
|
||||||
|
false, // no-wait
|
||||||
|
nil, // аргументы
|
||||||
|
)
|
||||||
|
failOnError(err, "Не удалось зарегистрировать firstConsumer")
|
||||||
|
|
||||||
|
var forever chan struct{}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for d := range msgs {
|
||||||
|
log.Printf(" [x] Получено: %s", d.Body)
|
||||||
|
time.Sleep(2 * time.Second) // Задержка в 2 секунды
|
||||||
|
log.Printf(" [x] Обработка завершена")
|
||||||
|
err := d.Ack(false)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C")
|
||||||
|
<-forever
|
||||||
|
}
|
@ -0,0 +1,5 @@
|
|||||||
|
module firstConsumer
|
||||||
|
|
||||||
|
go 1.23.2
|
||||||
|
|
||||||
|
require github.com/rabbitmq/amqp091-go v1.10.0
|
@ -0,0 +1,4 @@
|
|||||||
|
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||||
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
17
zhimolostnova_anna_lab_4/ThirdTutorial/publisher/Dockerfile
Normal file
17
zhimolostnova_anna_lab_4/ThirdTutorial/publisher/Dockerfile
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
# Пример для publisher
|
||||||
|
FROM golang:1.23
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Копируем модули и загружаем зависимости
|
||||||
|
COPY go.mod go.sum ./
|
||||||
|
RUN go mod download
|
||||||
|
|
||||||
|
# Копируем исходный код
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
# Сборка
|
||||||
|
RUN go build -o publisher .
|
||||||
|
|
||||||
|
# Запуск
|
||||||
|
CMD ["./publisher"]
|
5
zhimolostnova_anna_lab_4/ThirdTutorial/publisher/go.mod
Normal file
5
zhimolostnova_anna_lab_4/ThirdTutorial/publisher/go.mod
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
module publisher
|
||||||
|
|
||||||
|
go 1.23.2
|
||||||
|
|
||||||
|
require github.com/rabbitmq/amqp091-go v1.10.0
|
4
zhimolostnova_anna_lab_4/ThirdTutorial/publisher/go.sum
Normal file
4
zhimolostnova_anna_lab_4/ThirdTutorial/publisher/go.sum
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||||
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
@ -0,0 +1,70 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
func failOnError(err error, msg string) {
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("%s: %s", msg, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
||||||
|
failOnError(err, "Не удалось подключиться к RabbitMQ")
|
||||||
|
defer func(conn *amqp.Connection) {
|
||||||
|
err := conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(conn)
|
||||||
|
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
failOnError(err, "Не удалось открыть канал")
|
||||||
|
defer func(ch *amqp.Channel) {
|
||||||
|
err := ch.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(ch)
|
||||||
|
|
||||||
|
err = ch.ExchangeDeclare(
|
||||||
|
"tax_notifications", // имя
|
||||||
|
"fanout", // тип
|
||||||
|
true, // durable
|
||||||
|
false, // auto-deleted
|
||||||
|
false, // internal
|
||||||
|
false, // no-wait
|
||||||
|
nil, // аргументы
|
||||||
|
)
|
||||||
|
failOnError(err, "Не удалось объявить exchange")
|
||||||
|
|
||||||
|
i := 1
|
||||||
|
|
||||||
|
for {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
body := "Уведомление о штрафе за нарушение ПДД №" + strconv.Itoa(i)
|
||||||
|
err = ch.PublishWithContext(ctx,
|
||||||
|
"tax_notifications", // exchange
|
||||||
|
"", // routing key
|
||||||
|
false, // mandatory
|
||||||
|
false, // immediate
|
||||||
|
amqp.Publishing{
|
||||||
|
ContentType: "text/plain",
|
||||||
|
Body: []byte(body),
|
||||||
|
})
|
||||||
|
failOnError(err, "Не удалось отправить сообщение")
|
||||||
|
|
||||||
|
log.Printf(" [x] Отправлено: %s", body)
|
||||||
|
time.Sleep(1 * time.Second) // Генерация сообщений раз в секунду
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,17 @@
|
|||||||
|
# Пример для Consumer 1
|
||||||
|
FROM golang:1.23
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Копируем модули и загружаем зависимости
|
||||||
|
COPY go.mod go.sum ./
|
||||||
|
RUN go mod download
|
||||||
|
|
||||||
|
# Копируем исходный код
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
# Сборка
|
||||||
|
RUN go build -o secondConsumer .
|
||||||
|
|
||||||
|
# Запуск
|
||||||
|
CMD ["./secondConsumer"]
|
@ -0,0 +1,5 @@
|
|||||||
|
module secondConsumer
|
||||||
|
|
||||||
|
go 1.23.2
|
||||||
|
|
||||||
|
require github.com/rabbitmq/amqp091-go v1.10.0
|
@ -0,0 +1,4 @@
|
|||||||
|
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||||
|
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||||
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
@ -0,0 +1,90 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
|
||||||
|
amqp "github.com/rabbitmq/amqp091-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
func failError(err error, msg string) {
|
||||||
|
if err != nil {
|
||||||
|
log.Panicf("%s: %s", msg, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
||||||
|
failError(err, "Не удалось подключиться к RabbitMQ")
|
||||||
|
defer func(conn *amqp.Connection) {
|
||||||
|
err := conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(conn)
|
||||||
|
|
||||||
|
ch, err := conn.Channel()
|
||||||
|
failError(err, "Не удалось открыть канал")
|
||||||
|
defer func(ch *amqp.Channel) {
|
||||||
|
err := ch.Close()
|
||||||
|
if err != nil {
|
||||||
|
|
||||||
|
}
|
||||||
|
}(ch)
|
||||||
|
|
||||||
|
err = ch.ExchangeDeclare(
|
||||||
|
"tax_notifications", // имя
|
||||||
|
"fanout", // тип
|
||||||
|
true, // durable
|
||||||
|
false, // auto-deleted
|
||||||
|
false, // internal
|
||||||
|
false, // no-wait
|
||||||
|
nil, // аргументы
|
||||||
|
)
|
||||||
|
failError(err, "Не удалось объявить exchange")
|
||||||
|
|
||||||
|
q, err := ch.QueueDeclare(
|
||||||
|
"fast_consumer_queue", // имя очереди
|
||||||
|
false, // durable
|
||||||
|
false, // delete when unused
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // аргументы
|
||||||
|
)
|
||||||
|
failError(err, "Не удалось объявить очередь")
|
||||||
|
|
||||||
|
err = ch.QueueBind(
|
||||||
|
q.Name, // имя очереди
|
||||||
|
"", // routing key
|
||||||
|
"tax_notifications", // exchange
|
||||||
|
false,
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
failError(err, "Не удалось привязать очередь")
|
||||||
|
|
||||||
|
msgs, err := ch.Consume(
|
||||||
|
q.Name, // очередь
|
||||||
|
"", // firstConsumer
|
||||||
|
false, // auto-ack
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-local
|
||||||
|
false, // no-wait
|
||||||
|
nil, // аргументы
|
||||||
|
)
|
||||||
|
failError(err, "Не удалось зарегистрировать firstConsumer")
|
||||||
|
|
||||||
|
var forever chan struct{}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for d := range msgs {
|
||||||
|
log.Printf(" [x] Получено: %s", d.Body)
|
||||||
|
log.Printf(" [x] Обработка завершена")
|
||||||
|
err := d.Ack(false)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C")
|
||||||
|
<-forever
|
||||||
|
}
|
38
zhimolostnova_anna_lab_4/docker-compose.yml
Normal file
38
zhimolostnova_anna_lab_4/docker-compose.yml
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
services:
|
||||||
|
rabbitmq:
|
||||||
|
image: rabbitmq:3-management
|
||||||
|
ports:
|
||||||
|
- "5672:5672"
|
||||||
|
- "15672:15672"
|
||||||
|
environment:
|
||||||
|
RABBITMQ_DEFAULT_USER: guest
|
||||||
|
RABBITMQ_DEFAULT_PASS: guest
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "rabbitmqctl", "status"]
|
||||||
|
interval: 10s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 5
|
||||||
|
|
||||||
|
publisher:
|
||||||
|
build: ThirdTutorial/publisher
|
||||||
|
depends_on:
|
||||||
|
rabbitmq:
|
||||||
|
condition: service_healthy
|
||||||
|
environment:
|
||||||
|
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
||||||
|
|
||||||
|
first_consumer:
|
||||||
|
build: ThirdTutorial/firstConsumer
|
||||||
|
depends_on:
|
||||||
|
rabbitmq:
|
||||||
|
condition: service_healthy
|
||||||
|
environment:
|
||||||
|
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
||||||
|
|
||||||
|
second_consumer:
|
||||||
|
build: ThirdTutorial/secondConsumer
|
||||||
|
depends_on:
|
||||||
|
rabbitmq:
|
||||||
|
condition: service_healthy
|
||||||
|
environment:
|
||||||
|
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
Loading…
Reference in New Issue
Block a user