diff --git a/db/csv_to_db.py b/db/csv_to_db.py index 155a2de..cdaed5a 100644 --- a/db/csv_to_db.py +++ b/db/csv_to_db.py @@ -1,20 +1,14 @@ -from unittest.mock import inplace +import asyncio import pandas as pd -from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from sqlalchemy import insert -from settings import settings +from db.db_connection import async_session, engine from db.models.base import Base from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData from db.models.experiment_data_model import ExperimentData from db.models.experiment_parameters_model import ExperimentParameters from db.models.load_parameters_model import LoadParameters from db.models.recycling_parameters_model import RecyclingParameters -import asyncio - -engine = create_async_engine(url=settings.db_url_asyncpg_docker, echo=True) - -async_session = async_sessionmaker(engine) def add_ids_in_csv(file: str): @@ -37,16 +31,19 @@ def print_headers_and_types(file: str): async def create_all_tables(): + print('create_all_tables') async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) async def drop_all_tables(): + print('drop_all_tables') async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) async def load_data_to_db(file: str, model_class): + print('load_data_to_db') async with async_session() as session: df = pd.read_csv(file).dropna() # Преобразование данных из DataFrame в формат, подходящий для SQLAlchemy @@ -65,33 +62,29 @@ async def insert_batch(session, model_class, batch_data): await session.commit() async def test(file: str, model_class): + print('test') + df = pd.read_csv(file) + data_records = df.to_dict(orient='records') + + # async with async_session() as session: + # batch_size = 3000 + # tasks = [] + # + # for i in range(0, len(data_records), batch_size): + # batch_data = data_records[i:i + batch_size] + # tasks.append(insert_batch(session, model_class, batch_data)) + # + # await asyncio.gather(*tasks) + + # две минуты добавляет async with async_session() as session: - df = pd.read_csv(file) - # df.dropna(inplace=True) - df.pop('id') - df.insert(0, 'id', pd.Series(range(1, len(df) + 1))) - - # df = df.head(3000) - # num_rows = df.shape[0] - # print(f"Количество строк: {num_rows}") - - tasks = [] - data_records = df.to_dict(orient='records') batch_size = 3000 - + print(batch_size) for i in range(0, len(data_records), batch_size): batch_data = data_records[i:i + batch_size] - tasks.append(insert_batch(session, model_class, batch_data)) - await asyncio.gather(*tasks) - - # for i in range(0, len(data_records), batch_size): - # batch_data = data_records[i:i + batch_size] - # stmt = insert(model_class).values(batch_data) - # await session.execute(stmt) - # await session.commit() - # - # await session.execute(stmt) - # await session.commit() + stmt = insert(model_class).values(batch_data) + await session.execute(stmt) + await session.commit() async def csv_to_db(): diff --git a/db/db_connection.py b/db/db_connection.py index 97a2663..bc042d1 100644 --- a/db/db_connection.py +++ b/db/db_connection.py @@ -2,6 +2,6 @@ from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker from settings import settings -engine = create_async_engine(url=settings.db_url_asyncpg_docker, echo=False) +engine = create_async_engine(url=settings.db_url_asyncpg_docker, echo=True) async_session = async_sessionmaker(engine) diff --git a/db/repositories/ch_experimentdb_experiment_data_repos.py b/db/repositories/ch_experimentdb_experiment_data_repos.py index 071008d..8e93ace 100644 --- a/db/repositories/ch_experimentdb_experiment_data_repos.py +++ b/db/repositories/ch_experimentdb_experiment_data_repos.py @@ -2,6 +2,7 @@ from typing import Optional from sqlalchemy import select from db.db_connection import async_session from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData +from network.schemas import ChExperimentDBExperimentDataBody class ChExperimentDBExperimentDataRepository: @@ -43,3 +44,20 @@ class ChExperimentDBExperimentDataRepository: async with async_session() as session: session.add(new_data) await session.commit() + + @staticmethod + async def create_from_pydantic(body: ChExperimentDBExperimentDataBody): + new_data = ChExperimentDBExperimentData( + volume=body.volume, + nitrogen_oxide_emission=body.nitrogen_oxide_emission, + temperature=body.temperature, + co_fraction=body.co_fraction, + co2_fraction=body.co2_fraction, + x=body.x, + y=body.y, + z=body.z, + file_id=body.file_id + ) + async with async_session() as session: + session.add(new_data) + await session.commit() diff --git a/db/repositories/experiment_data_repos.py b/db/repositories/experiment_data_repos.py index 4ad2f5d..8a5b8fa 100644 --- a/db/repositories/experiment_data_repos.py +++ b/db/repositories/experiment_data_repos.py @@ -2,6 +2,7 @@ from typing import Optional from sqlalchemy import select from db.db_connection import async_session from db.models.experiment_data_model import ExperimentData +from network.schemas import ExperimentDataBody class ExperimentDataRepository: @@ -36,3 +37,17 @@ class ExperimentDataRepository: async with async_session() as session: session.add(new_data) await session.commit() + + @staticmethod + async def create_from_pydantic(body: ExperimentDataBody): + new_data = ExperimentData( + direction=body.direction, + temperature=body.temperature, + nox=body.nox, + co2=body.co2, + co=body.co, + file_id=body.file_id + ) + async with async_session() as session: + session.add(new_data) + await session.commit() diff --git a/db/repositories/experiment_parameters_repos.py b/db/repositories/experiment_parameters_repos.py index c76caaf..3bb0430 100644 --- a/db/repositories/experiment_parameters_repos.py +++ b/db/repositories/experiment_parameters_repos.py @@ -2,6 +2,7 @@ from typing import Optional from sqlalchemy import select from db.db_connection import async_session from db.models.experiment_parameters_model import ExperimentParameters +from network.schemas import ExperimentParametersBody class ExperimentParametersRepository: @@ -37,3 +38,18 @@ class ExperimentParametersRepository: async with async_session() as session: session.add(new_data) await session.commit() + + @staticmethod + async def create_from_pydantic(body: ExperimentParametersBody): + new_data = ExperimentParameters( + outer_blades_count=body.outer_blades_count, + outer_blades_length=body.outer_blades_length, + outer_blades_angle=body.outer_blades_angle, + middle_blades_count=body.middle_blades_count, + load_id=body.load_id, + recycling_id=body.recycling_id, + experiment_hash=body.experiment_hash + ) + async with async_session() as session: + session.add(new_data) + await session.commit() \ No newline at end of file diff --git a/db/repositories/load_parameters_repos.py b/db/repositories/load_parameters_repos.py index d0ea033..d8eff1d 100644 --- a/db/repositories/load_parameters_repos.py +++ b/db/repositories/load_parameters_repos.py @@ -2,7 +2,7 @@ from typing import Optional from sqlalchemy.future import select from db.db_connection import async_session from db.models.load_parameters_model import LoadParameters -from network.bodies import LoadParametersBody +from network.schemas import LoadParametersBody class LoadParametersRepository: diff --git a/db/repositories/recycling_parameters_repos.py b/db/repositories/recycling_parameters_repos.py index 6e16c18..c9fe85e 100644 --- a/db/repositories/recycling_parameters_repos.py +++ b/db/repositories/recycling_parameters_repos.py @@ -2,6 +2,7 @@ from typing import Optional from sqlalchemy import select from db.db_connection import async_session from db.models.recycling_parameters_model import RecyclingParameters +from network.schemas import RecyclingParametersBody class RecyclingParametersRepository: @@ -35,3 +36,17 @@ class RecyclingParametersRepository: async with async_session() as session: session.add(new_data) await session.commit() + + @staticmethod + async def create_from_pydantic(body: RecyclingParametersBody): + new_data = RecyclingParameters( + load_id=body.load_id, + recycling_level=body.recycling_level, + co2=body.co2, + n2=body.n2, + h2o=body.h2o, + o2=body.o2 + ) + async with async_session() as session: + session.add(new_data) + await session.commit() \ No newline at end of file diff --git a/main.py b/main.py index c6430fb..935e81d 100644 --- a/main.py +++ b/main.py @@ -1,23 +1,28 @@ import asyncio -from typing import Optional -from fastapi import FastAPI, HTTPException, Body, Response -from pydantic import BaseModel -from new_experiment_planner import run_experiment # Импортируем функцию из твоего скрипта +from fastapi import FastAPI, HTTPException from db.csv_to_db import csv_to_db -from db.repositories.load_parameters_repos import LoadParametersRepository -from network.bodies import LoadParametersBody +from network.routes import ch_experimentdb_experiment_data_router, experiment_data_router, experiment_parameters_router +from network.routes import load_parameters_router, recycling_parameters_router +from network.schemas import * +from new_experiment_planner import run_experiment # Импортируем функцию из твоего скрипта app = FastAPI() -# Модель данных для передачи параметров -class ExperimentParameters(BaseModel): - outer_blades_count: str - outer_blades_length: str - outer_blades_angle: str - middle_blades_count: str - load: str - recycling: str - +app.include_router(ch_experimentdb_experiment_data_router.router, + prefix="/ch_experimentdb_experiment_data", + tags=["ch_experimentdb_experiment_data"]) +app.include_router(experiment_data_router.router, + prefix="/experiment_data", + tags=["experiment_data"]) +app.include_router(experiment_parameters_router.router, + prefix="/experiment_parameters", + tags=["experiment_parameters"]) +app.include_router(load_parameters_router.router, + prefix="/load_parameters", + tags=["load_parameters"]) +app.include_router(recycling_parameters_router.router, + prefix="/recycling_parameters", + tags=["recycling_parameters"]) # Эндпоинт для запуска эксперимента @@ -38,6 +43,7 @@ def run_experiment_api(params: ExperimentParameters): raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") +# эндпоинт инициализации бд из csv файлов @app.get('/init_db_data') def init_db_data(): try: @@ -45,27 +51,3 @@ def init_db_data(): return {"status": "success", "message": "База данных инициализирована. Данные из файлов csv успешно добавлены."} except Exception as e: raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") - - -@app.post('/create') -async def test(data: LoadParametersBody): - try: - await LoadParametersRepository.create_from_pydantic(data) - - return {"status": "success", "message": "Новая запись успешно добавлена"} - except Exception as e: - raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") - - -@app.get('/all') -async def get_all(): - try: - result = await LoadParametersRepository.get_all() - - if result is not None: - # return {"status": "success", "data": [LoadParametersBody.model_validate(param) for param in result]} - return result - else: - return {"status": "success", "message":"result is not None"} - except Exception as e: - raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") \ No newline at end of file diff --git a/network/bodies.py b/network/bodies.py deleted file mode 100644 index 15636d5..0000000 --- a/network/bodies.py +++ /dev/null @@ -1,9 +0,0 @@ -from pydantic import BaseModel, ConfigDict - - -class LoadParametersBody(BaseModel): - model_config = ConfigDict(from_attributes=True) - load:int - primary_air_consumption: float - secondary_air_consumption:float - gas_inlet_consumption:float \ No newline at end of file diff --git a/network/routes/ch_experimentdb_experiment_data_router.py b/network/routes/ch_experimentdb_experiment_data_router.py new file mode 100644 index 0000000..d3f4375 --- /dev/null +++ b/network/routes/ch_experimentdb_experiment_data_router.py @@ -0,0 +1,28 @@ +from fastapi import APIRouter, HTTPException +from db.repositories.ch_experimentdb_experiment_data_repos import ChExperimentDBExperimentDataRepository +from network.schemas import ChExperimentDBExperimentDataBody + +router = APIRouter() + + +@router.post('/create') +async def test(data: ChExperimentDBExperimentDataBody): + try: + await ChExperimentDBExperimentDataRepository.create_from_pydantic(data) + + return {"message": "Новая запись успешно добавлена"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") + + +@router.get('/all') +async def get_all(): + try: + result = await ChExperimentDBExperimentDataRepository.get_all() + + if result is not None: + return result + else: + return {"message": "Нет записей в , либо произошла непредвиденная ошибка"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") diff --git a/network/routes/experiment_data_router.py b/network/routes/experiment_data_router.py new file mode 100644 index 0000000..5dfc268 --- /dev/null +++ b/network/routes/experiment_data_router.py @@ -0,0 +1,29 @@ +from fastapi import APIRouter, HTTPException +from db.repositories.experiment_data_repos import ExperimentDataRepository +from network.schemas import ExperimentDataBody + +router = APIRouter() + + +@router.post('/create') +async def test(data: ExperimentDataBody): + try: + await ExperimentDataRepository.create_from_pydantic(data) + + return {"message": "Новая запись успешно добавлена"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") + + +@router.get('/all') +async def get_all(): + try: + result = await ExperimentDataRepository.get_all() + + if result is not None: + # return {"status": "success", "data": [LoadParametersBody.model_validate(param) for param in result]} + return result + else: + return {"message": "Нет записей в , либо произошла непредвиденная ошибка"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") diff --git a/network/routes/experiment_parameters_router.py b/network/routes/experiment_parameters_router.py new file mode 100644 index 0000000..c19fecb --- /dev/null +++ b/network/routes/experiment_parameters_router.py @@ -0,0 +1,25 @@ +from fastapi import APIRouter, HTTPException +from db.repositories.experiment_parameters_repos import ExperimentParametersRepository +from network.schemas import ExperimentParametersBody + +router = APIRouter() + +@router.post('/create') +async def create_experiment_parameters(data: ExperimentParametersBody): + try: + await ExperimentParametersRepository.create_from_pydantic(data) + return {"message": "Новая запись успешно добавлена"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") + +@router.get('/all') +async def get_all_experiment_parameters(): + try: + result = await ExperimentParametersRepository.get_all() + + if result: + return result + else: + return {"message": "Нет записей в , либо произошла непредвиденная ошибка"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") diff --git a/network/routes/load_parameters_router.py b/network/routes/load_parameters_router.py new file mode 100644 index 0000000..81135a1 --- /dev/null +++ b/network/routes/load_parameters_router.py @@ -0,0 +1,29 @@ +from fastapi import APIRouter, HTTPException +from db.repositories.load_parameters_repos import LoadParametersRepository +from network.schemas import LoadParametersBody + +router = APIRouter() + + +@router.post('/create') +async def test(data: LoadParametersBody): + try: + await LoadParametersRepository.create_from_pydantic(data) + + return {"message": "Новая запись успешно добавлена"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") + + +@router.get('/all') +async def get_all(): + try: + result = await LoadParametersRepository.get_all() + + if result is not None: + # return {"status": "success", "data": [LoadParametersBody.model_validate(param) for param in result]} + return result + else: + return {"message": "Нет записей в , либо произошла непредвиденная ошибка"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") diff --git a/network/routes/recycling_parameters_router.py b/network/routes/recycling_parameters_router.py new file mode 100644 index 0000000..1bd4bb9 --- /dev/null +++ b/network/routes/recycling_parameters_router.py @@ -0,0 +1,25 @@ +from fastapi import APIRouter, HTTPException +from db.repositories.recycling_parameters_repos import RecyclingParametersRepository +from network.schemas import RecyclingParametersBody + +router = APIRouter() + +@router.post('/create') +async def create_recycling_parameters(data: RecyclingParametersBody): + try: + await RecyclingParametersRepository.create_from_pydantic(data) + return {"message": "Новая запись успешно добавлена"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") + +@router.get('/all') +async def get_all_recycling_parameters(): + try: + result = await RecyclingParametersRepository.get_all() + + if result: + return result + else: + return {"message": "Нет записей в , либо произошла непредвиденная ошибка"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") diff --git a/network/schemas.py b/network/schemas.py new file mode 100644 index 0000000..1aa37d0 --- /dev/null +++ b/network/schemas.py @@ -0,0 +1,65 @@ +from typing import Optional + +from pydantic import BaseModel, ConfigDict + + +# Модель данных для передачи параметров +class ExperimentParameters(BaseModel): + outer_blades_count: str + outer_blades_length: str + outer_blades_angle: str + middle_blades_count: str + load: str + recycling: str + + +class ChExperimentDBExperimentDataBody(BaseModel): + model_config = ConfigDict(from_attributes=True) + volume: float + nitrogen_oxide_emission: float + temperature: float + co_fraction: float + co2_fraction: float + x: float + y: float + z: float + file_id: str + + +class ExperimentDataBody(BaseModel): + model_config = ConfigDict(from_attributes=True) + direction: float + temperature: float + nox: float + co2: float + co: float + file_id: Optional[str] + + +class ExperimentParametersBody(BaseModel): + model_config = ConfigDict(from_attributes=True) + outer_blades_count: int + outer_blades_length: float + outer_blades_angle: float + middle_blades_count: int + load_id: Optional[int] + recycling_id: Optional[int] + experiment_hash: str + + +class LoadParametersBody(BaseModel): + model_config = ConfigDict(from_attributes=True) + load: int + primary_air_consumption: float + secondary_air_consumption: float + gas_inlet_consumption: float + + +class RecyclingParametersBody(BaseModel): + model_config = ConfigDict(from_attributes=True) + load_id: Optional[int] + recycling_level: int + co2: float + n2: float + h2o: float + o2: float