From 49e327005e7ac2993d93f533d358498b71a19837 Mon Sep 17 00:00:00 2001 From: AnnZhimol Date: Fri, 25 Oct 2024 01:02:31 +0300 Subject: [PATCH] Update Database: +ch_experiment_data in click_house +experiment_data and experiment_parameters in postgresql +experiment_data has foreign_key from experiment_parameters +new connections +realize routes for ch_experiment_data +new alg csv_to_db +new methods in repos +update dockerfile +update readme "how to init db" --- Dockerfile | 2 +- README.md | 23 +++- db/__init__.py | 0 db/clickhouse_db_connection.py | 7 ++ db/csv_to_db.py | 110 ++++++------------ db/db_connection.py | 7 -- db/models/__init__.py | 6 + .../ch_experimentdb_experiment_data_model.py | 37 +++--- db/models/experiment_data_model.py | 4 +- db/models/experiment_parameters_model.py | 2 +- db/postgres_db_connection.py | 5 + db/repositories/__init__.py | 5 + .../ch_experimentdb_experiment_data_repos.py | 85 +++++--------- db/repositories/experiment_data_repos.py | 88 +++++++------- .../experiment_parameters_repos.py | 95 +++++++-------- db/repositories/load_parameters_repos.py | 10 +- db/repositories/recycling_parameters_repos.py | 10 +- docker-compose.yml | 2 +- main.py | 11 +- .../ch_experimentdb_experiment_data_router.py | 51 +++++++- network/schemas.py | 7 -- requirements.txt | Bin 1512 -> 1572 bytes settings.py | 5 +- 23 files changed, 291 insertions(+), 281 deletions(-) create mode 100644 db/__init__.py create mode 100644 db/clickhouse_db_connection.py delete mode 100644 db/db_connection.py create mode 100644 db/models/__init__.py create mode 100644 db/postgres_db_connection.py create mode 100644 db/repositories/__init__.py diff --git a/Dockerfile b/Dockerfile index 4bf2f1c..8bcc2c6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.12-slim +FROM python:3.12 WORKDIR /app diff --git a/README.md b/README.md index 0ccad25..db601e8 100644 --- a/README.md +++ b/README.md @@ -67,4 +67,25 @@ alembic revision --autogenerate ``` ``` alembic upgrade head -``` \ No newline at end of file +``` + +# Инициализация БД + +## 1. Запустить docker-compose +``` +docker-compose up --build +``` +## 2. Зайти в ClickHouse +``` +docker exec -it clickhouse-db clickhouse-client -u UserMyHouse --password NotWarningWord2 --host localhost +``` +## 3. Создать базу данных +``` +CREATE DATABASE SuperService; +``` +## 4. Инициализировать БД +Зайти на fastapi и выполнить запрос: +``` +http://localhost:8000/init_db_data +``` +PostgreSQL и ClickHouse будут заполнены данными. \ No newline at end of file diff --git a/db/__init__.py b/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/db/clickhouse_db_connection.py b/db/clickhouse_db_connection.py new file mode 100644 index 0000000..12f890f --- /dev/null +++ b/db/clickhouse_db_connection.py @@ -0,0 +1,7 @@ +from clickhouse_sqlalchemy import make_session, get_declarative_base, engines +from sqlalchemy import create_engine +from settings import settings + +engine_clickhouse = create_engine(settings.clickhouse_url) +BaseClickhouse = get_declarative_base() +session_clickhouse = make_session(engine_clickhouse) diff --git a/db/csv_to_db.py b/db/csv_to_db.py index cdaed5a..895a0fa 100644 --- a/db/csv_to_db.py +++ b/db/csv_to_db.py @@ -1,8 +1,10 @@ import asyncio - import pandas as pd -from sqlalchemy import insert -from db.db_connection import async_session, engine +from sqlalchemy import insert, text +from sqlalchemy.exc import OperationalError, IntegrityError + +from db.clickhouse_db_connection import BaseClickhouse, engine_clickhouse, session_clickhouse +from db.postgres_db_connection import async_session_postgres, engine_postgres from db.models.base import Base from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData from db.models.experiment_data_model import ExperimentData @@ -10,95 +12,49 @@ from db.models.experiment_parameters_model import ExperimentParameters from db.models.load_parameters_model import LoadParameters from db.models.recycling_parameters_model import RecyclingParameters - -def add_ids_in_csv(file: str): - try: - df = pd.read_csv(file) - df.insert(0, 'id', pd.Series(range(1, len(df) + 1))) - df.to_csv(file, index=False) - except Exception as e: - print(f'Exception!! {e}') - - -def print_headers_and_types(file: str): - df = pd.read_csv(file) - headers = df.columns.tolist() - print(headers) - - for header in headers: - column_type = df[header].dtype - print(column_type) - - async def create_all_tables(): - print('create_all_tables') - async with engine.begin() as conn: + async with engine_postgres.begin() as conn: await conn.run_sync(Base.metadata.create_all) - -async def drop_all_tables(): - print('drop_all_tables') - async with engine.begin() as conn: - await conn.run_sync(Base.metadata.drop_all) + BaseClickhouse.metadata.create_all(engine_clickhouse) -async def load_data_to_db(file: str, model_class): - print('load_data_to_db') - async with async_session() as session: - df = pd.read_csv(file).dropna() - # Преобразование данных из DataFrame в формат, подходящий для SQLAlchemy - data_records = df.to_dict(orient='records') +async def load_data_to_postgres(file: str, model_class, chunk_size=100): + df = pd.read_csv(file).dropna() - # Пакетная вставка всех записей - stmt = insert(model_class).values(data_records) - await session.execute(stmt) - - await session.commit() + async with async_session_postgres() as session: + for start in range(0, len(df), chunk_size): + end = start + chunk_size + chunk = df.iloc[start:end].to_dict(orient='records') + stmt = insert(model_class).values(chunk) + try: + await session.execute(stmt) + await session.commit() + except IntegrityError as e: + await session.rollback() -async def insert_batch(session, model_class, batch_data): - stmt = insert(model_class).values(batch_data) - await session.execute(stmt) - await session.commit() +def load_data_to_clickhouse(file: str, model_class, chunk_size=100): + df = pd.read_csv(file).dropna() -async def test(file: str, model_class): - print('test') - df = pd.read_csv(file) - data_records = df.to_dict(orient='records') - - # async with async_session() as session: - # batch_size = 3000 - # tasks = [] - # - # for i in range(0, len(data_records), batch_size): - # batch_data = data_records[i:i + batch_size] - # tasks.append(insert_batch(session, model_class, batch_data)) - # - # await asyncio.gather(*tasks) - - # две минуты добавляет - async with async_session() as session: - batch_size = 3000 - print(batch_size) - for i in range(0, len(data_records), batch_size): - batch_data = data_records[i:i + batch_size] - stmt = insert(model_class).values(batch_data) - await session.execute(stmt) - await session.commit() + with session_clickhouse as session: + for start in range(0, len(df), chunk_size): + end = start + chunk_size + chunk = df.iloc[start:end].to_dict(orient='records') + stmt = insert(model_class).values(chunk) + session.execute(stmt) + session.commit() async def csv_to_db(): - await drop_all_tables() await create_all_tables() - await load_data_to_db('./db/files/ch_experimentdb_experiment_data.csv', ChExperimentDBExperimentData) - await test('./db/files/experiment_data.csv', ExperimentData) - await load_data_to_db('./db/files/load_parameters.csv', LoadParameters) - await load_data_to_db('./db/files/recycling_parameters.csv', RecyclingParameters) - await load_data_to_db('./db/files/experiment_parameters.csv', ExperimentParameters) + await load_data_to_postgres('./db/files/load_parameters.csv', LoadParameters) + await load_data_to_postgres('./db/files/recycling_parameters.csv', RecyclingParameters) + await load_data_to_postgres('./db/files/experiment_parameters.csv', ExperimentParameters) -# asyncio.run(csv_to_db()) - + await load_data_to_postgres('./db/files/experiment_data.csv', ExperimentData) + load_data_to_clickhouse('./db/files/ch_experimentdb_experiment_data.csv', ChExperimentDBExperimentData) diff --git a/db/db_connection.py b/db/db_connection.py deleted file mode 100644 index bc042d1..0000000 --- a/db/db_connection.py +++ /dev/null @@ -1,7 +0,0 @@ -from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker - -from settings import settings - -engine = create_async_engine(url=settings.db_url_asyncpg_docker, echo=True) - -async_session = async_sessionmaker(engine) diff --git a/db/models/__init__.py b/db/models/__init__.py new file mode 100644 index 0000000..4fa29b3 --- /dev/null +++ b/db/models/__init__.py @@ -0,0 +1,6 @@ +from .base import * +from .experiment_parameters_model import * +from .experiment_data_model import * +from .ch_experimentdb_experiment_data_model import * +from .load_parameters_model import * +from .recycling_parameters_model import * \ No newline at end of file diff --git a/db/models/ch_experimentdb_experiment_data_model.py b/db/models/ch_experimentdb_experiment_data_model.py index eb68185..5ee91af 100644 --- a/db/models/ch_experimentdb_experiment_data_model.py +++ b/db/models/ch_experimentdb_experiment_data_model.py @@ -1,23 +1,26 @@ -from sqlalchemy import Identity +from clickhouse_sqlalchemy import engines +from clickhouse_sqlalchemy.types import Float64, String +from sqlalchemy import Integer from sqlalchemy.orm import Mapped, mapped_column -from db.models.base import Base +from db.clickhouse_db_connection import BaseClickhouse +class ChExperimentDBExperimentData(BaseClickhouse): + __tablename__ = 'experiment_data' + __table_args__ = ( + engines.MergeTree(order_by='id'), + ) -class ChExperimentDBExperimentData(Base): - __tablename__ = 'ch_experimentdb_experiment_data' - - id: Mapped[int] = mapped_column(Identity(start=11, cycle=True), - primary_key=True) - volume: Mapped[float] - nitrogen_oxide_emission: Mapped[float] - temperature: Mapped[float] - co_fraction: Mapped[float] - co2_fraction: Mapped[float] - x: Mapped[float] - y: Mapped[float] - z: Mapped[float] - file_id: Mapped[str] + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + volume: Mapped[float] = mapped_column(Float64) + nitrogen_oxide_emission: Mapped[float] = mapped_column(Float64) + temperature: Mapped[float] = mapped_column(Float64) + co_fraction: Mapped[float] = mapped_column(Float64) + co2_fraction: Mapped[float] = mapped_column(Float64) + x: Mapped[float] = mapped_column(Float64) + y: Mapped[float] = mapped_column(Float64) + z: Mapped[float] = mapped_column(Float64) + file_id: Mapped[str] = mapped_column(String) def __repr__(self): - return f"" + return f"" diff --git a/db/models/experiment_data_model.py b/db/models/experiment_data_model.py index 07c2e17..fed6bd4 100644 --- a/db/models/experiment_data_model.py +++ b/db/models/experiment_data_model.py @@ -1,6 +1,6 @@ from typing import Optional -from sqlalchemy import Identity +from sqlalchemy import Identity, ForeignKey from sqlalchemy.orm import Mapped, mapped_column from db.models.base import Base @@ -16,7 +16,7 @@ class ExperimentData(Base): nox: Mapped[float] co2: Mapped[float] co: Mapped[float] - file_id: Mapped[Optional[str]] + file_id: Mapped[Optional[str]] = mapped_column(ForeignKey('experiment_parameters.experiment_hash', ondelete='SET NULL')) def __repr__(self): return f"" diff --git a/db/models/experiment_parameters_model.py b/db/models/experiment_parameters_model.py index dd02a27..927be7b 100644 --- a/db/models/experiment_parameters_model.py +++ b/db/models/experiment_parameters_model.py @@ -17,7 +17,7 @@ class ExperimentParameters(Base): middle_blades_count: Mapped[int] load_id: Mapped[Optional[int]] = mapped_column(ForeignKey('load_parameters.id', ondelete='SET NULL')) recycling_id: Mapped[Optional[int]] = mapped_column(ForeignKey('recycling_parameters.id', ondelete='SET NULL')) - experiment_hash: Mapped[str] + experiment_hash: Mapped[str] = mapped_column(unique=True) def __repr__(self): return f"" diff --git a/db/postgres_db_connection.py b/db/postgres_db_connection.py new file mode 100644 index 0000000..9e75643 --- /dev/null +++ b/db/postgres_db_connection.py @@ -0,0 +1,5 @@ +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker +from settings import settings + +engine_postgres = create_async_engine(url=settings.db_url_asyncpg_docker, echo=True) +async_session_postgres = async_sessionmaker(engine_postgres, expire_on_commit=False) diff --git a/db/repositories/__init__.py b/db/repositories/__init__.py new file mode 100644 index 0000000..f6f7ebc --- /dev/null +++ b/db/repositories/__init__.py @@ -0,0 +1,5 @@ +from .experiment_data_repos import * +from .experiment_parameters_repos import * +from .load_parameters_repos import * +from .recycling_parameters_repos import * +from .ch_experimentdb_experiment_data_repos import * \ No newline at end of file diff --git a/db/repositories/ch_experimentdb_experiment_data_repos.py b/db/repositories/ch_experimentdb_experiment_data_repos.py index 8e93ace..984673d 100644 --- a/db/repositories/ch_experimentdb_experiment_data_repos.py +++ b/db/repositories/ch_experimentdb_experiment_data_repos.py @@ -1,63 +1,38 @@ -from typing import Optional -from sqlalchemy import select -from db.db_connection import async_session from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData -from network.schemas import ChExperimentDBExperimentDataBody - +from sqlalchemy import select,func class ChExperimentDBExperimentDataRepository: - @staticmethod - async def get_all() -> Optional[list[ChExperimentDBExperimentData]]: - async with async_session() as session: - result = await session.execute(select(ChExperimentDBExperimentData)) - return result.scalars().all() + def __init__(self, session): + self.session = session + def get_all(self): + return self.session.query(ChExperimentDBExperimentData).all() - @staticmethod - async def get_by_id(id_: int) -> Optional[ChExperimentDBExperimentData]: - async with async_session() as session: - result = session.get(ChExperimentDBExperimentData, id_) - return result + def get_by_id(self, id: int) -> ChExperimentDBExperimentData: + return self.session.query(ChExperimentDBExperimentData).filter(ChExperimentDBExperimentData.id == id).one_or_none() + def create(self, ch_experiment_data: ChExperimentDBExperimentData) -> ChExperimentDBExperimentData: + max_id_query = select(func.max(ChExperimentDBExperimentData.id)) + max_id_result = self.session.execute(max_id_query).scalar() - @staticmethod - async def create(volume: float, - nitrogen_oxide_emission: float, - temperature: float, - co_fraction: float, - co2_fraction: float, - x: float, - y: float, - z: float, - file_id: str): - new_data = ChExperimentDBExperimentData( - volume=volume, - nitrogen_oxide_emission=nitrogen_oxide_emission, - temperature=temperature, - co_fraction=co_fraction, - co2_fraction=co2_fraction, - x=x, - y=y, - z=z, - file_id=file_id - ) - async with async_session() as session: - session.add(new_data) - await session.commit() + max_id = max_id_result or 0 - @staticmethod - async def create_from_pydantic(body: ChExperimentDBExperimentDataBody): - new_data = ChExperimentDBExperimentData( - volume=body.volume, - nitrogen_oxide_emission=body.nitrogen_oxide_emission, - temperature=body.temperature, - co_fraction=body.co_fraction, - co2_fraction=body.co2_fraction, - x=body.x, - y=body.y, - z=body.z, - file_id=body.file_id - ) - async with async_session() as session: - session.add(new_data) - await session.commit() + ch_experiment_data.id = max_id + 1 + + self.session.add(ch_experiment_data) + self.session.commit() + + return ch_experiment_data + + def update(self, id: int, updated_data: dict) -> ChExperimentDBExperimentData: + self.session.query(ChExperimentDBExperimentData).filter(ChExperimentDBExperimentData.id == id).update(updated_data) + self.session.commit() + return self.get_by_id(id) + + def delete(self, id: int) -> bool: + item = self.get_by_id(id) + if item: + self.session.delete(item) + self.session.commit() + return True + return False \ No newline at end of file diff --git a/db/repositories/experiment_data_repos.py b/db/repositories/experiment_data_repos.py index 8a5b8fa..9a47649 100644 --- a/db/repositories/experiment_data_repos.py +++ b/db/repositories/experiment_data_repos.py @@ -1,53 +1,55 @@ -from typing import Optional -from sqlalchemy import select -from db.db_connection import async_session +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy import update, delete +from typing import List from db.models.experiment_data_model import ExperimentData from network.schemas import ExperimentDataBody class ExperimentDataRepository: - @staticmethod - async def get_all() -> Optional[list[ExperimentData]]: - async with async_session() as session: - result = await session.execute(select(ExperimentData)) - return result.scalars().all() + def __init__(self, session: AsyncSession): + self.session = session - @staticmethod - async def get_by_id(id_: int) -> Optional[ExperimentData]: - async with async_session() as session: - result = session.get(ExperimentData, id_) - return result + async def get_all(self) -> List[ExperimentData]: + result = await self.session.execute(select(ExperimentData)) + return result.scalars().all() + async def get_by_id(self, id: int) -> ExperimentData: + result = await self.session.execute(select(ExperimentData).where(ExperimentData.id == id)) + return result.scalar_one_or_none() - @staticmethod - async def create(direction: float, - temperature: float, - nox: float, - co2: float, - co: float, - file_id: str): - new_data = ExperimentData( - direction=direction, - temperature=temperature, - nox=nox, - co2=co2, - co=co, - file_id=file_id + async def create(self, experiment_data: ExperimentData) -> ExperimentData: + self.session.add(experiment_data) + await self.session.commit() + await self.session.refresh(experiment_data) + return experiment_data + + async def update(self, id: int, updated_data: dict) -> ExperimentData: + stmt = ( + update(ExperimentData). + where(ExperimentData.id == id). + values(**updated_data) ) - async with async_session() as session: - session.add(new_data) - await session.commit() + await self.session.execute(stmt) + await self.session.commit() + return await self.get_by_id(id) - @staticmethod - async def create_from_pydantic(body: ExperimentDataBody): - new_data = ExperimentData( - direction=body.direction, - temperature=body.temperature, - nox=body.nox, - co2=body.co2, - co=body.co, - file_id=body.file_id - ) - async with async_session() as session: - session.add(new_data) - await session.commit() + async def delete(self, id: int) -> bool: + stmt = delete(ExperimentData).where(ExperimentData.id == id) + result = await self.session.execute(stmt) + await self.session.commit() + return result.rowcount > 0 + + # @staticmethod + # async def create_from_pydantic(body: ExperimentDataBody): + # new_data = ExperimentData( + # direction=body.direction, + # temperature=body.temperature, + # nox=body.nox, + # co2=body.co2, + # co=body.co, + # file_id=body.file_id + # ) + # async with async_session() as session: + # session.add(new_data) + # await session.commit() diff --git a/db/repositories/experiment_parameters_repos.py b/db/repositories/experiment_parameters_repos.py index 3bb0430..b420f4a 100644 --- a/db/repositories/experiment_parameters_repos.py +++ b/db/repositories/experiment_parameters_repos.py @@ -1,55 +1,56 @@ -from typing import Optional +from typing import Optional, List, Sequence from sqlalchemy import select -from db.db_connection import async_session +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy import update, delete +from typing import List from db.models.experiment_parameters_model import ExperimentParameters -from network.schemas import ExperimentParametersBody - class ExperimentParametersRepository: - @staticmethod - async def get_all() -> Optional[list[ExperimentParameters]]: - async with async_session() as session: - result = await session.execute(select(ExperimentParameters)) - return result.scalars().all() + def __init__(self, session: AsyncSession): + self.session = session - @staticmethod - async def get_by_id(id_: int) -> Optional[ExperimentParameters]: - async with async_session() as session: - result = session.get(ExperimentParameters, id_) - return result + async def get_all(self) -> List[ExperimentParameters]: + result = await self.session.execute(select(ExperimentParameters)) + return result.scalars().all() - @staticmethod - async def create(outer_blades_count: int, - outer_blades_length: float, - outer_blades_angle: float, - middle_blades_count: int, - load_id: int, - recycling_id: int, - experiment_hash: str): - new_data = ExperimentParameters( - outer_blades_count=outer_blades_count, - outer_blades_length=outer_blades_length, - outer_blades_angle=outer_blades_angle, - middle_blades_count=middle_blades_count, - load_id=load_id, - recycling_id=recycling_id, - experiment_hash=experiment_hash + async def get_by_id(self, id: int) -> ExperimentParameters: + result = await self.session.execute(select(ExperimentParameters).where(ExperimentParameters.id == id)) + return result.scalar_one_or_none() + + async def create(self, experiment_parameters: ExperimentParameters) -> ExperimentParameters: + self.session.add(experiment_parameters) + await self.session.commit() + await self.session.refresh(experiment_parameters) + return experiment_parameters + + async def update(self, id: int, updated_data: dict) -> ExperimentParameters: + stmt = ( + update(ExperimentParameters). + where(ExperimentParameters.id == id). + values(**updated_data) ) - async with async_session() as session: - session.add(new_data) - await session.commit() + await self.session.execute(stmt) + await self.session.commit() + return await self.get_by_id(id) - @staticmethod - async def create_from_pydantic(body: ExperimentParametersBody): - new_data = ExperimentParameters( - outer_blades_count=body.outer_blades_count, - outer_blades_length=body.outer_blades_length, - outer_blades_angle=body.outer_blades_angle, - middle_blades_count=body.middle_blades_count, - load_id=body.load_id, - recycling_id=body.recycling_id, - experiment_hash=body.experiment_hash - ) - async with async_session() as session: - session.add(new_data) - await session.commit() \ No newline at end of file + async def delete(self, id: int) -> bool: + stmt = delete(ExperimentParameters).where(ExperimentParameters.id == id) + result = await self.session.execute(stmt) + await self.session.commit() + return result.rowcount > 0 + + # @staticmethod + # async def create_from_pydantic(body: ExperimentParametersBody): + # new_data = ExperimentParameters( + # outer_blades_count=body.outer_blades_count, + # outer_blades_length=body.outer_blades_length, + # outer_blades_angle=body.outer_blades_angle, + # middle_blades_count=body.middle_blades_count, + # load_id=body.load_id, + # recycling_id=body.recycling_id, + # experiment_hash=body.experiment_hash + # ) + # async with async_session() as session: + # session.add(new_data) + # await session.commit() \ No newline at end of file diff --git a/db/repositories/load_parameters_repos.py b/db/repositories/load_parameters_repos.py index d8eff1d..f62b8c7 100644 --- a/db/repositories/load_parameters_repos.py +++ b/db/repositories/load_parameters_repos.py @@ -1,6 +1,6 @@ from typing import Optional from sqlalchemy.future import select -from db.db_connection import async_session +from db.postgres_db_connection import async_session_postgres from db.models.load_parameters_model import LoadParameters from network.schemas import LoadParametersBody @@ -8,13 +8,13 @@ from network.schemas import LoadParametersBody class LoadParametersRepository: @staticmethod async def get_all() -> Optional[list[LoadParameters]]: - async with async_session() as session: + async with async_session_postgres() as session: result = await session.execute(select(LoadParameters)) return result.scalars().all() @staticmethod async def get_by_id(id_: int) -> Optional[LoadParameters]: - async with async_session() as session: + async with async_session_postgres() as session: result = session.get(LoadParameters, id_) return result @@ -29,7 +29,7 @@ class LoadParametersRepository: secondary_air_consumption=secondary_air_consumption, gas_inlet_consumption=gas_inlet_consumption ) - async with async_session() as session: + async with async_session_postgres() as session: session.add(new_data) await session.commit() @@ -41,7 +41,7 @@ class LoadParametersRepository: secondary_air_consumption=body.secondary_air_consumption, gas_inlet_consumption=body.gas_inlet_consumption ) - async with async_session() as session: + async with async_session_postgres() as session: session.add(new_data) await session.commit() diff --git a/db/repositories/recycling_parameters_repos.py b/db/repositories/recycling_parameters_repos.py index c9fe85e..ac9d477 100644 --- a/db/repositories/recycling_parameters_repos.py +++ b/db/repositories/recycling_parameters_repos.py @@ -1,6 +1,6 @@ from typing import Optional from sqlalchemy import select -from db.db_connection import async_session +from db.postgres_db_connection import async_session_postgres from db.models.recycling_parameters_model import RecyclingParameters from network.schemas import RecyclingParametersBody @@ -8,13 +8,13 @@ from network.schemas import RecyclingParametersBody class RecyclingParametersRepository: @staticmethod async def get_all() -> Optional[list[RecyclingParameters]]: - async with async_session() as session: + async with async_session_postgres() as session: result = await session.execute(select(RecyclingParameters)) return result.scalars.all() @staticmethod async def get_by_id(id_: int) -> Optional[RecyclingParameters]: - async with async_session() as session: + async with async_session_postgres() as session: result = await session.execute(select(RecyclingParameters).where(RecyclingParameters.id == id_)) return result.scalars().first() @@ -33,7 +33,7 @@ class RecyclingParametersRepository: h2o=h2o, o2=o2 ) - async with async_session() as session: + async with async_session_postgres() as session: session.add(new_data) await session.commit() @@ -47,6 +47,6 @@ class RecyclingParametersRepository: h2o=body.h2o, o2=body.o2 ) - async with async_session() as session: + async with async_session_postgres() as session: session.add(new_data) await session.commit() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 445f007..3c467fc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -33,7 +33,7 @@ services: - db - clickhouse volumes: - - .:/app # Связываем текущую директорию с контейнером для доступа к коду + - .:/app volumes: postgres_data: diff --git a/main.py b/main.py index 935e81d..a3179f3 100644 --- a/main.py +++ b/main.py @@ -1,10 +1,10 @@ import asyncio -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, BackgroundTasks from db.csv_to_db import csv_to_db from network.routes import ch_experimentdb_experiment_data_router, experiment_data_router, experiment_parameters_router from network.routes import load_parameters_router, recycling_parameters_router from network.schemas import * -from new_experiment_planner import run_experiment # Импортируем функцию из твоего скрипта +from new_experiment_planner import run_experiment app = FastAPI() @@ -45,9 +45,10 @@ def run_experiment_api(params: ExperimentParameters): # эндпоинт инициализации бд из csv файлов @app.get('/init_db_data') -def init_db_data(): +async def init_db_data(background_tasks: BackgroundTasks): try: - asyncio.run(csv_to_db()) - return {"status": "success", "message": "База данных инициализирована. Данные из файлов csv успешно добавлены."} + background_tasks.add_task(csv_to_db) + return {"status": "success", "message": "Инициализация БД запущена в фоне"} except Exception as e: + print(str(e.with_traceback())) raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") diff --git a/network/routes/ch_experimentdb_experiment_data_router.py b/network/routes/ch_experimentdb_experiment_data_router.py index 1dbe7d8..2421eed 100644 --- a/network/routes/ch_experimentdb_experiment_data_router.py +++ b/network/routes/ch_experimentdb_experiment_data_router.py @@ -1,28 +1,67 @@ from fastapi import APIRouter, HTTPException + +from db.clickhouse_db_connection import session_clickhouse +from db.models import ChExperimentDBExperimentData from db.repositories.ch_experimentdb_experiment_data_repos import ChExperimentDBExperimentDataRepository from network.schemas import ChExperimentDBExperimentDataBody +from typing import List router = APIRouter() +repository = ChExperimentDBExperimentDataRepository(session_clickhouse) @router.post('/create') async def create_ch_experimentdb_experiment_data(data: ChExperimentDBExperimentDataBody): try: - await ChExperimentDBExperimentDataRepository.create_from_pydantic(data) + new_record = repository.create(ChExperimentDBExperimentData( + volume=data.volume, + nitrogen_oxide_emission=data.nitrogen_oxide_emission, + temperature=data.temperature, + co_fraction=data.co_fraction, + co2_fraction=data.co2_fraction, + x=data.x, + y=data.y, + z=data.z, + file_id=data.file_id + )) - return {"message": "Новая запись успешно добавлена"} + return {"message": "Новая запись успешно добавлена"} except Exception as e: raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") -@router.get('/all') +@router.get('/all', response_model=List[ChExperimentDBExperimentDataBody]) async def get_all_ch_experimentdb_experiment_data(): try: - result = await ChExperimentDBExperimentDataRepository.get_all() + result = repository.get_all() - if result is not None: + if result: return result else: - return {"message": "Нет записей в , либо произошла непредвиденная ошибка"} + return {"message": "Нет записей"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") + + +@router.get('/{id}') +async def get_ch_experimentdb_experiment_data_by_id(id: int): + try: + record = repository.get_by_id(id) + if record: + return record + else: + raise HTTPException(status_code=404, detail="Запись не найдена") + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") + +@router.delete('/{id}/delete') +async def delete_ch_experimentdb_experiment_data(id: int): + try: + is_deleted = repository.delete(id) + + if is_deleted: + return {"message": "Запись успешно удалена"} + else: + raise HTTPException(status_code=404, detail="Запись не найдена") except Exception as e: raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") diff --git a/network/schemas.py b/network/schemas.py index 1aa37d0..c43dc73 100644 --- a/network/schemas.py +++ b/network/schemas.py @@ -2,8 +2,6 @@ from typing import Optional from pydantic import BaseModel, ConfigDict - -# Модель данных для передачи параметров class ExperimentParameters(BaseModel): outer_blades_count: str outer_blades_length: str @@ -12,7 +10,6 @@ class ExperimentParameters(BaseModel): load: str recycling: str - class ChExperimentDBExperimentDataBody(BaseModel): model_config = ConfigDict(from_attributes=True) volume: float @@ -25,7 +22,6 @@ class ChExperimentDBExperimentDataBody(BaseModel): z: float file_id: str - class ExperimentDataBody(BaseModel): model_config = ConfigDict(from_attributes=True) direction: float @@ -35,7 +31,6 @@ class ExperimentDataBody(BaseModel): co: float file_id: Optional[str] - class ExperimentParametersBody(BaseModel): model_config = ConfigDict(from_attributes=True) outer_blades_count: int @@ -46,7 +41,6 @@ class ExperimentParametersBody(BaseModel): recycling_id: Optional[int] experiment_hash: str - class LoadParametersBody(BaseModel): model_config = ConfigDict(from_attributes=True) load: int @@ -54,7 +48,6 @@ class LoadParametersBody(BaseModel): secondary_air_consumption: float gas_inlet_consumption: float - class RecyclingParametersBody(BaseModel): model_config = ConfigDict(from_attributes=True) load_id: Optional[int] diff --git a/requirements.txt b/requirements.txt index bf79f07a9e6824d432c5e72011e2c852a655cd19..64b500280364142a878da11f6f1ef8bbf174cbd5 100644 GIT binary patch delta 46 ycmaFCy@Y4O3)abAj8aO)4229i42eLT%#gv5%8<)Y$xz2&%V5Bu$6yS^MhpNi3JN*^ delta 7 OcmZ3&^MZTB3swLPWCK9} diff --git a/settings.py b/settings.py index 85f38b6..27311b7 100644 --- a/settings.py +++ b/settings.py @@ -22,7 +22,10 @@ class Settings(BaseSettings): # 'postgresql+asyncpg://username:password@localhost:5432/database_name' return f'postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}@db:{self.DB_PORT}/{self.DATABASE}' + @property + def clickhouse_url(self): + return f'clickhouse://{self.CLICKHOUSE_USER}:{self.CLICKHOUSE_PASSWORD}@clickhouse:8123/{self.DATABASE}' + model_config = SettingsConfigDict(env_file=".env") - settings = Settings()