DAS_2024_1/agliullov_daniyar_lab_3/user_service/database.py

50 lines
1.8 KiB
Python
Raw Normal View History

2024-12-31 08:10:29 +04:00
import time
from sys import gettrace
from typing import AsyncGenerator
from fastapi import HTTPException
from fastapi.encoders import jsonable_encoder
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from config import DB_HOST, DB_NAME, DB_PASS, DB_PORT, DB_USER
from pydantic import BaseModel
from sqlalchemy import Table, Column, Integer, String, TIMESTAMP, ForeignKey, Boolean, MetaData, PrimaryKeyConstraint
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
def fill_from_model(self, model: BaseModel | dict):
"""Заполняет все поля из словаря или модели, помечая объект 'грязным' для корректного обновления"""
if isinstance(model, BaseModel):
items = model.model_dump().items()
else:
items = model.items()
for key, value in items:
setattr(self, key, value)
DATABASE_URL = f"postgresql+asyncpg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_async_engine(DATABASE_URL, echo=gettrace() is not None)
async_session_maker = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def begin_transactions() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker().begin() as transaction:
try:
yield transaction.session
except Exception as e:
await transaction.rollback()
raise e
await transaction.commit()