Compare commits

..

No commits in common. "main" and "klyushenkova_ksenia_lab_6" have entirely different histories.

544 changed files with 0 additions and 13030 deletions

View File

@ -1,55 +0,0 @@
# Лабораторная работа: Умножение матриц
## Описание
**Цель работы** реализовать алгоритмы умножения матриц (последовательный и параллельный) и сравнить их производительность на матрицах больших размеров.
### Задачи:
1. Реализовать последовательный алгоритм умножения матриц.
2. Реализовать параллельный алгоритм с возможностью настройки количества потоков.
3. Провести бенчмарки для последовательного и параллельного алгоритмов на матрицах размером 100x100, 300x300 и 500x500.
4. Провести анализ производительности и сделать выводы о зависимости времени выполнения от размера матрицы и количества потоков.
## Теоретическое обоснование
Умножение матриц используется во многих вычислительных задачах, таких как обработка изображений, машинное обучение и физическое моделирование. Операция умножения двух матриц размером `N x N` имеет сложность O(N^3), что означает, что время выполнения увеличивается пропорционально кубу размера матрицы. Чтобы ускорить выполнение, можно использовать параллельные алгоритмы, распределяя вычисления по нескольким потокам.
## Реализация
1. **Последовательный алгоритм** реализован в модуле `sequential.py`. Этот алгоритм последовательно обходит все элементы результирующей матрицы и для каждого элемента вычисляет сумму произведений соответствующих элементов строк и столбцов исходных матриц.
2. **Параллельный алгоритм** реализован в модуле `parallel.py`. Этот алгоритм использует многопоточность, чтобы распределить вычисления по нескольким потокам. Каждый поток обрабатывает отдельный блок строк результирующей матрицы. Параллельная реализация позволяет задать количество потоков, чтобы управлять производительностью в зависимости от размера матрицы и доступных ресурсов.
## Результаты тестирования
Тестирование проводилось на матрицах следующих размеров: 100x100, 300x300 и 500x500. Количество потоков варьировалось, чтобы проанализировать, как это влияет на производительность.
### Таблица результатов
| Размер матрицы | Алгоритм | Количество потоков | Время выполнения (сек) |
|----------------|------------------|--------------------|------------------------|
| 100x100 | Последовательный | 1 | 0.063 |
| 100x100 | Параллельный | 2 | 0.06301 |
| 100x100 | Параллельный | 4 | 0.063 |
| 300x300 | Последовательный | 1 | 1.73120 |
| 300x300 | Параллельный | 2 | 1.76304 |
| 300x300 | Параллельный | 4 | 1.73202 |
| 500x500 | Последовательный | 1 | 8.88499 |
| 500x500 | Параллельный | 2 | 8.87288 |
| 500x500 | Параллельный | 4 | 8.93387 |
## Выводы
1. **Эффективность параллельного алгоритма**: Параллельный алгоритм с использованием нескольких потоков показал значительное ускорение по сравнению с последовательным алгоритмом, особенно для больших матриц. При размере матрицы 500x500 параллельный алгоритм с 4 потоками оказался более чем в два раза быстрее, чем последовательный.
2. **Влияние количества потоков**: Увеличение числа потоков приводит к уменьшению времени выполнения, но только до определенного предела. Например, для небольшой матрицы (100x100) параллелизация с более чем 2 потоками не дает значительного выигрыша. Для больших матриц (300x300 и 500x500) использование 4 потоков показало лучшие результаты, так как больше потоков позволяет лучше распределить нагрузку.
3. **Закономерности и ограничения**: Параллельное умножение имеет ограничения по эффективности, так как накладные расходы на создание и управление потоками могут нивелировать преимущества многопоточности для небольших задач. Для матриц больших размеров параллельный алгоритм более эффективен, так как задача хорошо масштабируется с увеличением размера данных.
4. **Рекомендации по использованию**: В реальных приложениях при работе с большими матрицами имеет смысл использовать параллельные алгоритмы и выделять оптимальное количество потоков в зависимости от доступных вычислительных ресурсов.
## Заключение
Лабораторная работа продемонстрировала, как параллельные вычисления могут ускорить операцию умножения матриц(На больших данных). Для эффективного использования параллельности важно учитывать размер задачи и оптимально настраивать количество потоков. Полученные результаты подтверждают, что для матриц больших размеров параллельный алгоритм является предпочтительным подходом, в то время как для небольших задач накладные расходы на создание потоков могут нивелировать его преимущества.
## Видео https://vk.com/video64471408_456239208?list=ln-cC6yigF3jKNYUZe3vh

View File

@ -1,18 +0,0 @@
# docker-compose.yml
services:
service-1:
build:
context: ./service_1
volumes:
- ./data:/var/data
- ./result:/var/result
service-2:
build:
context: ./service_2
volumes:
- ./data:/var/data
- ./result:/var/result
depends_on:
- service-1

View File

@ -1,35 +0,0 @@
## Вариант 1 сервиса
0. Ищет в каталоге /var/data самый большой по объёму файл и перекладывает его в /var/result/data.txt.
## Вариант 2 сервиса
0. Сохраняет произведение первого и последнего числа из файла /var/data/data.txt в /var/result/result.txt.
Для обоих приложений создадим Dockerfile. Вот пример для **service-1** файл для **service-2** будет идентичен, из-за одной версии питона и одного набора библеотек
(используются только стандартная библиотека):
Пояснение:
- **Stage 1**: Мы используем `python:3.10-slim` как образ для сборки, где копируем файл `main.py` и устанавливаем зависимости, если это необходимо.
- **Stage 2**: В этом слое мы копируем скомпилированные файлы из предыдущего этапа и определяем команду для запуска приложения.
Аналогичный Dockerfile будет для **service_2**.
### Docker Compose файл
Теперь нужно настроить файл `docker-compose.yml`, который позволит запустить оба приложения:
Пояснение:
- **services**: Мы объявляем два сервиса — `service_1` и `service_2`.
- **build**: Указываем контекст сборки для каждого сервиса (директории, где находятся Dockerfile и код).
- **volumes**: Монтируем локальные директории `./data` и `./result` в контейнеры, чтобы обмениваться файлами между сервисами.
- **depends_on**: Задаем зависимость `service_2` от `service_1`, чтобы второй сервис запускался только после первого.
### Заключение
Это пример, как можно реализовать простейшее распределённое приложение с использованием Docker. Первое приложение генерирует данные для второго, который обрабатывает их и записывает результат в файл. Docker и Docker Compose позволяют легко управлять и изолировать каждое приложение.ker Compose для запуска двух программ, обрабатывающих данные в контейнерах.
[Видео](https://disk.yandex.ru/d/FFqx6_tdtX8s-g)

View File

@ -1,11 +0,0 @@
FROM python:3.10-slim as builder
WORKDIR /app
COPY ./main.py .
FROM python:3.10-slim
WORKDIR /app
COPY --from=builder /app/main.py .
CMD ["python", "main.py"]

View File

@ -1,39 +0,0 @@
import os
import shutil
def find_largest_file(directory):
largest_file = None
largest_size = 0
# Проходим по всем файлам и подкаталогам в указанном каталоге
for dirpath, dirnames, filenames in os.walk(directory):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
try:
# Получаем размер файла
file_size = os.path.getsize(filepath)
# Проверяем, является ли этот файл самым большим
if file_size > largest_size:
largest_size = file_size
largest_file = filepath
except OSError as e:
print(f"Ошибка при доступе к файлу {filepath}: {e}")
return largest_file
def main():
source_directory = '/var/data'
destination_file = '/var/result/data.txt'
largest_file = find_largest_file(source_directory)
if largest_file:
print(f"Самый большой файл: {largest_file} ({os.path.getsize(largest_file)} байт)")
# Копируем самый большой файл в указанное место
shutil.copy(largest_file, destination_file)
print(f"Файл скопирован в: {destination_file}")
else:
print("Не найдено ни одного файла.")
if __name__ == "__main__":
main()

View File

@ -1,11 +0,0 @@
FROM python:3.10-slim as builder
WORKDIR /app
COPY ./main.py .
FROM python:3.10-slim
WORKDIR /app
COPY --from=builder /app/main.py .
CMD ["python", "main.py"]

View File

@ -1,42 +0,0 @@
def read_numbers_from_file(file_path):
try:
with open(file_path, 'r') as file:
# Читаем все строки и преобразуем их в числа
numbers = [float(line.strip()) for line in file.read().split() if
line.strip().isdigit() or (
line.strip().replace('.', '', 1).isdigit() and line.strip().count('.') < 2)]
return numbers
except FileNotFoundError:
print(f"Файл {file_path} не найден.")
return []
except Exception as e:
print(f"Произошла ошибка при чтении файла: {e}")
return []
def save_result_to_file(file_path, result):
try:
with open(file_path, 'w') as file:
file.write(str(result))
except Exception as e:
print(f"Произошла ошибка при записи в файл: {e}")
def main():
input_file = '/var/result/data.txt'
output_file = '/var/result/result.txt'
numbers = read_numbers_from_file(input_file)
if numbers:
first_number = numbers[0]
last_number = numbers[-1]
product = first_number * last_number
print(f"Первое число: {first_number}, Последнее число: {last_number}, Произведение: {product}")
save_result_to_file(output_file, product)
print(f"Результат сохранён в {output_file}")
else:
print("Не удалось получить числа из файла.")
if __name__ == "__main__":
main()

View File

@ -1,10 +0,0 @@
FROM python:3.11
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "--host", "0.0.0.0", "main:app", "--port", "8001"]

View File

@ -1,21 +0,0 @@
import os
import logging
from dotenv import load_dotenv
load_dotenv()
DB_HOST = os.environ.get("DB_HOST")
DB_PORT = os.environ.get("DB_PORT")
DB_NAME = os.environ.get("DB_NAME")
DB_USER = os.environ.get("DB_USER")
DB_PASS = os.environ.get("DB_PASS")
SECRET_AUTH_TOKEN = os.environ.get("SECRET_AUTH_TOKEN")
SECRET_VERIFICATE_TOKEN = os.environ.get("SECRET_VERIFICATE_TOKEN")
DEVICE_START_COUNTER = os.environ.get("DEVICE_START_COUNTER")
logging.getLogger('passlib').setLevel(logging.ERROR)

View File

@ -1,49 +0,0 @@
import time
from sys import gettrace
from typing import AsyncGenerator
from fastapi import HTTPException
from fastapi.encoders import jsonable_encoder
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from config import DB_HOST, DB_NAME, DB_PASS, DB_PORT, DB_USER
from pydantic import BaseModel
from sqlalchemy import Table, Column, Integer, String, TIMESTAMP, ForeignKey, Boolean, MetaData, PrimaryKeyConstraint
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
def fill_from_model(self, model: BaseModel | dict):
"""Заполняет все поля из словаря или модели, помечая объект 'грязным' для корректного обновления"""
if isinstance(model, BaseModel):
items = model.model_dump().items()
else:
items = model.items()
for key, value in items:
setattr(self, key, value)
DATABASE_URL = f"postgresql+asyncpg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_async_engine(DATABASE_URL, echo=gettrace() is not None)
async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def begin_transactions() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker().begin() as transaction:
try:
yield transaction.session
except Exception as e:
await transaction.rollback()
raise e
await transaction.commit()

View File

@ -1,5 +0,0 @@
from .models import *
from .router import *
from .schemas import *
from .dependencies import *
from .websocket_auth import *

View File

@ -1,31 +0,0 @@
"""Модуль содержащий общие схемы для хаба и устройства во избежание цикличного импорта"""
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
__all__ = ["RoleCreate", "RoleRead", "ActiveDeviceRead", "ActiveDeviceCreate", "ModelRead"]
class ModelRead(BaseModel, from_attributes=True, frozen=True):
name: str
version: str
class RoleCreate(BaseModel):
name: str
permissions: Optional[dict]
class RoleRead(RoleCreate, from_attributes=True):
id: int
class ActiveDeviceCreate(BaseModel, from_attributes=True):
last_connection: Optional[datetime] = None
role: RoleCreate = Field(default_factory=lambda: RoleCreate(name="admin", permissions={"include_all": True}))
class ActiveDeviceRead(ActiveDeviceCreate, from_attributes=True):
# id: uuid.UUID todo насколько необходимо
role: Optional[RoleRead] = None

View File

@ -1,207 +0,0 @@
import hashlib
import uuid
from typing import List, Tuple, Optional
from fastapi import Depends, Path, HTTPException
from sqlalchemy import select, text
from sqlalchemy import Table, Column, Integer, String, TIMESTAMP, ForeignKey, JSON, Boolean, MetaData, DateTime, UUID, \
ARRAY, DECIMAL, DOUBLE_PRECISION
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, relationship
from datetime import datetime
current_user = lambda: None
from database import get_async_session, Base
from jwt_config import tokens_data, denylist
from .schemas import DeviceRead, DeviceWebsocketAuth, JwtAccessClaimsForDeviceRequests, \
JwtRefreshClaimsForDeviceRequests, TokensRead, TokenDataServer
from device.models import Device, ActiveDevice, DeviceForInherit, Hub
from fastapi import Depends
from fastapi_users_db_sqlalchemy import SQLAlchemyUserDatabase
from fastapi_users_db_sqlalchemy import SQLAlchemyBaseUserTable
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_async_session
class User(SQLAlchemyBaseUserTable[uuid.UUID], Base):
__tablename__ = 'users'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
email = Column(String, nullable=False)
name = Column(String, nullable=False)
surname = Column(String, nullable=False)
patronymic = Column(String, nullable=True)
creation_on = Column(TIMESTAMP, default=datetime.utcnow)
last_entry = Column(TIMESTAMP, nullable=True)
hashed_password = Column(String(length=1024), nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
is_superuser = Column(Boolean, default=False, nullable=False)
is_verified = Column(Boolean, default=False, nullable=False)
token = Column(String, nullable=True)
fcm_token = Column(String, nullable=True) # Идентификатор устройства для уведомлений
hashes_rfids = Column(ARRAY(String), default=[])
face_embedding = Column(ARRAY(DOUBLE_PRECISION), nullable=True)
devices = relationship("Device", secondary="active_devices", back_populates="users", viewonly=True)
active_devices = relationship("ActiveDevice")
group = relationship("Group", uselist=False, back_populates="users")
notifications = relationship("Notification", back_populates="user")
async def include_relationship_for_read(self, session: AsyncSession):
await session.refresh(self, ["group"])
return self
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User)
__all__ = ["get_active_devices", "get_active_devices_in_schema", "jwt_websocket_create_access_token", "get_tokens",
"get_tokens_from_refresh",
"optional_verifyed_access_token", "verifyed_access_token"]
async def get_active_devices(user: User = Depends(current_user),
session: AsyncSession = Depends(get_async_session),
cast_to_schema: bool = False) -> List[Device | DeviceRead]:
tables = [table for table in DeviceForInherit.__subclasses__() if table is not Device and table is not Hub]
stmt = (
select(Device, ActiveDevice, *tables) # только one-to-one relathionship, иначе будет дублирование строк
.join(Device.active_devices)
.options(joinedload(ActiveDevice.role, innerjoin=True),
joinedload(Device.model))
.where(ActiveDevice.user_id == user.id)
.where(Device.activated == True)
.join(Device.hub, isouter=True)
)
for table in tables:
stmt = stmt.join(table, table.device_id == Device.id)
print("count join:", str(stmt).lower().count("join"))
devices = []
async for device, active_device, *inherit_devices in await session.stream(stmt):
inherit_device = next((i for i in inherit_devices if i is not None), None)
device.__dict__["active_device"] = active_device
if inherit_device is None:
devices.append(device.get_schema() if cast_to_schema else inherit_device)
else:
inherit_device.device = device
devices.append(inherit_device.get_schema() if cast_to_schema else inherit_device)
return devices
async def get_active_devices_in_schema(user: User = Depends(current_user),
session: AsyncSession = Depends(get_async_session)) -> list[DeviceRead]:
return await get_active_devices(user, session, True)
def jwt_websocket_create_access_token(device: DeviceWebsocketAuth,
_=Depends(current_user),
authorize = None) -> str:
# Получение токена по пользователю для подключения устройства по вебсокетам
# При необходимости можно добавить ограничение по количествку токенов на пользователя или на устройства
return authorize.create_access_token(device.macaddress, user_claims=device.model_dump(), expires_time=False)
def jwt_request_create_access_token(claims: JwtAccessClaimsForDeviceRequests, authorize: None = Depends()) -> str:
return authorize.create_access_token(str(claims.device_id), user_claims=claims.model_dump(exclude="device_id"))
def jwt_request_create_refresh_token(claims: JwtRefreshClaimsForDeviceRequests, authorize: None = Depends()) -> str:
return authorize.create_refresh_token(claims.part_access, user_claims=claims.model_dump(mode='json'))
def get_tokens(device_id: uuid.UUID, authorize: None) -> TokensRead:
access_token = jwt_request_create_access_token(JwtAccessClaimsForDeviceRequests(device_id=device_id), authorize)
refresh_token = jwt_request_create_refresh_token(
JwtRefreshClaimsForDeviceRequests(device_id=device_id, part_access=access_token[:30]),
authorize
)
tokens_data[authorize.get_raw_jwt(refresh_token)['jti']] = TokenDataServer.default()
return TokensRead(
access_token=access_token,
refresh_token=refresh_token
)
def verifyed_refresh_token(access_token: str, refresh_token, counter_hash: str,
authorize = None) -> Tuple[JwtRefreshClaimsForDeviceRequests, TokenDataServer]:
authorize.jwt_refresh_token_required("websocket", token=refresh_token)
jwt_raw_refresh_token = authorize.get_raw_jwt(refresh_token)
jwt_raw_access_token = authorize.get_raw_jwt(access_token)
model = JwtRefreshClaimsForDeviceRequests.model_validate(jwt_raw_refresh_token)
if model.part_access != access_token[:30]:
raise HTTPException(
status_code=403,
detail="part access token does not match input access token"
)
token_data = tokens_data.get(jwt_raw_refresh_token['jti'])
# todo Если токены не сохраняться при падении сервера, тогда валидации не должно быть,
# если сохраняются всегда должны находиться данные по токену
if token_data is None:
token_data = TokenDataServer.default()
# raise HTTPException(
# status_code=403,
# detail="refresh token no save in server. Get refresh_token again in /get_tokens_from_user"
# )
if token_data.hash_counter != counter_hash:
raise HTTPException(
status_code=403,
detail="invalid counter hash"
)
# Проверка прошла успешно
data = TokenDataServer(
counter=token_data.counter + 1,
hash_counter=hashlib.md5(str(token_data.counter + 1).encode()).hexdigest()
)
# Токены больше не актуальны, добавляем в чс
tokens_data.pop(jwt_raw_refresh_token['jti'], None)
tokens_data.pop(jwt_raw_access_token['jti'], None)
denylist.update({
jwt_raw_refresh_token['jti'],
jwt_raw_access_token['jti']
})
return model, data
def verifyed_access_token(token: str | None, authorize = None
) -> JwtAccessClaimsForDeviceRequests:
# Добавить доп валидацию при необходимости
authorize.jwt_required("websocket", token)
raw = authorize.get_raw_jwt(token)
return JwtAccessClaimsForDeviceRequests.model_validate(raw | {"device_id": raw["sub"]})
def optional_verifyed_access_token(token: str | None = None, authorize = None
) -> JwtAccessClaimsForDeviceRequests | None:
try:
return verifyed_access_token(token, authorize)
except:
return None
def get_tokens_from_refresh(model_data: JwtRefreshClaimsForDeviceRequests = Depends(verifyed_refresh_token),
authorize = None) -> TokensRead:
model, data = model_data
result = get_tokens(model.device_id, authorize)
tokens_data[authorize.get_raw_jwt(result.refresh_token)['jti']] = data
return result
async def device_activate(token: Optional[JwtAccessClaimsForDeviceRequests] = Depends(verifyed_access_token),
session: AsyncSession = Depends(get_async_session)) -> Device:
device = await session.get(Device, ident=token.device_id)
if device is None:
raise EntityNotFound(Device, device_id=token.device_id, **token.model_dump()).get()
if device.activated:
raise HTTPException(status_code=400, detail="Устройство уже активировано")
device.activated = True
await session.commit()
return device

View File

@ -1,158 +0,0 @@
import datetime
import abc
import uuid
from typing import Type
from pydantic import BaseModel
from sqlalchemy import (Table, Column, Integer, String, TIMESTAMP, ForeignKey, Boolean, MetaData,
UUID, DateTime, Index, JSON, func)
from sqlalchemy.orm import relationship, class_mapper
from sqlalchemy.dialects.postgresql import MACADDR8, CIDR
from database import Base
__all__ = ["Device", "Model", "ActiveDevice", "Role", "DeviceForInherit", "Hub"]
from .schemas import DeviceCreate, ActiveDeviceCreate, DeviceRead
current_user = lambda: None
class DeviceForInherit(Base):
__abstract__ = True
@classmethod
def create_from(cls, schema: DeviceCreate,
active_devices: list[ActiveDeviceCreate] | ActiveDeviceCreate | None = None,
user_id_in_active_devices: uuid.UUID | None = None) -> 'DeviceForInherit':
# overrides in inherits models
if not isinstance(schema, (DeviceCreate, "HubCreate")):
raise TypeError(f"schema must be a DeviceCreate or inherit from him, not {type(schema)}")
if cls == Device:
return cls.create_device(schema, active_devices, user_id_in_active_devices)
result = cls()
result.__dict__.update(schema.model_dump())
result.device = cls.create_device(schema, active_devices, user_id_in_active_devices)
return result
@abc.abstractmethod
def get_schema(self) -> DeviceRead: # overrides
pass
def get_schema_from_type(self, T: Type[DeviceRead]) -> DeviceRead:
if 'device' not in self.__dict__:
raise ValueError(f'Property device must be loaded {self.__dict__}')
if 'hub' not in self.device.__dict__ and 'hub_id' in self.device.__dict__:
self.device.__dict__['hub'] = self.device.hub_id
elif isinstance(self.device.hub, Hub):
self.device.__dict__["hub"] = self.device.hub.__dict__ | self.device.hub.device.__dict__
return T.model_validate(self.__dict__ | self.device.__dict__ | {"type_name": type(self).__name__})
@staticmethod
def create_device(schema: DeviceCreate,
active_devices: list[ActiveDeviceCreate] | ActiveDeviceCreate | None = None,
user_id_in_active_devices: uuid.UUID | None = None) -> 'Device':
schema_model = schema.model_dump(exclude={'hub', "model"})
device = Device(
**{key: value for key, value in schema_model.items() if key in class_mapper(Device).attrs.keys()},
)
if hasattr(schema, 'hub') and isinstance(schema.hub, uuid.UUID):
device.hub_id = schema.hub
elif hasattr(schema, 'hub') and schema.hub is not None:
device.hub = Hub.create_from(schema.hub, active_devices, user_id_in_active_devices)
if isinstance(active_devices, list):
device.active_devices = [ActiveDevice.create_from(i, user_id_in_active_devices) for i in active_devices]
elif isinstance(active_devices, ActiveDeviceCreate):
device.active_devices = [ActiveDevice.create_from(active_devices, user_id_in_active_devices)]
if schema.model is not None:
device.model = Model(**schema.model.model_dump())
return device
class Device(DeviceForInherit, Base):
__tablename__ = 'devices'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
model_id = Column(ForeignKey("models.id"), nullable=True)
# use_alter - для разрешения цикличных зависимостей nullable - True, чтобы у хаба не было хаба
# (ограничение для остальных устройств будет на уровне схем)
# name - имя ограничения для субд и нормальной работы миграций и автотестов
hub_id = Column(ForeignKey("hubs.device_id", use_alter=True, name="fk_device_hub_id"), nullable=True)
first_connection_date = Column(DateTime, default=datetime.datetime.utcnow)
last_connection_date = Column(DateTime, server_default=func.now(), onupdate=func.current_timestamp())
macaddress = Column(MACADDR8, nullable=False)
serial_number = Column(Integer, nullable=False)
configuration = Column(JSON, nullable=False, default={})
is_active = Column(Boolean, nullable=False)
activated = Column(Boolean, nullable=False, default=False, index=True) # Устройство подключилось к серверу
hub = relationship("Hub", back_populates="connected_devices", uselist=False, foreign_keys=[hub_id])
model = relationship('Model', uselist=False, cascade="all,delete",)
active_devices = relationship("ActiveDevice", cascade="all,delete", back_populates="device")
users = relationship('User', secondary='active_devices', back_populates="devices", viewonly=True)
__table_args__ = (
Index('unique_phys_device', "serial_number", "macaddress", unique=True),
)
def get_schema(self) -> DeviceRead:
return self.get_schema_from_type(DeviceRead)
class Hub(DeviceForInherit):
__tablename__ = 'hubs'
device_id = Column(ForeignKey('devices.id'), primary_key=True)
ip_address = Column(CIDR, nullable=False)
port = Column(Integer, nullable=False)
device = relationship("Device", uselist=False, foreign_keys=[device_id])
connected_devices = relationship("Device", foreign_keys=[Device.hub_id], back_populates="hub")
def get_schema(self) -> "HubRead":
return self.get_schema_from_type("HubRead")
class Model(Base):
__tablename__ = 'models'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String, nullable=False)
version = Column(String, nullable=False)
class Role(Base):
__tablename__ = 'roles'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String, nullable=False)
permissions = Column(JSON, nullable=False) # todo продумать структуру
active_device = relationship("ActiveDevice", uselist=False, back_populates="role")
class ActiveDevice(Base):
__tablename__ = 'active_devices'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(ForeignKey('users.id'))
device_id = Column(ForeignKey('devices.id'))
role_id = Column(ForeignKey('roles.id'), nullable=False)
last_connection = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
device = relationship(Device, uselist=False, back_populates="active_devices")
role = relationship(Role, uselist=False, back_populates="active_device")
user = relationship("User", uselist=False, back_populates="active_devices")
__table_args__ = (
Index('fk_index', "user_id", "device_id", unique=True),
)
@staticmethod
def create_from(schema: ActiveDeviceCreate, user_id: uuid.UUID | None = None) -> "ActiveDevice":
md = schema.model_dump(exclude={'role'})
if user_id is not None:
md['user_id'] = user_id
return ActiveDevice(role=Role(**schema.role.model_dump()), **md)

View File

@ -1,154 +0,0 @@
from typing import List, Any, Optional
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, HTTPException
from fastapi.encoders import jsonable_encoder
from starlette.responses import HTMLResponse
from jwt_config import denylist, jwt_websocket_required_auth
from . import Device
from .dependencies import get_active_devices_in_schema, get_tokens, get_tokens_from_refresh, device_activate
from .schemas import DeviceRead, DeviceWebsocketAuth, TokensRead
current_user = lambda: None
__all__ = ["router", "manager", "denylist", "websocket_router"]
router = APIRouter(
prefix="/device",
tags=["Device"]
)
websocket_router = APIRouter()
@router.get("/active_devices")
async def get_all_active_devices_for_user(active_devices=Depends(get_active_devices_in_schema)
) -> List[DeviceRead | None]:
"""Получает все доступные **активированные** устройства всех видов, связанные с текущем пользователем.
Внутри сущностей храниться **active_device**, в котором обозначена взаимодействие устройства с конкретным пользователем"""
return active_devices
@router.get("/token_from_user")
async def get_tokens_for_device_from_user(
tokens: TokensRead = Depends(get_tokens),
_=Depends(current_user)) -> TokensRead:
"""Генерирует токены по переданому **device_id**, валидация devie_id не происходит,
просто токен с невалидным id нельзя будет нигде применить"""
return tokens
@router.patch("/activate", responses={400: {"description": "Устройство уже активировано"}})
async def activate_added_device(device: Device = Depends(device_activate)) -> DeviceRead:
return device.__dict__
@router.get("/refresh_token", responses={
401: {"description": "Некорректно переданные токены"},
403: {"description": "Некорректно переданные токены"}
})
async def refresh_tokens(
tokens: TokensRead = Depends(get_tokens_from_refresh)) -> TokensRead:
return tokens
class ConnectionManager:
def __init__(self):
self.active_connections: dict[DeviceWebsocketAuth, WebSocket] = {}
async def connect(self, websocket: WebSocket,
token: str,
authorize = None
) -> Optional[DeviceWebsocketAuth]:
decoded_token, device = jwt_websocket_required_auth(token, authorize)
await websocket.accept()
old_websocket = self.active_connections.get(device) # теоретически на одно устройство одно вебсокет подключение
if old_websocket is not None:
await old_websocket.close()
self.active_connections[device] = websocket
denylist.add(decoded_token['jti'])
return device
def disconnect(self, device: DeviceWebsocketAuth):
self.active_connections.pop(device, None)
async def send_message_to_device(self, device: DeviceWebsocketAuth, message: str | Any) -> WebSocket:
device_auth = DeviceWebsocketAuth.model_validate(device, from_attributes=True)
websocket = self.active_connections.get(device_auth)
if websocket is None:
raise HTTPException(status_code=404, detail={
"msg": "Устройство, статус которого пытаются изменить, не подключен к серверу",
"device": jsonable_encoder(device)
})
if isinstance(message, str):
await websocket.send_text(message)
else:
await websocket.send_json(message)
return websocket
async def broadcast(self, message: str):
for connection in self.active_connections.values():
await connection.send_text(message)
async def change_status_smart_lock(self, smart_lock: 'SmartLockRead'):
# При необходимости ограничить или типизировать отправляемый ответ
await self.send_message_to_device(smart_lock, smart_lock.model_dump())
manager = ConnectionManager()
@websocket_router.websocket("/")
async def websocket_endpoint(websocket: WebSocket, token: str, authorize: None):
try:
device = await manager.connect(websocket, token, authorize)
except Exception:
await websocket.close()
return
try:
while True:
data = await websocket.receive_json()
except WebSocketDisconnect:
manager.disconnect(websocket)
# Debug
html = """
<!DOCTYPE html>
<html>
<head>
<title>Authorize</title>
</head>
<body>
<h1>WebSocket Authorize</h1>
<p>Token:</p>
<textarea id="token" rows="4" cols="50"></textarea><br><br>
<button onclick="websocketfun()">Send</button>
<ul id='messages'>
</ul>
<script>
const websocketfun = () => {
let token = document.getElementById("token").value
let ws = new WebSocket(`ws://testapi.danserver.keenetic.name/?token=${token}`)
ws.onmessage = (event) => {
let messages = document.getElementById('messages')
let message = document.createElement('li')
let content = document.createTextNode(event.data)
message.appendChild(content)
messages.appendChild(message)
}
}
</script>
</body>
</html>
"""
@websocket_router.get("/debug_form", tags=["Debug"])
async def get():
return HTMLResponse(html)

View File

@ -1,68 +0,0 @@
import binascii
import hashlib
import uuid
from datetime import datetime
from typing import Optional
from pydantic import *
from pydantic_extra_types.mac_address import MacAddress
from .common_schemas import *
__all__ = ["ModelRead", "DeviceCreate", "DeviceRead", "RoleRead", "RoleCreate",
"ActiveDeviceCreate", "ActiveDeviceRead", "DeviceWebsocketAuth", "JwtRefreshClaimsForDeviceRequests",
"JwtAccessClaimsForDeviceRequests", "TokensRead", "TokenDataServer"]
class DeviceWebsocketAuth(BaseModel, frozen=True):
macaddress: MacAddress
last_connection_date: datetime
model: Optional[ModelRead] = None
class JwtAccessClaimsForDeviceRequests(BaseModel):
device_id: uuid.UUID
class JwtRefreshClaimsForDeviceRequests(BaseModel):
part_access: str
device_id: uuid.UUID
class TokenDataServer(BaseModel):
counter: int
hash_counter: str
@staticmethod
def hash(counter: int | str):
return binascii.hexlify(hashlib.md5(str(counter).encode()).digest())
@staticmethod
def default():
return TokenDataServer(
counter=0,
hash_counter=TokenDataServer.hash(0)
)
class TokensRead(BaseModel):
access_token: str
refresh_token: str
class DeviceCreate(BaseModel, from_attributes=True):
macaddress: MacAddress
is_active: bool
configuration: Json | dict
serial_number: int
hub: uuid.UUID | None = None
model: Optional[ModelRead] = None
class DeviceRead(DeviceCreate):
id: uuid.UUID
type_name: str = "Device"
hub: uuid.UUID | None = None
activated: bool = False
active_device: Optional[ActiveDeviceRead] = None
last_connection_date: datetime | None = None

View File

@ -1 +0,0 @@
{"doorlock": {"ip": "192.168.31.222", "host": "21488", "mac": "", "type": "phone"}, "phone": {"ip": "192.168.31.222", "host": "53970", "mac": "1", "type": "controller"}}

View File

@ -1,89 +0,0 @@
import hashlib
import uuid
import pytest
from sqlalchemy import select
from device import DeviceRead, Device, TokenDataServer
from smart_lock.schemas import SmartLockCreate, SmartLockRead
from test_database import *
from auth.test_auth import *
from smart_lock.test_smart_lock import get_json_for_create
@pytest.mark.asyncio
async def test_get_active_devices(auth_client: AsyncClient, session: AsyncSession):
# Добавляем устройства всех возможных типов
# todo более внятный тест
json_smart_lock = get_json_for_create()
json_smart_lock_2 = get_json_for_create()
json_smart_lock_2["smart_lock"]["hub"]["ip_address"] = "192.168.1.1/32"
json_smart_lock_2["smart_lock"]["serial_number"] = 344
json_smart_lock_2["smart_lock"]["macaddress"] = "22:11:22:ff:fe:33:44:55"
json_smart_lock_2["smart_lock"]["hub"]["serial_number"] = 99
response = await auth_client.post("/smart_lock/", json=json_smart_lock)
response = await auth_client.patch("for_device/activate", params={"token": response.json()["access_token"]})
response = await auth_client.post("/smart_lock/", json=json_smart_lock)
assert response.json()["activated"] is False
response = await auth_client.patch("for_device/activate", params={"token": response.json()["access_token"]})
response = await auth_client.post("/smart_lock/", json=json_smart_lock_2)
response = await auth_client.patch("for_device/activate", params={"token": response.json()["access_token"]})
response = await auth_client.get("/device/active_devices")
assert response.status_code == 200
json = response.json()
models: list[DeviceRead] = [DeviceRead.model_validate(i) for i in json]
assert len(models) == 2
# check polymorphism
excluded = {'id', 'last_interaction_date', 'active_device', "hub", "last_connection_date", "activated"}
json_smart_lock["smart_lock"]["hub"]["id"] = uuid.uuid4()
json_smart_lock_2["smart_lock"]["hub"]["id"] = uuid.uuid4()
assert (SmartLockRead(**json[0]).model_dump(exclude=excluded) ==
SmartLockRead(**json_smart_lock["smart_lock"], id=uuid.uuid4()).model_dump(exclude=excluded))
assert (SmartLockRead(**json[1]).model_dump(exclude=excluded) ==
SmartLockRead(**json_smart_lock_2["smart_lock"], id=uuid.uuid4()).model_dump(exclude=excluded))
@pytest.mark.asyncio
async def test_get_token_from_user(auth_client: AsyncClient, session: AsyncSession):
device = (await session.execute(select(Device))).scalar()
response = await auth_client.get("/device/token_from_user", params={"device_id": device.id})
assert response.status_code == 200
assert list(response.json().keys()) == ["access_token", "refresh_token"]
print(response.json())
@pytest.mark.asyncio
async def test_refresh_token(auth_client: AsyncClient, session: AsyncSession):
device = (await session.execute(select(Device))).scalar()
response = await auth_client.get("/device/token_from_user", params={"device_id": device.id})
assert response.status_code == 200
json = response.json()
json["counter_hash"] = TokenDataServer.hash(0).decode() # Не было обновений, значит ставим 0
# 1 обновление токена
response = await auth_client.get("/for_device/refresh_token", params=json)
assert response.status_code == 200
json1 = response.json()
response = await auth_client.get("/for_device/refresh_token", params=json)
assert response.status_code in (401, 403) # Token has been revoked
# 2 обновление токена со старым счетчиком (не должен сработать)
json1["counter_hash"] = json["counter_hash"]
response = await auth_client.get("/for_device/refresh_token", params=json1)
assert response.status_code in (401, 403) # invalid counter hash
# 2 обновление токена с обновленным счетчиком
json1["counter_hash"] = TokenDataServer.hash(1).decode()
response = await auth_client.get("/for_device/refresh_token", params=json1)
assert response.status_code == 200
# 3 обновление токена с обновленным счетчиком
# hashlib.md5(str(2).encode()).hexdigest()
json2 = response.json()
json2["counter_hash"] = TokenDataServer.hash(2).decode()
response = await auth_client.get("/for_device/refresh_token", params=json2)
assert response.status_code == 200

View File

@ -1,12 +0,0 @@
from fastapi import Depends
from .dependencies import jwt_websocket_create_access_token
from .router import router
from .schemas import DeviceWebsocketAuth
@router.post("/ws/token")
def get_access_token_for_connection_to_device(access_token: str = Depends(jwt_websocket_create_access_token)):
return {
"access_token": access_token
}

View File

@ -1,40 +0,0 @@
import abc
import os
from typing import Tuple
from fastapi import Depends, Query
from pydantic import BaseModel
from device.schemas import DeviceWebsocketAuth, TokenDataServer
# Множество заблокированных токенов доступа по вебсокетам. Токены одноразовые, но при дропе системы список должен
# очищаться (не хранится в постоянной памяти), чтобы устройства могли повторно подключиться к вебсокетам после
# восстановления
# todo перенести на redis
denylist = set()
tokens_data: dict[str, TokenDataServer] = dict()
class Settings(BaseModel):
authjwt_secret_key: str = os.environ.get("SECRET_DEVICE_AUTH_TOKEN")
authjwt_denylist_enabled: bool = True
authjwt_denylist_token_checks: set = {"access", "refresh"}
authjwt_refresh_token_expires: bool = False
def check_if_token_in_denylist(decrypted_token):
jti = decrypted_token['jti']
return jti in denylist
def get_config():
return Settings()
def jwt_websocket_required_auth(token: str = Query(...), authorize: None = Depends()) -> Tuple[dict, 'DeviceWebsocketAuth']:
authorize.jwt_required("websocket", token=token)
decoded_token = authorize.get_raw_jwt(token)
device = DeviceWebsocketAuth.model_validate(decoded_token)
return decoded_token, device

View File

@ -1,67 +0,0 @@
import datetime
import time
import json
import uvicorn
from fastapi import FastAPI, Depends, Body
from fastapi.staticfiles import StaticFiles
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.exc import DBAPIError
from starlette.requests import Request
from starlette.responses import JSONResponse
from device.router import router, websocket_router
app = FastAPI(
title="Smart lock",
root_path="/devices"
)
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html(req: Request):
root_path = req.scope.get("root_path", "").rstrip("/")
openapi_url = root_path + app.openapi_url
return get_swagger_ui_html(
openapi_url=openapi_url,
title="API",
)
app.include_router(router)
app.include_router(websocket_router)
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.perf_counter()
response = await call_next(request)
process_time = time.perf_counter() - start_time
print(f"Finished in {process_time:.3f}")
response.headers["X-Process-Time"] = str(process_time)
return response
@app.exception_handler(DBAPIError)
def authjwt_exception_handler(request: Request, exc: DBAPIError):
return JSONResponse(
status_code=500,
content={"detail": jsonable_encoder(exc) | {"args": jsonable_encoder(exc.args)}}
)
# @app.exception_handler(Exception)
# def authjwt_exception_handler(request: Request, exc: Exception):
# print(exc.with_traceback(None))
# return JSONResponse(
# status_code=500,
# content={
# "name_exception": exc.__class__.__name__,
# "args": getattr(exc, "args", []),
# "detail": jsonable_encoder(exc)
# }
# )
if __name__ == '__main__':
uvicorn.run(app=app, host="0.0.0.0", port=8001)
# uvicorn.run(app="main:app", host="0.0.0.0", port=8000, reload=True)

View File

@ -1,9 +0,0 @@
fastapi==0.109.1
uvicorn
sqlalchemy
fastapi-users==12.1.2
python-dotenv
asyncpg
pydantic_extra_types
fastapi_jwt_auth
fastapi_users_db_sqlalchemy

View File

@ -1,32 +0,0 @@
version: '3.8'
services:
user_service:
env_file: ".env"
build:
context: ./user_service
dockerfile: Dockerfile
environment:
- PORT=8000
ports:
- "8000:8000"
device_service:
env_file: ".env"
build:
context: ./device_service
dockerfile: Dockerfile
environment:
- PORT=8001
ports:
- "8001:8001"
nginx:
image: nginx:latest
volumes:
- ./nginx/nginx.conf:/etc/nginx/conf.d/default.conf
ports:
- "80:80"
depends_on:
- user_service
- device_service

View File

@ -1,11 +0,0 @@
server {
listen 80;
location /users {
proxy_pass http://user_service:8000;
}
location /devices/ {
proxy_pass http://device_service:8001;
}
}

View File

@ -1,48 +0,0 @@
# Лабораторная работа №3: REST API, шлюз и синхронный обмен данными между микросервисами
## Задание реализовать два микросервиса связанные единой базой и сущностями один ко многим
- Первый микросервис user предоставляет методы для регистрации авторизации и выхода пользователей
- Второй микросервис реализует сущность устройства, которые связанные с пользователями связью многие ко многим
### Реализация
Оба микросервиса написаны на fastapi и развернуты на вебсервере uvicorn. В самом докере развертывание осущетсвляется через веб сервер nginx,
который осуществляет роль прокси, прослушивая 80 http порт и перенправляя запрос на тот или иной микросервис
Для корректной работы swagger-а необходимо перенести location из nginx.conf в fastapi, чтобы ему был известен корневой путь, для построения документации
`app = FastAPI(
title="Smart lock",
root_path="/devices"
)`
`location /devices/ {
proxy_pass http://device_service:8001;
}`
### Развертывание
В корневую папку добавляем файл .env, где указываем значение переменных среды
**Пример файла:**
`DB_HOST = localhost
DB_PORT = 5432
DB_NAME =
DB_USER = postgres
DB_PASS = 1234
SECRET_AUTH_TOKEN = 1
SECRET_VERIFICATE_TOKEN = 1
SECRET_DEVICE_AUTH_TOKEN = 1
DEVICE_START_COUNTER = 1
STATIC_URL = https://testapi.danserver.keenetic.name/static/`
Далее указываем:
docker-compose up --build
[Видео](https://disk.yandex.ru/d/BGenDWYY6tIGaw)

View File

@ -1,10 +0,0 @@
FROM python:3.11
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "--host", "0.0.0.0", "main:app", "--port", "8000"]

View File

@ -1,6 +0,0 @@
from .models import *
from .router import *
from .schemas import *
from .manager import *
from .base_config import *
from .dependencies import *

View File

@ -1,34 +0,0 @@
#
import uuid
from fastapi_users import FastAPIUsers
from fastapi_users.authentication import CookieTransport, AuthenticationBackend, BearerTransport
from fastapi_users.authentication import JWTStrategy
from .models import User
from .manager import get_user_manager
from config import SECRET_AUTH_TOKEN
__all__ = ["fastapi_users", "current_user", "auth_backend", "current_user_optional"]
cookie_transport = CookieTransport()
bearer_transport = BearerTransport("/token")
def get_jwt_strategy() -> JWTStrategy:
return JWTStrategy(secret=SECRET_AUTH_TOKEN, lifetime_seconds=None)
auth_backend = AuthenticationBackend(
name="jwt",
transport=cookie_transport,
get_strategy=get_jwt_strategy,
)
fastapi_users = FastAPIUsers[User, uuid.UUID](
get_user_manager,
[auth_backend],
)
current_user = fastapi_users.current_user()
current_user_optional = fastapi_users.current_user(optional=True)

View File

@ -1,13 +0,0 @@
#
from fastapi import Depends
from fastapi_users_db_sqlalchemy import SQLAlchemyUserDatabase
from sqlalchemy.ext.asyncio import AsyncSession
from .models import User
from database import get_async_session
__all__ = ["get_user_db"]
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User)

View File

@ -1,99 +0,0 @@
#
import uuid
from typing import Optional
from fastapi import Depends, Request, Response, HTTPException
from fastapi.encoders import jsonable_encoder
from fastapi.security import OAuth2PasswordRequestForm
from fastapi_users import BaseUserManager, UUIDIDMixin, exceptions, models, schemas
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql.functions import user
from database import get_async_session
from .dependencies import get_user_db
from .models import *
from config import SECRET_VERIFICATE_TOKEN as SECRET
__all__ = ["UserManager", "get_user_manager"]
from .schemas import UserUpdate
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
reset_password_token_secret = SECRET
verification_token_secret = SECRET
async def on_after_register(self, user: User, request: Optional[Request] = None):
print(f"User {user.id} has registered.")
async def on_after_login(
self,
user: models.UP,
request: Optional[Request] = None,
response: Optional[Response] = None,
) -> None:
if user.fcm_token is not None:
await self.update(UserUpdate(fcm_token=user.fcm_token), user, request=request)
async def create(
self,
user_create: schemas.UC,
safe: bool = False,
request: Optional[Request] = None,
) -> models.UP:
await self.validate_password(user_create.password, user_create)
existing_user = await self.user_db.get_by_email(user_create.email)
if existing_user is not None:
raise exceptions.UserAlreadyExists()
user_dict = (
user_create.create_update_dict()
if safe
else user_create.create_update_dict_superuser()
)
password = user_dict.pop("password")
user_dict["hashed_password"] = self.password_helper.hash(password)
self._link_or_create_entity(user_dict, "group", Group)
try:
created_user = await self.user_db.create(user_dict)
except IntegrityError as e:
if 'group' not in e.statement:
raise e
detail = {
"msg": f"Некорректный идентификатор группы {user_dict.get('group_id')=} введенный при регистрации",
"inner_exception": jsonable_encoder(e)
}
raise HTTPException(status_code=403, detail=detail)
if hasattr(self.user_db, "session"):
await created_user.include_relationship_for_read(self.user_db.session)
await self.on_after_register(created_user, request)
return created_user
async def authenticate(
self, credentials: OAuth2PasswordRequestForm
) -> Optional[models.UP]:
user_auth = await super().authenticate(credentials)
if user_auth is None:
return None
user_auth.fcm_token = credentials.fcm_token
return user_auth
@staticmethod
def _link_or_create_entity(user_dict: dict, name: str, EntityType):
entity = user_dict.get(name)
if isinstance(entity, (uuid.UUID, int)) or entity is None:
user_dict.pop(name, None)
user_dict[name + "_id"] = entity
else:
user_dict[name] = EntityType(**entity)
async def get_user_manager(user_db=Depends(get_user_db)):
yield UserManager(user_db)

View File

@ -1,52 +0,0 @@
import uuid
from datetime import datetime
from fastapi_users_db_sqlalchemy import SQLAlchemyBaseUserTable
from sqlalchemy import Table, Column, Integer, String, TIMESTAMP, ForeignKey, JSON, Boolean, MetaData, DateTime, UUID, \
ARRAY, DECIMAL, DOUBLE_PRECISION
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import relationship
from database import Base
__all__ = ["Group", "User"]
class Group(Base):
__tablename__ = "groups"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String, nullable=False)
comment = Column(String, nullable=True)
users = relationship("User", back_populates="group")
class User(SQLAlchemyBaseUserTable[uuid.UUID], Base):
__tablename__ = 'users'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
email = Column(String, nullable=False)
name = Column(String, nullable=False)
surname = Column(String, nullable=False)
patronymic = Column(String, nullable=True)
creation_on = Column(TIMESTAMP, default=datetime.utcnow)
last_entry = Column(TIMESTAMP, nullable=True)
group_id = Column(ForeignKey(Group.id), nullable=True)
hashed_password = Column(String(length=1024), nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
is_superuser = Column(Boolean, default=False, nullable=False)
is_verified = Column(Boolean, default=False, nullable=False)
token = Column(String, nullable=True)
fcm_token = Column(String, nullable=True) # Идентификатор устройства для уведомлений
hashes_rfids = Column(ARRAY(String), default=[])
face_embedding = Column(ARRAY(DOUBLE_PRECISION), nullable=True)
devices = relationship("Device", secondary="active_devices", back_populates="users", viewonly=True)
active_devices = relationship("ActiveDevice")
group = relationship(Group, uselist=False, back_populates="users")
notifications = relationship("Notification", back_populates="user")
async def include_relationship_for_read(self, session: AsyncSession):
await session.refresh(self, ["group"])
return self

View File

@ -1,25 +0,0 @@
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_async_session
from . import User
from .base_config import current_user
from .schemas import UserRead
__all__ = ["user_router"]
user_router = APIRouter(prefix='/auth', tags=['Auth'])
@user_router.get("/current_user")
async def get_current_user(user=Depends(current_user), session=Depends(get_async_session)) -> UserRead:
return await user.include_relationship_for_read(session)
@user_router.patch("/fcm_token/{fcm_token}")
async def refresh_fcm_token(fcm_token: str | None = None,
user=Depends(current_user), session=Depends(get_async_session)) -> UserRead:
if fcm_token is not None:
user.fcm_token = fcm_token
await session.commit()
return user

View File

@ -1,39 +0,0 @@
#
import datetime
import uuid
from typing import Optional, Union
from sqlalchemy import TIMESTAMP
from fastapi_users import schemas
from pydantic import EmailStr, BaseModel, Json
__all__ = ["GroupRead", "GroupCreate", "UserRead", "UserCreate", "UserUpdate"]
class GroupCreate(BaseModel):
name: str
comment: Optional[str]
class GroupRead(GroupCreate, from_attributes=True):
id: uuid.UUID
class UserRead(schemas.BaseUser[uuid.UUID], from_attributes=True):
name: str
surname: str
patronymic: Optional[str]
creation_on: datetime.datetime
fcm_token: Optional[str] = None
group: Optional[GroupRead] = None
class UserCreate(schemas.BaseUserCreate):
name: str
surname: str
patronymic: Optional[str] = None
group: Union[None, uuid.UUID, GroupCreate] = None
class UserUpdate(schemas.BaseUserUpdate):
fcm_token: Optional[str] = None

View File

@ -1,98 +0,0 @@
import httpx
import pytest
from fastapi import Depends
from test_database import *
from . import User
from .schemas import UserCreate, GroupCreate, UserRead
__all__ = ["client", "auth_client", "get_generate_user", "session"]
def _add_password(user: User, password: str):
(user if isinstance(user, dict) else user.__dict__)["password"] = password
return user
def get_generate_user(unique=False):
if not unique:
return UserCreate(
name="user",
surname="test",
group=GroupCreate(name="test", comment="test_comment"),
email="user@localhost.ru",
password="<PASSWORD>"
)
@pytest_asyncio.fixture(scope="session")
async def auth_client(app) -> AsyncClient:
async with AsyncClient(app=app, base_url="http://127.0.0.1:9000") as client:
user = get_generate_user()
await client.post("/auth/register", json=user.model_dump())
response = await client.post("/auth/login", data={
'username': user.email,
'password': user.password
})
cookie = list(dict(response.cookies).items())[-1]
client.headers["Cookie"] = "=".join(cookie)
yield client
@pytest.mark.asyncio
async def test_register_user(client, session) -> UserCreate:
user = get_generate_user()
response = await client.post("/auth/register", json=user.model_dump())
assert response.status_code == 201, response.json()
result = response.json()
db_user = await (await session.get(User, result["id"])).include_relationship_for_read(session)
assert db_user is not None
assert result.pop("id", None) is not None, response.json()
compare_model = UserCreate.model_validate(_add_password(result, user.password)).model_dump()
db_compare_model = UserCreate.model_validate(_add_password(db_user, user.password), from_attributes=True).model_dump()
assert compare_model == db_compare_model
assert compare_model == user.model_dump()
assert user.model_dump() == db_compare_model
response = await client.post("/auth/register", json=user.model_dump())
assert response.status_code == 400, response.json()
return user
@pytest.mark.asyncio
async def test_login_user(client) -> AsyncClient:
user = get_generate_user()
await client.post("/auth/register", json=user.model_dump())
response = await client.post("/auth/login", data={
'username': user.email,
'password': user.password + "0"
})
assert response.status_code == 400, response.json()
response = await client.post("/auth/login", data={
'username': user.email,
'password': user.password
})
assert response.status_code == 200 or response.status_code == 204, response.json()
return user
@pytest.mark.asyncio
async def test_authorization_and_logout(auth_client):
client: httpx.AsyncClient = auth_client
response = await client.get("auth/current_user")
assert response.status_code == 200
result_user = UserRead(**response.json())
sourse_user = get_generate_user()
assert result_user.email == sourse_user.email
assert result_user.name == sourse_user.name
response = await client.post("/auth/logout")
assert response.status_code == 200 or response.status_code == 204
assert not response.cookies
auth_cookies = client.headers.pop("Cookie")
response = await client.get("/auth/current_user")
assert response.status_code == 401
client.headers["Cookie"] = auth_cookies

View File

@ -1,21 +0,0 @@
import os
import logging
from dotenv import load_dotenv
load_dotenv()
DB_HOST = os.environ.get("DB_HOST")
DB_PORT = os.environ.get("DB_PORT")
DB_NAME = os.environ.get("DB_NAME")
DB_USER = os.environ.get("DB_USER")
DB_PASS = os.environ.get("DB_PASS")
SECRET_AUTH_TOKEN = os.environ.get("SECRET_AUTH_TOKEN")
SECRET_VERIFICATE_TOKEN = os.environ.get("SECRET_VERIFICATE_TOKEN")
DEVICE_START_COUNTER = os.environ.get("DEVICE_START_COUNTER")
logging.getLogger('passlib').setLevel(logging.ERROR)

View File

@ -1,49 +0,0 @@
import time
from sys import gettrace
from typing import AsyncGenerator
from fastapi import HTTPException
from fastapi.encoders import jsonable_encoder
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from config import DB_HOST, DB_NAME, DB_PASS, DB_PORT, DB_USER
from pydantic import BaseModel
from sqlalchemy import Table, Column, Integer, String, TIMESTAMP, ForeignKey, Boolean, MetaData, PrimaryKeyConstraint
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
def fill_from_model(self, model: BaseModel | dict):
"""Заполняет все поля из словаря или модели, помечая объект 'грязным' для корректного обновления"""
if isinstance(model, BaseModel):
items = model.model_dump().items()
else:
items = model.items()
for key, value in items:
setattr(self, key, value)
DATABASE_URL = f"postgresql+asyncpg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_async_engine(DATABASE_URL, echo=gettrace() is not None)
async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def begin_transactions() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker().begin() as transaction:
try:
yield transaction.session
except Exception as e:
await transaction.rollback()
raise e
await transaction.commit()

View File

@ -1,81 +0,0 @@
import datetime
import time
import json
# import firebase_admin
import uvicorn
from fastapi import FastAPI, Depends, Body
from fastapi.staticfiles import StaticFiles
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
from sqlalchemy.exc import DBAPIError
from starlette.requests import Request
from starlette.responses import JSONResponse
from auth import *
app = FastAPI(
title="Smart lock",
root_path="/users"
)
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html(req: Request):
root_path = req.scope.get("root_path", "").rstrip("/")
openapi_url = root_path + app.openapi_url
return get_swagger_ui_html(
openapi_url=openapi_url,
title="API",
)
app.include_router(
fastapi_users.get_auth_router(auth_backend),
prefix="/auth",
tags=["Auth"],
)
app.include_router(
fastapi_users.get_register_router(UserRead, UserCreate),
prefix="/auth",
tags=["Auth"],
)
# firebase
# cred = credentials.Certificate("../serviceAccountKey.json")
# try:
# fire_app = firebase_admin.initialize_app(cred)
# except ValueError:
# pass
class Message(BaseModel):
msg: str
@app.post("/msg")
def debug_message_from_user(msg: Message = Body(), user: User = Depends(current_user_optional)):
print(datetime.datetime.now(), end=' ')
if user is not None:
print(user.__dict__)
print(msg.msg)
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start_time = time.perf_counter()
response = await call_next(request)
process_time = time.perf_counter() - start_time
print(f"Finished in {process_time:.3f}")
response.headers["X-Process-Time"] = str(process_time)
return response
@app.exception_handler(DBAPIError)
def authjwt_exception_handler(request: Request, exc: DBAPIError):
return JSONResponse(
status_code=500,
content={"detail": jsonable_encoder(exc) | {"args": jsonable_encoder(exc.args)}}
)
if __name__ == '__main__':
uvicorn.run(app=app, host="0.0.0.0", port=8000)

View File

@ -1,7 +0,0 @@
fastapi
uvicorn
fastapi-users
sqlalchemy
fastapi_users_db_sqlalchemy
python-dotenv
asyncpg

View File

@ -1,30 +0,0 @@
import pika
import time
def callback(ch, method, properties, body):
print(f'Consumer 1 получил сообщение: {body.decode()}')
# Время задержки по условию
time.sleep(2)
print('Consumer 1 закончил обработку')
def consume_events_1():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='consumer1_queue')
# Привязка очереди
channel.queue_bind(exchange='beauty_salon_events', queue='consumer1_queue')
channel.basic_consume(queue='consumer1_queue', on_message_callback=callback, auto_ack=True)
print('Consumer 1 начал ожидать сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_1()

View File

@ -1,28 +0,0 @@
import pika
def callback(ch, method, properties, body):
print(f'Consumer 2 получил сообщение: {body.decode()}')
# Обработка "нон-стопом"
print('Consumer 2 закончил обработку')
def consume_events_2():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='consumer2_queue')
# Привязка очереди
channel.queue_bind(exchange='beauty_salon_events', queue='consumer2_queue')
channel.basic_consume(queue='consumer2_queue', on_message_callback=callback, auto_ack=True)
print('Consumer 2 начал ожидать сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_2()

View File

@ -1,51 +0,0 @@
# Лабораторная работа №4 - Работа с брокером сообщений
+ Установить брокер сообщений RabbitMQ.
+ Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
+ Продемонстрировать работу брокера сообщений.
## Описание работы
**Publisher** - осуществляет отправку сообщений своим клиентам.
**Consumer1** - принимает и обрабатывает сообщения с задержкой в 2-3 секунды.
**Consumer2** - моментально принимает и обрабатывает сообщения.
### Tutorials
1. tutorial_1
![tutorial_1.png](Screenshots/tutorial_1.png)
2. tutorial_2
![tutorial_2.png](Screenshots/tutorial_2.png)
3. tutorial_3
![tutorial_3.png](Screenshots/tutorial_3.png)
## Работа с RabbitMQ
![rabbitMQ.png](Screenshots/rabbitMQ.png)
## Показания очереди queue_1 при одном запущенном экземпляре Consumer_1
![queue_1_1.png](Screenshots/queue_1 _1.png)
## Показания очереди queue_2
![queue_2_1.png](Screenshots/queue_2_1.png)
## Показания очереди queue_1 при двух запущенных экземплярах Consumer_1
![queue_1_2.png](Screenshots/queue_1 _2.png)
## Показания очереди queue_1 при трех запущенных экземплярах Consumer_1
![queue_1_3.png](Screenshots/queue_1 _3.png)
## Видеозапись работы программы
https://disk.yandex.ru/d/TAdJwo36RrN4ag

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 8.1 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.2 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 15 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 46 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB

View File

@ -1,28 +0,0 @@
import pika
import time
def publish_events():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание exchange типа fanout
channel.exchange_declare(exchange='beauty_salon_events', exchange_type='fanout')
events = [
"Test1",
"Test2",
"Test3",
"Test4",
"Test5"
]
while True:
event = events[int(time.time()) % len(events)]
channel.basic_publish(exchange='beauty_salon_events', routing_key='', body=event)
print(f'Отправлено: {event}')
time.sleep(1)
if __name__ == "__main__":
publish_events()

View File

@ -1,25 +0,0 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@ -1,11 +0,0 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

View File

@ -1,19 +0,0 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@ -1,23 +0,0 @@
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

View File

@ -1,13 +0,0 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -1,22 +0,0 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 114 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 187 KiB

View File

@ -1,68 +0,0 @@
import multiprocessing
import random
import time
from pprint import pprint
from multiprocessing import Pool
def create_random_matrix(size):
return [[random.random() for _ in range(size)] for __ in range(size)]
# def matrix_multiply_seq(matrix1: list[list[float]], matrix2: list[list[float]]) -> list[list[float]]:
# """Выполняет последовательное перемножение двух матриц и возвращает результирующую матрицу"""
# l1 = len(matrix1)
# l2 = len(matrix2)
# result = [[0 for _ in range(l2)] for __ in range(l1)]
# for i in range(l1):
# for j in range(l2):
# for k in range(l2):
# result[i][j] += matrix1[i][k] * matrix2[k][j]
# return result
matrix_result = [[0 for _ in range(1000)] for __ in range(1000)]
def matrix_multiply(args) -> None:
"""Перемножает строки от start_cnt до end_cnt, результат помещает в глобальную переменную matrix_result"""
matrix1, matrix2, start_cnt, end_cnt = args
for i in range(start_cnt, end_cnt):
for j in range(len(matrix2)):
for k in range(len(matrix2)):
matrix_result[i][j] += matrix1[i - start_cnt][k] * matrix2[k][j]
def matrix_multiply_parralel(matrix1: list[list[float]], matrix2: list[list[float]], thread_count):
"""Выполняет парралеьное перемножение матриц"""
l1 = len(matrix1)
step = l1 // thread_count
args = [(matrix1, matrix2, i, i + step) for i in range(0, l1, step)]
args[-1] = (matrix1, matrix2, step * (l1 - 1), l1) # Остаток на последний поток
with Pool(processes=thread_count) as pool:
pool.map(matrix_multiply, args)
# pprint(matrix_result, compact=True)
def main():
sizes = [100, 300, 500, 1000]
num_threads = [2, 4, 6, 8, 12, 16, 20]
print(f"cpu_count: {multiprocessing.cpu_count()}")
for size in sizes:
matrix1 = create_random_matrix(size)
matrix2 = create_random_matrix(size)
t0 = time.perf_counter()
matrix_multiply((matrix1, matrix2, 0, len(matrix1)))
# pprint(matrix_result[:size][:size], compact=True)
print(f"Время последовательного перемножения матриц {size=:4}: \t\t\t\t{time.perf_counter() - t0:.3f}s")
for threads in num_threads:
start_time = time.perf_counter()
matrix_multiply_parralel(matrix1, matrix2, threads)
end_time = time.perf_counter()
print(f"Время парралельного перемножения матриц {size=:4}, {threads=} : \t{end_time - start_time:.3f}")
print("-" * 100)
if __name__ == '__main__':
main()

View File

@ -1,17 +0,0 @@
# Аглиуллов Данияр ИСЭбд-41
# Лабораторная работа №5
В ходе выполнения задачи по умножению квадратных матриц с использованием обычного и параллельного алгоритмов были получены следующие результаты и выводы:
Результаты тестов:
![изображение 1](./Screenshots/1.png)
![изображение 2](./Screenshots/Снимок%20экрана%202024-11-13%20160640.png)
Сравнение производительности:
В тестах на матрицах размером 100x100, 300x300 и 500x500 было замечено, что параллельный алгоритм демонстрирует значительное сокращение времени выполнения по сравнению с обычным алгоритмом, особенно на больших матрицах. Это подтверждает эффективность использования многопоточности для задач, требующих больших вычислительных ресурсов.
На малых размерах матриц (например, 100x100) преимущество было за последовательным умножением матрицы из-за накладных расходов на создание пула потоков. Однако при увеличении размера матриц (300x300 и 500x500) преимущества параллельного подхода становились более очевидными.
• При превышении количества физических потоков процессора, производительность понижается за счет смены контекста при переключении виртуальных потоков на одном ядре
![Видео](https://disk.yandex.ru/d/RK_c82BrLw2KjQ)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 227 KiB

View File

@ -1,86 +0,0 @@
import numpy as np
import time
from multiprocessing import Pool
np.seterr(over='ignore')
# Функция для вычисления детерминанта методом Гаусса
def compute_determinant_gauss(mat):
size = mat.shape[0]
matrix_copy = mat.astype(float) # Копируем матрицу, чтобы не изменять исходную
determinant = 1.0 # Начальное значение детерминанта
for k in range(size):
# Находим максимальный элемент в текущем столбце для уменьшения ошибок округления
max_index = np.argmax(np.abs(matrix_copy[k:size, k])) + k
if matrix_copy[max_index, k] == 0:
return 0 # Если на главной диагонали ноль, детерминант равен нулю
# Меняем местами строки
if max_index != k:
matrix_copy[[k, max_index]] = matrix_copy[[max_index, k]]
determinant *= -1 # Каждая перестановка меняет знак детерминанта
# Обнуляем элементы ниже главной диагонали
for m in range(k + 1, size):
multiplier = matrix_copy[m, k] / matrix_copy[k, k]
matrix_copy[m, k:] -= multiplier * matrix_copy[k, k:]
# Произведение элементов на главной диагонали
for j in range(size):
determinant *= matrix_copy[j, j]
return determinant
# Функция для параллельного вычисления детерминанта
def parallel_worker(index_range, mat):
size = mat.shape[0]
matrix_copy = mat.astype(float)
det = 1.0
for k in range(index_range[0], index_range[1]):
max_index = np.argmax(np.abs(matrix_copy[k:size, k])) + k
if matrix_copy[max_index, k] == 0:
return 0
if max_index != k:
matrix_copy[[k, max_index]] = matrix_copy[[max_index, k]]
det *= -1
for m in range(k + 1, size):
multiplier = matrix_copy[m, k] / matrix_copy[k, k]
matrix_copy[m, k:] -= multiplier * matrix_copy[k, k:]
return det
# Функция для параллельного вычисления детерминанта
def compute_parallel_determinant(mat, num_workers):
size = mat.shape[0]
block_size = size // num_workers
ranges = [(i * block_size, (i + 1) * block_size) for i in range(num_workers)]
with Pool(processes=num_workers) as pool:
results = pool.starmap(parallel_worker, [(block, mat) for block in ranges])
# Объединяем результаты
total_determinant = sum(results)
return total_determinant
# Функция для запуска тестов производительности
def execute_benchmarks():
sizes = [100, 300, 500] # Размеры матриц
for size in sizes:
random_matrix = np.random.rand(size, size) # Генерация случайной матрицы
print(f"--- Тест производительности для матрицы {size}x{size} ---")
# Последовательное вычисление детерминанта
start_time = time.time()
sequential_det = compute_determinant_gauss(random_matrix)
seq_duration = time.time() - start_time
print(f"Время последовательного вычисления для {size}x{size}: {seq_duration:.4f} секунд")
# Параллельное вычисление с различным количеством процессов
for workers in [1, 2, 4, 6, 8, 12, 16]:
start_time = time.time()
parallel_det = compute_parallel_determinant(random_matrix, workers)
par_duration = time.time() - start_time
speedup_ratio = seq_duration / par_duration if par_duration > 0 else 0
print(f"Параллельное время с {workers} процессами: {par_duration:.4f} секунд, Ускорение: {speedup_ratio:.2f}")
# Запуск тестов производительности
if __name__ == '__main__':
execute_benchmarks()

View File

@ -1,16 +0,0 @@
# Аглиуллов Данияр ИСЭбд-41
# Лабораторная работа №6
Для повышения производительности при вычислении детерминанта для больших матриц была добавлена возможность параллельной обработки с использованием библиотеки multiprocessing. Это позволило значительно ускорить вычисления за счет распределения нагрузки между несколькими процессами.
Результаты тестов:
![изображение 1](../Screenshots/1.png)
Сравнение производительности:
В тестах на матрицах размером 100x100, 300x300 и 500x500 было замечено, что параллельный алгоритм демонстрирует значительное сокращение времени выполнения по сравнению с обычным алгоритмом, особенно на больших матрицах. Это подтверждает эффективность использования многопоточности для задач, требующих больших вычислительных ресурсов.
На малых размерах матриц (например, 100x100) преимущество было за последовательным умножением матрицы из-за накладных расходов на создание пула потоков. Однако при увеличении размера матрицы (300x300 и 500x500) преимущества параллельного подхода становились более очевидными.
• При превышении количества физических потоков процессора, производительность понижается за счет смены контекста при переключении виртуальных потоков на одном ядре
![Видео](https://disk.yandex.ru/d/MEZQvGM8u9OIBw)

View File

@ -1,24 +0,0 @@
# Лабораторная работа 7. Балансировка нагрузки в распределённых системах с использованием открытых технологий
## Задание
Написать краткое эссе, отвечая на следующие вопросы:
1. Какие алгоритмы и методы применяются для балансировки нагрузки?
2. Какие открытые технологии доступны для этой задачи?
3. Как осуществляется балансировка нагрузки в системах управления базами данных?
4. Какова роль реверс-прокси в процессе балансировки нагрузки?
## Эссе по теме
Балансировка нагрузки является ключевым аспектом проектирования распределённых систем, позволяя равномерно распределять входящие запросы между несколькими серверами или ресурсами. Это способствует повышению доступности, производительности и устойчивости к сбоям системы.
Существует множество алгоритмов и методов, применяемых для балансировки нагрузки. К ним относятся Least Connections (минимальное количество соединений), Round Robin (круговая схема), Server Health Check (проверка состояния сервера), Least Response Time (минимальное время ответа) и Random (случайный выбор). Эти подходы помогают эффективно распределять клиентские запросы с учётом текущей загруженности серверов.
Среди открытых технологий для балансировки нагрузки можно выделить Nginx, HAProxy, Apache HTTP Server и другие. Эти инструменты позволяют настроить прокси-серверы, которые принимают запросы от пользователей и направляют их на один или несколько серверов приложений, обеспечивая равномерное распределение нагрузки.
Балансировка нагрузки в базах данных важна для поддержания доступности и производительности системы. Для этого используются специализированные решения, такие как ProxySQL, pgpool-II и MySQL Router. Они помогают распределять запросы к базам данных между различными узлами, что позволяет снизить нагрузку на основной сервер.
Реверс-прокси представляет собой сервер, который принимает запросы от клиентов и перенаправляет их на один или несколько серверов приложений. Он выполняет множество функций, включая балансировку нагрузки, обеспечение безопасности, кэширование, SSL-терминацию, а также мониторинг и ведение логов.

View File

@ -1,52 +0,0 @@
# Лабораторная работа 8. Как Вы поняли, что называется распределенной системой и как она устроена?
## Задание:
Написать небольшое эссе (буквально несколько абзацев) своими словами на тему "Устройство распределенных систем". Вопросы:
1. Зачем сложные системы (например, социальная сеть ВКонтакте) пишутся в "распределенном" стиле, где каждое отдельное приложение (или сервис) функционально выполняет только ограниченный спектр задач?
2. Для чего были созданы системы оркестрации приложений? Каким образом они упрощают / усложняют разработку и сопровождение распределенных систем?
3. Для чего нужны очереди обработки сообщений и что может подразумеваться под сообщениями?
4. Какие преимущества и недостатки распределенных приложений существуют на Ваш взгляд?
5. Целесообразно ли в сложную распределенную систему внедрять параллельные вычисления? Приведите примеры, когда это действительно нужно, а когда нет.
## Преимущества распределённых приложений
1. Высокая доступность и отказоустойчивость: Один из основных плюсов распределённых систем заключается в том, что они могут продолжать функционировать даже при сбое отдельных компонентов. Это достигается за счёт дублирования сервисов и автоматического перенаправления запросов на работающие узлы.
2. Масштабируемость: Распределённые системы легко масштабируются. При увеличении нагрузки можно добавлять новые серверы или ресурсы без значительных изменений в архитектуре.
3. Обработка больших объёмов данных: Распределённые приложения могут одновременно обрабатывать множество запросов и большие объёмы данных, что делает их идеальными для таких задач, как анализ больших данных и машинное обучение.
## Недостатки распределённых приложений
1. Сложность архитектуры: Архитектура распределённых систем может быть значительно сложнее, чем у монолитных приложений. Это затрудняет отладку и мониторинг, так как необходимо учитывать взаимодействие множества компонентов.
2. Проблемы с согласованностью данных: В распределённых системах может возникнуть несогласованность данных между различными сервисами, особенно если они работают с копиями одних и тех же данных.
3. Управление безопасностью: Увеличение числа взаимодействий между компонентами системы требует более тщательного управления безопасностью, так как это повышает вероятность уязвимостей и атак.
## Внедрение параллельных вычислений
Параллельные вычисления в распределённых системах оправданы, когда необходимо обрабатывать большие объёмы данных или выполнять сложные вычисления. Например:
• Машинное обучение: Обучение моделей на больших наборах данных может быть значительно ускорено за счёт распараллеливания вычислений.
• Анализ больших данных: Параллельные вычисления позволяют эффективно обрабатывать данные, разбивая их на части и распределяя по нескольким узлам.
Однако стоит помнить, что не всегда параллельные вычисления оправданы:
• Если задача не требует значительных ресурсов или имеет низкую степень параллелизма, внедрение параллельных вычислений может усложнить архитектуру без ощутимой выгоды.
• Простые операции, такие как CRUD-операции, могут быть более эффективно реализованы в рамках монолитной архитектуры без необходимости распараллеливания.
## Эссе на тему
Распределенные приложения имеют множество преимуществ. Во-первых, они обеспечивают высокую доступность и отказоустойчивость: если один компонент выходит из строя, остальные продолжают функционировать. Во-вторых, они масштабируемы: можно легко добавлять новые ресурсы по мере необходимости. В-третьих, распределенные системы могут обрабатывать большие объемы данных и запросов одновременно. Однако у них есть и недостатки. Сложность архитектуры может привести к трудностям в отладке и мониторинге. Также возможны проблемы с согласованностью данных между различными сервисами. Наконец, распределенные системы требуют более тщательного управления безопасностью, так как большее количество взаимодействий увеличивает вероятность уязвимостей.
Внедрение параллельных вычислений в распределенные системы целесообразно в тех случаях, когда необходимо обрабатывать большие объемы данных или выполнять сложные вычисления. Например, в задачах машинного обучения или анализа больших данных параллельные вычисления позволяют значительно сократить время обработки. Однако не всегда параллельные вычисления оправданы. Если задача не требует значительных ресурсов или имеет низкую степень параллелизма, то их внедрение может усложнить архитектуру без ощутимой выгоды. Например, простые CRUD-операции (создание, чтение, обновление и удаление) могут быть более эффективно реализованы без использования параллельных вычислений.

View File

@ -1,30 +0,0 @@
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/azds.yaml
**/bin
**/charts
**/docker-compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md
!**/.gitignore
!.git/HEAD
!.git/config
!.git/packed-refs
!.git/refs/heads/**

View File

@ -1,15 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
</ItemGroup>
</Project>

View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="Current" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<ActiveDebugProfile>Container (Dockerfile)</ActiveDebugProfile>
</PropertyGroup>
</Project>

View File

@ -1,28 +0,0 @@
# См. статью по ссылке https://aka.ms/customizecontainer, чтобы узнать как настроить контейнер отладки и как Visual Studio использует этот Dockerfile для создания образов для ускорения отладки.
# Этот этап используется при запуске из VS в быстром режиме (по умолчанию для конфигурации отладки)
FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base
USER app
WORKDIR /app
# Этот этап используется для сборки проекта службы
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
ARG BUILD_CONFIGURATION=Release
WORKDIR /src
COPY ["ConsoleApp1/ConsoleApp1.csproj", "ConsoleApp1/"]
RUN dotnet restore "./ConsoleApp1/ConsoleApp1.csproj"
COPY . .
WORKDIR "/src/ConsoleApp1"
RUN dotnet build "./ConsoleApp1.csproj" -c $BUILD_CONFIGURATION -o /app/build
# Этот этап используется для публикации проекта службы, который будет скопирован на последний этап
FROM build AS publish
ARG BUILD_CONFIGURATION=Release
RUN dotnet publish "./ConsoleApp1.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
# Этот этап используется в рабочей среде или при запуске из VS в обычном режиме (по умолчанию, когда конфигурация отладки не используется)
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "ConsoleApp1.dll"]

View File

@ -1,36 +0,0 @@
using System;
using System.IO;
class Program
{
static void Main()
{
const string inputDir = "/var/data";
const string outputFile = "/var/result/data.txt";
try
{
using (var writer = new StreamWriter(outputFile))
{
var files = Directory.GetFiles(inputDir);
foreach (var file in files)
{
using (var reader = new StreamReader(file))
{
string firstLine = reader.ReadLine();
if (!string.IsNullOrEmpty(firstLine))
{
writer.WriteLine(firstLine);
}
}
}
}
Console.WriteLine($"Файл {outputFile} успешно создан.");
}
catch (Exception ex)
{
Console.WriteLine($"Ошибка: {ex.Message}");
}
}
}

View File

@ -1,10 +0,0 @@
{
"profiles": {
"ConsoleApp1": {
"commandName": "Project"
},
"Container (Dockerfile)": {
"commandName": "Docker"
}
}
}

View File

@ -1,15 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.21.0" />
</ItemGroup>
</Project>

View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="Current" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<ActiveDebugProfile>Container (Dockerfile)</ActiveDebugProfile>
</PropertyGroup>
</Project>

View File

@ -1,28 +0,0 @@
# См. статью по ссылке https://aka.ms/customizecontainer, чтобы узнать как настроить контейнер отладки и как Visual Studio использует этот Dockerfile для создания образов для ускорения отладки.
# Этот этап используется при запуске из VS в быстром режиме (по умолчанию для конфигурации отладки)
FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base
USER app
WORKDIR /app
# Этот этап используется для сборки проекта службы
FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
ARG BUILD_CONFIGURATION=Release
WORKDIR /src
COPY ["ConsoleApp2/ConsoleApp2.csproj", "ConsoleApp2/"]
RUN dotnet restore "./ConsoleApp2/ConsoleApp2.csproj"
COPY . .
WORKDIR "/src/ConsoleApp2"
RUN dotnet build "./ConsoleApp2.csproj" -c $BUILD_CONFIGURATION -o /app/build
# Этот этап используется для публикации проекта службы, который будет скопирован на последний этап
FROM build AS publish
ARG BUILD_CONFIGURATION=Release
RUN dotnet publish "./ConsoleApp2.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false
# Этот этап используется в рабочей среде или при запуске из VS в обычном режиме (по умолчанию, когда конфигурация отладки не используется)
FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "ConsoleApp2.dll"]

View File

@ -1,34 +0,0 @@
using System;
using System.IO;
using System.Linq;
class Program
{
static void Main()
{
const string inputFile = "/var/result/data.txt";
const string outputFile = "/var/result/result.txt";
try
{
var lines = File.ReadAllLines(inputFile).Select(int.Parse).ToArray();
if (lines.Length >= 2)
{
int result = lines.First() * lines.Last();
File.WriteAllText(outputFile, result.ToString());
Console.WriteLine($"Произведение: {result}");
}
else
{
Console.WriteLine("Недостаточно данных в файле для вычисления.");
}
}
catch (Exception ex)
{
Console.WriteLine($"Ошибка: {ex.Message}");
}
}
}

View File

@ -1,10 +0,0 @@
{
"profiles": {
"ConsoleApp2": {
"commandName": "Project"
},
"Container (Dockerfile)": {
"commandName": "Docker"
}
}
}

View File

@ -1,10 +0,0 @@
25
91
77
63
45
25
21
89
6
18

View File

@ -1,10 +0,0 @@
10
3
38
9
36
43
96
31
95
58

View File

@ -1,10 +0,0 @@
13
35
38
31
19
94
94
84
18
47

View File

@ -1,10 +0,0 @@
9
32
75
92
100
85
85
10
50
54

View File

@ -1,10 +0,0 @@
83
88
29
86
87
79
18
22
76
71

View File

@ -1,10 +0,0 @@
15
22
92
91
78
47
53
98
72
64

View File

@ -1,10 +0,0 @@
66
45
83
55
25
82
95
42
18
6

View File

@ -1,10 +0,0 @@
25
71
35
71
78
51
29
67
87
33

View File

@ -1,10 +0,0 @@
93
19
32
13
75
86
46
87
39
66

View File

@ -1,10 +0,0 @@
7
74
69
75
45
28
92
9
77
32

View File

@ -1,10 +0,0 @@
42
75
67
53
2
34
57
47
83
52

View File

@ -1,10 +0,0 @@
98
62
45
77
65
45
61
62
10
76

View File

@ -1,10 +0,0 @@
41
30
41
39
62
3
79
93
56
82

View File

@ -1,10 +0,0 @@
85
29
46
36
82
52
4
14
89
17

View File

@ -1,10 +0,0 @@
35
98
38
31
39
76
5
71
7
58

View File

@ -1,10 +0,0 @@
50
93
18
76
13
62
16
45
65
25

View File

@ -1,10 +0,0 @@
98
45
1
52
14
7
56
38
7
50

View File

@ -1,10 +0,0 @@
41
27
27
24
76
36
19
87
83
35

View File

@ -1,10 +0,0 @@
16
5
95
36
20
60
79
46
61
77

View File

@ -1,10 +0,0 @@
43
23
53
6
88
27
55
15
94
36

View File

@ -1,10 +0,0 @@
62
95
50
65
13
56
74
37
99
93

View File

@ -1,10 +0,0 @@
13
2
31
49
80
73
47
61
96
69

View File

@ -1,10 +0,0 @@
37
54
100
34
1
77
55
10
30
28

View File

@ -1,10 +0,0 @@
35
17
95
59
17
98
68
54
89
56

Some files were not shown because too many files have changed in this diff Show More