Бд +- подготовили, надеюсь нейронка быстро получится все сделать
This commit is contained in:
parent
03e6dfa97c
commit
96f8e782cc
18
database.py
18
database.py
@ -1,30 +1,26 @@
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
|
||||
|
||||
engine = create_async_engine("sqlite+aiosqlite:///tasks.db")
|
||||
engine = create_async_engine("sqlite+aiosqlite:///questions.db")
|
||||
new_session = async_sessionmaker(engine, expire_on_commit=False)
|
||||
|
||||
|
||||
class Model(DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
class TaskOrm(Model):
|
||||
__tablename__ = "tasks"
|
||||
class QuestionOrm(Model):
|
||||
__tablename__ = "questions"
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
name: Mapped[str]
|
||||
description: Mapped[Optional[str]]
|
||||
|
||||
id_user: Mapped[int]
|
||||
type_question: Mapped[bool]
|
||||
question: Mapped[str]
|
||||
answer: Mapped[Optional[str]]
|
||||
|
||||
async def create_tables():
|
||||
# https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Model.metadata.create_all)
|
||||
|
||||
|
||||
async def delete_tables():
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Model.metadata.drop_all)
|
||||
|
16
main.py
16
main.py
@ -1,20 +1,22 @@
|
||||
from fastapi import FastAPI
|
||||
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from database import create_tables, delete_tables
|
||||
from router import router as tasks_router
|
||||
from router import router as questions_router
|
||||
|
||||
# Настройка логирования
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
await delete_tables()
|
||||
print("База очищена")
|
||||
logger.info("База данных очищена")
|
||||
await create_tables()
|
||||
print("База готова к работе")
|
||||
logger.info("База данных готова к работе")
|
||||
yield
|
||||
print("Выключение")
|
||||
|
||||
logger.info("Выключение")
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
app.include_router(tasks_router)
|
||||
app.include_router(questions_router)
|
||||
|
@ -1,26 +1,24 @@
|
||||
from sqlalchemy import select
|
||||
|
||||
from database import new_session, TaskOrm
|
||||
from schemas import STaskAdd, STask
|
||||
from database import new_session, QuestionOrm
|
||||
from schemas import SQuestionAdd, SQuestion
|
||||
|
||||
|
||||
class TaskRepository:
|
||||
class QuestionRepository:
|
||||
@classmethod
|
||||
async def add_one(cls, data: STaskAdd) -> int:
|
||||
async def add_one(cls, data: SQuestionAdd) -> int:
|
||||
async with new_session() as session:
|
||||
task_dict = data.model_dump()
|
||||
|
||||
task = TaskOrm(**task_dict)
|
||||
session.add(task)
|
||||
question_dict = data.model_dump()
|
||||
question = QuestionOrm(**question_dict)
|
||||
session.add(question)
|
||||
await session.flush()
|
||||
await session.commit()
|
||||
return task.id
|
||||
return question.id
|
||||
|
||||
@classmethod
|
||||
async def find_all(cls) -> list[STask]:
|
||||
async def find_all(cls) -> list[SQuestion]:
|
||||
async with new_session() as session:
|
||||
query = select(TaskOrm)
|
||||
query = select(QuestionOrm)
|
||||
result = await session.execute(query)
|
||||
task_models = result.scalars().all()
|
||||
task_schemas = [STask.model_validate(task_model) for task_model in task_models]
|
||||
return task_schemas
|
||||
question_models = result.scalars().all()
|
||||
question_schemas = [SQuestion.model_validate(question_model) for question_model in question_models]
|
||||
return question_schemas
|
||||
|
27
router.py
27
router.py
@ -1,25 +1,22 @@
|
||||
from typing import Annotated
|
||||
|
||||
from fastapi import APIRouter, Depends
|
||||
|
||||
from repository import TaskRepository
|
||||
from schemas import STaskAdd, STask, STaskId
|
||||
from repository import QuestionRepository
|
||||
from schemas import SQuestionAdd, SQuestion, SQuestionId
|
||||
|
||||
router = APIRouter(
|
||||
prefix="/tasks",
|
||||
tags=["Таски"],
|
||||
prefix="/questions",
|
||||
tags=["Questions"],
|
||||
)
|
||||
|
||||
|
||||
@router.post("")
|
||||
async def add_task(
|
||||
task: Annotated[STaskAdd, Depends()],
|
||||
) -> STaskId:
|
||||
task_id = await TaskRepository.add_one(task)
|
||||
return {"ok": True, "task_id": task_id}
|
||||
|
||||
async def add_question(
|
||||
question: Annotated[SQuestionAdd, Depends()],
|
||||
) -> SQuestionId:
|
||||
question_id = await QuestionRepository.add_one(question)
|
||||
return {"ok": True, "question_id": question_id}
|
||||
|
||||
@router.get("")
|
||||
async def get_tasks() -> list[STask]:
|
||||
tasks = await TaskRepository.find_all()
|
||||
return tasks
|
||||
async def get_questions() -> list[SQuestion]:
|
||||
questions = await QuestionRepository.find_all()
|
||||
return questions
|
||||
|
18
schemas.py
18
schemas.py
@ -1,19 +1,17 @@
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
class SQuestionAdd(BaseModel):
|
||||
id_user: int
|
||||
type_question: bool
|
||||
question: str
|
||||
|
||||
class STaskAdd(BaseModel):
|
||||
name: str
|
||||
description: Optional[str] = None
|
||||
|
||||
|
||||
class STask(STaskAdd):
|
||||
class SQuestion(SQuestionAdd):
|
||||
id: int
|
||||
answer: Optional[str] = None
|
||||
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
|
||||
class STaskId(BaseModel):
|
||||
class SQuestionId(BaseModel):
|
||||
ok: bool = True
|
||||
task_id: int
|
||||
question_id: int
|
||||
|
Loading…
Reference in New Issue
Block a user