58 lines
2.0 KiB
Python
58 lines
2.0 KiB
Python
from datetime import datetime
|
|
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
|
|
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
|
|
from sqlalchemy import ForeignKey
|
|
|
|
engine = create_async_engine("sqlite+aiosqlite:///tasks.db")
|
|
new_session = async_sessionmaker(engine, expire_on_commit=False)
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
class User(Base):
|
|
__tablename__ = "users"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
username: Mapped[str] = mapped_column(unique=True)
|
|
password_hash: Mapped[str]
|
|
|
|
# Связи
|
|
csv_files = relationship("CSVFile", back_populates="user")
|
|
models = relationship("H5Model", back_populates="user")
|
|
|
|
class CSVFile(Base):
|
|
__tablename__ = "csv_files"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
|
|
file_path: Mapped[str]
|
|
uploaded_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
|
|
|
|
user = relationship("User", back_populates="csv_files")
|
|
|
|
class H5Model(Base):
|
|
__tablename__ = "h5_models"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
|
|
path_model: Mapped[str]
|
|
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
|
|
|
|
user = relationship("User", back_populates="models")
|
|
statistics = relationship("ModelStatistics", back_populates="model")
|
|
|
|
class ModelStatistics(Base):
|
|
__tablename__ = "model_statistics"
|
|
id: Mapped[int] = mapped_column(primary_key=True)
|
|
model_id: Mapped[int] = mapped_column(ForeignKey("h5_models.id"))
|
|
accuracy: Mapped[float]
|
|
loss: Mapped[float]
|
|
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
|
|
|
|
model = relationship("H5Model", back_populates="statistics")
|
|
|
|
async def create_tables():
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.create_all)
|
|
|
|
async def delete_tables():
|
|
async with engine.begin() as conn:
|
|
await conn.run_sync(Base.metadata.drop_all)
|