forked from Alexey/DAS_2024_1
Compare commits
2 Commits
zhimolostn
...
bogdanov_d
| Author | SHA1 | Date | |
|---|---|---|---|
| 735a403027 | |||
| c67049687b |
2
bogdanov_dmitry_lab_3/.gitignore
vendored
Normal file
2
bogdanov_dmitry_lab_3/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
/.idea
|
||||
/.venv
|
||||
22
bogdanov_dmitry_lab_3/README.md
Normal file
22
bogdanov_dmitry_lab_3/README.md
Normal file
@@ -0,0 +1,22 @@
|
||||
# Лабораторная работа №3
|
||||
|
||||
## Богданов Дмитрий ПИбд-42
|
||||
|
||||
### Для выполнения были выбраны следующие сущности:
|
||||
|
||||
* Message - содержит uuid (генерируется), text, datetime_sent, user_id
|
||||
* User - содержит uuid (генерируется), name, surname
|
||||
|
||||
Одному пользователю может быть присвоено несколько сообщений.
|
||||
|
||||
Соответственно были развернуты 2 сервиса для управления этими сущностями.
|
||||
|
||||
### Запуск лабораторной:
|
||||
Необходимо перейти в папку с файлом compose.yaml и ввести следующую команду:
|
||||
```
|
||||
docker-compose up --build -d
|
||||
```
|
||||
|
||||
## Видео с результатом запуска и тестами...
|
||||
|
||||
...можно посмотреть по данной [ссылке](https://drive.google.com/file/d/1cJz0z4KduSz1oltmAuieUW7GxxVLNPNo/view).
|
||||
27
bogdanov_dmitry_lab_3/compose.yaml
Normal file
27
bogdanov_dmitry_lab_3/compose.yaml
Normal file
@@ -0,0 +1,27 @@
|
||||
services:
|
||||
|
||||
user_service:
|
||||
container_name: userService
|
||||
build:
|
||||
context: .
|
||||
dockerfile: ./userService/Dockerfile
|
||||
expose:
|
||||
- 20001
|
||||
|
||||
message_service:
|
||||
container_name: messageService
|
||||
build:
|
||||
context: .
|
||||
dockerfile: ./messageService/Dockerfile
|
||||
expose:
|
||||
- 20002
|
||||
|
||||
nginx:
|
||||
image: nginx:latest
|
||||
ports:
|
||||
- "80:80"
|
||||
volumes:
|
||||
- ./nginx.conf:/etc/nginx/nginx.conf
|
||||
depends_on:
|
||||
- user_service
|
||||
- message_service
|
||||
11
bogdanov_dmitry_lab_3/messageService/Dockerfile
Normal file
11
bogdanov_dmitry_lab_3/messageService/Dockerfile
Normal file
@@ -0,0 +1,11 @@
|
||||
FROM python:latest
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY messageService/messageService.py .
|
||||
|
||||
CMD ["python", "messageService.py"]
|
||||
138
bogdanov_dmitry_lab_3/messageService/messageService.py
Normal file
138
bogdanov_dmitry_lab_3/messageService/messageService.py
Normal file
@@ -0,0 +1,138 @@
|
||||
from flask import Flask, request, jsonify
|
||||
from uuid import uuid4
|
||||
import uuid
|
||||
import datetime
|
||||
import requests
|
||||
|
||||
class Message:
|
||||
def __init__(self, text: str, datetime_sent: datetime, uuid_: uuid, user_id: uuid):
|
||||
if uuid_ is None:
|
||||
self.uuid_ = uuid4()
|
||||
else:
|
||||
self.uuid_ = uuid.UUID(uuid_)
|
||||
self.text = text
|
||||
self.datetime_sent = datetime_sent
|
||||
self.user_id = uuid.UUID(user_id)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'text': self.text,
|
||||
'datetime_sent': self.datetime_sent,
|
||||
'user_id': self.user_id,
|
||||
'uuid': self.uuid_
|
||||
}
|
||||
|
||||
def to_dict_for_users(self):
|
||||
return {
|
||||
'title': self.text,
|
||||
'datetime_sent': self.datetime_sent,
|
||||
'uuid': self.uuid_
|
||||
}
|
||||
|
||||
def to_dict_with_info(self, user: dict):
|
||||
return {
|
||||
'title': self.text,
|
||||
'datetime_sent': self.datetime_sent,
|
||||
'user_id': self.user_id,
|
||||
'user_info': user,
|
||||
'uuid': self.uuid_
|
||||
}
|
||||
|
||||
messages = [
|
||||
Message(text='Hi!', datetime_sent=datetime.datetime.now(), uuid_='4add0525-1857-477d-ad35-56790d400b72', user_id='94b171ea-39f6-4a67-9c67-061743f67cfd'),
|
||||
Message(text='Hello this is a message', datetime_sent=datetime.datetime.now(), uuid_='dd69758d-89e8-49b5-86bf-54ae2adb64e8', user_id='724a3192-70dd-4909-9b0f-c9060a4ab1bd'),
|
||||
Message(text='Test', datetime_sent=datetime.datetime.now(), uuid_='92389e8d-4365-457e-b37e-78abbc07f194', user_id='94b171ea-39f6-4a67-9c67-061743f67cfd'),
|
||||
Message(text='Anyone here?', datetime_sent=datetime.datetime.now(), uuid_='f3a1c526-aca2-47e2-afd3-a1c2eac92458', user_id='724a3192-70dd-4909-9b0f-c9060a4ab1bd'),
|
||||
Message(text='Mambo', datetime_sent=datetime.datetime.now(), uuid_='00abbdb5-e480-4842-bc32-f916894757eb', user_id='46672ea5-3d7b-4137-a0ac-efd898ca4db6')
|
||||
]
|
||||
|
||||
def list_jsonify():
|
||||
return jsonify([message.to_dict() for message in messages])
|
||||
|
||||
|
||||
app = Flask(__name__)
|
||||
users_url = 'http://userService:20001/'
|
||||
|
||||
@app.route('/', methods=['GET'])
|
||||
def get_all():
|
||||
return list_jsonify(), 200
|
||||
|
||||
@app.route('/info', methods=['GET'])
|
||||
def get_all_full():
|
||||
users: list[dict] = requests.get(users_url).json()
|
||||
response = []
|
||||
for message in messages:
|
||||
for user in users:
|
||||
if message.user_id == uuid.UUID(user.get('uuid')):
|
||||
response.append(message.to_dict_with_info(user))
|
||||
|
||||
return response, 200
|
||||
|
||||
@app.route('/by-user/<uuid:user_uuid>', methods=['GET'])
|
||||
def get_by_user_id(user_uuid):
|
||||
return [message.to_dict_for_users() for message in messages if message.user_id == user_uuid], 200
|
||||
|
||||
@app.route('/info/<uuid:uuid_>', methods=['GET'])
|
||||
def get_one_full(uuid_):
|
||||
for message in messages:
|
||||
if message.uuid_ == uuid_:
|
||||
response = requests.get(users_url + str(message.user_id))
|
||||
return message.to_dict_with_info(response.json()), 200
|
||||
|
||||
return f'Сообщение с uuid {uuid_} не найдено', 404
|
||||
|
||||
@app.route('/', methods=['POST'])
|
||||
def create():
|
||||
data = request.json
|
||||
text = data.get('text', None)
|
||||
datetime_sent = datetime.datetime.now()
|
||||
user_id = data.get('user_id', None)
|
||||
checking = requests.get(users_url + f'/check/{user_id}')
|
||||
print(checking)
|
||||
if checking.status_code == 200:
|
||||
new_message = Message(text, datetime_sent, None, user_id)
|
||||
messages.append(new_message)
|
||||
return get_one(new_message.uuid_)
|
||||
if checking.status_code == 404:
|
||||
return f'Пользователь с uuid {user_id} не существует', 404
|
||||
|
||||
return 'Неизвестная ошибка', 500
|
||||
|
||||
@app.route('/<uuid:uuid_>', methods=['PUT'])
|
||||
def update_by_id(uuid_):
|
||||
data = request.json
|
||||
new_text = data.get('text', None)
|
||||
|
||||
for message in messages:
|
||||
print(message.uuid_)
|
||||
|
||||
if message.uuid_ == uuid_:
|
||||
if new_text is not None:
|
||||
message.text = new_text
|
||||
return get_one(message.uuid_)
|
||||
|
||||
return f'Сообщение с uuid {uuid_} не найдено', 404
|
||||
|
||||
|
||||
@app.route('/<uuid:uuid_>', methods=['DELETE'])
|
||||
def delete(uuid_):
|
||||
for message in messages:
|
||||
if message.uuid_ == uuid_:
|
||||
messages.remove(message)
|
||||
return 'Сообщение успешно удалено', 200
|
||||
|
||||
return f'Сообщение с uuid {uuid_} не найдено', 404
|
||||
|
||||
|
||||
|
||||
@app.route('/<uuid:uuid_>', methods=['GET'])
|
||||
def get_one(uuid_):
|
||||
for message in messages:
|
||||
if message.uuid_ == uuid_:
|
||||
return message.to_dict(), 200
|
||||
|
||||
return f'Сообщение с uuid {uuid_} не найдено', 404
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', port=20002, debug=True)
|
||||
25
bogdanov_dmitry_lab_3/nginx.conf
Normal file
25
bogdanov_dmitry_lab_3/nginx.conf
Normal file
@@ -0,0 +1,25 @@
|
||||
events { worker_connections 1024; }
|
||||
|
||||
http {
|
||||
server {
|
||||
listen 80;
|
||||
listen [::]:80;
|
||||
server_name localhost;
|
||||
|
||||
location /userService/ {
|
||||
proxy_pass http://userService:20001/;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded-Proto $scheme;
|
||||
}
|
||||
|
||||
location /messageService/ {
|
||||
proxy_pass http://messageService:20002/;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded-Proto $scheme;
|
||||
}
|
||||
}
|
||||
}
|
||||
2
bogdanov_dmitry_lab_3/requirements.txt
Normal file
2
bogdanov_dmitry_lab_3/requirements.txt
Normal file
@@ -0,0 +1,2 @@
|
||||
Flask==3.0.3
|
||||
requests==2.32.3
|
||||
11
bogdanov_dmitry_lab_3/userService/Dockerfile
Normal file
11
bogdanov_dmitry_lab_3/userService/Dockerfile
Normal file
@@ -0,0 +1,11 @@
|
||||
FROM python:latest
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY userService/userService.py .
|
||||
|
||||
CMD ["python", "userService.py"]
|
||||
115
bogdanov_dmitry_lab_3/userService/userService.py
Normal file
115
bogdanov_dmitry_lab_3/userService/userService.py
Normal file
@@ -0,0 +1,115 @@
|
||||
from flask import Flask, jsonify, request
|
||||
from uuid import uuid4
|
||||
import uuid
|
||||
import requests
|
||||
|
||||
|
||||
class User:
|
||||
def __init__(self, name, surname, uuid_: uuid):
|
||||
if uuid_ is None:
|
||||
self.uuid_: uuid = uuid4()
|
||||
else:
|
||||
self.uuid_: uuid = uuid.UUID(uuid_)
|
||||
self.name: str = name
|
||||
self.surname: str = surname
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
"uuid": self.uuid_,
|
||||
"name": self.name,
|
||||
"surname": self.surname
|
||||
}
|
||||
|
||||
def to_dict_with_messages(self, messages: list):
|
||||
return {
|
||||
"uuid": self.uuid_,
|
||||
"name": self.name,
|
||||
"surname": self.surname,
|
||||
"messages": messages
|
||||
}
|
||||
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
users: list[User] = [
|
||||
User(name='Dr.', surname='Kino', uuid_='94b171ea-39f6-4a67-9c67-061743f67cfd'),
|
||||
User(name='Caspian', surname='Holstrom', uuid_='724a3192-70dd-4909-9b0f-c9060a4ab1bd'),
|
||||
User(name='Admin', surname='Admin', uuid_='46672ea5-3d7b-4137-a0ac-efd898ca4db6')
|
||||
]
|
||||
|
||||
messages_url = 'http://messageService:20002/'
|
||||
|
||||
|
||||
def list_jsonify():
|
||||
return jsonify([user.to_dict() for user in users])
|
||||
|
||||
@app.route('/', methods=['GET'])
|
||||
def get_all():
|
||||
return list_jsonify(), 200
|
||||
|
||||
@app.route('/<uuid:uuid_>', methods=['GET'])
|
||||
def get_one(uuid_):
|
||||
for user in users:
|
||||
if user.uuid_ == uuid_:
|
||||
return user.to_dict(), 200
|
||||
|
||||
return f'Пользователь с uuid {uuid_} не найден', 404
|
||||
|
||||
@app.route('/info/<uuid:uuid_>', methods=['GET'])
|
||||
def get_one_with_messages(uuid_):
|
||||
for user in users:
|
||||
if user.uuid_ == uuid_:
|
||||
response = requests.get(messages_url + f'by-user/{uuid_}')
|
||||
print(response.json())
|
||||
return user.to_dict_with_messages(response.json()), 200
|
||||
|
||||
return f'Пользователь с uuid {uuid_} не найден', 404
|
||||
|
||||
@app.route('/check/<uuid:uuid_>', methods=['GET'])
|
||||
def check_exist(uuid_):
|
||||
for user in users:
|
||||
if user.uuid_ == uuid_:
|
||||
return '', 200
|
||||
return '', 404
|
||||
|
||||
@app.route('/', methods=['POST'])
|
||||
def create():
|
||||
data = request.json
|
||||
name = data.get('name', None)
|
||||
surname = data.get('surname', None)
|
||||
if name is None or surname is None:
|
||||
return 'Недостаточно информации для создания пользователя', 404
|
||||
|
||||
new_user = User(name, surname, None)
|
||||
users.append(new_user)
|
||||
return get_one(new_user.uuid_)
|
||||
|
||||
@app.route('/<uuid:uuid_>', methods=['PUT'])
|
||||
def update_by_id(uuid_):
|
||||
data = request.json
|
||||
new_name = data.get('name', None)
|
||||
new_surname = data.get('surname', None)
|
||||
|
||||
for user in users:
|
||||
if user.uuid_ == uuid_:
|
||||
if new_name is not None:
|
||||
user.name = new_name
|
||||
if new_surname is not None:
|
||||
user.surname = new_surname
|
||||
return get_one(user.uuid_)
|
||||
|
||||
return f'Пользователь с uuid {uuid_} не найден', 404
|
||||
|
||||
|
||||
@app.route('/<uuid:uuid_>', methods=['DELETE'])
|
||||
def delete(uuid_):
|
||||
for user in users:
|
||||
if user.uuid_ == uuid_:
|
||||
users.remove(user)
|
||||
return 'Пользователь удален', 200
|
||||
|
||||
return f'Пользователь с uuid {uuid_} не найден', 404
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', port=20001, debug=True)
|
||||
@@ -1,5 +0,0 @@
|
||||
module FirstTutorial
|
||||
|
||||
go 1.23.2
|
||||
|
||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
||||
@@ -1,4 +0,0 @@
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
@@ -1,65 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||
failOnError(err, "Failed to connect to RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Failed to open a channel")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"hello", // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
failOnError(err, "Failed to declare a queue")
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
q.Name, // queue
|
||||
"", // consumer
|
||||
true, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
failOnError(err, "Failed to register a consumer")
|
||||
|
||||
var forever chan struct{}
|
||||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
log.Printf("Received a message: %s", d.Body)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
|
||||
<-forever
|
||||
}
|
||||
@@ -1,63 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||
failOnError(err, "Failed to connect to RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Failed to open a channel")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"hello", // name
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
failOnError(err, "Failed to declare a queue")
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
for d := range 10 {
|
||||
body := "This message for consumer by number " + strconv.Itoa(d+1)
|
||||
err = ch.PublishWithContext(ctx,
|
||||
"", // exchange
|
||||
q.Name, // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: []byte(body),
|
||||
})
|
||||
failOnError(err, "Failed to publish a message")
|
||||
log.Printf(" [x] Sent %s\n", body)
|
||||
}
|
||||
}
|
||||
@@ -1,80 +0,0 @@
|
||||
# Отчет по лабораторной работе №4
|
||||
|
||||
## Поставленные задачи
|
||||
|
||||
1. Установить брокер сообщений RabbitMQ.
|
||||
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
|
||||
3. Продемонстрировать работу брокера сообщений.
|
||||
|
||||
## Предметная область
|
||||
|
||||
Уведомление о штрафах за нарушение ПДД.
|
||||
|
||||
## Запуск работы
|
||||
|
||||
1. Убедиться, что установлены необходимые технологии:
|
||||
- **Docker**: Платформа для контейнеризации приложений.
|
||||
- **RabbitMQ**: Брокер сообщений.
|
||||
- **Docker Compose**: Инструмент для запуска многоконтейнерных приложений на основе `docker-compose.yaml`. Обычно поставляется вместе с Docker. Чтобы проверить, установлена ли утилита, нужно запустить команду:
|
||||
```bash
|
||||
docker-compose --version
|
||||
```
|
||||
|
||||
2. В директории, где находится файл `docker-compose.yaml`, выполнить следующую команду для запуска всех сервисов:
|
||||
```bash
|
||||
docker-compose up --build
|
||||
```
|
||||
Эта команда сначала выполнит сборку, а затем запустит контейнеры.
|
||||
|
||||
3. После успешного запуска можно перейти на RabbitMQ Management UI:
|
||||
- RabbitMQ Management UI: [http://localhost:15672/#/](http://localhost:15672/#/).
|
||||
|
||||
## Технологии
|
||||
|
||||
1. Golang: основной язык программирования.
|
||||
2. Docker & Docker Compose: для контейнеризации сервисов и удобного развертывания.
|
||||
3. RabbitMQ: брокер сообщений.
|
||||
|
||||
## Анализ полученных данных
|
||||
|
||||

|
||||
Для замедленной очереди можно сделать следующие выводы:
|
||||
- Все 366 сообщений доставлены потребителям, но они еще не подтверждены.
|
||||
Поскольку "Ready" равен нулю, это означает, что все сообщения сразу передаются потребителям,
|
||||
и ни одно сообщение не ожидает очереди.
|
||||
- Однако, поскольку все 366 сообщений находятся в состоянии "Unacked",
|
||||
это указывает на то, что потребитель не подтверждает обработку сообщений вовремя,
|
||||
что связано с медленной обработкой (как предполагает название очереди slow_consumer_queue).
|
||||
- Публикация сообщений идет с постоянной скоростью 1 сообщение в секунду,
|
||||
и с той же скоростью сообщения доставляются потребителю.
|
||||
- Однако скорость подтверждения (Consumer ack) составляет лишь 0.60 сообщений в секунду,
|
||||
что означает, что потребитель обрабатывает и подтверждает сообщения медленнее, чем они поступают.
|
||||
Это вызывает накопление неподтвержденных сообщений в очереди (Unacked), и со временем их число
|
||||
может продолжать расти, если скорость обработки не увеличится.
|
||||
|
||||

|
||||
Для быстрой очереди можно сделать следующие выводы:
|
||||
- Очередь пуста. Т.е. нет сообщений, которые ожидают обработки или находятся в процессе обработки.
|
||||
Это означает, что потребитель обрабатывает сообщения сразу после их получения, и сообщения не
|
||||
накапливаются в очереди.
|
||||
- Сообщения публикуются и доставляются потребителю со скоростью 1 сообщение в секунду, и
|
||||
потребитель подтверждает обработку каждого сообщения также со скоростью 1 сообщение в секунду.
|
||||
- Поскольку скорость подтверждения и доставки совпадают, сообщений не накапливается, и все
|
||||
обрабатывается своевременно.
|
||||
|
||||
## Ход работы
|
||||
|
||||
1. **Прохождение первого урока**: директория FirstTutorial
|
||||

|
||||
|
||||
2. **Прохождение второго урока**: директория SecondTutorial
|
||||

|
||||
|
||||
3. **Прохождение третьего урока и выполнение задания на ЛР №3**: директория ThirdTutorial
|
||||

|
||||
|
||||
В коде присутствуют пояснительные комментарии.
|
||||
|
||||
## Демонстрационное видео
|
||||
|
||||
Видеозапись доступна по адресу: [https://vk.com/video193898050_456240872](https://vk.com/video193898050_456240872)
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 32 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 253 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 245 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 33 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 223 KiB |
@@ -1,5 +0,0 @@
|
||||
module SecondTutorial
|
||||
|
||||
go 1.23.2
|
||||
|
||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
||||
@@ -1,4 +0,0 @@
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
@@ -1,74 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||
failOnError(err, "Failed to connect to RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Failed to open a channel")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"task", // name
|
||||
true, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
failOnError(err, "Failed to declare a queue")
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
body := bodyFrom(os.Args)
|
||||
err = ch.PublishWithContext(ctx,
|
||||
"", // exchange
|
||||
q.Name, // routing key
|
||||
false, // mandatory
|
||||
false,
|
||||
amqp.Publishing{
|
||||
DeliveryMode: amqp.Persistent,
|
||||
ContentType: "text/plain",
|
||||
Body: []byte(body),
|
||||
})
|
||||
failOnError(err, "Failed to publish a message")
|
||||
log.Printf(" [x] Sent %s", body)
|
||||
}
|
||||
|
||||
func bodyFrom(args []string) string {
|
||||
var s string
|
||||
if (len(args) < 2) || os.Args[1] == "" {
|
||||
s = "hello"
|
||||
} else {
|
||||
s = strings.Join(args[1:], " ")
|
||||
}
|
||||
return s
|
||||
}
|
||||
@@ -1,82 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
|
||||
failOnError(err, "Failed to connect to RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Failed to open a channel")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"task", // name
|
||||
true, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // arguments
|
||||
)
|
||||
failOnError(err, "Failed to declare a queue")
|
||||
|
||||
err = ch.Qos(
|
||||
1, // prefetch count
|
||||
0, // prefetch size
|
||||
false, // global
|
||||
)
|
||||
failOnError(err, "Failed to set QoS")
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
q.Name, // queue
|
||||
"", // consumer
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
failOnError(err, "Failed to register a consumer")
|
||||
|
||||
var forever chan struct{}
|
||||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
log.Printf("Received a message: %s", d.Body)
|
||||
countChar := bytes.Count(d.Body, []byte(""))
|
||||
t := time.Duration(countChar)
|
||||
time.Sleep(t * time.Second)
|
||||
log.Printf("Done")
|
||||
err := d.Ack(false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
|
||||
<-forever
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
# Пример для Consumer 1
|
||||
FROM golang:1.23
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Копируем модули и загружаем зависимости
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
# Копируем исходный код
|
||||
COPY . .
|
||||
|
||||
# Сборка
|
||||
RUN go build -o firstConsumer .
|
||||
|
||||
# Запуск
|
||||
CMD ["./firstConsumer"]
|
||||
@@ -1,92 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
||||
failOnError(err, "Не удалось подключиться к RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Не удалось открыть канал")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
err = ch.ExchangeDeclare(
|
||||
"tax_notifications", // имя
|
||||
"fanout", // тип
|
||||
true, // durable
|
||||
false, // auto-deleted
|
||||
false, // internal
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failOnError(err, "Не удалось объявить exchange")
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"slow_consumer_queue", // имя очереди
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failOnError(err, "Не удалось объявить очередь")
|
||||
|
||||
err = ch.QueueBind(
|
||||
q.Name, // имя очереди
|
||||
"", // routing key
|
||||
"tax_notifications", // exchange
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
failOnError(err, "Не удалось привязать очередь")
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
q.Name, // очередь
|
||||
"", // firstConsumer
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failOnError(err, "Не удалось зарегистрировать firstConsumer")
|
||||
|
||||
var forever chan struct{}
|
||||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
log.Printf(" [x] Получено: %s", d.Body)
|
||||
time.Sleep(2 * time.Second) // Задержка в 2 секунды
|
||||
log.Printf(" [x] Обработка завершена")
|
||||
err := d.Ack(false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C")
|
||||
<-forever
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
module firstConsumer
|
||||
|
||||
go 1.23.2
|
||||
|
||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
||||
@@ -1,4 +0,0 @@
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
@@ -1,17 +0,0 @@
|
||||
# Пример для publisher
|
||||
FROM golang:1.23
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Копируем модули и загружаем зависимости
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
# Копируем исходный код
|
||||
COPY . .
|
||||
|
||||
# Сборка
|
||||
RUN go build -o publisher .
|
||||
|
||||
# Запуск
|
||||
CMD ["./publisher"]
|
||||
@@ -1,5 +0,0 @@
|
||||
module publisher
|
||||
|
||||
go 1.23.2
|
||||
|
||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
||||
@@ -1,4 +0,0 @@
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
@@ -1,70 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failOnError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
||||
failOnError(err, "Не удалось подключиться к RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failOnError(err, "Не удалось открыть канал")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
err = ch.ExchangeDeclare(
|
||||
"tax_notifications", // имя
|
||||
"fanout", // тип
|
||||
true, // durable
|
||||
false, // auto-deleted
|
||||
false, // internal
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failOnError(err, "Не удалось объявить exchange")
|
||||
|
||||
i := 1
|
||||
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
body := "Уведомление о штрафе за нарушение ПДД №" + strconv.Itoa(i)
|
||||
err = ch.PublishWithContext(ctx,
|
||||
"tax_notifications", // exchange
|
||||
"", // routing key
|
||||
false, // mandatory
|
||||
false, // immediate
|
||||
amqp.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: []byte(body),
|
||||
})
|
||||
failOnError(err, "Не удалось отправить сообщение")
|
||||
|
||||
log.Printf(" [x] Отправлено: %s", body)
|
||||
time.Sleep(1 * time.Second) // Генерация сообщений раз в секунду
|
||||
i++
|
||||
}
|
||||
}
|
||||
@@ -1,17 +0,0 @@
|
||||
# Пример для Consumer 1
|
||||
FROM golang:1.23
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Копируем модули и загружаем зависимости
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
# Копируем исходный код
|
||||
COPY . .
|
||||
|
||||
# Сборка
|
||||
RUN go build -o secondConsumer .
|
||||
|
||||
# Запуск
|
||||
CMD ["./secondConsumer"]
|
||||
@@ -1,5 +0,0 @@
|
||||
module secondConsumer
|
||||
|
||||
go 1.23.2
|
||||
|
||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
||||
@@ -1,4 +0,0 @@
|
||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
@@ -1,90 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
|
||||
amqp "github.com/rabbitmq/amqp091-go"
|
||||
)
|
||||
|
||||
func failError(err error, msg string) {
|
||||
if err != nil {
|
||||
log.Panicf("%s: %s", msg, err)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
conn, err := amqp.Dial("amqp://guest:guest@rabbitmq:5672/")
|
||||
failError(err, "Не удалось подключиться к RabbitMQ")
|
||||
defer func(conn *amqp.Connection) {
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(conn)
|
||||
|
||||
ch, err := conn.Channel()
|
||||
failError(err, "Не удалось открыть канал")
|
||||
defer func(ch *amqp.Channel) {
|
||||
err := ch.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(ch)
|
||||
|
||||
err = ch.ExchangeDeclare(
|
||||
"tax_notifications", // имя
|
||||
"fanout", // тип
|
||||
true, // durable
|
||||
false, // auto-deleted
|
||||
false, // internal
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failError(err, "Не удалось объявить exchange")
|
||||
|
||||
q, err := ch.QueueDeclare(
|
||||
"fast_consumer_queue", // имя очереди
|
||||
false, // durable
|
||||
false, // delete when unused
|
||||
false, // exclusive
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failError(err, "Не удалось объявить очередь")
|
||||
|
||||
err = ch.QueueBind(
|
||||
q.Name, // имя очереди
|
||||
"", // routing key
|
||||
"tax_notifications", // exchange
|
||||
false,
|
||||
nil,
|
||||
)
|
||||
failError(err, "Не удалось привязать очередь")
|
||||
|
||||
msgs, err := ch.Consume(
|
||||
q.Name, // очередь
|
||||
"", // firstConsumer
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // аргументы
|
||||
)
|
||||
failError(err, "Не удалось зарегистрировать firstConsumer")
|
||||
|
||||
var forever chan struct{}
|
||||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
log.Printf(" [x] Получено: %s", d.Body)
|
||||
log.Printf(" [x] Обработка завершена")
|
||||
err := d.Ack(false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf(" [*] Ожидание сообщений. Для выхода нажмите CTRL+C")
|
||||
<-forever
|
||||
}
|
||||
@@ -1,38 +0,0 @@
|
||||
services:
|
||||
rabbitmq:
|
||||
image: rabbitmq:3-management
|
||||
ports:
|
||||
- "5672:5672"
|
||||
- "15672:15672"
|
||||
environment:
|
||||
RABBITMQ_DEFAULT_USER: guest
|
||||
RABBITMQ_DEFAULT_PASS: guest
|
||||
healthcheck:
|
||||
test: ["CMD", "rabbitmqctl", "status"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
|
||||
publisher:
|
||||
build: ThirdTutorial/publisher
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
environment:
|
||||
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
||||
|
||||
first_consumer:
|
||||
build: ThirdTutorial/firstConsumer
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
environment:
|
||||
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
||||
|
||||
second_consumer:
|
||||
build: ThirdTutorial/secondConsumer
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
environment:
|
||||
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
|
||||
Reference in New Issue
Block a user