77 lines
2.5 KiB
Python
77 lines
2.5 KiB
Python
from typing import Optional, Sequence
|
|
|
|
from sqlalchemy import delete as delete_
|
|
from sqlalchemy import update as update_
|
|
from sqlalchemy.future import select
|
|
|
|
from db.models.load_parameters_model import LoadParameters
|
|
from db.postgres_db_connection import async_session_postgres
|
|
|
|
|
|
async def get_all() -> Sequence[LoadParameters]:
|
|
async with async_session_postgres() as session:
|
|
result = await session.execute(select(LoadParameters))
|
|
return result.scalars().all()
|
|
|
|
|
|
async def get_by_id(id: int) -> LoadParameters:
|
|
async with async_session_postgres() as session:
|
|
result = await session.execute(select(LoadParameters).where(LoadParameters.id == id))
|
|
return result.scalar_one_or_none()
|
|
|
|
|
|
async def create(new_data: LoadParameters):
|
|
async with async_session_postgres() as session:
|
|
session.add(new_data)
|
|
await session.commit()
|
|
|
|
|
|
async def update(id: int, updated_data: dict):
|
|
async with async_session_postgres() as session:
|
|
stmt = (
|
|
update_(LoadParameters).
|
|
where(LoadParameters.id == id).
|
|
values(**updated_data)
|
|
)
|
|
await session.execute(stmt)
|
|
await session.commit()
|
|
|
|
|
|
async def delete(id: int) -> bool:
|
|
async with async_session_postgres() as session:
|
|
stmt = delete_(LoadParameters).where(LoadParameters.id == id)
|
|
result = await session.execute(stmt)
|
|
await session.commit()
|
|
return result.rowcount > 0
|
|
|
|
|
|
# class LoadParametersRepository:
|
|
# def __init__(self, session: AsyncSession):
|
|
# self.session = session
|
|
#
|
|
# async def get_all(self) -> Sequence[LoadParameters]:
|
|
# result = await self.session.execute(select(LoadParameters))
|
|
# return result.scalars().all()
|
|
#
|
|
# async def get_by_id(self, id: int) -> LoadParameters:
|
|
# result = await self.session.execute(select(LoadParameters).where(LoadParameters.id == id))
|
|
# return result.scalar_one_or_none()
|
|
#
|
|
# async def create(self, new_data: LoadParameters):
|
|
# self.session.add(new_data)
|
|
# await self.session.commit()
|
|
#
|
|
# async def update(self, id: int, updated_data: dict):
|
|
# stmt = (
|
|
# update_(LoadParameters).
|
|
# where(LoadParameters.id == id).
|
|
# values(**updated_data)
|
|
# )
|
|
# await self.session.execute(stmt)
|
|
# await self.session.commit()
|
|
#
|
|
# async def delete(self, id: int):
|
|
# stmt = delete_(LoadParameters).where(LoadParameters.id == id)
|
|
# await self.session.execute(stmt)
|
|
# await self.session.commit()
|