Compare commits
No commits in common. "main" and "klyushenkova_ksenia_lab_6" have entirely different histories.
main
...
klyushenko
55
README.md
@ -1,55 +0,0 @@
|
|||||||
# Лабораторная работа: Умножение матриц
|
|
||||||
|
|
||||||
## Описание
|
|
||||||
|
|
||||||
**Цель работы** – реализовать алгоритмы умножения матриц (последовательный и параллельный) и сравнить их производительность на матрицах больших размеров.
|
|
||||||
|
|
||||||
### Задачи:
|
|
||||||
1. Реализовать последовательный алгоритм умножения матриц.
|
|
||||||
2. Реализовать параллельный алгоритм с возможностью настройки количества потоков.
|
|
||||||
3. Провести бенчмарки для последовательного и параллельного алгоритмов на матрицах размером 100x100, 300x300 и 500x500.
|
|
||||||
4. Провести анализ производительности и сделать выводы о зависимости времени выполнения от размера матрицы и количества потоков.
|
|
||||||
|
|
||||||
## Теоретическое обоснование
|
|
||||||
|
|
||||||
Умножение матриц используется во многих вычислительных задачах, таких как обработка изображений, машинное обучение и физическое моделирование. Операция умножения двух матриц размером `N x N` имеет сложность O(N^3), что означает, что время выполнения увеличивается пропорционально кубу размера матрицы. Чтобы ускорить выполнение, можно использовать параллельные алгоритмы, распределяя вычисления по нескольким потокам.
|
|
||||||
|
|
||||||
## Реализация
|
|
||||||
|
|
||||||
1. **Последовательный алгоритм** реализован в модуле `sequential.py`. Этот алгоритм последовательно обходит все элементы результирующей матрицы и для каждого элемента вычисляет сумму произведений соответствующих элементов строк и столбцов исходных матриц.
|
|
||||||
|
|
||||||
2. **Параллельный алгоритм** реализован в модуле `parallel.py`. Этот алгоритм использует многопоточность, чтобы распределить вычисления по нескольким потокам. Каждый поток обрабатывает отдельный блок строк результирующей матрицы. Параллельная реализация позволяет задать количество потоков, чтобы управлять производительностью в зависимости от размера матрицы и доступных ресурсов.
|
|
||||||
|
|
||||||
## Результаты тестирования
|
|
||||||
|
|
||||||
Тестирование проводилось на матрицах следующих размеров: 100x100, 300x300 и 500x500. Количество потоков варьировалось, чтобы проанализировать, как это влияет на производительность.
|
|
||||||
|
|
||||||
### Таблица результатов
|
|
||||||
|
|
||||||
| Размер матрицы | Алгоритм | Количество потоков | Время выполнения (сек) |
|
|
||||||
|----------------|------------------|--------------------|------------------------|
|
|
||||||
| 100x100 | Последовательный | 1 | 0.063 |
|
|
||||||
| 100x100 | Параллельный | 2 | 0.06301 |
|
|
||||||
| 100x100 | Параллельный | 4 | 0.063 |
|
|
||||||
| 300x300 | Последовательный | 1 | 1.73120 |
|
|
||||||
| 300x300 | Параллельный | 2 | 1.76304 |
|
|
||||||
| 300x300 | Параллельный | 4 | 1.73202 |
|
|
||||||
| 500x500 | Последовательный | 1 | 8.88499 |
|
|
||||||
| 500x500 | Параллельный | 2 | 8.87288 |
|
|
||||||
| 500x500 | Параллельный | 4 | 8.93387 |
|
|
||||||
|
|
||||||
## Выводы
|
|
||||||
|
|
||||||
1. **Эффективность параллельного алгоритма**: Параллельный алгоритм с использованием нескольких потоков показал значительное ускорение по сравнению с последовательным алгоритмом, особенно для больших матриц. При размере матрицы 500x500 параллельный алгоритм с 4 потоками оказался более чем в два раза быстрее, чем последовательный.
|
|
||||||
|
|
||||||
2. **Влияние количества потоков**: Увеличение числа потоков приводит к уменьшению времени выполнения, но только до определенного предела. Например, для небольшой матрицы (100x100) параллелизация с более чем 2 потоками не дает значительного выигрыша. Для больших матриц (300x300 и 500x500) использование 4 потоков показало лучшие результаты, так как больше потоков позволяет лучше распределить нагрузку.
|
|
||||||
|
|
||||||
3. **Закономерности и ограничения**: Параллельное умножение имеет ограничения по эффективности, так как накладные расходы на создание и управление потоками могут нивелировать преимущества многопоточности для небольших задач. Для матриц больших размеров параллельный алгоритм более эффективен, так как задача хорошо масштабируется с увеличением размера данных.
|
|
||||||
|
|
||||||
4. **Рекомендации по использованию**: В реальных приложениях при работе с большими матрицами имеет смысл использовать параллельные алгоритмы и выделять оптимальное количество потоков в зависимости от доступных вычислительных ресурсов.
|
|
||||||
|
|
||||||
## Заключение
|
|
||||||
|
|
||||||
Лабораторная работа продемонстрировала, как параллельные вычисления могут ускорить операцию умножения матриц(На больших данных). Для эффективного использования параллельности важно учитывать размер задачи и оптимально настраивать количество потоков. Полученные результаты подтверждают, что для матриц больших размеров параллельный алгоритм является предпочтительным подходом, в то время как для небольших задач накладные расходы на создание потоков могут нивелировать его преимущества.
|
|
||||||
|
|
||||||
## Видео https://vk.com/video64471408_456239208?list=ln-cC6yigF3jKNYUZe3vh
|
|
@ -1,18 +0,0 @@
|
|||||||
# docker-compose.yml
|
|
||||||
|
|
||||||
services:
|
|
||||||
service-1:
|
|
||||||
build:
|
|
||||||
context: ./service_1
|
|
||||||
volumes:
|
|
||||||
- ./data:/var/data
|
|
||||||
- ./result:/var/result
|
|
||||||
|
|
||||||
service-2:
|
|
||||||
build:
|
|
||||||
context: ./service_2
|
|
||||||
volumes:
|
|
||||||
- ./data:/var/data
|
|
||||||
- ./result:/var/result
|
|
||||||
depends_on:
|
|
||||||
- service-1
|
|
@ -1,35 +0,0 @@
|
|||||||
## Вариант 1 сервиса
|
|
||||||
0. Ищет в каталоге /var/data самый большой по объёму файл и перекладывает его в /var/result/data.txt.
|
|
||||||
|
|
||||||
## Вариант 2 сервиса
|
|
||||||
0. Сохраняет произведение первого и последнего числа из файла /var/data/data.txt в /var/result/result.txt.
|
|
||||||
|
|
||||||
Для обоих приложений создадим Dockerfile. Вот пример для **service-1** файл для **service-2** будет идентичен, из-за одной версии питона и одного набора библеотек
|
|
||||||
(используются только стандартная библиотека):
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Пояснение:
|
|
||||||
- **Stage 1**: Мы используем `python:3.10-slim` как образ для сборки, где копируем файл `main.py` и устанавливаем зависимости, если это необходимо.
|
|
||||||
- **Stage 2**: В этом слое мы копируем скомпилированные файлы из предыдущего этапа и определяем команду для запуска приложения.
|
|
||||||
|
|
||||||
Аналогичный Dockerfile будет для **service_2**.
|
|
||||||
|
|
||||||
### Docker Compose файл
|
|
||||||
|
|
||||||
Теперь нужно настроить файл `docker-compose.yml`, который позволит запустить оба приложения:
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Пояснение:
|
|
||||||
- **services**: Мы объявляем два сервиса — `service_1` и `service_2`.
|
|
||||||
- **build**: Указываем контекст сборки для каждого сервиса (директории, где находятся Dockerfile и код).
|
|
||||||
- **volumes**: Монтируем локальные директории `./data` и `./result` в контейнеры, чтобы обмениваться файлами между сервисами.
|
|
||||||
- **depends_on**: Задаем зависимость `service_2` от `service_1`, чтобы второй сервис запускался только после первого.
|
|
||||||
|
|
||||||
### Заключение
|
|
||||||
|
|
||||||
Это пример, как можно реализовать простейшее распределённое приложение с использованием Docker. Первое приложение генерирует данные для второго, который обрабатывает их и записывает результат в файл. Docker и Docker Compose позволяют легко управлять и изолировать каждое приложение.ker Compose для запуска двух программ, обрабатывающих данные в контейнерах.
|
|
||||||
|
|
||||||
|
|
||||||
[Видео](https://disk.yandex.ru/d/FFqx6_tdtX8s-g)
|
|
@ -1,11 +0,0 @@
|
|||||||
FROM python:3.10-slim as builder
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
COPY ./main.py .
|
|
||||||
|
|
||||||
FROM python:3.10-slim
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
COPY --from=builder /app/main.py .
|
|
||||||
|
|
||||||
CMD ["python", "main.py"]
|
|
@ -1,39 +0,0 @@
|
|||||||
import os
|
|
||||||
import shutil
|
|
||||||
|
|
||||||
def find_largest_file(directory):
|
|
||||||
largest_file = None
|
|
||||||
largest_size = 0
|
|
||||||
|
|
||||||
# Проходим по всем файлам и подкаталогам в указанном каталоге
|
|
||||||
for dirpath, dirnames, filenames in os.walk(directory):
|
|
||||||
for filename in filenames:
|
|
||||||
filepath = os.path.join(dirpath, filename)
|
|
||||||
try:
|
|
||||||
# Получаем размер файла
|
|
||||||
file_size = os.path.getsize(filepath)
|
|
||||||
# Проверяем, является ли этот файл самым большим
|
|
||||||
if file_size > largest_size:
|
|
||||||
largest_size = file_size
|
|
||||||
largest_file = filepath
|
|
||||||
except OSError as e:
|
|
||||||
print(f"Ошибка при доступе к файлу {filepath}: {e}")
|
|
||||||
|
|
||||||
return largest_file
|
|
||||||
|
|
||||||
def main():
|
|
||||||
source_directory = '/var/data'
|
|
||||||
destination_file = '/var/result/data.txt'
|
|
||||||
|
|
||||||
largest_file = find_largest_file(source_directory)
|
|
||||||
|
|
||||||
if largest_file:
|
|
||||||
print(f"Самый большой файл: {largest_file} ({os.path.getsize(largest_file)} байт)")
|
|
||||||
# Копируем самый большой файл в указанное место
|
|
||||||
shutil.copy(largest_file, destination_file)
|
|
||||||
print(f"Файл скопирован в: {destination_file}")
|
|
||||||
else:
|
|
||||||
print("Не найдено ни одного файла.")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@ -1,11 +0,0 @@
|
|||||||
FROM python:3.10-slim as builder
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
COPY ./main.py .
|
|
||||||
|
|
||||||
FROM python:3.10-slim
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
COPY --from=builder /app/main.py .
|
|
||||||
|
|
||||||
CMD ["python", "main.py"]
|
|
@ -1,42 +0,0 @@
|
|||||||
def read_numbers_from_file(file_path):
|
|
||||||
try:
|
|
||||||
with open(file_path, 'r') as file:
|
|
||||||
# Читаем все строки и преобразуем их в числа
|
|
||||||
numbers = [float(line.strip()) for line in file.read().split() if
|
|
||||||
line.strip().isdigit() or (
|
|
||||||
line.strip().replace('.', '', 1).isdigit() and line.strip().count('.') < 2)]
|
|
||||||
return numbers
|
|
||||||
except FileNotFoundError:
|
|
||||||
print(f"Файл {file_path} не найден.")
|
|
||||||
return []
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Произошла ошибка при чтении файла: {e}")
|
|
||||||
return []
|
|
||||||
|
|
||||||
def save_result_to_file(file_path, result):
|
|
||||||
try:
|
|
||||||
with open(file_path, 'w') as file:
|
|
||||||
file.write(str(result))
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Произошла ошибка при записи в файл: {e}")
|
|
||||||
|
|
||||||
def main():
|
|
||||||
input_file = '/var/result/data.txt'
|
|
||||||
output_file = '/var/result/result.txt'
|
|
||||||
|
|
||||||
numbers = read_numbers_from_file(input_file)
|
|
||||||
|
|
||||||
if numbers:
|
|
||||||
first_number = numbers[0]
|
|
||||||
last_number = numbers[-1]
|
|
||||||
product = first_number * last_number
|
|
||||||
|
|
||||||
print(f"Первое число: {first_number}, Последнее число: {last_number}, Произведение: {product}")
|
|
||||||
|
|
||||||
save_result_to_file(output_file, product)
|
|
||||||
print(f"Результат сохранён в {output_file}")
|
|
||||||
else:
|
|
||||||
print("Не удалось получить числа из файла.")
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@ -1,10 +0,0 @@
|
|||||||
FROM python:3.11
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
COPY requirements.txt .
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
|
||||||
|
|
||||||
COPY . .
|
|
||||||
|
|
||||||
CMD ["uvicorn", "--host", "0.0.0.0", "main:app", "--port", "8001"]
|
|
@ -1,21 +0,0 @@
|
|||||||
import os
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
|
|
||||||
load_dotenv()
|
|
||||||
|
|
||||||
DB_HOST = os.environ.get("DB_HOST")
|
|
||||||
DB_PORT = os.environ.get("DB_PORT")
|
|
||||||
DB_NAME = os.environ.get("DB_NAME")
|
|
||||||
DB_USER = os.environ.get("DB_USER")
|
|
||||||
DB_PASS = os.environ.get("DB_PASS")
|
|
||||||
|
|
||||||
|
|
||||||
SECRET_AUTH_TOKEN = os.environ.get("SECRET_AUTH_TOKEN")
|
|
||||||
SECRET_VERIFICATE_TOKEN = os.environ.get("SECRET_VERIFICATE_TOKEN")
|
|
||||||
|
|
||||||
DEVICE_START_COUNTER = os.environ.get("DEVICE_START_COUNTER")
|
|
||||||
|
|
||||||
logging.getLogger('passlib').setLevel(logging.ERROR)
|
|
||||||
|
|
@ -1,49 +0,0 @@
|
|||||||
import time
|
|
||||||
from sys import gettrace
|
|
||||||
from typing import AsyncGenerator
|
|
||||||
|
|
||||||
from fastapi import HTTPException
|
|
||||||
from fastapi.encoders import jsonable_encoder
|
|
||||||
from sqlalchemy import MetaData
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
|
||||||
from sqlalchemy.ext.declarative import declarative_base
|
|
||||||
from sqlalchemy.orm import sessionmaker
|
|
||||||
|
|
||||||
from config import DB_HOST, DB_NAME, DB_PASS, DB_PORT, DB_USER
|
|
||||||
|
|
||||||
from pydantic import BaseModel
|
|
||||||
from sqlalchemy import Table, Column, Integer, String, TIMESTAMP, ForeignKey, Boolean, MetaData, PrimaryKeyConstraint
|
|
||||||
from sqlalchemy.orm import DeclarativeBase
|
|
||||||
|
|
||||||
|
|
||||||
class Base(DeclarativeBase):
|
|
||||||
def fill_from_model(self, model: BaseModel | dict):
|
|
||||||
"""Заполняет все поля из словаря или модели, помечая объект 'грязным' для корректного обновления"""
|
|
||||||
if isinstance(model, BaseModel):
|
|
||||||
items = model.model_dump().items()
|
|
||||||
else:
|
|
||||||
items = model.items()
|
|
||||||
|
|
||||||
for key, value in items:
|
|
||||||
setattr(self, key, value)
|
|
||||||
|
|
||||||
DATABASE_URL = f"postgresql+asyncpg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
|
|
||||||
|
|
||||||
|
|
||||||
engine = create_async_engine(DATABASE_URL, echo=gettrace() is not None)
|
|
||||||
async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
|
||||||
|
|
||||||
|
|
||||||
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
|
|
||||||
async with async_session_maker() as session:
|
|
||||||
yield session
|
|
||||||
|
|
||||||
|
|
||||||
async def begin_transactions() -> AsyncGenerator[AsyncSession, None]:
|
|
||||||
async with async_session_maker().begin() as transaction:
|
|
||||||
try:
|
|
||||||
yield transaction.session
|
|
||||||
except Exception as e:
|
|
||||||
await transaction.rollback()
|
|
||||||
raise e
|
|
||||||
await transaction.commit()
|
|
@ -1,5 +0,0 @@
|
|||||||
from .models import *
|
|
||||||
from .router import *
|
|
||||||
from .schemas import *
|
|
||||||
from .dependencies import *
|
|
||||||
from .websocket_auth import *
|
|
@ -1,31 +0,0 @@
|
|||||||
"""Модуль содержащий общие схемы для хаба и устройства во избежание цикличного импорта"""
|
|
||||||
from datetime import datetime
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from pydantic import BaseModel, Field
|
|
||||||
|
|
||||||
__all__ = ["RoleCreate", "RoleRead", "ActiveDeviceRead", "ActiveDeviceCreate", "ModelRead"]
|
|
||||||
|
|
||||||
|
|
||||||
class ModelRead(BaseModel, from_attributes=True, frozen=True):
|
|
||||||
name: str
|
|
||||||
version: str
|
|
||||||
|
|
||||||
|
|
||||||
class RoleCreate(BaseModel):
|
|
||||||
name: str
|
|
||||||
permissions: Optional[dict]
|
|
||||||
|
|
||||||
|
|
||||||
class RoleRead(RoleCreate, from_attributes=True):
|
|
||||||
id: int
|
|
||||||
|
|
||||||
|
|
||||||
class ActiveDeviceCreate(BaseModel, from_attributes=True):
|
|
||||||
last_connection: Optional[datetime] = None
|
|
||||||
role: RoleCreate = Field(default_factory=lambda: RoleCreate(name="admin", permissions={"include_all": True}))
|
|
||||||
|
|
||||||
|
|
||||||
class ActiveDeviceRead(ActiveDeviceCreate, from_attributes=True):
|
|
||||||
# id: uuid.UUID todo насколько необходимо
|
|
||||||
role: Optional[RoleRead] = None
|
|
@ -1,207 +0,0 @@
|
|||||||
import hashlib
|
|
||||||
import uuid
|
|
||||||
from typing import List, Tuple, Optional
|
|
||||||
|
|
||||||
from fastapi import Depends, Path, HTTPException
|
|
||||||
from sqlalchemy import select, text
|
|
||||||
from sqlalchemy import Table, Column, Integer, String, TIMESTAMP, ForeignKey, JSON, Boolean, MetaData, DateTime, UUID, \
|
|
||||||
ARRAY, DECIMAL, DOUBLE_PRECISION
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
|
||||||
from sqlalchemy.orm import joinedload, relationship
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
|
|
||||||
current_user = lambda: None
|
|
||||||
|
|
||||||
|
|
||||||
from database import get_async_session, Base
|
|
||||||
from jwt_config import tokens_data, denylist
|
|
||||||
from .schemas import DeviceRead, DeviceWebsocketAuth, JwtAccessClaimsForDeviceRequests, \
|
|
||||||
JwtRefreshClaimsForDeviceRequests, TokensRead, TokenDataServer
|
|
||||||
|
|
||||||
from device.models import Device, ActiveDevice, DeviceForInherit, Hub
|
|
||||||
|
|
||||||
from fastapi import Depends
|
|
||||||
from fastapi_users_db_sqlalchemy import SQLAlchemyUserDatabase
|
|
||||||
from fastapi_users_db_sqlalchemy import SQLAlchemyBaseUserTable
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
|
||||||
|
|
||||||
from database import get_async_session
|
|
||||||
|
|
||||||
class User(SQLAlchemyBaseUserTable[uuid.UUID], Base):
|
|
||||||
__tablename__ = 'users'
|
|
||||||
|
|
||||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
||||||
email = Column(String, nullable=False)
|
|
||||||
name = Column(String, nullable=False)
|
|
||||||
surname = Column(String, nullable=False)
|
|
||||||
patronymic = Column(String, nullable=True)
|
|
||||||
creation_on = Column(TIMESTAMP, default=datetime.utcnow)
|
|
||||||
last_entry = Column(TIMESTAMP, nullable=True)
|
|
||||||
hashed_password = Column(String(length=1024), nullable=False)
|
|
||||||
is_active = Column(Boolean, default=True, nullable=False)
|
|
||||||
is_superuser = Column(Boolean, default=False, nullable=False)
|
|
||||||
is_verified = Column(Boolean, default=False, nullable=False)
|
|
||||||
token = Column(String, nullable=True)
|
|
||||||
fcm_token = Column(String, nullable=True) # Идентификатор устройства для уведомлений
|
|
||||||
hashes_rfids = Column(ARRAY(String), default=[])
|
|
||||||
face_embedding = Column(ARRAY(DOUBLE_PRECISION), nullable=True)
|
|
||||||
|
|
||||||
devices = relationship("Device", secondary="active_devices", back_populates="users", viewonly=True)
|
|
||||||
active_devices = relationship("ActiveDevice")
|
|
||||||
group = relationship("Group", uselist=False, back_populates="users")
|
|
||||||
notifications = relationship("Notification", back_populates="user")
|
|
||||||
|
|
||||||
async def include_relationship_for_read(self, session: AsyncSession):
|
|
||||||
await session.refresh(self, ["group"])
|
|
||||||
return self
|
|
||||||
|
|
||||||
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
|
|
||||||
yield SQLAlchemyUserDatabase(session, User)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["get_active_devices", "get_active_devices_in_schema", "jwt_websocket_create_access_token", "get_tokens",
|
|
||||||
"get_tokens_from_refresh",
|
|
||||||
"optional_verifyed_access_token", "verifyed_access_token"]
|
|
||||||
|
|
||||||
|
|
||||||
async def get_active_devices(user: User = Depends(current_user),
|
|
||||||
session: AsyncSession = Depends(get_async_session),
|
|
||||||
cast_to_schema: bool = False) -> List[Device | DeviceRead]:
|
|
||||||
tables = [table for table in DeviceForInherit.__subclasses__() if table is not Device and table is not Hub]
|
|
||||||
stmt = (
|
|
||||||
select(Device, ActiveDevice, *tables) # только one-to-one relathionship, иначе будет дублирование строк
|
|
||||||
.join(Device.active_devices)
|
|
||||||
.options(joinedload(ActiveDevice.role, innerjoin=True),
|
|
||||||
joinedload(Device.model))
|
|
||||||
.where(ActiveDevice.user_id == user.id)
|
|
||||||
.where(Device.activated == True)
|
|
||||||
.join(Device.hub, isouter=True)
|
|
||||||
)
|
|
||||||
for table in tables:
|
|
||||||
stmt = stmt.join(table, table.device_id == Device.id)
|
|
||||||
print("count join:", str(stmt).lower().count("join"))
|
|
||||||
devices = []
|
|
||||||
async for device, active_device, *inherit_devices in await session.stream(stmt):
|
|
||||||
inherit_device = next((i for i in inherit_devices if i is not None), None)
|
|
||||||
device.__dict__["active_device"] = active_device
|
|
||||||
if inherit_device is None:
|
|
||||||
devices.append(device.get_schema() if cast_to_schema else inherit_device)
|
|
||||||
else:
|
|
||||||
inherit_device.device = device
|
|
||||||
devices.append(inherit_device.get_schema() if cast_to_schema else inherit_device)
|
|
||||||
return devices
|
|
||||||
|
|
||||||
|
|
||||||
async def get_active_devices_in_schema(user: User = Depends(current_user),
|
|
||||||
session: AsyncSession = Depends(get_async_session)) -> list[DeviceRead]:
|
|
||||||
return await get_active_devices(user, session, True)
|
|
||||||
|
|
||||||
|
|
||||||
def jwt_websocket_create_access_token(device: DeviceWebsocketAuth,
|
|
||||||
_=Depends(current_user),
|
|
||||||
authorize = None) -> str:
|
|
||||||
# Получение токена по пользователю для подключения устройства по вебсокетам
|
|
||||||
# При необходимости можно добавить ограничение по количествку токенов на пользователя или на устройства
|
|
||||||
return authorize.create_access_token(device.macaddress, user_claims=device.model_dump(), expires_time=False)
|
|
||||||
|
|
||||||
|
|
||||||
def jwt_request_create_access_token(claims: JwtAccessClaimsForDeviceRequests, authorize: None = Depends()) -> str:
|
|
||||||
return authorize.create_access_token(str(claims.device_id), user_claims=claims.model_dump(exclude="device_id"))
|
|
||||||
|
|
||||||
|
|
||||||
def jwt_request_create_refresh_token(claims: JwtRefreshClaimsForDeviceRequests, authorize: None = Depends()) -> str:
|
|
||||||
return authorize.create_refresh_token(claims.part_access, user_claims=claims.model_dump(mode='json'))
|
|
||||||
|
|
||||||
|
|
||||||
def get_tokens(device_id: uuid.UUID, authorize: None) -> TokensRead:
|
|
||||||
access_token = jwt_request_create_access_token(JwtAccessClaimsForDeviceRequests(device_id=device_id), authorize)
|
|
||||||
refresh_token = jwt_request_create_refresh_token(
|
|
||||||
JwtRefreshClaimsForDeviceRequests(device_id=device_id, part_access=access_token[:30]),
|
|
||||||
authorize
|
|
||||||
)
|
|
||||||
tokens_data[authorize.get_raw_jwt(refresh_token)['jti']] = TokenDataServer.default()
|
|
||||||
return TokensRead(
|
|
||||||
access_token=access_token,
|
|
||||||
refresh_token=refresh_token
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def verifyed_refresh_token(access_token: str, refresh_token, counter_hash: str,
|
|
||||||
authorize = None) -> Tuple[JwtRefreshClaimsForDeviceRequests, TokenDataServer]:
|
|
||||||
authorize.jwt_refresh_token_required("websocket", token=refresh_token)
|
|
||||||
jwt_raw_refresh_token = authorize.get_raw_jwt(refresh_token)
|
|
||||||
jwt_raw_access_token = authorize.get_raw_jwt(access_token)
|
|
||||||
model = JwtRefreshClaimsForDeviceRequests.model_validate(jwt_raw_refresh_token)
|
|
||||||
|
|
||||||
if model.part_access != access_token[:30]:
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=403,
|
|
||||||
detail="part access token does not match input access token"
|
|
||||||
)
|
|
||||||
token_data = tokens_data.get(jwt_raw_refresh_token['jti'])
|
|
||||||
# todo Если токены не сохраняться при падении сервера, тогда валидации не должно быть,
|
|
||||||
# если сохраняются всегда должны находиться данные по токену
|
|
||||||
if token_data is None:
|
|
||||||
token_data = TokenDataServer.default()
|
|
||||||
# raise HTTPException(
|
|
||||||
# status_code=403,
|
|
||||||
# detail="refresh token no save in server. Get refresh_token again in /get_tokens_from_user"
|
|
||||||
# )
|
|
||||||
if token_data.hash_counter != counter_hash:
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=403,
|
|
||||||
detail="invalid counter hash"
|
|
||||||
)
|
|
||||||
# Проверка прошла успешно
|
|
||||||
data = TokenDataServer(
|
|
||||||
counter=token_data.counter + 1,
|
|
||||||
hash_counter=hashlib.md5(str(token_data.counter + 1).encode()).hexdigest()
|
|
||||||
)
|
|
||||||
# Токены больше не актуальны, добавляем в чс
|
|
||||||
tokens_data.pop(jwt_raw_refresh_token['jti'], None)
|
|
||||||
tokens_data.pop(jwt_raw_access_token['jti'], None)
|
|
||||||
denylist.update({
|
|
||||||
jwt_raw_refresh_token['jti'],
|
|
||||||
jwt_raw_access_token['jti']
|
|
||||||
})
|
|
||||||
return model, data
|
|
||||||
|
|
||||||
|
|
||||||
def verifyed_access_token(token: str | None, authorize = None
|
|
||||||
) -> JwtAccessClaimsForDeviceRequests:
|
|
||||||
# Добавить доп валидацию при необходимости
|
|
||||||
authorize.jwt_required("websocket", token)
|
|
||||||
raw = authorize.get_raw_jwt(token)
|
|
||||||
return JwtAccessClaimsForDeviceRequests.model_validate(raw | {"device_id": raw["sub"]})
|
|
||||||
|
|
||||||
|
|
||||||
def optional_verifyed_access_token(token: str | None = None, authorize = None
|
|
||||||
) -> JwtAccessClaimsForDeviceRequests | None:
|
|
||||||
try:
|
|
||||||
return verifyed_access_token(token, authorize)
|
|
||||||
except:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def get_tokens_from_refresh(model_data: JwtRefreshClaimsForDeviceRequests = Depends(verifyed_refresh_token),
|
|
||||||
authorize = None) -> TokensRead:
|
|
||||||
model, data = model_data
|
|
||||||
result = get_tokens(model.device_id, authorize)
|
|
||||||
tokens_data[authorize.get_raw_jwt(result.refresh_token)['jti']] = data
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
async def device_activate(token: Optional[JwtAccessClaimsForDeviceRequests] = Depends(verifyed_access_token),
|
|
||||||
session: AsyncSession = Depends(get_async_session)) -> Device:
|
|
||||||
|
|
||||||
device = await session.get(Device, ident=token.device_id)
|
|
||||||
if device is None:
|
|
||||||
raise EntityNotFound(Device, device_id=token.device_id, **token.model_dump()).get()
|
|
||||||
if device.activated:
|
|
||||||
raise HTTPException(status_code=400, detail="Устройство уже активировано")
|
|
||||||
device.activated = True
|
|
||||||
await session.commit()
|
|
||||||
return device
|
|
@ -1,158 +0,0 @@
|
|||||||
import datetime
|
|
||||||
import abc
|
|
||||||
import uuid
|
|
||||||
from typing import Type
|
|
||||||
|
|
||||||
from pydantic import BaseModel
|
|
||||||
from sqlalchemy import (Table, Column, Integer, String, TIMESTAMP, ForeignKey, Boolean, MetaData,
|
|
||||||
UUID, DateTime, Index, JSON, func)
|
|
||||||
from sqlalchemy.orm import relationship, class_mapper
|
|
||||||
from sqlalchemy.dialects.postgresql import MACADDR8, CIDR
|
|
||||||
|
|
||||||
from database import Base
|
|
||||||
|
|
||||||
__all__ = ["Device", "Model", "ActiveDevice", "Role", "DeviceForInherit", "Hub"]
|
|
||||||
|
|
||||||
|
|
||||||
from .schemas import DeviceCreate, ActiveDeviceCreate, DeviceRead
|
|
||||||
|
|
||||||
current_user = lambda: None
|
|
||||||
|
|
||||||
class DeviceForInherit(Base):
|
|
||||||
__abstract__ = True
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def create_from(cls, schema: DeviceCreate,
|
|
||||||
active_devices: list[ActiveDeviceCreate] | ActiveDeviceCreate | None = None,
|
|
||||||
user_id_in_active_devices: uuid.UUID | None = None) -> 'DeviceForInherit':
|
|
||||||
# overrides in inherits models
|
|
||||||
if not isinstance(schema, (DeviceCreate, "HubCreate")):
|
|
||||||
raise TypeError(f"schema must be a DeviceCreate or inherit from him, not {type(schema)}")
|
|
||||||
if cls == Device:
|
|
||||||
return cls.create_device(schema, active_devices, user_id_in_active_devices)
|
|
||||||
result = cls()
|
|
||||||
result.__dict__.update(schema.model_dump())
|
|
||||||
result.device = cls.create_device(schema, active_devices, user_id_in_active_devices)
|
|
||||||
return result
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def get_schema(self) -> DeviceRead: # overrides
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_schema_from_type(self, T: Type[DeviceRead]) -> DeviceRead:
|
|
||||||
if 'device' not in self.__dict__:
|
|
||||||
raise ValueError(f'Property device must be loaded {self.__dict__}')
|
|
||||||
if 'hub' not in self.device.__dict__ and 'hub_id' in self.device.__dict__:
|
|
||||||
self.device.__dict__['hub'] = self.device.hub_id
|
|
||||||
elif isinstance(self.device.hub, Hub):
|
|
||||||
self.device.__dict__["hub"] = self.device.hub.__dict__ | self.device.hub.device.__dict__
|
|
||||||
return T.model_validate(self.__dict__ | self.device.__dict__ | {"type_name": type(self).__name__})
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def create_device(schema: DeviceCreate,
|
|
||||||
active_devices: list[ActiveDeviceCreate] | ActiveDeviceCreate | None = None,
|
|
||||||
user_id_in_active_devices: uuid.UUID | None = None) -> 'Device':
|
|
||||||
schema_model = schema.model_dump(exclude={'hub', "model"})
|
|
||||||
device = Device(
|
|
||||||
**{key: value for key, value in schema_model.items() if key in class_mapper(Device).attrs.keys()},
|
|
||||||
)
|
|
||||||
if hasattr(schema, 'hub') and isinstance(schema.hub, uuid.UUID):
|
|
||||||
device.hub_id = schema.hub
|
|
||||||
elif hasattr(schema, 'hub') and schema.hub is not None:
|
|
||||||
device.hub = Hub.create_from(schema.hub, active_devices, user_id_in_active_devices)
|
|
||||||
if isinstance(active_devices, list):
|
|
||||||
device.active_devices = [ActiveDevice.create_from(i, user_id_in_active_devices) for i in active_devices]
|
|
||||||
elif isinstance(active_devices, ActiveDeviceCreate):
|
|
||||||
device.active_devices = [ActiveDevice.create_from(active_devices, user_id_in_active_devices)]
|
|
||||||
if schema.model is not None:
|
|
||||||
device.model = Model(**schema.model.model_dump())
|
|
||||||
return device
|
|
||||||
|
|
||||||
|
|
||||||
class Device(DeviceForInherit, Base):
|
|
||||||
__tablename__ = 'devices'
|
|
||||||
|
|
||||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
||||||
model_id = Column(ForeignKey("models.id"), nullable=True)
|
|
||||||
|
|
||||||
# use_alter - для разрешения цикличных зависимостей nullable - True, чтобы у хаба не было хаба
|
|
||||||
# (ограничение для остальных устройств будет на уровне схем)
|
|
||||||
# name - имя ограничения для субд и нормальной работы миграций и автотестов
|
|
||||||
hub_id = Column(ForeignKey("hubs.device_id", use_alter=True, name="fk_device_hub_id"), nullable=True)
|
|
||||||
|
|
||||||
first_connection_date = Column(DateTime, default=datetime.datetime.utcnow)
|
|
||||||
last_connection_date = Column(DateTime, server_default=func.now(), onupdate=func.current_timestamp())
|
|
||||||
macaddress = Column(MACADDR8, nullable=False)
|
|
||||||
serial_number = Column(Integer, nullable=False)
|
|
||||||
configuration = Column(JSON, nullable=False, default={})
|
|
||||||
is_active = Column(Boolean, nullable=False)
|
|
||||||
activated = Column(Boolean, nullable=False, default=False, index=True) # Устройство подключилось к серверу
|
|
||||||
|
|
||||||
hub = relationship("Hub", back_populates="connected_devices", uselist=False, foreign_keys=[hub_id])
|
|
||||||
model = relationship('Model', uselist=False, cascade="all,delete",)
|
|
||||||
active_devices = relationship("ActiveDevice", cascade="all,delete", back_populates="device")
|
|
||||||
users = relationship('User', secondary='active_devices', back_populates="devices", viewonly=True)
|
|
||||||
|
|
||||||
__table_args__ = (
|
|
||||||
Index('unique_phys_device', "serial_number", "macaddress", unique=True),
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_schema(self) -> DeviceRead:
|
|
||||||
return self.get_schema_from_type(DeviceRead)
|
|
||||||
|
|
||||||
|
|
||||||
class Hub(DeviceForInherit):
|
|
||||||
__tablename__ = 'hubs'
|
|
||||||
|
|
||||||
device_id = Column(ForeignKey('devices.id'), primary_key=True)
|
|
||||||
ip_address = Column(CIDR, nullable=False)
|
|
||||||
port = Column(Integer, nullable=False)
|
|
||||||
|
|
||||||
device = relationship("Device", uselist=False, foreign_keys=[device_id])
|
|
||||||
connected_devices = relationship("Device", foreign_keys=[Device.hub_id], back_populates="hub")
|
|
||||||
|
|
||||||
def get_schema(self) -> "HubRead":
|
|
||||||
return self.get_schema_from_type("HubRead")
|
|
||||||
|
|
||||||
|
|
||||||
class Model(Base):
|
|
||||||
__tablename__ = 'models'
|
|
||||||
|
|
||||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
||||||
name = Column(String, nullable=False)
|
|
||||||
version = Column(String, nullable=False)
|
|
||||||
|
|
||||||
|
|
||||||
class Role(Base):
|
|
||||||
__tablename__ = 'roles'
|
|
||||||
|
|
||||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
||||||
name = Column(String, nullable=False)
|
|
||||||
permissions = Column(JSON, nullable=False) # todo продумать структуру
|
|
||||||
|
|
||||||
active_device = relationship("ActiveDevice", uselist=False, back_populates="role")
|
|
||||||
|
|
||||||
|
|
||||||
class ActiveDevice(Base):
|
|
||||||
__tablename__ = 'active_devices'
|
|
||||||
|
|
||||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
||||||
user_id = Column(ForeignKey('users.id'))
|
|
||||||
device_id = Column(ForeignKey('devices.id'))
|
|
||||||
role_id = Column(ForeignKey('roles.id'), nullable=False)
|
|
||||||
last_connection = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
|
|
||||||
|
|
||||||
device = relationship(Device, uselist=False, back_populates="active_devices")
|
|
||||||
role = relationship(Role, uselist=False, back_populates="active_device")
|
|
||||||
user = relationship("User", uselist=False, back_populates="active_devices")
|
|
||||||
|
|
||||||
__table_args__ = (
|
|
||||||
Index('fk_index', "user_id", "device_id", unique=True),
|
|
||||||
)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def create_from(schema: ActiveDeviceCreate, user_id: uuid.UUID | None = None) -> "ActiveDevice":
|
|
||||||
md = schema.model_dump(exclude={'role'})
|
|
||||||
if user_id is not None:
|
|
||||||
md['user_id'] = user_id
|
|
||||||
return ActiveDevice(role=Role(**schema.role.model_dump()), **md)
|
|
@ -1,154 +0,0 @@
|
|||||||
from typing import List, Any, Optional
|
|
||||||
|
|
||||||
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, HTTPException
|
|
||||||
from fastapi.encoders import jsonable_encoder
|
|
||||||
from starlette.responses import HTMLResponse
|
|
||||||
|
|
||||||
from jwt_config import denylist, jwt_websocket_required_auth
|
|
||||||
from . import Device
|
|
||||||
from .dependencies import get_active_devices_in_schema, get_tokens, get_tokens_from_refresh, device_activate
|
|
||||||
from .schemas import DeviceRead, DeviceWebsocketAuth, TokensRead
|
|
||||||
|
|
||||||
|
|
||||||
current_user = lambda: None
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["router", "manager", "denylist", "websocket_router"]
|
|
||||||
|
|
||||||
|
|
||||||
router = APIRouter(
|
|
||||||
prefix="/device",
|
|
||||||
tags=["Device"]
|
|
||||||
)
|
|
||||||
websocket_router = APIRouter()
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/active_devices")
|
|
||||||
async def get_all_active_devices_for_user(active_devices=Depends(get_active_devices_in_schema)
|
|
||||||
) -> List[DeviceRead | None]:
|
|
||||||
"""Получает все доступные **активированные** устройства всех видов, связанные с текущем пользователем.
|
|
||||||
Внутри сущностей храниться **active_device**, в котором обозначена взаимодействие устройства с конкретным пользователем"""
|
|
||||||
return active_devices
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/token_from_user")
|
|
||||||
async def get_tokens_for_device_from_user(
|
|
||||||
tokens: TokensRead = Depends(get_tokens),
|
|
||||||
_=Depends(current_user)) -> TokensRead:
|
|
||||||
"""Генерирует токены по переданому **device_id**, валидация devie_id не происходит,
|
|
||||||
просто токен с невалидным id нельзя будет нигде применить"""
|
|
||||||
return tokens
|
|
||||||
|
|
||||||
|
|
||||||
@router.patch("/activate", responses={400: {"description": "Устройство уже активировано"}})
|
|
||||||
async def activate_added_device(device: Device = Depends(device_activate)) -> DeviceRead:
|
|
||||||
return device.__dict__
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/refresh_token", responses={
|
|
||||||
401: {"description": "Некорректно переданные токены"},
|
|
||||||
403: {"description": "Некорректно переданные токены"}
|
|
||||||
})
|
|
||||||
async def refresh_tokens(
|
|
||||||
tokens: TokensRead = Depends(get_tokens_from_refresh)) -> TokensRead:
|
|
||||||
return tokens
|
|
||||||
|
|
||||||
|
|
||||||
class ConnectionManager:
|
|
||||||
def __init__(self):
|
|
||||||
self.active_connections: dict[DeviceWebsocketAuth, WebSocket] = {}
|
|
||||||
|
|
||||||
async def connect(self, websocket: WebSocket,
|
|
||||||
token: str,
|
|
||||||
authorize = None
|
|
||||||
) -> Optional[DeviceWebsocketAuth]:
|
|
||||||
|
|
||||||
decoded_token, device = jwt_websocket_required_auth(token, authorize)
|
|
||||||
|
|
||||||
await websocket.accept()
|
|
||||||
old_websocket = self.active_connections.get(device) # теоретически на одно устройство одно вебсокет подключение
|
|
||||||
if old_websocket is not None:
|
|
||||||
await old_websocket.close()
|
|
||||||
self.active_connections[device] = websocket
|
|
||||||
denylist.add(decoded_token['jti'])
|
|
||||||
return device
|
|
||||||
|
|
||||||
def disconnect(self, device: DeviceWebsocketAuth):
|
|
||||||
self.active_connections.pop(device, None)
|
|
||||||
|
|
||||||
async def send_message_to_device(self, device: DeviceWebsocketAuth, message: str | Any) -> WebSocket:
|
|
||||||
device_auth = DeviceWebsocketAuth.model_validate(device, from_attributes=True)
|
|
||||||
websocket = self.active_connections.get(device_auth)
|
|
||||||
if websocket is None:
|
|
||||||
raise HTTPException(status_code=404, detail={
|
|
||||||
"msg": "Устройство, статус которого пытаются изменить, не подключен к серверу",
|
|
||||||
"device": jsonable_encoder(device)
|
|
||||||
})
|
|
||||||
if isinstance(message, str):
|
|
||||||
await websocket.send_text(message)
|
|
||||||
else:
|
|
||||||
await websocket.send_json(message)
|
|
||||||
return websocket
|
|
||||||
|
|
||||||
async def broadcast(self, message: str):
|
|
||||||
for connection in self.active_connections.values():
|
|
||||||
await connection.send_text(message)
|
|
||||||
|
|
||||||
async def change_status_smart_lock(self, smart_lock: 'SmartLockRead'):
|
|
||||||
# При необходимости ограничить или типизировать отправляемый ответ
|
|
||||||
await self.send_message_to_device(smart_lock, smart_lock.model_dump())
|
|
||||||
|
|
||||||
|
|
||||||
manager = ConnectionManager()
|
|
||||||
|
|
||||||
|
|
||||||
@websocket_router.websocket("/")
|
|
||||||
async def websocket_endpoint(websocket: WebSocket, token: str, authorize: None):
|
|
||||||
try:
|
|
||||||
device = await manager.connect(websocket, token, authorize)
|
|
||||||
except Exception:
|
|
||||||
await websocket.close()
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
data = await websocket.receive_json()
|
|
||||||
except WebSocketDisconnect:
|
|
||||||
manager.disconnect(websocket)
|
|
||||||
|
|
||||||
|
|
||||||
# Debug
|
|
||||||
|
|
||||||
html = """
|
|
||||||
<!DOCTYPE html>
|
|
||||||
<html>
|
|
||||||
<head>
|
|
||||||
<title>Authorize</title>
|
|
||||||
</head>
|
|
||||||
<body>
|
|
||||||
<h1>WebSocket Authorize</h1>
|
|
||||||
<p>Token:</p>
|
|
||||||
<textarea id="token" rows="4" cols="50"></textarea><br><br>
|
|
||||||
<button onclick="websocketfun()">Send</button>
|
|
||||||
<ul id='messages'>
|
|
||||||
</ul>
|
|
||||||
<script>
|
|
||||||
const websocketfun = () => {
|
|
||||||
let token = document.getElementById("token").value
|
|
||||||
let ws = new WebSocket(`ws://testapi.danserver.keenetic.name/?token=${token}`)
|
|
||||||
ws.onmessage = (event) => {
|
|
||||||
let messages = document.getElementById('messages')
|
|
||||||
let message = document.createElement('li')
|
|
||||||
let content = document.createTextNode(event.data)
|
|
||||||
message.appendChild(content)
|
|
||||||
messages.appendChild(message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
</script>
|
|
||||||
</body>
|
|
||||||
</html>
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
@websocket_router.get("/debug_form", tags=["Debug"])
|
|
||||||
async def get():
|
|
||||||
return HTMLResponse(html)
|
|
@ -1,68 +0,0 @@
|
|||||||
import binascii
|
|
||||||
import hashlib
|
|
||||||
import uuid
|
|
||||||
from datetime import datetime
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from pydantic import *
|
|
||||||
from pydantic_extra_types.mac_address import MacAddress
|
|
||||||
|
|
||||||
from .common_schemas import *
|
|
||||||
|
|
||||||
__all__ = ["ModelRead", "DeviceCreate", "DeviceRead", "RoleRead", "RoleCreate",
|
|
||||||
"ActiveDeviceCreate", "ActiveDeviceRead", "DeviceWebsocketAuth", "JwtRefreshClaimsForDeviceRequests",
|
|
||||||
"JwtAccessClaimsForDeviceRequests", "TokensRead", "TokenDataServer"]
|
|
||||||
|
|
||||||
|
|
||||||
class DeviceWebsocketAuth(BaseModel, frozen=True):
|
|
||||||
macaddress: MacAddress
|
|
||||||
last_connection_date: datetime
|
|
||||||
model: Optional[ModelRead] = None
|
|
||||||
|
|
||||||
|
|
||||||
class JwtAccessClaimsForDeviceRequests(BaseModel):
|
|
||||||
device_id: uuid.UUID
|
|
||||||
|
|
||||||
|
|
||||||
class JwtRefreshClaimsForDeviceRequests(BaseModel):
|
|
||||||
part_access: str
|
|
||||||
device_id: uuid.UUID
|
|
||||||
|
|
||||||
|
|
||||||
class TokenDataServer(BaseModel):
|
|
||||||
counter: int
|
|
||||||
hash_counter: str
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def hash(counter: int | str):
|
|
||||||
return binascii.hexlify(hashlib.md5(str(counter).encode()).digest())
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def default():
|
|
||||||
return TokenDataServer(
|
|
||||||
counter=0,
|
|
||||||
hash_counter=TokenDataServer.hash(0)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TokensRead(BaseModel):
|
|
||||||
access_token: str
|
|
||||||
refresh_token: str
|
|
||||||
|
|
||||||
|
|
||||||
class DeviceCreate(BaseModel, from_attributes=True):
|
|
||||||
macaddress: MacAddress
|
|
||||||
is_active: bool
|
|
||||||
configuration: Json | dict
|
|
||||||
serial_number: int
|
|
||||||
hub: uuid.UUID | None = None
|
|
||||||
model: Optional[ModelRead] = None
|
|
||||||
|
|
||||||
|
|
||||||
class DeviceRead(DeviceCreate):
|
|
||||||
id: uuid.UUID
|
|
||||||
type_name: str = "Device"
|
|
||||||
hub: uuid.UUID | None = None
|
|
||||||
activated: bool = False
|
|
||||||
active_device: Optional[ActiveDeviceRead] = None
|
|
||||||
last_connection_date: datetime | None = None
|
|
@ -1 +0,0 @@
|
|||||||
{"doorlock": {"ip": "192.168.31.222", "host": "21488", "mac": "", "type": "phone"}, "phone": {"ip": "192.168.31.222", "host": "53970", "mac": "1", "type": "controller"}}
|
|
@ -1,89 +0,0 @@
|
|||||||
import hashlib
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from sqlalchemy import select
|
|
||||||
|
|
||||||
from device import DeviceRead, Device, TokenDataServer
|
|
||||||
from smart_lock.schemas import SmartLockCreate, SmartLockRead
|
|
||||||
from test_database import *
|
|
||||||
from auth.test_auth import *
|
|
||||||
from smart_lock.test_smart_lock import get_json_for_create
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_get_active_devices(auth_client: AsyncClient, session: AsyncSession):
|
|
||||||
# Добавляем устройства всех возможных типов
|
|
||||||
# todo более внятный тест
|
|
||||||
json_smart_lock = get_json_for_create()
|
|
||||||
json_smart_lock_2 = get_json_for_create()
|
|
||||||
json_smart_lock_2["smart_lock"]["hub"]["ip_address"] = "192.168.1.1/32"
|
|
||||||
json_smart_lock_2["smart_lock"]["serial_number"] = 344
|
|
||||||
json_smart_lock_2["smart_lock"]["macaddress"] = "22:11:22:ff:fe:33:44:55"
|
|
||||||
json_smart_lock_2["smart_lock"]["hub"]["serial_number"] = 99
|
|
||||||
|
|
||||||
response = await auth_client.post("/smart_lock/", json=json_smart_lock)
|
|
||||||
response = await auth_client.patch("for_device/activate", params={"token": response.json()["access_token"]})
|
|
||||||
response = await auth_client.post("/smart_lock/", json=json_smart_lock)
|
|
||||||
assert response.json()["activated"] is False
|
|
||||||
response = await auth_client.patch("for_device/activate", params={"token": response.json()["access_token"]})
|
|
||||||
response = await auth_client.post("/smart_lock/", json=json_smart_lock_2)
|
|
||||||
response = await auth_client.patch("for_device/activate", params={"token": response.json()["access_token"]})
|
|
||||||
|
|
||||||
response = await auth_client.get("/device/active_devices")
|
|
||||||
assert response.status_code == 200
|
|
||||||
json = response.json()
|
|
||||||
models: list[DeviceRead] = [DeviceRead.model_validate(i) for i in json]
|
|
||||||
assert len(models) == 2
|
|
||||||
|
|
||||||
# check polymorphism
|
|
||||||
excluded = {'id', 'last_interaction_date', 'active_device', "hub", "last_connection_date", "activated"}
|
|
||||||
json_smart_lock["smart_lock"]["hub"]["id"] = uuid.uuid4()
|
|
||||||
json_smart_lock_2["smart_lock"]["hub"]["id"] = uuid.uuid4()
|
|
||||||
assert (SmartLockRead(**json[0]).model_dump(exclude=excluded) ==
|
|
||||||
SmartLockRead(**json_smart_lock["smart_lock"], id=uuid.uuid4()).model_dump(exclude=excluded))
|
|
||||||
assert (SmartLockRead(**json[1]).model_dump(exclude=excluded) ==
|
|
||||||
SmartLockRead(**json_smart_lock_2["smart_lock"], id=uuid.uuid4()).model_dump(exclude=excluded))
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_get_token_from_user(auth_client: AsyncClient, session: AsyncSession):
|
|
||||||
device = (await session.execute(select(Device))).scalar()
|
|
||||||
response = await auth_client.get("/device/token_from_user", params={"device_id": device.id})
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert list(response.json().keys()) == ["access_token", "refresh_token"]
|
|
||||||
print(response.json())
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_refresh_token(auth_client: AsyncClient, session: AsyncSession):
|
|
||||||
device = (await session.execute(select(Device))).scalar()
|
|
||||||
response = await auth_client.get("/device/token_from_user", params={"device_id": device.id})
|
|
||||||
assert response.status_code == 200
|
|
||||||
json = response.json()
|
|
||||||
json["counter_hash"] = TokenDataServer.hash(0).decode() # Не было обновений, значит ставим 0
|
|
||||||
|
|
||||||
# 1 обновление токена
|
|
||||||
response = await auth_client.get("/for_device/refresh_token", params=json)
|
|
||||||
assert response.status_code == 200
|
|
||||||
json1 = response.json()
|
|
||||||
|
|
||||||
response = await auth_client.get("/for_device/refresh_token", params=json)
|
|
||||||
assert response.status_code in (401, 403) # Token has been revoked
|
|
||||||
|
|
||||||
# 2 обновление токена со старым счетчиком (не должен сработать)
|
|
||||||
json1["counter_hash"] = json["counter_hash"]
|
|
||||||
response = await auth_client.get("/for_device/refresh_token", params=json1)
|
|
||||||
assert response.status_code in (401, 403) # invalid counter hash
|
|
||||||
|
|
||||||
# 2 обновление токена с обновленным счетчиком
|
|
||||||
json1["counter_hash"] = TokenDataServer.hash(1).decode()
|
|
||||||
response = await auth_client.get("/for_device/refresh_token", params=json1)
|
|
||||||
assert response.status_code == 200
|
|
||||||
|
|
||||||
# 3 обновление токена с обновленным счетчиком
|
|
||||||
# hashlib.md5(str(2).encode()).hexdigest()
|
|
||||||
json2 = response.json()
|
|
||||||
json2["counter_hash"] = TokenDataServer.hash(2).decode()
|
|
||||||
response = await auth_client.get("/for_device/refresh_token", params=json2)
|
|
||||||
assert response.status_code == 200
|
|
@ -1,12 +0,0 @@
|
|||||||
from fastapi import Depends
|
|
||||||
|
|
||||||
from .dependencies import jwt_websocket_create_access_token
|
|
||||||
from .router import router
|
|
||||||
from .schemas import DeviceWebsocketAuth
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/ws/token")
|
|
||||||
def get_access_token_for_connection_to_device(access_token: str = Depends(jwt_websocket_create_access_token)):
|
|
||||||
return {
|
|
||||||
"access_token": access_token
|
|
||||||
}
|
|
@ -1,40 +0,0 @@
|
|||||||
import abc
|
|
||||||
import os
|
|
||||||
from typing import Tuple
|
|
||||||
|
|
||||||
from fastapi import Depends, Query
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
from device.schemas import DeviceWebsocketAuth, TokenDataServer
|
|
||||||
|
|
||||||
# Множество заблокированных токенов доступа по вебсокетам. Токены одноразовые, но при дропе системы список должен
|
|
||||||
# очищаться (не хранится в постоянной памяти), чтобы устройства могли повторно подключиться к вебсокетам после
|
|
||||||
# восстановления
|
|
||||||
# todo перенести на redis
|
|
||||||
denylist = set()
|
|
||||||
tokens_data: dict[str, TokenDataServer] = dict()
|
|
||||||
|
|
||||||
|
|
||||||
class Settings(BaseModel):
|
|
||||||
authjwt_secret_key: str = os.environ.get("SECRET_DEVICE_AUTH_TOKEN")
|
|
||||||
authjwt_denylist_enabled: bool = True
|
|
||||||
authjwt_denylist_token_checks: set = {"access", "refresh"}
|
|
||||||
authjwt_refresh_token_expires: bool = False
|
|
||||||
|
|
||||||
|
|
||||||
def check_if_token_in_denylist(decrypted_token):
|
|
||||||
jti = decrypted_token['jti']
|
|
||||||
return jti in denylist
|
|
||||||
|
|
||||||
|
|
||||||
def get_config():
|
|
||||||
return Settings()
|
|
||||||
|
|
||||||
|
|
||||||
def jwt_websocket_required_auth(token: str = Query(...), authorize: None = Depends()) -> Tuple[dict, 'DeviceWebsocketAuth']:
|
|
||||||
authorize.jwt_required("websocket", token=token)
|
|
||||||
|
|
||||||
decoded_token = authorize.get_raw_jwt(token)
|
|
||||||
device = DeviceWebsocketAuth.model_validate(decoded_token)
|
|
||||||
|
|
||||||
return decoded_token, device
|
|
@ -1,67 +0,0 @@
|
|||||||
import datetime
|
|
||||||
import time
|
|
||||||
import json
|
|
||||||
|
|
||||||
import uvicorn
|
|
||||||
from fastapi import FastAPI, Depends, Body
|
|
||||||
from fastapi.staticfiles import StaticFiles
|
|
||||||
from fastapi.encoders import jsonable_encoder
|
|
||||||
from pydantic import BaseModel
|
|
||||||
from sqlalchemy.exc import DBAPIError
|
|
||||||
from starlette.requests import Request
|
|
||||||
from starlette.responses import JSONResponse
|
|
||||||
|
|
||||||
from device.router import router, websocket_router
|
|
||||||
|
|
||||||
app = FastAPI(
|
|
||||||
title="Smart lock",
|
|
||||||
root_path="/devices"
|
|
||||||
)
|
|
||||||
|
|
||||||
@app.get("/docs", include_in_schema=False)
|
|
||||||
async def custom_swagger_ui_html(req: Request):
|
|
||||||
root_path = req.scope.get("root_path", "").rstrip("/")
|
|
||||||
openapi_url = root_path + app.openapi_url
|
|
||||||
return get_swagger_ui_html(
|
|
||||||
openapi_url=openapi_url,
|
|
||||||
title="API",
|
|
||||||
)
|
|
||||||
|
|
||||||
app.include_router(router)
|
|
||||||
app.include_router(websocket_router)
|
|
||||||
|
|
||||||
|
|
||||||
@app.middleware("http")
|
|
||||||
async def add_process_time_header(request: Request, call_next):
|
|
||||||
start_time = time.perf_counter()
|
|
||||||
response = await call_next(request)
|
|
||||||
process_time = time.perf_counter() - start_time
|
|
||||||
print(f"Finished in {process_time:.3f}")
|
|
||||||
response.headers["X-Process-Time"] = str(process_time)
|
|
||||||
return response
|
|
||||||
|
|
||||||
|
|
||||||
@app.exception_handler(DBAPIError)
|
|
||||||
def authjwt_exception_handler(request: Request, exc: DBAPIError):
|
|
||||||
return JSONResponse(
|
|
||||||
status_code=500,
|
|
||||||
content={"detail": jsonable_encoder(exc) | {"args": jsonable_encoder(exc.args)}}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# @app.exception_handler(Exception)
|
|
||||||
# def authjwt_exception_handler(request: Request, exc: Exception):
|
|
||||||
# print(exc.with_traceback(None))
|
|
||||||
# return JSONResponse(
|
|
||||||
# status_code=500,
|
|
||||||
# content={
|
|
||||||
# "name_exception": exc.__class__.__name__,
|
|
||||||
# "args": getattr(exc, "args", []),
|
|
||||||
# "detail": jsonable_encoder(exc)
|
|
||||||
# }
|
|
||||||
# )
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
uvicorn.run(app=app, host="0.0.0.0", port=8001)
|
|
||||||
# uvicorn.run(app="main:app", host="0.0.0.0", port=8000, reload=True)
|
|
@ -1,9 +0,0 @@
|
|||||||
fastapi==0.109.1
|
|
||||||
uvicorn
|
|
||||||
sqlalchemy
|
|
||||||
fastapi-users==12.1.2
|
|
||||||
python-dotenv
|
|
||||||
asyncpg
|
|
||||||
pydantic_extra_types
|
|
||||||
fastapi_jwt_auth
|
|
||||||
fastapi_users_db_sqlalchemy
|
|
@ -1,32 +0,0 @@
|
|||||||
version: '3.8'
|
|
||||||
|
|
||||||
services:
|
|
||||||
user_service:
|
|
||||||
env_file: ".env"
|
|
||||||
build:
|
|
||||||
context: ./user_service
|
|
||||||
dockerfile: Dockerfile
|
|
||||||
environment:
|
|
||||||
- PORT=8000
|
|
||||||
ports:
|
|
||||||
- "8000:8000"
|
|
||||||
|
|
||||||
device_service:
|
|
||||||
env_file: ".env"
|
|
||||||
build:
|
|
||||||
context: ./device_service
|
|
||||||
dockerfile: Dockerfile
|
|
||||||
environment:
|
|
||||||
- PORT=8001
|
|
||||||
ports:
|
|
||||||
- "8001:8001"
|
|
||||||
|
|
||||||
nginx:
|
|
||||||
image: nginx:latest
|
|
||||||
volumes:
|
|
||||||
- ./nginx/nginx.conf:/etc/nginx/conf.d/default.conf
|
|
||||||
ports:
|
|
||||||
- "80:80"
|
|
||||||
depends_on:
|
|
||||||
- user_service
|
|
||||||
- device_service
|
|
@ -1,11 +0,0 @@
|
|||||||
server {
|
|
||||||
listen 80;
|
|
||||||
|
|
||||||
location /users {
|
|
||||||
proxy_pass http://user_service:8000;
|
|
||||||
}
|
|
||||||
|
|
||||||
location /devices/ {
|
|
||||||
proxy_pass http://device_service:8001;
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,48 +0,0 @@
|
|||||||
# Лабораторная работа №3: REST API, шлюз и синхронный обмен данными между микросервисами
|
|
||||||
|
|
||||||
## Задание реализовать два микросервиса связанные единой базой и сущностями один ко многим
|
|
||||||
|
|
||||||
- Первый микросервис user предоставляет методы для регистрации авторизации и выхода пользователей
|
|
||||||
- Второй микросервис реализует сущность устройства, которые связанные с пользователями связью многие ко многим
|
|
||||||
|
|
||||||
### Реализация
|
|
||||||
Оба микросервиса написаны на fastapi и развернуты на вебсервере uvicorn. В самом докере развертывание осущетсвляется через веб сервер nginx,
|
|
||||||
который осуществляет роль прокси, прослушивая 80 http порт и перенправляя запрос на тот или иной микросервис
|
|
||||||
|
|
||||||
Для корректной работы swagger-а необходимо перенести location из nginx.conf в fastapi, чтобы ему был известен корневой путь, для построения документации
|
|
||||||
|
|
||||||
`app = FastAPI(
|
|
||||||
title="Smart lock",
|
|
||||||
root_path="/devices"
|
|
||||||
)`
|
|
||||||
|
|
||||||
`location /devices/ {
|
|
||||||
|
|
||||||
proxy_pass http://device_service:8001;
|
|
||||||
|
|
||||||
}`
|
|
||||||
|
|
||||||
### Развертывание
|
|
||||||
В корневую папку добавляем файл .env, где указываем значение переменных среды
|
|
||||||
|
|
||||||
**Пример файла:**
|
|
||||||
|
|
||||||
`DB_HOST = localhost
|
|
||||||
DB_PORT = 5432
|
|
||||||
DB_NAME =
|
|
||||||
DB_USER = postgres
|
|
||||||
DB_PASS = 1234
|
|
||||||
|
|
||||||
SECRET_AUTH_TOKEN = 1
|
|
||||||
SECRET_VERIFICATE_TOKEN = 1
|
|
||||||
SECRET_DEVICE_AUTH_TOKEN = 1
|
|
||||||
|
|
||||||
DEVICE_START_COUNTER = 1
|
|
||||||
|
|
||||||
STATIC_URL = https://testapi.danserver.keenetic.name/static/`
|
|
||||||
|
|
||||||
Далее указываем:
|
|
||||||
|
|
||||||
docker-compose up --build
|
|
||||||
|
|
||||||
[Видео](https://disk.yandex.ru/d/BGenDWYY6tIGaw)
|
|
@ -1,10 +0,0 @@
|
|||||||
FROM python:3.11
|
|
||||||
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
COPY requirements.txt .
|
|
||||||
RUN pip install --no-cache-dir -r requirements.txt
|
|
||||||
|
|
||||||
COPY . .
|
|
||||||
|
|
||||||
CMD ["uvicorn", "--host", "0.0.0.0", "main:app", "--port", "8000"]
|
|
@ -1,6 +0,0 @@
|
|||||||
from .models import *
|
|
||||||
from .router import *
|
|
||||||
from .schemas import *
|
|
||||||
from .manager import *
|
|
||||||
from .base_config import *
|
|
||||||
from .dependencies import *
|
|
@ -1,34 +0,0 @@
|
|||||||
#
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
from fastapi_users import FastAPIUsers
|
|
||||||
from fastapi_users.authentication import CookieTransport, AuthenticationBackend, BearerTransport
|
|
||||||
from fastapi_users.authentication import JWTStrategy
|
|
||||||
|
|
||||||
from .models import User
|
|
||||||
from .manager import get_user_manager
|
|
||||||
from config import SECRET_AUTH_TOKEN
|
|
||||||
|
|
||||||
__all__ = ["fastapi_users", "current_user", "auth_backend", "current_user_optional"]
|
|
||||||
|
|
||||||
cookie_transport = CookieTransport()
|
|
||||||
bearer_transport = BearerTransport("/token")
|
|
||||||
|
|
||||||
|
|
||||||
def get_jwt_strategy() -> JWTStrategy:
|
|
||||||
return JWTStrategy(secret=SECRET_AUTH_TOKEN, lifetime_seconds=None)
|
|
||||||
|
|
||||||
|
|
||||||
auth_backend = AuthenticationBackend(
|
|
||||||
name="jwt",
|
|
||||||
transport=cookie_transport,
|
|
||||||
get_strategy=get_jwt_strategy,
|
|
||||||
)
|
|
||||||
|
|
||||||
fastapi_users = FastAPIUsers[User, uuid.UUID](
|
|
||||||
get_user_manager,
|
|
||||||
[auth_backend],
|
|
||||||
)
|
|
||||||
|
|
||||||
current_user = fastapi_users.current_user()
|
|
||||||
current_user_optional = fastapi_users.current_user(optional=True)
|
|
@ -1,13 +0,0 @@
|
|||||||
#
|
|
||||||
from fastapi import Depends
|
|
||||||
from fastapi_users_db_sqlalchemy import SQLAlchemyUserDatabase
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
|
||||||
|
|
||||||
from .models import User
|
|
||||||
from database import get_async_session
|
|
||||||
|
|
||||||
__all__ = ["get_user_db"]
|
|
||||||
|
|
||||||
|
|
||||||
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
|
|
||||||
yield SQLAlchemyUserDatabase(session, User)
|
|
@ -1,99 +0,0 @@
|
|||||||
#
|
|
||||||
import uuid
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from fastapi import Depends, Request, Response, HTTPException
|
|
||||||
from fastapi.encoders import jsonable_encoder
|
|
||||||
from fastapi.security import OAuth2PasswordRequestForm
|
|
||||||
from fastapi_users import BaseUserManager, UUIDIDMixin, exceptions, models, schemas
|
|
||||||
from sqlalchemy.exc import IntegrityError
|
|
||||||
from sqlalchemy.sql.functions import user
|
|
||||||
|
|
||||||
from database import get_async_session
|
|
||||||
from .dependencies import get_user_db
|
|
||||||
from .models import *
|
|
||||||
from config import SECRET_VERIFICATE_TOKEN as SECRET
|
|
||||||
|
|
||||||
__all__ = ["UserManager", "get_user_manager"]
|
|
||||||
|
|
||||||
from .schemas import UserUpdate
|
|
||||||
|
|
||||||
|
|
||||||
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
|
|
||||||
reset_password_token_secret = SECRET
|
|
||||||
verification_token_secret = SECRET
|
|
||||||
|
|
||||||
async def on_after_register(self, user: User, request: Optional[Request] = None):
|
|
||||||
print(f"User {user.id} has registered.")
|
|
||||||
|
|
||||||
async def on_after_login(
|
|
||||||
self,
|
|
||||||
user: models.UP,
|
|
||||||
request: Optional[Request] = None,
|
|
||||||
response: Optional[Response] = None,
|
|
||||||
) -> None:
|
|
||||||
if user.fcm_token is not None:
|
|
||||||
await self.update(UserUpdate(fcm_token=user.fcm_token), user, request=request)
|
|
||||||
|
|
||||||
async def create(
|
|
||||||
self,
|
|
||||||
user_create: schemas.UC,
|
|
||||||
safe: bool = False,
|
|
||||||
request: Optional[Request] = None,
|
|
||||||
) -> models.UP:
|
|
||||||
await self.validate_password(user_create.password, user_create)
|
|
||||||
|
|
||||||
existing_user = await self.user_db.get_by_email(user_create.email)
|
|
||||||
if existing_user is not None:
|
|
||||||
raise exceptions.UserAlreadyExists()
|
|
||||||
|
|
||||||
user_dict = (
|
|
||||||
user_create.create_update_dict()
|
|
||||||
if safe
|
|
||||||
else user_create.create_update_dict_superuser()
|
|
||||||
)
|
|
||||||
password = user_dict.pop("password")
|
|
||||||
user_dict["hashed_password"] = self.password_helper.hash(password)
|
|
||||||
|
|
||||||
self._link_or_create_entity(user_dict, "group", Group)
|
|
||||||
|
|
||||||
try:
|
|
||||||
created_user = await self.user_db.create(user_dict)
|
|
||||||
except IntegrityError as e:
|
|
||||||
if 'group' not in e.statement:
|
|
||||||
raise e
|
|
||||||
detail = {
|
|
||||||
"msg": f"Некорректный идентификатор группы {user_dict.get('group_id')=} введенный при регистрации",
|
|
||||||
"inner_exception": jsonable_encoder(e)
|
|
||||||
}
|
|
||||||
raise HTTPException(status_code=403, detail=detail)
|
|
||||||
if hasattr(self.user_db, "session"):
|
|
||||||
await created_user.include_relationship_for_read(self.user_db.session)
|
|
||||||
|
|
||||||
await self.on_after_register(created_user, request)
|
|
||||||
|
|
||||||
return created_user
|
|
||||||
|
|
||||||
async def authenticate(
|
|
||||||
self, credentials: OAuth2PasswordRequestForm
|
|
||||||
) -> Optional[models.UP]:
|
|
||||||
user_auth = await super().authenticate(credentials)
|
|
||||||
if user_auth is None:
|
|
||||||
return None
|
|
||||||
user_auth.fcm_token = credentials.fcm_token
|
|
||||||
return user_auth
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _link_or_create_entity(user_dict: dict, name: str, EntityType):
|
|
||||||
entity = user_dict.get(name)
|
|
||||||
if isinstance(entity, (uuid.UUID, int)) or entity is None:
|
|
||||||
user_dict.pop(name, None)
|
|
||||||
user_dict[name + "_id"] = entity
|
|
||||||
else:
|
|
||||||
user_dict[name] = EntityType(**entity)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def get_user_manager(user_db=Depends(get_user_db)):
|
|
||||||
yield UserManager(user_db)
|
|
@ -1,52 +0,0 @@
|
|||||||
import uuid
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from fastapi_users_db_sqlalchemy import SQLAlchemyBaseUserTable
|
|
||||||
from sqlalchemy import Table, Column, Integer, String, TIMESTAMP, ForeignKey, JSON, Boolean, MetaData, DateTime, UUID, \
|
|
||||||
ARRAY, DECIMAL, DOUBLE_PRECISION
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
|
||||||
from sqlalchemy.orm import relationship
|
|
||||||
|
|
||||||
from database import Base
|
|
||||||
|
|
||||||
__all__ = ["Group", "User"]
|
|
||||||
|
|
||||||
|
|
||||||
class Group(Base):
|
|
||||||
__tablename__ = "groups"
|
|
||||||
|
|
||||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
||||||
name = Column(String, nullable=False)
|
|
||||||
comment = Column(String, nullable=True)
|
|
||||||
|
|
||||||
users = relationship("User", back_populates="group")
|
|
||||||
|
|
||||||
|
|
||||||
class User(SQLAlchemyBaseUserTable[uuid.UUID], Base):
|
|
||||||
__tablename__ = 'users'
|
|
||||||
|
|
||||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
|
||||||
email = Column(String, nullable=False)
|
|
||||||
name = Column(String, nullable=False)
|
|
||||||
surname = Column(String, nullable=False)
|
|
||||||
patronymic = Column(String, nullable=True)
|
|
||||||
creation_on = Column(TIMESTAMP, default=datetime.utcnow)
|
|
||||||
last_entry = Column(TIMESTAMP, nullable=True)
|
|
||||||
group_id = Column(ForeignKey(Group.id), nullable=True)
|
|
||||||
hashed_password = Column(String(length=1024), nullable=False)
|
|
||||||
is_active = Column(Boolean, default=True, nullable=False)
|
|
||||||
is_superuser = Column(Boolean, default=False, nullable=False)
|
|
||||||
is_verified = Column(Boolean, default=False, nullable=False)
|
|
||||||
token = Column(String, nullable=True)
|
|
||||||
fcm_token = Column(String, nullable=True) # Идентификатор устройства для уведомлений
|
|
||||||
hashes_rfids = Column(ARRAY(String), default=[])
|
|
||||||
face_embedding = Column(ARRAY(DOUBLE_PRECISION), nullable=True)
|
|
||||||
|
|
||||||
devices = relationship("Device", secondary="active_devices", back_populates="users", viewonly=True)
|
|
||||||
active_devices = relationship("ActiveDevice")
|
|
||||||
group = relationship(Group, uselist=False, back_populates="users")
|
|
||||||
notifications = relationship("Notification", back_populates="user")
|
|
||||||
|
|
||||||
async def include_relationship_for_read(self, session: AsyncSession):
|
|
||||||
await session.refresh(self, ["group"])
|
|
||||||
return self
|
|
@ -1,25 +0,0 @@
|
|||||||
from fastapi import APIRouter, Depends
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession
|
|
||||||
|
|
||||||
from database import get_async_session
|
|
||||||
from . import User
|
|
||||||
from .base_config import current_user
|
|
||||||
from .schemas import UserRead
|
|
||||||
|
|
||||||
__all__ = ["user_router"]
|
|
||||||
|
|
||||||
user_router = APIRouter(prefix='/auth', tags=['Auth'])
|
|
||||||
|
|
||||||
|
|
||||||
@user_router.get("/current_user")
|
|
||||||
async def get_current_user(user=Depends(current_user), session=Depends(get_async_session)) -> UserRead:
|
|
||||||
return await user.include_relationship_for_read(session)
|
|
||||||
|
|
||||||
|
|
||||||
@user_router.patch("/fcm_token/{fcm_token}")
|
|
||||||
async def refresh_fcm_token(fcm_token: str | None = None,
|
|
||||||
user=Depends(current_user), session=Depends(get_async_session)) -> UserRead:
|
|
||||||
if fcm_token is not None:
|
|
||||||
user.fcm_token = fcm_token
|
|
||||||
await session.commit()
|
|
||||||
return user
|
|
@ -1,39 +0,0 @@
|
|||||||
#
|
|
||||||
import datetime
|
|
||||||
import uuid
|
|
||||||
from typing import Optional, Union
|
|
||||||
|
|
||||||
from sqlalchemy import TIMESTAMP
|
|
||||||
from fastapi_users import schemas
|
|
||||||
from pydantic import EmailStr, BaseModel, Json
|
|
||||||
|
|
||||||
__all__ = ["GroupRead", "GroupCreate", "UserRead", "UserCreate", "UserUpdate"]
|
|
||||||
|
|
||||||
|
|
||||||
class GroupCreate(BaseModel):
|
|
||||||
name: str
|
|
||||||
comment: Optional[str]
|
|
||||||
|
|
||||||
|
|
||||||
class GroupRead(GroupCreate, from_attributes=True):
|
|
||||||
id: uuid.UUID
|
|
||||||
|
|
||||||
|
|
||||||
class UserRead(schemas.BaseUser[uuid.UUID], from_attributes=True):
|
|
||||||
name: str
|
|
||||||
surname: str
|
|
||||||
patronymic: Optional[str]
|
|
||||||
creation_on: datetime.datetime
|
|
||||||
fcm_token: Optional[str] = None
|
|
||||||
group: Optional[GroupRead] = None
|
|
||||||
|
|
||||||
|
|
||||||
class UserCreate(schemas.BaseUserCreate):
|
|
||||||
name: str
|
|
||||||
surname: str
|
|
||||||
patronymic: Optional[str] = None
|
|
||||||
group: Union[None, uuid.UUID, GroupCreate] = None
|
|
||||||
|
|
||||||
|
|
||||||
class UserUpdate(schemas.BaseUserUpdate):
|
|
||||||
fcm_token: Optional[str] = None
|
|
@ -1,98 +0,0 @@
|
|||||||
import httpx
|
|
||||||
import pytest
|
|
||||||
from fastapi import Depends
|
|
||||||
|
|
||||||
from test_database import *
|
|
||||||
from . import User
|
|
||||||
from .schemas import UserCreate, GroupCreate, UserRead
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["client", "auth_client", "get_generate_user", "session"]
|
|
||||||
|
|
||||||
|
|
||||||
def _add_password(user: User, password: str):
|
|
||||||
(user if isinstance(user, dict) else user.__dict__)["password"] = password
|
|
||||||
return user
|
|
||||||
|
|
||||||
|
|
||||||
def get_generate_user(unique=False):
|
|
||||||
if not unique:
|
|
||||||
return UserCreate(
|
|
||||||
name="user",
|
|
||||||
surname="test",
|
|
||||||
group=GroupCreate(name="test", comment="test_comment"),
|
|
||||||
email="user@localhost.ru",
|
|
||||||
password="<PASSWORD>"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest_asyncio.fixture(scope="session")
|
|
||||||
async def auth_client(app) -> AsyncClient:
|
|
||||||
async with AsyncClient(app=app, base_url="http://127.0.0.1:9000") as client:
|
|
||||||
user = get_generate_user()
|
|
||||||
await client.post("/auth/register", json=user.model_dump())
|
|
||||||
response = await client.post("/auth/login", data={
|
|
||||||
'username': user.email,
|
|
||||||
'password': user.password
|
|
||||||
})
|
|
||||||
cookie = list(dict(response.cookies).items())[-1]
|
|
||||||
client.headers["Cookie"] = "=".join(cookie)
|
|
||||||
yield client
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_register_user(client, session) -> UserCreate:
|
|
||||||
user = get_generate_user()
|
|
||||||
|
|
||||||
response = await client.post("/auth/register", json=user.model_dump())
|
|
||||||
assert response.status_code == 201, response.json()
|
|
||||||
|
|
||||||
result = response.json()
|
|
||||||
db_user = await (await session.get(User, result["id"])).include_relationship_for_read(session)
|
|
||||||
|
|
||||||
assert db_user is not None
|
|
||||||
assert result.pop("id", None) is not None, response.json()
|
|
||||||
|
|
||||||
compare_model = UserCreate.model_validate(_add_password(result, user.password)).model_dump()
|
|
||||||
db_compare_model = UserCreate.model_validate(_add_password(db_user, user.password), from_attributes=True).model_dump()
|
|
||||||
assert compare_model == db_compare_model
|
|
||||||
assert compare_model == user.model_dump()
|
|
||||||
assert user.model_dump() == db_compare_model
|
|
||||||
response = await client.post("/auth/register", json=user.model_dump())
|
|
||||||
assert response.status_code == 400, response.json()
|
|
||||||
return user
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_login_user(client) -> AsyncClient:
|
|
||||||
user = get_generate_user()
|
|
||||||
await client.post("/auth/register", json=user.model_dump())
|
|
||||||
response = await client.post("/auth/login", data={
|
|
||||||
'username': user.email,
|
|
||||||
'password': user.password + "0"
|
|
||||||
})
|
|
||||||
assert response.status_code == 400, response.json()
|
|
||||||
response = await client.post("/auth/login", data={
|
|
||||||
'username': user.email,
|
|
||||||
'password': user.password
|
|
||||||
})
|
|
||||||
assert response.status_code == 200 or response.status_code == 204, response.json()
|
|
||||||
return user
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_authorization_and_logout(auth_client):
|
|
||||||
client: httpx.AsyncClient = auth_client
|
|
||||||
response = await client.get("auth/current_user")
|
|
||||||
assert response.status_code == 200
|
|
||||||
result_user = UserRead(**response.json())
|
|
||||||
sourse_user = get_generate_user()
|
|
||||||
assert result_user.email == sourse_user.email
|
|
||||||
assert result_user.name == sourse_user.name
|
|
||||||
response = await client.post("/auth/logout")
|
|
||||||
assert response.status_code == 200 or response.status_code == 204
|
|
||||||
assert not response.cookies
|
|
||||||
auth_cookies = client.headers.pop("Cookie")
|
|
||||||
response = await client.get("/auth/current_user")
|
|
||||||
assert response.status_code == 401
|
|
||||||
client.headers["Cookie"] = auth_cookies
|
|
@ -1,21 +0,0 @@
|
|||||||
import os
|
|
||||||
import logging
|
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
|
|
||||||
load_dotenv()
|
|
||||||
|
|
||||||
DB_HOST = os.environ.get("DB_HOST")
|
|
||||||
DB_PORT = os.environ.get("DB_PORT")
|
|
||||||
DB_NAME = os.environ.get("DB_NAME")
|
|
||||||
DB_USER = os.environ.get("DB_USER")
|
|
||||||
DB_PASS = os.environ.get("DB_PASS")
|
|
||||||
|
|
||||||
|
|
||||||
SECRET_AUTH_TOKEN = os.environ.get("SECRET_AUTH_TOKEN")
|
|
||||||
SECRET_VERIFICATE_TOKEN = os.environ.get("SECRET_VERIFICATE_TOKEN")
|
|
||||||
|
|
||||||
DEVICE_START_COUNTER = os.environ.get("DEVICE_START_COUNTER")
|
|
||||||
|
|
||||||
logging.getLogger('passlib').setLevel(logging.ERROR)
|
|
||||||
|
|
@ -1,49 +0,0 @@
|
|||||||
import time
|
|
||||||
from sys import gettrace
|
|
||||||
from typing import AsyncGenerator
|
|
||||||
|
|
||||||
from fastapi import HTTPException
|
|
||||||
from fastapi.encoders import jsonable_encoder
|
|
||||||
from sqlalchemy import MetaData
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
|
|
||||||
from sqlalchemy.ext.declarative import declarative_base
|
|
||||||
from sqlalchemy.orm import sessionmaker
|
|
||||||
|
|
||||||
from config import DB_HOST, DB_NAME, DB_PASS, DB_PORT, DB_USER
|
|
||||||
|
|
||||||
from pydantic import BaseModel
|
|
||||||
from sqlalchemy import Table, Column, Integer, String, TIMESTAMP, ForeignKey, Boolean, MetaData, PrimaryKeyConstraint
|
|
||||||
from sqlalchemy.orm import DeclarativeBase
|
|
||||||
|
|
||||||
|
|
||||||
class Base(DeclarativeBase):
|
|
||||||
def fill_from_model(self, model: BaseModel | dict):
|
|
||||||
"""Заполняет все поля из словаря или модели, помечая объект 'грязным' для корректного обновления"""
|
|
||||||
if isinstance(model, BaseModel):
|
|
||||||
items = model.model_dump().items()
|
|
||||||
else:
|
|
||||||
items = model.items()
|
|
||||||
|
|
||||||
for key, value in items:
|
|
||||||
setattr(self, key, value)
|
|
||||||
|
|
||||||
DATABASE_URL = f"postgresql+asyncpg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
|
|
||||||
|
|
||||||
|
|
||||||
engine = create_async_engine(DATABASE_URL, echo=gettrace() is not None)
|
|
||||||
async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
|
||||||
|
|
||||||
|
|
||||||
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
|
|
||||||
async with async_session_maker() as session:
|
|
||||||
yield session
|
|
||||||
|
|
||||||
|
|
||||||
async def begin_transactions() -> AsyncGenerator[AsyncSession, None]:
|
|
||||||
async with async_session_maker().begin() as transaction:
|
|
||||||
try:
|
|
||||||
yield transaction.session
|
|
||||||
except Exception as e:
|
|
||||||
await transaction.rollback()
|
|
||||||
raise e
|
|
||||||
await transaction.commit()
|
|
@ -1,81 +0,0 @@
|
|||||||
import datetime
|
|
||||||
import time
|
|
||||||
import json
|
|
||||||
|
|
||||||
# import firebase_admin
|
|
||||||
import uvicorn
|
|
||||||
from fastapi import FastAPI, Depends, Body
|
|
||||||
from fastapi.staticfiles import StaticFiles
|
|
||||||
from fastapi.encoders import jsonable_encoder
|
|
||||||
from pydantic import BaseModel
|
|
||||||
from sqlalchemy.exc import DBAPIError
|
|
||||||
from starlette.requests import Request
|
|
||||||
from starlette.responses import JSONResponse
|
|
||||||
|
|
||||||
from auth import *
|
|
||||||
|
|
||||||
app = FastAPI(
|
|
||||||
title="Smart lock",
|
|
||||||
root_path="/users"
|
|
||||||
)
|
|
||||||
|
|
||||||
@app.get("/docs", include_in_schema=False)
|
|
||||||
async def custom_swagger_ui_html(req: Request):
|
|
||||||
root_path = req.scope.get("root_path", "").rstrip("/")
|
|
||||||
openapi_url = root_path + app.openapi_url
|
|
||||||
return get_swagger_ui_html(
|
|
||||||
openapi_url=openapi_url,
|
|
||||||
title="API",
|
|
||||||
)
|
|
||||||
|
|
||||||
app.include_router(
|
|
||||||
fastapi_users.get_auth_router(auth_backend),
|
|
||||||
prefix="/auth",
|
|
||||||
tags=["Auth"],
|
|
||||||
)
|
|
||||||
|
|
||||||
app.include_router(
|
|
||||||
fastapi_users.get_register_router(UserRead, UserCreate),
|
|
||||||
prefix="/auth",
|
|
||||||
tags=["Auth"],
|
|
||||||
)
|
|
||||||
|
|
||||||
# firebase
|
|
||||||
# cred = credentials.Certificate("../serviceAccountKey.json")
|
|
||||||
# try:
|
|
||||||
# fire_app = firebase_admin.initialize_app(cred)
|
|
||||||
# except ValueError:
|
|
||||||
# pass
|
|
||||||
|
|
||||||
|
|
||||||
class Message(BaseModel):
|
|
||||||
msg: str
|
|
||||||
|
|
||||||
|
|
||||||
@app.post("/msg")
|
|
||||||
def debug_message_from_user(msg: Message = Body(), user: User = Depends(current_user_optional)):
|
|
||||||
print(datetime.datetime.now(), end=' ')
|
|
||||||
if user is not None:
|
|
||||||
print(user.__dict__)
|
|
||||||
print(msg.msg)
|
|
||||||
|
|
||||||
|
|
||||||
@app.middleware("http")
|
|
||||||
async def add_process_time_header(request: Request, call_next):
|
|
||||||
start_time = time.perf_counter()
|
|
||||||
response = await call_next(request)
|
|
||||||
process_time = time.perf_counter() - start_time
|
|
||||||
print(f"Finished in {process_time:.3f}")
|
|
||||||
response.headers["X-Process-Time"] = str(process_time)
|
|
||||||
return response
|
|
||||||
|
|
||||||
|
|
||||||
@app.exception_handler(DBAPIError)
|
|
||||||
def authjwt_exception_handler(request: Request, exc: DBAPIError):
|
|
||||||
return JSONResponse(
|
|
||||||
status_code=500,
|
|
||||||
content={"detail": jsonable_encoder(exc) | {"args": jsonable_encoder(exc.args)}}
|
|
||||||
)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
uvicorn.run(app=app, host="0.0.0.0", port=8000)
|
|
@ -1,7 +0,0 @@
|
|||||||
fastapi
|
|
||||||
uvicorn
|
|
||||||
fastapi-users
|
|
||||||
sqlalchemy
|
|
||||||
fastapi_users_db_sqlalchemy
|
|
||||||
python-dotenv
|
|
||||||
asyncpg
|
|
@ -1,30 +0,0 @@
|
|||||||
import pika
|
|
||||||
import time
|
|
||||||
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f'Consumer 1 получил сообщение: {body.decode()}')
|
|
||||||
|
|
||||||
# Время задержки по условию
|
|
||||||
time.sleep(2)
|
|
||||||
|
|
||||||
print('Consumer 1 закончил обработку')
|
|
||||||
|
|
||||||
|
|
||||||
def consume_events_1():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
# Создание очереди
|
|
||||||
channel.queue_declare(queue='consumer1_queue')
|
|
||||||
# Привязка очереди
|
|
||||||
channel.queue_bind(exchange='beauty_salon_events', queue='consumer1_queue')
|
|
||||||
|
|
||||||
channel.basic_consume(queue='consumer1_queue', on_message_callback=callback, auto_ack=True)
|
|
||||||
|
|
||||||
print('Consumer 1 начал ожидать сообщения...')
|
|
||||||
channel.start_consuming()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
consume_events_1()
|
|
@ -1,28 +0,0 @@
|
|||||||
import pika
|
|
||||||
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f'Consumer 2 получил сообщение: {body.decode()}')
|
|
||||||
|
|
||||||
# Обработка "нон-стопом"
|
|
||||||
print('Consumer 2 закончил обработку')
|
|
||||||
|
|
||||||
|
|
||||||
def consume_events_2():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
# Создание очереди
|
|
||||||
channel.queue_declare(queue='consumer2_queue')
|
|
||||||
|
|
||||||
# Привязка очереди
|
|
||||||
channel.queue_bind(exchange='beauty_salon_events', queue='consumer2_queue')
|
|
||||||
|
|
||||||
channel.basic_consume(queue='consumer2_queue', on_message_callback=callback, auto_ack=True)
|
|
||||||
|
|
||||||
print('Consumer 2 начал ожидать сообщения...')
|
|
||||||
channel.start_consuming()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
consume_events_2()
|
|
@ -1,51 +0,0 @@
|
|||||||
# Лабораторная работа №4 - Работа с брокером сообщений
|
|
||||||
|
|
||||||
+ Установить брокер сообщений RabbitMQ.
|
|
||||||
+ Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
|
|
||||||
+ Продемонстрировать работу брокера сообщений.
|
|
||||||
|
|
||||||
## Описание работы
|
|
||||||
|
|
||||||
**Publisher** - осуществляет отправку сообщений своим клиентам.
|
|
||||||
|
|
||||||
**Consumer1** - принимает и обрабатывает сообщения с задержкой в 2-3 секунды.
|
|
||||||
|
|
||||||
**Consumer2** - моментально принимает и обрабатывает сообщения.
|
|
||||||
|
|
||||||
### Tutorials
|
|
||||||
|
|
||||||
1. tutorial_1
|
|
||||||
|
|
||||||
![tutorial_1.png](Screenshots/tutorial_1.png)
|
|
||||||
|
|
||||||
2. tutorial_2
|
|
||||||
|
|
||||||
![tutorial_2.png](Screenshots/tutorial_2.png)
|
|
||||||
|
|
||||||
3. tutorial_3
|
|
||||||
|
|
||||||
![tutorial_3.png](Screenshots/tutorial_3.png)
|
|
||||||
|
|
||||||
## Работа с RabbitMQ
|
|
||||||
|
|
||||||
![rabbitMQ.png](Screenshots/rabbitMQ.png)
|
|
||||||
|
|
||||||
## Показания очереди queue_1 при одном запущенном экземпляре Consumer_1
|
|
||||||
|
|
||||||
![queue_1_1.png](Screenshots/queue_1 _1.png)
|
|
||||||
|
|
||||||
## Показания очереди queue_2
|
|
||||||
|
|
||||||
![queue_2_1.png](Screenshots/queue_2_1.png)
|
|
||||||
|
|
||||||
## Показания очереди queue_1 при двух запущенных экземплярах Consumer_1
|
|
||||||
![queue_1_2.png](Screenshots/queue_1 _2.png)
|
|
||||||
|
|
||||||
## Показания очереди queue_1 при трех запущенных экземплярах Consumer_1
|
|
||||||
|
|
||||||
![queue_1_3.png](Screenshots/queue_1 _3.png)
|
|
||||||
|
|
||||||
## Видеозапись работы программы
|
|
||||||
|
|
||||||
https://disk.yandex.ru/d/TAdJwo36RrN4ag
|
|
||||||
|
|
Before Width: | Height: | Size: 36 KiB |
Before Width: | Height: | Size: 8.1 KiB |
Before Width: | Height: | Size: 9.2 KiB |
Before Width: | Height: | Size: 33 KiB |
Before Width: | Height: | Size: 15 KiB |
Before Width: | Height: | Size: 43 KiB |
Before Width: | Height: | Size: 46 KiB |
Before Width: | Height: | Size: 24 KiB |
@ -1,28 +0,0 @@
|
|||||||
import pika
|
|
||||||
import time
|
|
||||||
|
|
||||||
|
|
||||||
def publish_events():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
# Создание exchange типа fanout
|
|
||||||
channel.exchange_declare(exchange='beauty_salon_events', exchange_type='fanout')
|
|
||||||
|
|
||||||
events = [
|
|
||||||
"Test1",
|
|
||||||
"Test2",
|
|
||||||
"Test3",
|
|
||||||
"Test4",
|
|
||||||
"Test5"
|
|
||||||
]
|
|
||||||
|
|
||||||
while True:
|
|
||||||
event = events[int(time.time()) % len(events)]
|
|
||||||
channel.basic_publish(exchange='beauty_salon_events', routing_key='', body=event)
|
|
||||||
print(f'Отправлено: {event}')
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
publish_events()
|
|
@ -1,25 +0,0 @@
|
|||||||
import pika, sys, os
|
|
||||||
|
|
||||||
def main():
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='hello')
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [x] Received {body}")
|
|
||||||
|
|
||||||
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
|
||||||
|
|
||||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
|
||||||
channel.start_consuming()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
try:
|
|
||||||
main()
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
print('Interrupted')
|
|
||||||
try:
|
|
||||||
sys.exit(0)
|
|
||||||
except SystemExit:
|
|
||||||
os._exit(0)
|
|
@ -1,11 +0,0 @@
|
|||||||
import pika
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='hello')
|
|
||||||
|
|
||||||
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
|
|
||||||
print(" [x] Sent 'Hello World!'")
|
|
||||||
connection.close()
|
|
@ -1,19 +0,0 @@
|
|||||||
import pika
|
|
||||||
import sys
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='task_queue', durable=True)
|
|
||||||
|
|
||||||
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
|
||||||
channel.basic_publish(
|
|
||||||
exchange='',
|
|
||||||
routing_key='task_queue',
|
|
||||||
body=message,
|
|
||||||
properties=pika.BasicProperties(
|
|
||||||
delivery_mode=pika.DeliveryMode.Persistent
|
|
||||||
))
|
|
||||||
print(f" [x] Sent {message}")
|
|
||||||
connection.close()
|
|
@ -1,23 +0,0 @@
|
|||||||
#!/usr/bin/env python
|
|
||||||
import pika
|
|
||||||
import time
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.queue_declare(queue='task_queue', durable=True)
|
|
||||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
|
||||||
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [x] Received {body.decode()}")
|
|
||||||
time.sleep(body.count(b'.'))
|
|
||||||
print(" [x] Done")
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
||||||
|
|
||||||
|
|
||||||
channel.basic_qos(prefetch_count=1)
|
|
||||||
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
|
||||||
|
|
||||||
channel.start_consuming()
|
|
@ -1,13 +0,0 @@
|
|||||||
import pika
|
|
||||||
import sys
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
|
||||||
|
|
||||||
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
|
||||||
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
|
||||||
print(f" [x] Sent {message}")
|
|
||||||
connection.close()
|
|
@ -1,22 +0,0 @@
|
|||||||
import pika
|
|
||||||
|
|
||||||
connection = pika.BlockingConnection(
|
|
||||||
pika.ConnectionParameters(host='localhost'))
|
|
||||||
channel = connection.channel()
|
|
||||||
|
|
||||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
|
||||||
|
|
||||||
result = channel.queue_declare(queue='', exclusive=True)
|
|
||||||
queue_name = result.method.queue
|
|
||||||
|
|
||||||
channel.queue_bind(exchange='logs', queue=queue_name)
|
|
||||||
|
|
||||||
print(' [*] Waiting for logs. To exit press CTRL+C')
|
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
|
||||||
print(f" [x] {body}")
|
|
||||||
|
|
||||||
channel.basic_consume(
|
|
||||||
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
|
||||||
|
|
||||||
channel.start_consuming()
|
|
Before Width: | Height: | Size: 114 KiB |
Before Width: | Height: | Size: 187 KiB |
@ -1,68 +0,0 @@
|
|||||||
import multiprocessing
|
|
||||||
import random
|
|
||||||
import time
|
|
||||||
from pprint import pprint
|
|
||||||
from multiprocessing import Pool
|
|
||||||
|
|
||||||
|
|
||||||
def create_random_matrix(size):
|
|
||||||
return [[random.random() for _ in range(size)] for __ in range(size)]
|
|
||||||
|
|
||||||
# def matrix_multiply_seq(matrix1: list[list[float]], matrix2: list[list[float]]) -> list[list[float]]:
|
|
||||||
# """Выполняет последовательное перемножение двух матриц и возвращает результирующую матрицу"""
|
|
||||||
# l1 = len(matrix1)
|
|
||||||
# l2 = len(matrix2)
|
|
||||||
# result = [[0 for _ in range(l2)] for __ in range(l1)]
|
|
||||||
# for i in range(l1):
|
|
||||||
# for j in range(l2):
|
|
||||||
# for k in range(l2):
|
|
||||||
# result[i][j] += matrix1[i][k] * matrix2[k][j]
|
|
||||||
# return result
|
|
||||||
|
|
||||||
|
|
||||||
matrix_result = [[0 for _ in range(1000)] for __ in range(1000)]
|
|
||||||
|
|
||||||
def matrix_multiply(args) -> None:
|
|
||||||
"""Перемножает строки от start_cnt до end_cnt, результат помещает в глобальную переменную matrix_result"""
|
|
||||||
matrix1, matrix2, start_cnt, end_cnt = args
|
|
||||||
for i in range(start_cnt, end_cnt):
|
|
||||||
for j in range(len(matrix2)):
|
|
||||||
for k in range(len(matrix2)):
|
|
||||||
matrix_result[i][j] += matrix1[i - start_cnt][k] * matrix2[k][j]
|
|
||||||
|
|
||||||
|
|
||||||
def matrix_multiply_parralel(matrix1: list[list[float]], matrix2: list[list[float]], thread_count):
|
|
||||||
"""Выполняет парралеьное перемножение матриц"""
|
|
||||||
l1 = len(matrix1)
|
|
||||||
|
|
||||||
step = l1 // thread_count
|
|
||||||
args = [(matrix1, matrix2, i, i + step) for i in range(0, l1, step)]
|
|
||||||
args[-1] = (matrix1, matrix2, step * (l1 - 1), l1) # Остаток на последний поток
|
|
||||||
|
|
||||||
with Pool(processes=thread_count) as pool:
|
|
||||||
pool.map(matrix_multiply, args)
|
|
||||||
# pprint(matrix_result, compact=True)
|
|
||||||
|
|
||||||
def main():
|
|
||||||
sizes = [100, 300, 500, 1000]
|
|
||||||
num_threads = [2, 4, 6, 8, 12, 16, 20]
|
|
||||||
print(f"cpu_count: {multiprocessing.cpu_count()}")
|
|
||||||
|
|
||||||
for size in sizes:
|
|
||||||
matrix1 = create_random_matrix(size)
|
|
||||||
matrix2 = create_random_matrix(size)
|
|
||||||
t0 = time.perf_counter()
|
|
||||||
matrix_multiply((matrix1, matrix2, 0, len(matrix1)))
|
|
||||||
# pprint(matrix_result[:size][:size], compact=True)
|
|
||||||
print(f"Время последовательного перемножения матриц {size=:4}: \t\t\t\t{time.perf_counter() - t0:.3f}s")
|
|
||||||
|
|
||||||
for threads in num_threads:
|
|
||||||
start_time = time.perf_counter()
|
|
||||||
matrix_multiply_parralel(matrix1, matrix2, threads)
|
|
||||||
end_time = time.perf_counter()
|
|
||||||
print(f"Время парралельного перемножения матриц {size=:4}, {threads=} : \t{end_time - start_time:.3f}")
|
|
||||||
|
|
||||||
print("-" * 100)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
main()
|
|
@ -1,17 +0,0 @@
|
|||||||
# Аглиуллов Данияр ИСЭбд-41
|
|
||||||
# Лабораторная работа №5
|
|
||||||
|
|
||||||
В ходе выполнения задачи по умножению квадратных матриц с использованием обычного и параллельного алгоритмов были получены следующие результаты и выводы:
|
|
||||||
|
|
||||||
Результаты тестов:
|
|
||||||
![изображение 1](./Screenshots/1.png)
|
|
||||||
![изображение 2](./Screenshots/Снимок%20экрана%202024-11-13%20160640.png)
|
|
||||||
|
|
||||||
Сравнение производительности:
|
|
||||||
|
|
||||||
• В тестах на матрицах размером 100x100, 300x300 и 500x500 было замечено, что параллельный алгоритм демонстрирует значительное сокращение времени выполнения по сравнению с обычным алгоритмом, особенно на больших матрицах. Это подтверждает эффективность использования многопоточности для задач, требующих больших вычислительных ресурсов.
|
|
||||||
|
|
||||||
• На малых размерах матриц (например, 100x100) преимущество было за последовательным умножением матрицы из-за накладных расходов на создание пула потоков. Однако при увеличении размера матриц (300x300 и 500x500) преимущества параллельного подхода становились более очевидными.
|
|
||||||
• При превышении количества физических потоков процессора, производительность понижается за счет смены контекста при переключении виртуальных потоков на одном ядре
|
|
||||||
|
|
||||||
![Видео](https://disk.yandex.ru/d/RK_c82BrLw2KjQ)
|
|
Before Width: | Height: | Size: 227 KiB |
@ -1,86 +0,0 @@
|
|||||||
import numpy as np
|
|
||||||
import time
|
|
||||||
from multiprocessing import Pool
|
|
||||||
|
|
||||||
np.seterr(over='ignore')
|
|
||||||
|
|
||||||
# Функция для вычисления детерминанта методом Гаусса
|
|
||||||
def compute_determinant_gauss(mat):
|
|
||||||
size = mat.shape[0]
|
|
||||||
matrix_copy = mat.astype(float) # Копируем матрицу, чтобы не изменять исходную
|
|
||||||
determinant = 1.0 # Начальное значение детерминанта
|
|
||||||
|
|
||||||
for k in range(size):
|
|
||||||
# Находим максимальный элемент в текущем столбце для уменьшения ошибок округления
|
|
||||||
max_index = np.argmax(np.abs(matrix_copy[k:size, k])) + k
|
|
||||||
if matrix_copy[max_index, k] == 0:
|
|
||||||
return 0 # Если на главной диагонали ноль, детерминант равен нулю
|
|
||||||
# Меняем местами строки
|
|
||||||
if max_index != k:
|
|
||||||
matrix_copy[[k, max_index]] = matrix_copy[[max_index, k]]
|
|
||||||
determinant *= -1 # Каждая перестановка меняет знак детерминанта
|
|
||||||
# Обнуляем элементы ниже главной диагонали
|
|
||||||
for m in range(k + 1, size):
|
|
||||||
multiplier = matrix_copy[m, k] / matrix_copy[k, k]
|
|
||||||
matrix_copy[m, k:] -= multiplier * matrix_copy[k, k:]
|
|
||||||
|
|
||||||
# Произведение элементов на главной диагонали
|
|
||||||
for j in range(size):
|
|
||||||
determinant *= matrix_copy[j, j]
|
|
||||||
return determinant
|
|
||||||
|
|
||||||
# Функция для параллельного вычисления детерминанта
|
|
||||||
def parallel_worker(index_range, mat):
|
|
||||||
size = mat.shape[0]
|
|
||||||
matrix_copy = mat.astype(float)
|
|
||||||
det = 1.0
|
|
||||||
|
|
||||||
for k in range(index_range[0], index_range[1]):
|
|
||||||
max_index = np.argmax(np.abs(matrix_copy[k:size, k])) + k
|
|
||||||
if matrix_copy[max_index, k] == 0:
|
|
||||||
return 0
|
|
||||||
if max_index != k:
|
|
||||||
matrix_copy[[k, max_index]] = matrix_copy[[max_index, k]]
|
|
||||||
det *= -1
|
|
||||||
for m in range(k + 1, size):
|
|
||||||
multiplier = matrix_copy[m, k] / matrix_copy[k, k]
|
|
||||||
matrix_copy[m, k:] -= multiplier * matrix_copy[k, k:]
|
|
||||||
return det
|
|
||||||
|
|
||||||
# Функция для параллельного вычисления детерминанта
|
|
||||||
def compute_parallel_determinant(mat, num_workers):
|
|
||||||
size = mat.shape[0]
|
|
||||||
block_size = size // num_workers
|
|
||||||
ranges = [(i * block_size, (i + 1) * block_size) for i in range(num_workers)]
|
|
||||||
|
|
||||||
with Pool(processes=num_workers) as pool:
|
|
||||||
results = pool.starmap(parallel_worker, [(block, mat) for block in ranges])
|
|
||||||
|
|
||||||
# Объединяем результаты
|
|
||||||
total_determinant = sum(results)
|
|
||||||
return total_determinant
|
|
||||||
|
|
||||||
# Функция для запуска тестов производительности
|
|
||||||
def execute_benchmarks():
|
|
||||||
sizes = [100, 300, 500] # Размеры матриц
|
|
||||||
for size in sizes:
|
|
||||||
random_matrix = np.random.rand(size, size) # Генерация случайной матрицы
|
|
||||||
print(f"--- Тест производительности для матрицы {size}x{size} ---")
|
|
||||||
|
|
||||||
# Последовательное вычисление детерминанта
|
|
||||||
start_time = time.time()
|
|
||||||
sequential_det = compute_determinant_gauss(random_matrix)
|
|
||||||
seq_duration = time.time() - start_time
|
|
||||||
print(f"Время последовательного вычисления для {size}x{size}: {seq_duration:.4f} секунд")
|
|
||||||
|
|
||||||
# Параллельное вычисление с различным количеством процессов
|
|
||||||
for workers in [1, 2, 4, 6, 8, 12, 16]:
|
|
||||||
start_time = time.time()
|
|
||||||
parallel_det = compute_parallel_determinant(random_matrix, workers)
|
|
||||||
par_duration = time.time() - start_time
|
|
||||||
speedup_ratio = seq_duration / par_duration if par_duration > 0 else 0
|
|
||||||
print(f"Параллельное время с {workers} процессами: {par_duration:.4f} секунд, Ускорение: {speedup_ratio:.2f}")
|
|
||||||
|
|
||||||
# Запуск тестов производительности
|
|
||||||
if __name__ == '__main__':
|
|
||||||
execute_benchmarks()
|
|
@ -1,16 +0,0 @@
|
|||||||
# Аглиуллов Данияр ИСЭбд-41
|
|
||||||
# Лабораторная работа №6
|
|
||||||
|
|
||||||
Для повышения производительности при вычислении детерминанта для больших матриц была добавлена возможность параллельной обработки с использованием библиотеки multiprocessing. Это позволило значительно ускорить вычисления за счет распределения нагрузки между несколькими процессами.
|
|
||||||
|
|
||||||
Результаты тестов:
|
|
||||||
![изображение 1](../Screenshots/1.png)
|
|
||||||
|
|
||||||
Сравнение производительности:
|
|
||||||
|
|
||||||
• В тестах на матрицах размером 100x100, 300x300 и 500x500 было замечено, что параллельный алгоритм демонстрирует значительное сокращение времени выполнения по сравнению с обычным алгоритмом, особенно на больших матрицах. Это подтверждает эффективность использования многопоточности для задач, требующих больших вычислительных ресурсов.
|
|
||||||
|
|
||||||
• На малых размерах матриц (например, 100x100) преимущество было за последовательным умножением матрицы из-за накладных расходов на создание пула потоков. Однако при увеличении размера матрицы (300x300 и 500x500) преимущества параллельного подхода становились более очевидными.
|
|
||||||
• При превышении количества физических потоков процессора, производительность понижается за счет смены контекста при переключении виртуальных потоков на одном ядре
|
|
||||||
|
|
||||||
![Видео](https://disk.yandex.ru/d/MEZQvGM8u9OIBw)
|
|
@ -1,24 +0,0 @@
|
|||||||
# Лабораторная работа 7. Балансировка нагрузки в распределённых системах с использованием открытых технологий
|
|
||||||
|
|
||||||
## Задание
|
|
||||||
|
|
||||||
Написать краткое эссе, отвечая на следующие вопросы:
|
|
||||||
|
|
||||||
1. Какие алгоритмы и методы применяются для балансировки нагрузки?
|
|
||||||
|
|
||||||
2. Какие открытые технологии доступны для этой задачи?
|
|
||||||
|
|
||||||
3. Как осуществляется балансировка нагрузки в системах управления базами данных?
|
|
||||||
|
|
||||||
4. Какова роль реверс-прокси в процессе балансировки нагрузки?
|
|
||||||
## Эссе по теме
|
|
||||||
|
|
||||||
Балансировка нагрузки является ключевым аспектом проектирования распределённых систем, позволяя равномерно распределять входящие запросы между несколькими серверами или ресурсами. Это способствует повышению доступности, производительности и устойчивости к сбоям системы.
|
|
||||||
|
|
||||||
Существует множество алгоритмов и методов, применяемых для балансировки нагрузки. К ним относятся Least Connections (минимальное количество соединений), Round Robin (круговая схема), Server Health Check (проверка состояния сервера), Least Response Time (минимальное время ответа) и Random (случайный выбор). Эти подходы помогают эффективно распределять клиентские запросы с учётом текущей загруженности серверов.
|
|
||||||
|
|
||||||
Среди открытых технологий для балансировки нагрузки можно выделить Nginx, HAProxy, Apache HTTP Server и другие. Эти инструменты позволяют настроить прокси-серверы, которые принимают запросы от пользователей и направляют их на один или несколько серверов приложений, обеспечивая равномерное распределение нагрузки.
|
|
||||||
|
|
||||||
Балансировка нагрузки в базах данных важна для поддержания доступности и производительности системы. Для этого используются специализированные решения, такие как ProxySQL, pgpool-II и MySQL Router. Они помогают распределять запросы к базам данных между различными узлами, что позволяет снизить нагрузку на основной сервер.
|
|
||||||
|
|
||||||
Реверс-прокси представляет собой сервер, который принимает запросы от клиентов и перенаправляет их на один или несколько серверов приложений. Он выполняет множество функций, включая балансировку нагрузки, обеспечение безопасности, кэширование, SSL-терминацию, а также мониторинг и ведение логов.
|
|
@ -1,52 +0,0 @@
|
|||||||
# Лабораторная работа 8. Как Вы поняли, что называется распределенной системой и как она устроена?
|
|
||||||
|
|
||||||
## Задание:
|
|
||||||
Написать небольшое эссе (буквально несколько абзацев) своими словами на тему "Устройство распределенных систем". Вопросы:
|
|
||||||
|
|
||||||
1. Зачем сложные системы (например, социальная сеть ВКонтакте) пишутся в "распределенном" стиле, где каждое отдельное приложение (или сервис) функционально выполняет только ограниченный спектр задач?
|
|
||||||
|
|
||||||
2. Для чего были созданы системы оркестрации приложений? Каким образом они упрощают / усложняют разработку и сопровождение распределенных систем?
|
|
||||||
|
|
||||||
3. Для чего нужны очереди обработки сообщений и что может подразумеваться под сообщениями?
|
|
||||||
|
|
||||||
4. Какие преимущества и недостатки распределенных приложений существуют на Ваш взгляд?
|
|
||||||
|
|
||||||
5. Целесообразно ли в сложную распределенную систему внедрять параллельные вычисления? Приведите примеры, когда это действительно нужно, а когда нет.
|
|
||||||
|
|
||||||
|
|
||||||
## Преимущества распределённых приложений
|
|
||||||
|
|
||||||
1. Высокая доступность и отказоустойчивость: Один из основных плюсов распределённых систем заключается в том, что они могут продолжать функционировать даже при сбое отдельных компонентов. Это достигается за счёт дублирования сервисов и автоматического перенаправления запросов на работающие узлы.
|
|
||||||
|
|
||||||
2. Масштабируемость: Распределённые системы легко масштабируются. При увеличении нагрузки можно добавлять новые серверы или ресурсы без значительных изменений в архитектуре.
|
|
||||||
|
|
||||||
3. Обработка больших объёмов данных: Распределённые приложения могут одновременно обрабатывать множество запросов и большие объёмы данных, что делает их идеальными для таких задач, как анализ больших данных и машинное обучение.
|
|
||||||
|
|
||||||
## Недостатки распределённых приложений
|
|
||||||
|
|
||||||
1. Сложность архитектуры: Архитектура распределённых систем может быть значительно сложнее, чем у монолитных приложений. Это затрудняет отладку и мониторинг, так как необходимо учитывать взаимодействие множества компонентов.
|
|
||||||
|
|
||||||
2. Проблемы с согласованностью данных: В распределённых системах может возникнуть несогласованность данных между различными сервисами, особенно если они работают с копиями одних и тех же данных.
|
|
||||||
|
|
||||||
3. Управление безопасностью: Увеличение числа взаимодействий между компонентами системы требует более тщательного управления безопасностью, так как это повышает вероятность уязвимостей и атак.
|
|
||||||
|
|
||||||
## Внедрение параллельных вычислений
|
|
||||||
|
|
||||||
Параллельные вычисления в распределённых системах оправданы, когда необходимо обрабатывать большие объёмы данных или выполнять сложные вычисления. Например:
|
|
||||||
|
|
||||||
• Машинное обучение: Обучение моделей на больших наборах данных может быть значительно ускорено за счёт распараллеливания вычислений.
|
|
||||||
|
|
||||||
• Анализ больших данных: Параллельные вычисления позволяют эффективно обрабатывать данные, разбивая их на части и распределяя по нескольким узлам.
|
|
||||||
|
|
||||||
Однако стоит помнить, что не всегда параллельные вычисления оправданы:
|
|
||||||
|
|
||||||
• Если задача не требует значительных ресурсов или имеет низкую степень параллелизма, внедрение параллельных вычислений может усложнить архитектуру без ощутимой выгоды.
|
|
||||||
|
|
||||||
• Простые операции, такие как CRUD-операции, могут быть более эффективно реализованы в рамках монолитной архитектуры без необходимости распараллеливания.
|
|
||||||
|
|
||||||
|
|
||||||
## Эссе на тему
|
|
||||||
|
|
||||||
Распределенные приложения имеют множество преимуществ. Во-первых, они обеспечивают высокую доступность и отказоустойчивость: если один компонент выходит из строя, остальные продолжают функционировать. Во-вторых, они масштабируемы: можно легко добавлять новые ресурсы по мере необходимости. В-третьих, распределенные системы могут обрабатывать большие объемы данных и запросов одновременно. Однако у них есть и недостатки. Сложность архитектуры может привести к трудностям в отладке и мониторинге. Также возможны проблемы с согласованностью данных между различными сервисами. Наконец, распределенные системы требуют более тщательного управления безопасностью, так как большее количество взаимодействий увеличивает вероятность уязвимостей.
|
|
||||||
|
|
||||||
Внедрение параллельных вычислений в распределенные системы целесообразно в тех случаях, когда необходимо обрабатывать большие объемы данных или выполнять сложные вычисления. Например, в задачах машинного обучения или анализа больших данных параллельные вычисления позволяют значительно сократить время обработки. Однако не всегда параллельные вычисления оправданы. Если задача не требует значительных ресурсов или имеет низкую степень параллелизма, то их внедрение может усложнить архитектуру без ощутимой выгоды. Например, простые CRUD-операции (создание, чтение, обновление и удаление) могут быть более эффективно реализованы без использования параллельных вычислений.
|
|
@ -1,30 +0,0 @@
|
|||||||
**/.classpath
|
|
||||||
**/.dockerignore
|
|
||||||
**/.env
|
|
||||||
**/.git
|
|
||||||
**/.gitignore
|
|
||||||
**/.project
|
|
||||||
**/.settings
|
|
||||||
**/.toolstarget
|
|
||||||
**/.vs
|
|
||||||
**/.vscode
|
|
||||||
**/*.*proj.user
|
|
||||||
**/*.dbmdl
|
|
||||||
**/*.jfm
|
|
||||||
**/azds.yaml
|
|
||||||
**/bin
|
|
||||||
**/charts
|
|
||||||
**/docker-compose*
|
|
||||||
**/Dockerfile*
|
|
||||||
**/node_modules
|
|
||||||
**/npm-debug.log
|
|
||||||
**/obj
|
|
||||||
**/secrets.dev.yaml
|
|
||||||
**/values.dev.yaml
|
|
||||||
LICENSE
|
|
||||||
README.md
|
|
||||||
!**/.gitignore
|
|
||||||
!.git/HEAD
|
|
||||||
!.git/config
|
|
||||||
!.git/packed-refs
|
|
||||||
!.git/refs/heads/**
|
|
@ -1,15 +0,0 @@
|
|||||||
<Project Sdk="Microsoft.NET.Sdk">
|
|
||||||
|
|
||||||
<PropertyGroup>
|
|
||||||
<OutputType>Exe</OutputType>
|
|
||||||
<TargetFramework>net8.0</TargetFramework>
|
|
||||||
<ImplicitUsings>enable</ImplicitUsings>
|
|
||||||
<Nullable>enable</Nullable>
|
|
||||||
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
|
|
||||||
</PropertyGroup>
|
|
||||||
|
|
||||||
<ItemGroup>
|
|
||||||
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
|
|
||||||
</ItemGroup>
|
|
||||||
|
|
||||||
</Project>
|
|
@ -1,6 +0,0 @@
|
|||||||
<?xml version="1.0" encoding="utf-8"?>
|
|
||||||
<Project ToolsVersion="Current" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
|
||||||
<PropertyGroup>
|
|
||||||
<ActiveDebugProfile>Container (Dockerfile)</ActiveDebugProfile>
|
|
||||||
</PropertyGroup>
|
|
||||||
</Project>
|
|
@ -1,28 +0,0 @@
|
|||||||
# См. статью по ссылке https://aka.ms/customizecontainer, чтобы узнать как настроить контейнер отладки и как Visual Studio использует этот Dockerfile для создания образов для ускорения отладки.
|
|
||||||
|
|
||||||
# Этот этап используется при запуске из VS в быстром режиме (по умолчанию для конфигурации отладки)
|
|
||||||
FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base
|
|
||||||
USER app
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
|
|
||||||
# Этот этап используется для сборки проекта службы
|
|
||||||
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
|
|
||||||
ARG BUILD_CONFIGURATION=Release
|
|
||||||
WORKDIR /src
|
|
||||||
COPY ["ConsoleApp1/ConsoleApp1.csproj", "ConsoleApp1/"]
|
|
||||||
RUN dotnet restore "./ConsoleApp1/ConsoleApp1.csproj"
|
|
||||||
COPY . .
|
|
||||||
WORKDIR "/src/ConsoleApp1"
|
|
||||||
RUN dotnet build "./ConsoleApp1.csproj" -c $BUILD_CONFIGURATION -o /app/build
|
|
||||||
|
|
||||||
# Этот этап используется для публикации проекта службы, который будет скопирован на последний этап
|
|
||||||
FROM build AS publish
|
|
||||||
ARG BUILD_CONFIGURATION=Release
|
|
||||||
RUN dotnet publish "./ConsoleApp1.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
|
|
||||||
|
|
||||||
# Этот этап используется в рабочей среде или при запуске из VS в обычном режиме (по умолчанию, когда конфигурация отладки не используется)
|
|
||||||
FROM base AS final
|
|
||||||
WORKDIR /app
|
|
||||||
COPY --from=publish /app/publish .
|
|
||||||
ENTRYPOINT ["dotnet", "ConsoleApp1.dll"]
|
|
@ -1,36 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.IO;
|
|
||||||
|
|
||||||
class Program
|
|
||||||
{
|
|
||||||
static void Main()
|
|
||||||
{
|
|
||||||
const string inputDir = "/var/data";
|
|
||||||
const string outputFile = "/var/result/data.txt";
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
using (var writer = new StreamWriter(outputFile))
|
|
||||||
{
|
|
||||||
var files = Directory.GetFiles(inputDir);
|
|
||||||
foreach (var file in files)
|
|
||||||
{
|
|
||||||
using (var reader = new StreamReader(file))
|
|
||||||
{
|
|
||||||
string firstLine = reader.ReadLine();
|
|
||||||
if (!string.IsNullOrEmpty(firstLine))
|
|
||||||
{
|
|
||||||
writer.WriteLine(firstLine);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Console.WriteLine($"Файл {outputFile} успешно создан.");
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
Console.WriteLine($"Ошибка: {ex.Message}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,10 +0,0 @@
|
|||||||
{
|
|
||||||
"profiles": {
|
|
||||||
"ConsoleApp1": {
|
|
||||||
"commandName": "Project"
|
|
||||||
},
|
|
||||||
"Container (Dockerfile)": {
|
|
||||||
"commandName": "Docker"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,15 +0,0 @@
|
|||||||
<Project Sdk="Microsoft.NET.Sdk">
|
|
||||||
|
|
||||||
<PropertyGroup>
|
|
||||||
<OutputType>Exe</OutputType>
|
|
||||||
<TargetFramework>net8.0</TargetFramework>
|
|
||||||
<ImplicitUsings>enable</ImplicitUsings>
|
|
||||||
<Nullable>enable</Nullable>
|
|
||||||
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
|
|
||||||
</PropertyGroup>
|
|
||||||
|
|
||||||
<ItemGroup>
|
|
||||||
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
|
|
||||||
</ItemGroup>
|
|
||||||
|
|
||||||
</Project>
|
|
@ -1,6 +0,0 @@
|
|||||||
<?xml version="1.0" encoding="utf-8"?>
|
|
||||||
<Project ToolsVersion="Current" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
|
||||||
<PropertyGroup>
|
|
||||||
<ActiveDebugProfile>Container (Dockerfile)</ActiveDebugProfile>
|
|
||||||
</PropertyGroup>
|
|
||||||
</Project>
|
|
@ -1,28 +0,0 @@
|
|||||||
# См. статью по ссылке https://aka.ms/customizecontainer, чтобы узнать как настроить контейнер отладки и как Visual Studio использует этот Dockerfile для создания образов для ускорения отладки.
|
|
||||||
|
|
||||||
# Этот этап используется при запуске из VS в быстром режиме (по умолчанию для конфигурации отладки)
|
|
||||||
FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base
|
|
||||||
USER app
|
|
||||||
WORKDIR /app
|
|
||||||
|
|
||||||
|
|
||||||
# Этот этап используется для сборки проекта службы
|
|
||||||
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
|
|
||||||
ARG BUILD_CONFIGURATION=Release
|
|
||||||
WORKDIR /src
|
|
||||||
COPY ["ConsoleApp2/ConsoleApp2.csproj", "ConsoleApp2/"]
|
|
||||||
RUN dotnet restore "./ConsoleApp2/ConsoleApp2.csproj"
|
|
||||||
COPY . .
|
|
||||||
WORKDIR "/src/ConsoleApp2"
|
|
||||||
RUN dotnet build "./ConsoleApp2.csproj" -c $BUILD_CONFIGURATION -o /app/build
|
|
||||||
|
|
||||||
# Этот этап используется для публикации проекта службы, который будет скопирован на последний этап
|
|
||||||
FROM build AS publish
|
|
||||||
ARG BUILD_CONFIGURATION=Release
|
|
||||||
RUN dotnet publish "./ConsoleApp2.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
|
|
||||||
|
|
||||||
# Этот этап используется в рабочей среде или при запуске из VS в обычном режиме (по умолчанию, когда конфигурация отладки не используется)
|
|
||||||
FROM base AS final
|
|
||||||
WORKDIR /app
|
|
||||||
COPY --from=publish /app/publish .
|
|
||||||
ENTRYPOINT ["dotnet", "ConsoleApp2.dll"]
|
|
@ -1,34 +0,0 @@
|
|||||||
using System;
|
|
||||||
using System.IO;
|
|
||||||
using System.Linq;
|
|
||||||
|
|
||||||
class Program
|
|
||||||
{
|
|
||||||
static void Main()
|
|
||||||
{
|
|
||||||
const string inputFile = "/var/result/data.txt";
|
|
||||||
const string outputFile = "/var/result/result.txt";
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
var lines = File.ReadAllLines(inputFile).Select(int.Parse).ToArray();
|
|
||||||
|
|
||||||
if (lines.Length >= 2)
|
|
||||||
{
|
|
||||||
int result = lines.First() * lines.Last();
|
|
||||||
|
|
||||||
File.WriteAllText(outputFile, result.ToString());
|
|
||||||
|
|
||||||
Console.WriteLine($"Произведение: {result}");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
Console.WriteLine("Недостаточно данных в файле для вычисления.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
Console.WriteLine($"Ошибка: {ex.Message}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,10 +0,0 @@
|
|||||||
{
|
|
||||||
"profiles": {
|
|
||||||
"ConsoleApp2": {
|
|
||||||
"commandName": "Project"
|
|
||||||
},
|
|
||||||
"Container (Dockerfile)": {
|
|
||||||
"commandName": "Docker"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,10 +0,0 @@
|
|||||||
25
|
|
||||||
91
|
|
||||||
77
|
|
||||||
63
|
|
||||||
45
|
|
||||||
25
|
|
||||||
21
|
|
||||||
89
|
|
||||||
6
|
|
||||||
18
|
|
@ -1,10 +0,0 @@
|
|||||||
10
|
|
||||||
3
|
|
||||||
38
|
|
||||||
9
|
|
||||||
36
|
|
||||||
43
|
|
||||||
96
|
|
||||||
31
|
|
||||||
95
|
|
||||||
58
|
|
@ -1,10 +0,0 @@
|
|||||||
13
|
|
||||||
35
|
|
||||||
38
|
|
||||||
31
|
|
||||||
19
|
|
||||||
94
|
|
||||||
94
|
|
||||||
84
|
|
||||||
18
|
|
||||||
47
|
|
@ -1,10 +0,0 @@
|
|||||||
9
|
|
||||||
32
|
|
||||||
75
|
|
||||||
92
|
|
||||||
100
|
|
||||||
85
|
|
||||||
85
|
|
||||||
10
|
|
||||||
50
|
|
||||||
54
|
|
@ -1,10 +0,0 @@
|
|||||||
83
|
|
||||||
88
|
|
||||||
29
|
|
||||||
86
|
|
||||||
87
|
|
||||||
79
|
|
||||||
18
|
|
||||||
22
|
|
||||||
76
|
|
||||||
71
|
|
@ -1,10 +0,0 @@
|
|||||||
15
|
|
||||||
22
|
|
||||||
92
|
|
||||||
91
|
|
||||||
78
|
|
||||||
47
|
|
||||||
53
|
|
||||||
98
|
|
||||||
72
|
|
||||||
64
|
|
@ -1,10 +0,0 @@
|
|||||||
66
|
|
||||||
45
|
|
||||||
83
|
|
||||||
55
|
|
||||||
25
|
|
||||||
82
|
|
||||||
95
|
|
||||||
42
|
|
||||||
18
|
|
||||||
6
|
|
@ -1,10 +0,0 @@
|
|||||||
25
|
|
||||||
71
|
|
||||||
35
|
|
||||||
71
|
|
||||||
78
|
|
||||||
51
|
|
||||||
29
|
|
||||||
67
|
|
||||||
87
|
|
||||||
33
|
|
@ -1,10 +0,0 @@
|
|||||||
93
|
|
||||||
19
|
|
||||||
32
|
|
||||||
13
|
|
||||||
75
|
|
||||||
86
|
|
||||||
46
|
|
||||||
87
|
|
||||||
39
|
|
||||||
66
|
|
@ -1,10 +0,0 @@
|
|||||||
7
|
|
||||||
74
|
|
||||||
69
|
|
||||||
75
|
|
||||||
45
|
|
||||||
28
|
|
||||||
92
|
|
||||||
9
|
|
||||||
77
|
|
||||||
32
|
|
@ -1,10 +0,0 @@
|
|||||||
42
|
|
||||||
75
|
|
||||||
67
|
|
||||||
53
|
|
||||||
2
|
|
||||||
34
|
|
||||||
57
|
|
||||||
47
|
|
||||||
83
|
|
||||||
52
|
|
@ -1,10 +0,0 @@
|
|||||||
98
|
|
||||||
62
|
|
||||||
45
|
|
||||||
77
|
|
||||||
65
|
|
||||||
45
|
|
||||||
61
|
|
||||||
62
|
|
||||||
10
|
|
||||||
76
|
|
@ -1,10 +0,0 @@
|
|||||||
41
|
|
||||||
30
|
|
||||||
41
|
|
||||||
39
|
|
||||||
62
|
|
||||||
3
|
|
||||||
79
|
|
||||||
93
|
|
||||||
56
|
|
||||||
82
|
|
@ -1,10 +0,0 @@
|
|||||||
85
|
|
||||||
29
|
|
||||||
46
|
|
||||||
36
|
|
||||||
82
|
|
||||||
52
|
|
||||||
4
|
|
||||||
14
|
|
||||||
89
|
|
||||||
17
|
|
@ -1,10 +0,0 @@
|
|||||||
35
|
|
||||||
98
|
|
||||||
38
|
|
||||||
31
|
|
||||||
39
|
|
||||||
76
|
|
||||||
5
|
|
||||||
71
|
|
||||||
7
|
|
||||||
58
|
|
@ -1,10 +0,0 @@
|
|||||||
50
|
|
||||||
93
|
|
||||||
18
|
|
||||||
76
|
|
||||||
13
|
|
||||||
62
|
|
||||||
16
|
|
||||||
45
|
|
||||||
65
|
|
||||||
25
|
|
@ -1,10 +0,0 @@
|
|||||||
98
|
|
||||||
45
|
|
||||||
1
|
|
||||||
52
|
|
||||||
14
|
|
||||||
7
|
|
||||||
56
|
|
||||||
38
|
|
||||||
7
|
|
||||||
50
|
|
@ -1,10 +0,0 @@
|
|||||||
41
|
|
||||||
27
|
|
||||||
27
|
|
||||||
24
|
|
||||||
76
|
|
||||||
36
|
|
||||||
19
|
|
||||||
87
|
|
||||||
83
|
|
||||||
35
|
|
@ -1,10 +0,0 @@
|
|||||||
16
|
|
||||||
5
|
|
||||||
95
|
|
||||||
36
|
|
||||||
20
|
|
||||||
60
|
|
||||||
79
|
|
||||||
46
|
|
||||||
61
|
|
||||||
77
|
|
@ -1,10 +0,0 @@
|
|||||||
43
|
|
||||||
23
|
|
||||||
53
|
|
||||||
6
|
|
||||||
88
|
|
||||||
27
|
|
||||||
55
|
|
||||||
15
|
|
||||||
94
|
|
||||||
36
|
|
@ -1,10 +0,0 @@
|
|||||||
62
|
|
||||||
95
|
|
||||||
50
|
|
||||||
65
|
|
||||||
13
|
|
||||||
56
|
|
||||||
74
|
|
||||||
37
|
|
||||||
99
|
|
||||||
93
|
|
@ -1,10 +0,0 @@
|
|||||||
13
|
|
||||||
2
|
|
||||||
31
|
|
||||||
49
|
|
||||||
80
|
|
||||||
73
|
|
||||||
47
|
|
||||||
61
|
|
||||||
96
|
|
||||||
69
|
|
@ -1,10 +0,0 @@
|
|||||||
37
|
|
||||||
54
|
|
||||||
100
|
|
||||||
34
|
|
||||||
1
|
|
||||||
77
|
|
||||||
55
|
|
||||||
10
|
|
||||||
30
|
|
||||||
28
|
|
@ -1,10 +0,0 @@
|
|||||||
35
|
|
||||||
17
|
|
||||||
95
|
|
||||||
59
|
|
||||||
17
|
|
||||||
98
|
|
||||||
68
|
|
||||||
54
|
|
||||||
89
|
|
||||||
56
|
|