PIbd-32_Kashin_M.I_API_Cour.../repository.py

79 lines
3.5 KiB
Python
Raw Normal View History

2024-06-08 01:22:35 +04:00
from sqlalchemy import select, delete, func
from sqlalchemy.orm import joinedload
from datetime import datetime
import urllib.parse
from database import new_session, QuestionOrm
from enums import TypeMood, TypeModel
from schemas import SQuestionAdd, SQuestion
from model import predict_answer
class QuestionRepository:
@classmethod
async def add_one(cls, data: SQuestionAdd, type_mood: TypeMood, type_model: TypeModel) -> int:
async with new_session() as session:
question_dict = data.model_dump()
# Декодирование URL-кодированных параметров
decoded_question = urllib.parse.unquote(question_dict["question"])
decoded_mood = urllib.parse.unquote(type_mood.value)
decoded_model = urllib.parse.unquote(type_model.value)
# Предсказание ответа с помощью модели
predicted_class, prediction = predict_answer(decoded_question, decoded_mood, decoded_model)
# Проверка вероятностей классов
if max(prediction) < 0.2:
answer = "Not Found"
else:
answer = predicted_class
# Обновление декодированных значений в словаре
question_dict["question"] = decoded_question
question_dict["type_mood"] = decoded_mood
question_dict["type_model"] = decoded_model
question_dict["answer"] = answer
question_dict["question_time"] = datetime.now()
question = QuestionOrm(**question_dict)
session.add(question)
# Проверка количества записей для email_user
query = select(QuestionOrm).where(QuestionOrm.email_user == data.email_user)
result = await session.execute(query)
user_questions = result.scalars().all()
if len(user_questions) > 100:
# Удаление самой старой записи
oldest_question = min(user_questions, key=lambda q: q.question_time)
await session.delete(oldest_question)
await session.flush()
await session.commit()
return question.id, question.answer
@classmethod
async def find_all(cls) -> list[SQuestion]:
async with new_session() as session:
query = select(QuestionOrm)
result = await session.execute(query)
question_models = result.scalars().all()
question_schemas = [SQuestion.model_validate(question_model) for question_model in question_models]
return question_schemas
@classmethod
async def find_by_email(cls, email_user: str) -> list[SQuestion]:
async with new_session() as session:
query = select(QuestionOrm).where(QuestionOrm.email_user == email_user)
result = await session.execute(query)
question_models = result.scalars().all()
question_schemas = [SQuestion.model_validate(question_model) for question_model in question_models]
return question_schemas
2024-06-08 01:22:35 +04:00
@staticmethod
async def get_class_statistics() -> dict:
async with new_session() as session:
query = select(QuestionOrm.answer, func.count(QuestionOrm.id)).group_by(QuestionOrm.answer)
result = await session.execute(query)
statistics = {answer: count for answer, count in result.fetchall()}
return statistics