Compare commits

..

1 Commits

Author SHA1 Message Date
77790c37fb lab 4 complete 2024-10-09 17:12:11 +03:00
27 changed files with 750 additions and 0 deletions

View File

@ -0,0 +1,5 @@
module FirstTutorial
go 1.23.2
require github.com/rabbitmq/amqp091-go v1.10.0

View File

@ -0,0 +1,4 @@
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

View File

@ -0,0 +1,65 @@
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
}
}(conn)
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
}
}(ch)
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}

View File

@ -0,0 +1,63 @@
package main
import (
"context"
"log"
"strconv"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
}
}(conn)
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
}
}(ch)
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for d := range 10 {
body := "This message for consumer by number " + strconv.Itoa(d+1)
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s\n", body)
}
}

View File

@ -0,0 +1,80 @@
# Отчет по лабораторной работе №4
## Поставленные задачи
1. Установить брокер сообщений RabbitMQ.
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
3. Продемонстрировать работу брокера сообщений.
## Предметная область
Уведомление о штрафах за нарушение ПДД.
## Запуск работы
1. Убедиться, что установлены необходимые технологии:
- **Docker**: Платформа для контейнеризации приложений.
- **RabbitMQ**: Брокер сообщений.
- **Docker Compose**: Инструмент для запуска многоконтейнерных приложений на основе `docker-compose.yaml`. Обычно поставляется вместе с Docker. Чтобы проверить, установлена ли утилита, нужно запустить команду:
```bash
docker-compose --version
```
2. В директории, где находится файл `docker-compose.yaml`, выполнить следующую команду для запуска всех сервисов:
```bash
docker-compose up --build
```
Эта команда сначала выполнит сборку, а затем запустит контейнеры.
3. После успешного запуска можно перейти на RabbitMQ Management UI:
- RabbitMQ Management UI: [http://localhost:15672/#/](http://localhost:15672/#/).
## Технологии
1. Golang: основной язык программирования.
2. Docker & Docker Compose: для контейнеризации сервисов и удобного развертывания.
3. RabbitMQ: брокер сообщений.
## Анализ полученных данных
![SlowQueueResult.png](ReportImages%2FSlowQueueResult.png)
Для замедленной очереди можно сделать следующие выводы:
- Все 366 сообщений доставлены потребителям, но они еще не подтверждены.
Поскольку "Ready" равен нулю, это означает, что все сообщения сразу передаются потребителям,
и ни одно сообщение не ожидает очереди.
- Однако, поскольку все 366 сообщений находятся в состоянии "Unacked",
это указывает на то, что потребитель не подтверждает обработку сообщений вовремя,
что связано с медленной обработкой (как предполагает название очереди slow_consumer_queue).
- Публикация сообщений идет с постоянной скоростью 1 сообщение в секунду,
и с той же скоростью сообщения доставляются потребителю.
- Однако скорость подтверждения (Consumer ack) составляет лишь 0.60 сообщений в секунду,
что означает, что потребитель обрабатывает и подтверждает сообщения медленнее, чем они поступают.
Это вызывает накопление неподтвержденных сообщений в очереди (Unacked), и со временем их число
может продолжать расти, если скорость обработки не увеличится.
![FastQueueResult.png](ReportImages%2FFastQueueResult.png)
Для быстрой очереди можно сделать следующие выводы:
- Очередь пуста. Т.е. нет сообщений, которые ожидают обработки или находятся в процессе обработки.
Это означает, что потребитель обрабатывает сообщения сразу после их получения, и сообщения не
накапливаются в очереди.
- Сообщения публикуются и доставляются потребителю со скоростью 1 сообщение в секунду, и
потребитель подтверждает обработку каждого сообщения также со скоростью 1 сообщение в секунду.
- Поскольку скорость подтверждения и доставки совпадают, сообщений не накапливается, и все
обрабатывается своевременно.
## Ход работы
1. **Прохождение первого урока**: директория FirstTutorial
![FirstTutorial.png](ReportImages%2FFirstTutorial.png)
2. **Прохождение второго урока**: директория SecondTutorial
![SecondTutorial.png](ReportImages%2FSecondTutorial.png)
3. **Прохождение третьего урока и выполнение задания на ЛР №3**: директория ThirdTutorial
![ThirdTutorial.png](ReportImages%2FThirdTutorial.png)
В коде присутствуют пояснительные комментарии.
## Демонстрационное видео
Видеозапись доступна по адресу: [https://vk.com/video193898050_456240872](https://vk.com/video193898050_456240872)

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 253 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 245 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 223 KiB

View File

@ -0,0 +1,5 @@
module SecondTutorial
go 1.23.2
require github.com/rabbitmq/amqp091-go v1.10.0

View File

@ -0,0 +1,4 @@
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

View File

@ -0,0 +1,74 @@
package main
import (
"context"
"log"
"os"
"strings"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
}
}(conn)
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
}
}(ch)
q, err := ch.QueueDeclare(
"task", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
func bodyFrom(args []string) string {
var s string
if (len(args) < 2) || os.Args[1] == "" {
s = "hello"
} else {
s = strings.Join(args[1:], " ")
}
return s
}

View File

@ -0,0 +1,82 @@
package main
import (
"bytes"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
}
}(conn)
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
}
}(ch)
q, err := ch.QueueDeclare(
"task", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
countChar := bytes.Count(d.Body, []byte(""))
t := time.Duration(countChar)
time.Sleep(t * time.Second)
log.Printf("Done")
err := d.Ack(false)
if err != nil {
return
}
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}

View File

@ -0,0 +1,17 @@
# Пример для Consumer 1
FROM golang:1.23
WORKDIR /app
# Копируем модули и загружаем зависимости
COPY go.mod go.sum ./
RUN go mod download
# Копируем исходный код
COPY . .
# Сборка
RUN go build -o firstConsumer .
# Запуск
CMD ["./firstConsumer"]

View File

@ -0,0 +1,92 @@
package main
import (
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
failOnError(err, "Не удалось подключиться к RabbitMQ")
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
}
}(conn)
ch, err := conn.Channel()
failOnError(err, "Не удалось открыть канал")
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
}
}(ch)
err = ch.ExchangeDeclare(
"tax_notifications", // имя
"fanout", // тип
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // аргументы
)
failOnError(err, "Не удалось объявить exchange")
q, err := ch.QueueDeclare(
"slow_consumer_queue", // имя очереди
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // аргументы
)
failOnError(err, "Не удалось объявить очередь")
err = ch.QueueBind(
q.Name, // имя очереди
"", // routing key
"tax_notifications", // exchange
false,
nil,
)
failOnError(err, "Не удалось привязать очередь")
msgs, err := ch.Consume(
q.Name, // очередь
"", // firstConsumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // аргументы
)
failOnError(err, "Не удалось зарегистрировать firstConsumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] Получено: %s", d.Body)
time.Sleep(2 * time.Second) // Задержка в 2 секунды
log.Printf(" [x] Обработка завершена")
err := d.Ack(false)
if err != nil {
return
}
}
}()
log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C")
<-forever
}

View File

@ -0,0 +1,5 @@
module firstConsumer
go 1.23.2
require github.com/rabbitmq/amqp091-go v1.10.0

View File

@ -0,0 +1,4 @@
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

View File

@ -0,0 +1,17 @@
# Пример для publisher
FROM golang:1.23
WORKDIR /app
# Копируем модули и загружаем зависимости
COPY go.mod go.sum ./
RUN go mod download
# Копируем исходный код
COPY . .
# Сборка
RUN go build -o publisher .
# Запуск
CMD ["./publisher"]

View File

@ -0,0 +1,5 @@
module publisher
go 1.23.2
require github.com/rabbitmq/amqp091-go v1.10.0

View File

@ -0,0 +1,4 @@
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

View File

@ -0,0 +1,70 @@
package main
import (
"context"
"log"
"strconv"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
failOnError(err, "Не удалось подключиться к RabbitMQ")
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
}
}(conn)
ch, err := conn.Channel()
failOnError(err, "Не удалось открыть канал")
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
}
}(ch)
err = ch.ExchangeDeclare(
"tax_notifications", // имя
"fanout", // тип
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // аргументы
)
failOnError(err, "Не удалось объявить exchange")
i := 1
for {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := "Уведомление о штрафе за нарушение ПДД №" + strconv.Itoa(i)
err = ch.PublishWithContext(ctx,
"tax_notifications", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Не удалось отправить сообщение")
log.Printf(" [x] Отправлено: %s", body)
time.Sleep(1 * time.Second) // Генерация сообщений раз в секунду
i++
}
}

View File

@ -0,0 +1,17 @@
# Пример для Consumer 1
FROM golang:1.23
WORKDIR /app
# Копируем модули и загружаем зависимости
COPY go.mod go.sum ./
RUN go mod download
# Копируем исходный код
COPY . .
# Сборка
RUN go build -o secondConsumer .
# Запуск
CMD ["./secondConsumer"]

View File

@ -0,0 +1,5 @@
module secondConsumer
go 1.23.2
require github.com/rabbitmq/amqp091-go v1.10.0

View File

@ -0,0 +1,4 @@
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

View File

@ -0,0 +1,90 @@
package main
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func failError(err error, msg string) {
if err != nil {
log.Panicf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
failError(err, "Не удалось подключиться к RabbitMQ")
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
}
}(conn)
ch, err := conn.Channel()
failError(err, "Не удалось открыть канал")
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
}
}(ch)
err = ch.ExchangeDeclare(
"tax_notifications", // имя
"fanout", // тип
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // аргументы
)
failError(err, "Не удалось объявить exchange")
q, err := ch.QueueDeclare(
"fast_consumer_queue", // имя очереди
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // аргументы
)
failError(err, "Не удалось объявить очередь")
err = ch.QueueBind(
q.Name, // имя очереди
"", // routing key
"tax_notifications", // exchange
false,
nil,
)
failError(err, "Не удалось привязать очередь")
msgs, err := ch.Consume(
q.Name, // очередь
"", // firstConsumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // аргументы
)
failError(err, "Не удалось зарегистрировать firstConsumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(" [x] Получено: %s", d.Body)
log.Printf(" [x] Обработка завершена")
err := d.Ack(false)
if err != nil {
return
}
}
}()
log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C")
<-forever
}

View File

@ -0,0 +1,38 @@
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
timeout: 5s
retries: 5
publisher:
build: ThirdTutorial/publisher
depends_on:
rabbitmq:
condition: service_healthy
environment:
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
first_consumer:
build: ThirdTutorial/firstConsumer
depends_on:
rabbitmq:
condition: service_healthy
environment:
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
second_consumer:
build: ThirdTutorial/secondConsumer
depends_on:
rabbitmq:
condition: service_healthy
environment:
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/