Update Database:

+ch_experiment_data in click_house
+experiment_data and experiment_parameters in postgresql
+experiment_data has foreign_key from experiment_parameters
+new connections
+realize routes for ch_experiment_data
+new alg csv_to_db
+new methods in repos
+update dockerfile
+update readme "how to init db"
This commit is contained in:
AnnZhimol 2024-10-25 01:02:31 +03:00
parent 40e97696fd
commit 49e327005e
23 changed files with 291 additions and 281 deletions

View File

@ -1,4 +1,4 @@
FROM python:3.12-slim FROM python:3.12
WORKDIR /app WORKDIR /app

View File

@ -68,3 +68,24 @@ alembic revision --autogenerate
``` ```
alembic upgrade head alembic upgrade head
``` ```
# Инициализация БД
## 1. Запустить docker-compose
```
docker-compose up --build
```
## 2. Зайти в ClickHouse
```
docker exec -it clickhouse-db clickhouse-client -u UserMyHouse --password NotWarningWord2 --host localhost
```
## 3. Создать базу данных
```
CREATE DATABASE SuperService;
```
## 4. Инициализировать БД
Зайти на fastapi и выполнить запрос:
```
http://localhost:8000/init_db_data
```
PostgreSQL и ClickHouse будут заполнены данными.

0
db/__init__.py Normal file
View File

View File

@ -0,0 +1,7 @@
from clickhouse_sqlalchemy import make_session, get_declarative_base, engines
from sqlalchemy import create_engine
from settings import settings
engine_clickhouse = create_engine(settings.clickhouse_url)
BaseClickhouse = get_declarative_base()
session_clickhouse = make_session(engine_clickhouse)

View File

@ -1,8 +1,10 @@
import asyncio import asyncio
import pandas as pd import pandas as pd
from sqlalchemy import insert from sqlalchemy import insert, text
from db.db_connection import async_session, engine from sqlalchemy.exc import OperationalError, IntegrityError
from db.clickhouse_db_connection import BaseClickhouse, engine_clickhouse, session_clickhouse
from db.postgres_db_connection import async_session_postgres, engine_postgres
from db.models.base import Base from db.models.base import Base
from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData
from db.models.experiment_data_model import ExperimentData from db.models.experiment_data_model import ExperimentData
@ -10,95 +12,49 @@ from db.models.experiment_parameters_model import ExperimentParameters
from db.models.load_parameters_model import LoadParameters from db.models.load_parameters_model import LoadParameters
from db.models.recycling_parameters_model import RecyclingParameters from db.models.recycling_parameters_model import RecyclingParameters
def add_ids_in_csv(file: str):
try:
df = pd.read_csv(file)
df.insert(0, 'id', pd.Series(range(1, len(df) + 1)))
df.to_csv(file, index=False)
except Exception as e:
print(f'Exception!! {e}')
def print_headers_and_types(file: str):
df = pd.read_csv(file)
headers = df.columns.tolist()
print(headers)
for header in headers:
column_type = df[header].dtype
print(column_type)
async def create_all_tables(): async def create_all_tables():
print('create_all_tables') async with engine_postgres.begin() as conn:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all) await conn.run_sync(Base.metadata.create_all)
BaseClickhouse.metadata.create_all(engine_clickhouse)
async def drop_all_tables():
print('drop_all_tables')
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
async def load_data_to_db(file: str, model_class): async def load_data_to_postgres(file: str, model_class, chunk_size=100):
print('load_data_to_db')
async with async_session() as session:
df = pd.read_csv(file).dropna() df = pd.read_csv(file).dropna()
# Преобразование данных из DataFrame в формат, подходящий для SQLAlchemy
data_records = df.to_dict(orient='records')
# Пакетная вставка всех записей async with async_session_postgres() as session:
stmt = insert(model_class).values(data_records) for start in range(0, len(df), chunk_size):
await session.execute(stmt) end = start + chunk_size
chunk = df.iloc[start:end].to_dict(orient='records')
await session.commit() stmt = insert(model_class).values(chunk)
try:
async def insert_batch(session, model_class, batch_data):
stmt = insert(model_class).values(batch_data)
await session.execute(stmt) await session.execute(stmt)
await session.commit() await session.commit()
except IntegrityError as e:
await session.rollback()
async def test(file: str, model_class):
print('test')
df = pd.read_csv(file)
data_records = df.to_dict(orient='records')
# async with async_session() as session: def load_data_to_clickhouse(file: str, model_class, chunk_size=100):
# batch_size = 3000 df = pd.read_csv(file).dropna()
# tasks = []
#
# for i in range(0, len(data_records), batch_size):
# batch_data = data_records[i:i + batch_size]
# tasks.append(insert_batch(session, model_class, batch_data))
#
# await asyncio.gather(*tasks)
# две минуты добавляет with session_clickhouse as session:
async with async_session() as session: for start in range(0, len(df), chunk_size):
batch_size = 3000 end = start + chunk_size
print(batch_size) chunk = df.iloc[start:end].to_dict(orient='records')
for i in range(0, len(data_records), batch_size): stmt = insert(model_class).values(chunk)
batch_data = data_records[i:i + batch_size] session.execute(stmt)
stmt = insert(model_class).values(batch_data) session.commit()
await session.execute(stmt)
await session.commit()
async def csv_to_db(): async def csv_to_db():
await drop_all_tables()
await create_all_tables() await create_all_tables()
await load_data_to_db('./db/files/ch_experimentdb_experiment_data.csv', ChExperimentDBExperimentData)
await test('./db/files/experiment_data.csv', ExperimentData)
await load_data_to_db('./db/files/load_parameters.csv', LoadParameters)
await load_data_to_db('./db/files/recycling_parameters.csv', RecyclingParameters)
await load_data_to_db('./db/files/experiment_parameters.csv', ExperimentParameters)
await load_data_to_postgres('./db/files/load_parameters.csv', LoadParameters)
await load_data_to_postgres('./db/files/recycling_parameters.csv', RecyclingParameters)
await load_data_to_postgres('./db/files/experiment_parameters.csv', ExperimentParameters)
# asyncio.run(csv_to_db()) await load_data_to_postgres('./db/files/experiment_data.csv', ExperimentData)
load_data_to_clickhouse('./db/files/ch_experimentdb_experiment_data.csv', ChExperimentDBExperimentData)

View File

@ -1,7 +0,0 @@
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from settings import settings
engine = create_async_engine(url=settings.db_url_asyncpg_docker, echo=True)
async_session = async_sessionmaker(engine)

6
db/models/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from .base import *
from .experiment_parameters_model import *
from .experiment_data_model import *
from .ch_experimentdb_experiment_data_model import *
from .load_parameters_model import *
from .recycling_parameters_model import *

View File

@ -1,23 +1,26 @@
from sqlalchemy import Identity from clickhouse_sqlalchemy import engines
from clickhouse_sqlalchemy.types import Float64, String
from sqlalchemy import Integer
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from db.models.base import Base from db.clickhouse_db_connection import BaseClickhouse
class ChExperimentDBExperimentData(BaseClickhouse):
__tablename__ = 'experiment_data'
__table_args__ = (
engines.MergeTree(order_by='id'),
)
class ChExperimentDBExperimentData(Base): id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
__tablename__ = 'ch_experimentdb_experiment_data' volume: Mapped[float] = mapped_column(Float64)
nitrogen_oxide_emission: Mapped[float] = mapped_column(Float64)
id: Mapped[int] = mapped_column(Identity(start=11, cycle=True), temperature: Mapped[float] = mapped_column(Float64)
primary_key=True) co_fraction: Mapped[float] = mapped_column(Float64)
volume: Mapped[float] co2_fraction: Mapped[float] = mapped_column(Float64)
nitrogen_oxide_emission: Mapped[float] x: Mapped[float] = mapped_column(Float64)
temperature: Mapped[float] y: Mapped[float] = mapped_column(Float64)
co_fraction: Mapped[float] z: Mapped[float] = mapped_column(Float64)
co2_fraction: Mapped[float] file_id: Mapped[str] = mapped_column(String)
x: Mapped[float]
y: Mapped[float]
z: Mapped[float]
file_id: Mapped[str]
def __repr__(self): def __repr__(self):
return f"<ChExperimentDBExperimentData>" return f"<ChExperimentDBExperimentData id={self.id}, volume={self.volume}>"

View File

@ -1,6 +1,6 @@
from typing import Optional from typing import Optional
from sqlalchemy import Identity from sqlalchemy import Identity, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
from db.models.base import Base from db.models.base import Base
@ -16,7 +16,7 @@ class ExperimentData(Base):
nox: Mapped[float] nox: Mapped[float]
co2: Mapped[float] co2: Mapped[float]
co: Mapped[float] co: Mapped[float]
file_id: Mapped[Optional[str]] file_id: Mapped[Optional[str]] = mapped_column(ForeignKey('experiment_parameters.experiment_hash', ondelete='SET NULL'))
def __repr__(self): def __repr__(self):
return f"<ExperimentData>" return f"<ExperimentData>"

View File

@ -17,7 +17,7 @@ class ExperimentParameters(Base):
middle_blades_count: Mapped[int] middle_blades_count: Mapped[int]
load_id: Mapped[Optional[int]] = mapped_column(ForeignKey('load_parameters.id', ondelete='SET NULL')) load_id: Mapped[Optional[int]] = mapped_column(ForeignKey('load_parameters.id', ondelete='SET NULL'))
recycling_id: Mapped[Optional[int]] = mapped_column(ForeignKey('recycling_parameters.id', ondelete='SET NULL')) recycling_id: Mapped[Optional[int]] = mapped_column(ForeignKey('recycling_parameters.id', ondelete='SET NULL'))
experiment_hash: Mapped[str] experiment_hash: Mapped[str] = mapped_column(unique=True)
def __repr__(self): def __repr__(self):
return f"<ExperimentParameters>" return f"<ExperimentParameters>"

View File

@ -0,0 +1,5 @@
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from settings import settings
engine_postgres = create_async_engine(url=settings.db_url_asyncpg_docker, echo=True)
async_session_postgres = async_sessionmaker(engine_postgres, expire_on_commit=False)

View File

@ -0,0 +1,5 @@
from .experiment_data_repos import *
from .experiment_parameters_repos import *
from .load_parameters_repos import *
from .recycling_parameters_repos import *
from .ch_experimentdb_experiment_data_repos import *

View File

@ -1,63 +1,38 @@
from typing import Optional
from sqlalchemy import select
from db.db_connection import async_session
from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData
from network.schemas import ChExperimentDBExperimentDataBody from sqlalchemy import select,func
class ChExperimentDBExperimentDataRepository: class ChExperimentDBExperimentDataRepository:
@staticmethod def __init__(self, session):
async def get_all() -> Optional[list[ChExperimentDBExperimentData]]: self.session = session
async with async_session() as session:
result = await session.execute(select(ChExperimentDBExperimentData))
return result.scalars().all()
def get_all(self):
return self.session.query(ChExperimentDBExperimentData).all()
@staticmethod def get_by_id(self, id: int) -> ChExperimentDBExperimentData:
async def get_by_id(id_: int) -> Optional[ChExperimentDBExperimentData]: return self.session.query(ChExperimentDBExperimentData).filter(ChExperimentDBExperimentData.id == id).one_or_none()
async with async_session() as session:
result = session.get(ChExperimentDBExperimentData, id_)
return result
def create(self, ch_experiment_data: ChExperimentDBExperimentData) -> ChExperimentDBExperimentData:
max_id_query = select(func.max(ChExperimentDBExperimentData.id))
max_id_result = self.session.execute(max_id_query).scalar()
@staticmethod max_id = max_id_result or 0
async def create(volume: float,
nitrogen_oxide_emission: float,
temperature: float,
co_fraction: float,
co2_fraction: float,
x: float,
y: float,
z: float,
file_id: str):
new_data = ChExperimentDBExperimentData(
volume=volume,
nitrogen_oxide_emission=nitrogen_oxide_emission,
temperature=temperature,
co_fraction=co_fraction,
co2_fraction=co2_fraction,
x=x,
y=y,
z=z,
file_id=file_id
)
async with async_session() as session:
session.add(new_data)
await session.commit()
@staticmethod ch_experiment_data.id = max_id + 1
async def create_from_pydantic(body: ChExperimentDBExperimentDataBody):
new_data = ChExperimentDBExperimentData( self.session.add(ch_experiment_data)
volume=body.volume, self.session.commit()
nitrogen_oxide_emission=body.nitrogen_oxide_emission,
temperature=body.temperature, return ch_experiment_data
co_fraction=body.co_fraction,
co2_fraction=body.co2_fraction, def update(self, id: int, updated_data: dict) -> ChExperimentDBExperimentData:
x=body.x, self.session.query(ChExperimentDBExperimentData).filter(ChExperimentDBExperimentData.id == id).update(updated_data)
y=body.y, self.session.commit()
z=body.z, return self.get_by_id(id)
file_id=body.file_id
) def delete(self, id: int) -> bool:
async with async_session() as session: item = self.get_by_id(id)
session.add(new_data) if item:
await session.commit() self.session.delete(item)
self.session.commit()
return True
return False

View File

@ -1,53 +1,55 @@
from typing import Optional from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select from sqlalchemy.future import select
from db.db_connection import async_session from sqlalchemy import update, delete
from typing import List
from db.models.experiment_data_model import ExperimentData from db.models.experiment_data_model import ExperimentData
from network.schemas import ExperimentDataBody from network.schemas import ExperimentDataBody
class ExperimentDataRepository: class ExperimentDataRepository:
@staticmethod def __init__(self, session: AsyncSession):
async def get_all() -> Optional[list[ExperimentData]]: self.session = session
async with async_session() as session:
result = await session.execute(select(ExperimentData)) async def get_all(self) -> List[ExperimentData]:
result = await self.session.execute(select(ExperimentData))
return result.scalars().all() return result.scalars().all()
@staticmethod async def get_by_id(self, id: int) -> ExperimentData:
async def get_by_id(id_: int) -> Optional[ExperimentData]: result = await self.session.execute(select(ExperimentData).where(ExperimentData.id == id))
async with async_session() as session: return result.scalar_one_or_none()
result = session.get(ExperimentData, id_)
return result
async def create(self, experiment_data: ExperimentData) -> ExperimentData:
self.session.add(experiment_data)
await self.session.commit()
await self.session.refresh(experiment_data)
return experiment_data
@staticmethod async def update(self, id: int, updated_data: dict) -> ExperimentData:
async def create(direction: float, stmt = (
temperature: float, update(ExperimentData).
nox: float, where(ExperimentData.id == id).
co2: float, values(**updated_data)
co: float,
file_id: str):
new_data = ExperimentData(
direction=direction,
temperature=temperature,
nox=nox,
co2=co2,
co=co,
file_id=file_id
) )
async with async_session() as session: await self.session.execute(stmt)
session.add(new_data) await self.session.commit()
await session.commit() return await self.get_by_id(id)
@staticmethod async def delete(self, id: int) -> bool:
async def create_from_pydantic(body: ExperimentDataBody): stmt = delete(ExperimentData).where(ExperimentData.id == id)
new_data = ExperimentData( result = await self.session.execute(stmt)
direction=body.direction, await self.session.commit()
temperature=body.temperature, return result.rowcount > 0
nox=body.nox,
co2=body.co2, # @staticmethod
co=body.co, # async def create_from_pydantic(body: ExperimentDataBody):
file_id=body.file_id # new_data = ExperimentData(
) # direction=body.direction,
async with async_session() as session: # temperature=body.temperature,
session.add(new_data) # nox=body.nox,
await session.commit() # co2=body.co2,
# co=body.co,
# file_id=body.file_id
# )
# async with async_session() as session:
# session.add(new_data)
# await session.commit()

View File

@ -1,55 +1,56 @@
from typing import Optional from typing import Optional, List, Sequence
from sqlalchemy import select from sqlalchemy import select
from db.db_connection import async_session from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update, delete
from typing import List
from db.models.experiment_parameters_model import ExperimentParameters from db.models.experiment_parameters_model import ExperimentParameters
from network.schemas import ExperimentParametersBody
class ExperimentParametersRepository: class ExperimentParametersRepository:
@staticmethod def __init__(self, session: AsyncSession):
async def get_all() -> Optional[list[ExperimentParameters]]: self.session = session
async with async_session() as session:
result = await session.execute(select(ExperimentParameters)) async def get_all(self) -> List[ExperimentParameters]:
result = await self.session.execute(select(ExperimentParameters))
return result.scalars().all() return result.scalars().all()
@staticmethod async def get_by_id(self, id: int) -> ExperimentParameters:
async def get_by_id(id_: int) -> Optional[ExperimentParameters]: result = await self.session.execute(select(ExperimentParameters).where(ExperimentParameters.id == id))
async with async_session() as session: return result.scalar_one_or_none()
result = session.get(ExperimentParameters, id_)
return result
@staticmethod async def create(self, experiment_parameters: ExperimentParameters) -> ExperimentParameters:
async def create(outer_blades_count: int, self.session.add(experiment_parameters)
outer_blades_length: float, await self.session.commit()
outer_blades_angle: float, await self.session.refresh(experiment_parameters)
middle_blades_count: int, return experiment_parameters
load_id: int,
recycling_id: int,
experiment_hash: str):
new_data = ExperimentParameters(
outer_blades_count=outer_blades_count,
outer_blades_length=outer_blades_length,
outer_blades_angle=outer_blades_angle,
middle_blades_count=middle_blades_count,
load_id=load_id,
recycling_id=recycling_id,
experiment_hash=experiment_hash
)
async with async_session() as session:
session.add(new_data)
await session.commit()
@staticmethod async def update(self, id: int, updated_data: dict) -> ExperimentParameters:
async def create_from_pydantic(body: ExperimentParametersBody): stmt = (
new_data = ExperimentParameters( update(ExperimentParameters).
outer_blades_count=body.outer_blades_count, where(ExperimentParameters.id == id).
outer_blades_length=body.outer_blades_length, values(**updated_data)
outer_blades_angle=body.outer_blades_angle,
middle_blades_count=body.middle_blades_count,
load_id=body.load_id,
recycling_id=body.recycling_id,
experiment_hash=body.experiment_hash
) )
async with async_session() as session: await self.session.execute(stmt)
session.add(new_data) await self.session.commit()
await session.commit() return await self.get_by_id(id)
async def delete(self, id: int) -> bool:
stmt = delete(ExperimentParameters).where(ExperimentParameters.id == id)
result = await self.session.execute(stmt)
await self.session.commit()
return result.rowcount > 0
# @staticmethod
# async def create_from_pydantic(body: ExperimentParametersBody):
# new_data = ExperimentParameters(
# outer_blades_count=body.outer_blades_count,
# outer_blades_length=body.outer_blades_length,
# outer_blades_angle=body.outer_blades_angle,
# middle_blades_count=body.middle_blades_count,
# load_id=body.load_id,
# recycling_id=body.recycling_id,
# experiment_hash=body.experiment_hash
# )
# async with async_session() as session:
# session.add(new_data)
# await session.commit()

View File

@ -1,6 +1,6 @@
from typing import Optional from typing import Optional
from sqlalchemy.future import select from sqlalchemy.future import select
from db.db_connection import async_session from db.postgres_db_connection import async_session_postgres
from db.models.load_parameters_model import LoadParameters from db.models.load_parameters_model import LoadParameters
from network.schemas import LoadParametersBody from network.schemas import LoadParametersBody
@ -8,13 +8,13 @@ from network.schemas import LoadParametersBody
class LoadParametersRepository: class LoadParametersRepository:
@staticmethod @staticmethod
async def get_all() -> Optional[list[LoadParameters]]: async def get_all() -> Optional[list[LoadParameters]]:
async with async_session() as session: async with async_session_postgres() as session:
result = await session.execute(select(LoadParameters)) result = await session.execute(select(LoadParameters))
return result.scalars().all() return result.scalars().all()
@staticmethod @staticmethod
async def get_by_id(id_: int) -> Optional[LoadParameters]: async def get_by_id(id_: int) -> Optional[LoadParameters]:
async with async_session() as session: async with async_session_postgres() as session:
result = session.get(LoadParameters, id_) result = session.get(LoadParameters, id_)
return result return result
@ -29,7 +29,7 @@ class LoadParametersRepository:
secondary_air_consumption=secondary_air_consumption, secondary_air_consumption=secondary_air_consumption,
gas_inlet_consumption=gas_inlet_consumption gas_inlet_consumption=gas_inlet_consumption
) )
async with async_session() as session: async with async_session_postgres() as session:
session.add(new_data) session.add(new_data)
await session.commit() await session.commit()
@ -41,7 +41,7 @@ class LoadParametersRepository:
secondary_air_consumption=body.secondary_air_consumption, secondary_air_consumption=body.secondary_air_consumption,
gas_inlet_consumption=body.gas_inlet_consumption gas_inlet_consumption=body.gas_inlet_consumption
) )
async with async_session() as session: async with async_session_postgres() as session:
session.add(new_data) session.add(new_data)
await session.commit() await session.commit()

View File

@ -1,6 +1,6 @@
from typing import Optional from typing import Optional
from sqlalchemy import select from sqlalchemy import select
from db.db_connection import async_session from db.postgres_db_connection import async_session_postgres
from db.models.recycling_parameters_model import RecyclingParameters from db.models.recycling_parameters_model import RecyclingParameters
from network.schemas import RecyclingParametersBody from network.schemas import RecyclingParametersBody
@ -8,13 +8,13 @@ from network.schemas import RecyclingParametersBody
class RecyclingParametersRepository: class RecyclingParametersRepository:
@staticmethod @staticmethod
async def get_all() -> Optional[list[RecyclingParameters]]: async def get_all() -> Optional[list[RecyclingParameters]]:
async with async_session() as session: async with async_session_postgres() as session:
result = await session.execute(select(RecyclingParameters)) result = await session.execute(select(RecyclingParameters))
return result.scalars.all() return result.scalars.all()
@staticmethod @staticmethod
async def get_by_id(id_: int) -> Optional[RecyclingParameters]: async def get_by_id(id_: int) -> Optional[RecyclingParameters]:
async with async_session() as session: async with async_session_postgres() as session:
result = await session.execute(select(RecyclingParameters).where(RecyclingParameters.id == id_)) result = await session.execute(select(RecyclingParameters).where(RecyclingParameters.id == id_))
return result.scalars().first() return result.scalars().first()
@ -33,7 +33,7 @@ class RecyclingParametersRepository:
h2o=h2o, h2o=h2o,
o2=o2 o2=o2
) )
async with async_session() as session: async with async_session_postgres() as session:
session.add(new_data) session.add(new_data)
await session.commit() await session.commit()
@ -47,6 +47,6 @@ class RecyclingParametersRepository:
h2o=body.h2o, h2o=body.h2o,
o2=body.o2 o2=body.o2
) )
async with async_session() as session: async with async_session_postgres() as session:
session.add(new_data) session.add(new_data)
await session.commit() await session.commit()

View File

@ -33,7 +33,7 @@ services:
- db - db
- clickhouse - clickhouse
volumes: volumes:
- .:/app # Связываем текущую директорию с контейнером для доступа к коду - .:/app
volumes: volumes:
postgres_data: postgres_data:

11
main.py
View File

@ -1,10 +1,10 @@
import asyncio import asyncio
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException, BackgroundTasks
from db.csv_to_db import csv_to_db from db.csv_to_db import csv_to_db
from network.routes import ch_experimentdb_experiment_data_router, experiment_data_router, experiment_parameters_router from network.routes import ch_experimentdb_experiment_data_router, experiment_data_router, experiment_parameters_router
from network.routes import load_parameters_router, recycling_parameters_router from network.routes import load_parameters_router, recycling_parameters_router
from network.schemas import * from network.schemas import *
from new_experiment_planner import run_experiment # Импортируем функцию из твоего скрипта from new_experiment_planner import run_experiment
app = FastAPI() app = FastAPI()
@ -45,9 +45,10 @@ def run_experiment_api(params: ExperimentParameters):
# эндпоинт инициализации бд из csv файлов # эндпоинт инициализации бд из csv файлов
@app.get('/init_db_data') @app.get('/init_db_data')
def init_db_data(): async def init_db_data(background_tasks: BackgroundTasks):
try: try:
asyncio.run(csv_to_db()) background_tasks.add_task(csv_to_db)
return {"status": "success", "message": "База данных инициализирована. Данные из файлов csv успешно добавлены."} return {"status": "success", "message": "Инициализация БД запущена в фоне"}
except Exception as e: except Exception as e:
print(str(e.with_traceback()))
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")

View File

@ -1,28 +1,67 @@
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
from db.clickhouse_db_connection import session_clickhouse
from db.models import ChExperimentDBExperimentData
from db.repositories.ch_experimentdb_experiment_data_repos import ChExperimentDBExperimentDataRepository from db.repositories.ch_experimentdb_experiment_data_repos import ChExperimentDBExperimentDataRepository
from network.schemas import ChExperimentDBExperimentDataBody from network.schemas import ChExperimentDBExperimentDataBody
from typing import List
router = APIRouter() router = APIRouter()
repository = ChExperimentDBExperimentDataRepository(session_clickhouse)
@router.post('/create') @router.post('/create')
async def create_ch_experimentdb_experiment_data(data: ChExperimentDBExperimentDataBody): async def create_ch_experimentdb_experiment_data(data: ChExperimentDBExperimentDataBody):
try: try:
await ChExperimentDBExperimentDataRepository.create_from_pydantic(data) new_record = repository.create(ChExperimentDBExperimentData(
volume=data.volume,
nitrogen_oxide_emission=data.nitrogen_oxide_emission,
temperature=data.temperature,
co_fraction=data.co_fraction,
co2_fraction=data.co2_fraction,
x=data.x,
y=data.y,
z=data.z,
file_id=data.file_id
))
return {"message": "Новая запись <ChExperimentDBExperimentData> успешно добавлена"} return {"message": "Новая запись успешно добавлена"}
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
@router.get('/all') @router.get('/all', response_model=List[ChExperimentDBExperimentDataBody])
async def get_all_ch_experimentdb_experiment_data(): async def get_all_ch_experimentdb_experiment_data():
try: try:
result = await ChExperimentDBExperimentDataRepository.get_all() result = repository.get_all()
if result is not None: if result:
return result return result
else: else:
return {"message": "Нет записей в <ChExperimentDBExperimentData>, либо произошла непредвиденная ошибка"} return {"message": "Нет записей"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
@router.get('/{id}')
async def get_ch_experimentdb_experiment_data_by_id(id: int):
try:
record = repository.get_by_id(id)
if record:
return record
else:
raise HTTPException(status_code=404, detail="Запись не найдена")
except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
@router.delete('/{id}/delete')
async def delete_ch_experimentdb_experiment_data(id: int):
try:
is_deleted = repository.delete(id)
if is_deleted:
return {"message": "Запись успешно удалена"}
else:
raise HTTPException(status_code=404, detail="Запись не найдена")
except Exception as e: except Exception as e:
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")

View File

@ -2,8 +2,6 @@ from typing import Optional
from pydantic import BaseModel, ConfigDict from pydantic import BaseModel, ConfigDict
# Модель данных для передачи параметров
class ExperimentParameters(BaseModel): class ExperimentParameters(BaseModel):
outer_blades_count: str outer_blades_count: str
outer_blades_length: str outer_blades_length: str
@ -12,7 +10,6 @@ class ExperimentParameters(BaseModel):
load: str load: str
recycling: str recycling: str
class ChExperimentDBExperimentDataBody(BaseModel): class ChExperimentDBExperimentDataBody(BaseModel):
model_config = ConfigDict(from_attributes=True) model_config = ConfigDict(from_attributes=True)
volume: float volume: float
@ -25,7 +22,6 @@ class ChExperimentDBExperimentDataBody(BaseModel):
z: float z: float
file_id: str file_id: str
class ExperimentDataBody(BaseModel): class ExperimentDataBody(BaseModel):
model_config = ConfigDict(from_attributes=True) model_config = ConfigDict(from_attributes=True)
direction: float direction: float
@ -35,7 +31,6 @@ class ExperimentDataBody(BaseModel):
co: float co: float
file_id: Optional[str] file_id: Optional[str]
class ExperimentParametersBody(BaseModel): class ExperimentParametersBody(BaseModel):
model_config = ConfigDict(from_attributes=True) model_config = ConfigDict(from_attributes=True)
outer_blades_count: int outer_blades_count: int
@ -46,7 +41,6 @@ class ExperimentParametersBody(BaseModel):
recycling_id: Optional[int] recycling_id: Optional[int]
experiment_hash: str experiment_hash: str
class LoadParametersBody(BaseModel): class LoadParametersBody(BaseModel):
model_config = ConfigDict(from_attributes=True) model_config = ConfigDict(from_attributes=True)
load: int load: int
@ -54,7 +48,6 @@ class LoadParametersBody(BaseModel):
secondary_air_consumption: float secondary_air_consumption: float
gas_inlet_consumption: float gas_inlet_consumption: float
class RecyclingParametersBody(BaseModel): class RecyclingParametersBody(BaseModel):
model_config = ConfigDict(from_attributes=True) model_config = ConfigDict(from_attributes=True)
load_id: Optional[int] load_id: Optional[int]

Binary file not shown.

View File

@ -22,7 +22,10 @@ class Settings(BaseSettings):
# 'postgresql+asyncpg://username:password@localhost:5432/database_name' # 'postgresql+asyncpg://username:password@localhost:5432/database_name'
return f'postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}@db:{self.DB_PORT}/{self.DATABASE}' return f'postgresql+asyncpg://{self.POSTGRES_USER}:{self.POSTGRES_PASSWORD}@db:{self.DB_PORT}/{self.DATABASE}'
@property
def clickhouse_url(self):
return f'clickhouse://{self.CLICKHOUSE_USER}:{self.CLICKHOUSE_PASSWORD}@clickhouse:8123/{self.DATABASE}'
model_config = SettingsConfigDict(env_file=".env") model_config = SettingsConfigDict(env_file=".env")
settings = Settings() settings = Settings()