presnyakova_victoria_lab_4 #134
27
presnyakova_victoria_lab_3/docker-compose.yml
Normal file
@ -0,0 +1,27 @@
|
||||
version: '3.8'
|
||||
services:
|
||||
event_service:
|
||||
build:
|
||||
context: /event_service
|
||||
dockerfile: Dockerfile
|
||||
depends_on:
|
||||
- session_service
|
||||
expose:
|
||||
- 8081
|
||||
|
||||
session_service:
|
||||
build:
|
||||
context: /session_service
|
||||
dockerfile: Dockerfile
|
||||
expose:
|
||||
- 8082
|
||||
|
||||
nginx:
|
||||
image: nginx
|
||||
ports:
|
||||
- 8086:8086
|
||||
volumes:
|
||||
- ./nginx.conf:/etc/nginx/nginx.conf
|
||||
depends_on:
|
||||
- event_service
|
||||
- session_service
|
11
presnyakova_victoria_lab_3/event_service/Dockerfile
Normal file
@ -0,0 +1,11 @@
|
||||
FROM python:3.9
|
||||
|
||||
EXPOSE 8081
|
||||
|
||||
RUN pip install Flask requests
|
||||
|
||||
WORKDIR /work
|
||||
|
||||
COPY event_service.py ./
|
||||
|
||||
CMD ["python", "event_service.py"]
|
48
presnyakova_victoria_lab_3/event_service/event_service.py
Normal file
@ -0,0 +1,48 @@
|
||||
from flask import Flask, jsonify, request, Response
|
||||
from datetime import datetime
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
events = {0: {'id': 0, 'session': 0, 'info': '{"defects": 1}', 'dt': datetime(2024, 10, 10, 8, 0)},
|
||||
1: {'id': 1, 'session': 1, 'info': '{"defects": 1}', 'dt': datetime(2024, 10, 10, 8, 5)},
|
||||
2: {'id': 2, 'session': 0, 'info': '{"defects": 1}', 'dt': datetime(2024, 10, 10, 8, 10)},
|
||||
3: {'id': 3, 'session': 1, 'info': '{"defects": 1}', 'dt': datetime(2024, 10, 10, 8, 15)}}
|
||||
|
||||
|
||||
@app.route('/', methods=['GET', 'POST'])
|
||||
def get_all():
|
||||
if request.method == 'POST':
|
||||
dto = request.get_json()['dto']
|
||||
new_id = max(events.keys()) + 1
|
||||
events[new_id] = {
|
||||
'id': new_id,
|
||||
'session': dto['session'],
|
||||
'info': dto['info'],
|
||||
'dt': datetime.now()
|
||||
}
|
||||
return jsonify(events[new_id])
|
||||
return jsonify(events)
|
||||
|
||||
|
||||
@app.route('/<int:event_id>', methods=['GET', 'PUT', 'DELETE'])
|
||||
def get_by_id(event_id):
|
||||
if event_id not in events.keys():
|
||||
return Response(status=404)
|
||||
|
||||
if request.method == 'PUT':
|
||||
dto = request.get_json()['dto']
|
||||
events[event_id] = {
|
||||
'session': dto['session'],
|
||||
'info': dto['info'],
|
||||
'dt': datetime.now()
|
||||
}
|
||||
return events[event_id]
|
||||
elif request.method == 'DELETE':
|
||||
events.pop(event_id)
|
||||
return Response(status=200)
|
||||
|
||||
return jsonify(events[event_id])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', use_reloader=False, port=8081)
|
28
presnyakova_victoria_lab_3/nginx.conf
Normal file
@ -0,0 +1,28 @@
|
||||
events {
|
||||
worker_connections 1024;
|
||||
}
|
||||
|
||||
http {
|
||||
server {
|
||||
listen 8086;
|
||||
listen [::]:8086;
|
||||
server_name localhost;
|
||||
|
||||
location /event_service/ {
|
||||
proxy_pass http://event_service:8081/;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-Proto $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded-Prefix $scheme;
|
||||
}
|
||||
|
||||
location /session_service/ {
|
||||
proxy_pass http://session_service:8082/;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-Proto $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded-Prefix $scheme;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
31
presnyakova_victoria_lab_3/readme.md
Normal file
@ -0,0 +1,31 @@
|
||||
# Задание:
|
||||
|
||||
- Создать 2 микросервиса, реализующих CRUD на связанных сущностях.
|
||||
- Реализовать механизм синхронного обмена сообщениями между микросервисами.
|
||||
- Реализовать шлюз на основе прозрачного прокси-сервера nginx.
|
||||
|
||||
Вариант: Сессии и события
|
||||
|
||||
## Выполнение
|
||||
|
||||
Были написаны два сервиса на языке python:
|
||||
|
||||
- Сервис session_service, хранящий данные о сессиях и реализующий CRUD операции с ними через HTTP запросы.
|
||||
- Сервис event_service, хранящий данные о событиях и реализующий CRUD операции с ними через HTTP запросы.
|
||||
|
||||
- Сервисы синхронно сообщены - сервис сессия запрашивает данные у сервиса событий для получения событий сессии.
|
||||
|
||||
Для сервисов прописаны файлы Dockerfile, описывающие создание контейнеров:
|
||||
|
||||
Для обоих контейнеров выбирается Python 3.9.
|
||||
Оба контейнера проявляют порты, на которых работает приложение: 8081 для событий и 8082 для сессий.
|
||||
В оба контейнера устанавливаются пакеты Flask и requests.
|
||||
Выбирается рабочая директория /work и туда копируются файлы скриптов.
|
||||
Командой запускаются сами скрипты.
|
||||
Общий yaml-файл развёртки был настроен следующим образом:
|
||||
|
||||
блок services, где перечислены разворачиваемые сервисы.
|
||||
для каждого сервиса прописан build, где объявляется его папка и докерфайл создания и зависимости.
|
||||
для сервиса nginx прописан порт для отображения вовне.
|
||||
|
||||
## Видео https://drive.google.com/file/d/1my-e51bAoxUaMHKpGJmd9ZXSR5GnmORS/view?usp=sharing
|
11
presnyakova_victoria_lab_3/session_service/Dockerfile
Normal file
@ -0,0 +1,11 @@
|
||||
FROM python:3.9
|
||||
|
||||
EXPOSE 8082
|
||||
|
||||
RUN pip install Flask requests flasgger
|
||||
|
||||
WORKDIR /work
|
||||
|
||||
COPY session_service.py ./
|
||||
|
||||
CMD ["python", "session_service.py"]
|
@ -0,0 +1,62 @@
|
||||
from flask import Flask, jsonify, request, Response
|
||||
from datetime import datetime
|
||||
import requests
|
||||
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
sessions = {0: {'id': 0, 'info': '{"distance": 1}', 'dt_start': datetime(2024, 10, 10, 8, 0),
|
||||
'dt_end': datetime(2024, 10, 11, 8, 0), 'events': [0, 1]},
|
||||
1: {'id': 1, 'info': '{"distance": 1}', 'dt_start': datetime(2024, 10, 12, 8, 0),
|
||||
'dt_end': datetime(2024, 10, 13, 8, 0), 'events': [0, 1]},
|
||||
2: {'id': 2, 'info': '{"distance": 1}', 'dt_start': datetime(2024, 10, 14, 8, 0),
|
||||
'dt_end': datetime(2024, 10, 15, 8, 0), 'events': [0, 1]},
|
||||
3: {'id': 3, 'info': '{"distance": 1}', 'dt_start': datetime(2024, 10, 16, 8, 0),
|
||||
'dt_end': datetime(2024, 10, 17, 8, 0), 'events': [0, 1]},
|
||||
}
|
||||
|
||||
|
||||
@app.route('/', methods=['GET', 'POST'])
|
||||
def get_all():
|
||||
if request.method == 'POST':
|
||||
dto = request.get_json()['dto']
|
||||
new_id = max(sessions.keys()) + 1
|
||||
sessions[new_id] = {
|
||||
'id': new_id,
|
||||
'info': dto['info'],
|
||||
'dt_start': datetime.now(),
|
||||
'dt_end': datetime.now() + datetime.timedelta(days=1)
|
||||
|
||||
}
|
||||
return jsonify(sessions[new_id])
|
||||
return jsonify(sessions)
|
||||
|
||||
|
||||
@app.route('/<int:session_id>', methods=['GET', 'PUT', 'DELETE'])
|
||||
def get_by_id(session_id):
|
||||
if session_id not in sessions.keys():
|
||||
return Response(status=404)
|
||||
|
||||
if request.method == 'PUT':
|
||||
dto = request.get_json()['dto']
|
||||
sessions[session_id] = {
|
||||
'info': dto['info'],
|
||||
'dt_start': datetime.now(),
|
||||
'dt_end': datetime.now() + datetime.timedelta(days=1),
|
||||
}
|
||||
return sessions[session_id]
|
||||
elif request.method == 'DELETE':
|
||||
sessions.pop(session_id)
|
||||
return Response(status=200)
|
||||
|
||||
session = sessions[session_id]
|
||||
events = []
|
||||
for event_id in session['events']:
|
||||
events.append(requests.get("http://event_service:8081/" + str(event_id)).json())
|
||||
session['events'] = events
|
||||
|
||||
return jsonify(session)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', use_reloader=False, port=8082)
|
BIN
presnyakova_victoria_lab_4/lessons/lesson_1/img.png
Normal file
After Width: | Height: | Size: 33 KiB |
BIN
presnyakova_victoria_lab_4/lessons/lesson_1/img_1.png
Normal file
After Width: | Height: | Size: 28 KiB |
25
presnyakova_victoria_lab_4/lessons/lesson_1/receive.py
Normal file
@ -0,0 +1,25 @@
|
||||
import pika, sys, os
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body}")
|
||||
|
||||
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
||||
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
channel.start_consuming()
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print('Interrupted')
|
||||
try:
|
||||
sys.exit(0)
|
||||
except SystemExit:
|
||||
os._exit(0)
|
11
presnyakova_victoria_lab_4/lessons/lesson_1/send.py
Normal file
@ -0,0 +1,11 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
|
||||
print(" [x] Sent 'Hello World!'")
|
||||
connection.close()
|
BIN
presnyakova_victoria_lab_4/lessons/lesson_2/img.png
Normal file
After Width: | Height: | Size: 114 KiB |
BIN
presnyakova_victoria_lab_4/lessons/lesson_2/img_1.png
Normal file
After Width: | Height: | Size: 88 KiB |
19
presnyakova_victoria_lab_4/lessons/lesson_2/new_task.py
Normal file
@ -0,0 +1,19 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
||||
channel.basic_publish(
|
||||
exchange='',
|
||||
routing_key='task_queue',
|
||||
body=message,
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=pika.DeliveryMode.Persistent
|
||||
))
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
22
presnyakova_victoria_lab_4/lessons/lesson_2/worker.py
Normal file
@ -0,0 +1,22 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body.decode()}")
|
||||
time.sleep(body.count(b'.'))
|
||||
print(" [x] Done")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
||||
|
||||
channel.start_consuming()
|
13
presnyakova_victoria_lab_4/lessons/lesson_3/emit_log.py
Normal file
@ -0,0 +1,13 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
||||
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
BIN
presnyakova_victoria_lab_4/lessons/lesson_3/img.png
Normal file
After Width: | Height: | Size: 39 KiB |
BIN
presnyakova_victoria_lab_4/lessons/lesson_3/img_1.png
Normal file
After Width: | Height: | Size: 30 KiB |
22
presnyakova_victoria_lab_4/lessons/lesson_3/receive_logs.py
Normal file
@ -0,0 +1,22 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
result = channel.queue_declare(queue='', exclusive=True)
|
||||
queue_name = result.method.queue
|
||||
|
||||
channel.queue_bind(exchange='logs', queue=queue_name)
|
||||
|
||||
print(' [*] Waiting for logs. To exit press CTRL+C')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] {body}")
|
||||
|
||||
channel.basic_consume(
|
||||
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
||||
|
||||
channel.start_consuming()
|
19
presnyakova_victoria_lab_4/publisher_app/app.py
Normal file
@ -0,0 +1,19 @@
|
||||
from flask import Flask, render_template, request
|
||||
from publisher import publish_messages
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route('/')
|
||||
def index():
|
||||
return "это publisher"
|
||||
|
||||
|
||||
@app.route('/publish', methods=['GET'])
|
||||
def publish():
|
||||
publish_messages()
|
||||
return 'Начало детекции дефектов'
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
27
presnyakova_victoria_lab_4/publisher_app/consumer_1.py
Normal file
@ -0,0 +1,27 @@
|
||||
import pika
|
||||
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print("Скрипт 1 обрабатывает дефект '{}'".format(body.decode()))
|
||||
|
||||
|
||||
def consume_messages():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
exchange_name = 'logs'
|
||||
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
|
||||
channel.queue_declare(queue='example_queue')
|
||||
channel.queue_bind(exchange=exchange_name, queue='example_queue')
|
||||
channel.basic_consume(queue='example_queue', on_message_callback=callback, auto_ack=True)
|
||||
print('Скрипт 1 ожидает дефектов')
|
||||
try:
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
print('Прервано. Останавливаем...')
|
||||
channel.stop_consuming()
|
||||
connection.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
consume_messages()
|
27
presnyakova_victoria_lab_4/publisher_app/consumer_2.py
Normal file
@ -0,0 +1,27 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print("Скрипт 2 обрабатывает дефект '{}'".format(body.decode()))
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
def consume_messages():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
exchange_name = 'logs'
|
||||
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
|
||||
channel.queue_declare(queue='example_queue_2')
|
||||
channel.queue_bind(exchange=exchange_name, queue='example_queue_2')
|
||||
channel.basic_consume(queue='example_queue_2', on_message_callback=callback, auto_ack=True)
|
||||
print('Скрипт 2 ожидает дефектов')
|
||||
try:
|
||||
channel.start_consuming()
|
||||
except KeyboardInterrupt:
|
||||
print('Прервано. Останавливаем...')
|
||||
channel.stop_consuming()
|
||||
connection.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
consume_messages()
|
19
presnyakova_victoria_lab_4/publisher_app/publisher.py
Normal file
@ -0,0 +1,19 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
|
||||
def publish_messages():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
exchange_name = 'logs'
|
||||
channel.exchange_declare(exchange=exchange_name, exchange_type='fanout')
|
||||
channel.queue_declare(queue='example_queue')
|
||||
# Отдаем сообщение в 'logs' exchange каждую секунду
|
||||
while True:
|
||||
message = "Дефект обнаружен" # Сообщение
|
||||
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
||||
print(f"Отправлено сообщение: {message}")
|
||||
time.sleep(1)
|
||||
print("Отправлено '{}'".format(message))
|
||||
connection.close()
|
54
presnyakova_victoria_lab_4/readme.md
Normal file
@ -0,0 +1,54 @@
|
||||
# Лабораторная работа №4 - Работа с брокером сообщений
|
||||
Lesson 1:
|
||||
![img.png](lessons/lesson_1/img.png) ![img.png](lessons/lesson_1/img_1.png)
|
||||
Lesson 2:
|
||||
![img.png](lessons/lesson_2/img.png) ![img.png](lessons/lesson_2/img_1.png)
|
||||
Lesson 3:
|
||||
![img.png](lessons/lesson_3/img.png) ![img.png](lessons/lesson_3/img_1.png)
|
||||
|
||||
|
||||
# Задачи
|
||||
|
||||
Необходимо выбрать предметную область и разработать следующие приложения:
|
||||
|
||||
1. **Publisher**.
|
||||
Программа, которая создаёт один **exchange** с типом _fanout_.
|
||||
Программа должна раз в секунду генерировать сообщения в журнал событий согласно вашей предметной области.
|
||||
Например, событие "пришёл заказ" или "сообщение от пользователя" или "необходимо создать отчёт".
|
||||
2. **Consumer 1**.
|
||||
Программа, которая создаёт под себя отдельную не анонимную (!) очередь (**queue**) (то есть имя queue НЕ пустая строка), создаёт **binding** на **exchange** и начинает принимать сообщения (_consume_).
|
||||
Программа должна обрабатывать сообщения 2-3 секунды.
|
||||
Можно реализовать через обычный _Thread.Sleep_ (для C#).
|
||||
3. **Consumer 2**.
|
||||
Аналогично _Consumer 1_, только сообщения необходимо обрабатывать моментально.
|
||||
Только имя очереди должно отличаться от _Consumer 1_.
|
||||
|
||||
Далее необходимо собрать и запустить приложения одновременно по одному экземпляру.
|
||||
|
||||
# Запуск
|
||||
|
||||
Проект запускается в ide просто по нажатию у питон файла на функцию мейн.
|
||||
Нужно последовательно запустить функцию мейн у файлов app.py, consumer1.py, consumer2.py.
|
||||
Очередь сообщений запускается такой командой
|
||||
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
|
||||
|
||||
# Описание работы:
|
||||
Развернули два приложения
|
||||
Сервисы используем из предыдущей работы
|
||||
Предметная область - Скрипты и дефекты
|
||||
|
||||
1. Consumer 1 - Скрипт 1:
|
||||
2. Consumer 2 - Скрипт 2:
|
||||
|
||||
Оба скрипта обрабатывают дефекты.
|
||||
|
||||
Flask-приложение с RabbitMQ, использующего библиотеку pika для publisher и для consumers.
|
||||
Надо обязательно вызвать метод publish, иначе обработка дефектов не начнется.
|
||||
Приложение Flask (app.py), для источника дефектов (publisher) и двух скриптов-потребителей (consumer1.py и consumer2.py).
|
||||
Запускаем приложение Flask и обоих потребителей запускаем отдельно в разных терминалах.
|
||||
Consumer ы будут прослушивать сообщения, опубликованные publisher ом.
|
||||
|
||||
![img.png](screens/img.png)
|
||||
|
||||
# Ссылка на видео
|
||||
https://drive.google.com/file/d/1P8kKSZAre24I04DXL1WqkPtdL6RnDOPA/view?usp=sharing
|
BIN
presnyakova_victoria_lab_4/screens/img.png
Normal file
After Width: | Height: | Size: 84 KiB |