добавила в репозиториях новый метод, добавила роуты для сущностей; сделала в них запрос получения всех записей и запрос на добавление; пыталась сделать добавление из большого файлика быстрее, но не получается - итого инициализирует все данные за 2 и больше минут
This commit is contained in:
parent
500a13c7e6
commit
97d0206339
@ -1,20 +1,14 @@
|
||||
from unittest.mock import inplace
|
||||
import asyncio
|
||||
|
||||
import pandas as pd
|
||||
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
|
||||
from sqlalchemy import insert
|
||||
from settings import settings
|
||||
from db.db_connection import async_session, engine
|
||||
from db.models.base import Base
|
||||
from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData
|
||||
from db.models.experiment_data_model import ExperimentData
|
||||
from db.models.experiment_parameters_model import ExperimentParameters
|
||||
from db.models.load_parameters_model import LoadParameters
|
||||
from db.models.recycling_parameters_model import RecyclingParameters
|
||||
import asyncio
|
||||
|
||||
engine = create_async_engine(url=settings.db_url_asyncpg_docker, echo=True)
|
||||
|
||||
async_session = async_sessionmaker(engine)
|
||||
|
||||
|
||||
def add_ids_in_csv(file: str):
|
||||
@ -37,16 +31,19 @@ def print_headers_and_types(file: str):
|
||||
|
||||
|
||||
async def create_all_tables():
|
||||
print('create_all_tables')
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
|
||||
async def drop_all_tables():
|
||||
print('drop_all_tables')
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.drop_all)
|
||||
|
||||
|
||||
async def load_data_to_db(file: str, model_class):
|
||||
print('load_data_to_db')
|
||||
async with async_session() as session:
|
||||
df = pd.read_csv(file).dropna()
|
||||
# Преобразование данных из DataFrame в формат, подходящий для SQLAlchemy
|
||||
@ -65,33 +62,29 @@ async def insert_batch(session, model_class, batch_data):
|
||||
await session.commit()
|
||||
|
||||
async def test(file: str, model_class):
|
||||
async with async_session() as session:
|
||||
print('test')
|
||||
df = pd.read_csv(file)
|
||||
# df.dropna(inplace=True)
|
||||
df.pop('id')
|
||||
df.insert(0, 'id', pd.Series(range(1, len(df) + 1)))
|
||||
|
||||
# df = df.head(3000)
|
||||
# num_rows = df.shape[0]
|
||||
# print(f"Количество строк: {num_rows}")
|
||||
|
||||
tasks = []
|
||||
data_records = df.to_dict(orient='records')
|
||||
batch_size = 3000
|
||||
|
||||
for i in range(0, len(data_records), batch_size):
|
||||
batch_data = data_records[i:i + batch_size]
|
||||
tasks.append(insert_batch(session, model_class, batch_data))
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
# async with async_session() as session:
|
||||
# batch_size = 3000
|
||||
# tasks = []
|
||||
#
|
||||
# for i in range(0, len(data_records), batch_size):
|
||||
# batch_data = data_records[i:i + batch_size]
|
||||
# stmt = insert(model_class).values(batch_data)
|
||||
# await session.execute(stmt)
|
||||
# await session.commit()
|
||||
# tasks.append(insert_batch(session, model_class, batch_data))
|
||||
#
|
||||
# await session.execute(stmt)
|
||||
# await session.commit()
|
||||
# await asyncio.gather(*tasks)
|
||||
|
||||
# две минуты добавляет
|
||||
async with async_session() as session:
|
||||
batch_size = 3000
|
||||
print(batch_size)
|
||||
for i in range(0, len(data_records), batch_size):
|
||||
batch_data = data_records[i:i + batch_size]
|
||||
stmt = insert(model_class).values(batch_data)
|
||||
await session.execute(stmt)
|
||||
await session.commit()
|
||||
|
||||
|
||||
async def csv_to_db():
|
||||
|
@ -2,6 +2,6 @@ from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
|
||||
|
||||
from settings import settings
|
||||
|
||||
engine = create_async_engine(url=settings.db_url_asyncpg_docker, echo=False)
|
||||
engine = create_async_engine(url=settings.db_url_asyncpg_docker, echo=True)
|
||||
|
||||
async_session = async_sessionmaker(engine)
|
||||
|
@ -2,6 +2,7 @@ from typing import Optional
|
||||
from sqlalchemy import select
|
||||
from db.db_connection import async_session
|
||||
from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData
|
||||
from network.schemas import ChExperimentDBExperimentDataBody
|
||||
|
||||
|
||||
class ChExperimentDBExperimentDataRepository:
|
||||
@ -43,3 +44,20 @@ class ChExperimentDBExperimentDataRepository:
|
||||
async with async_session() as session:
|
||||
session.add(new_data)
|
||||
await session.commit()
|
||||
|
||||
@staticmethod
|
||||
async def create_from_pydantic(body: ChExperimentDBExperimentDataBody):
|
||||
new_data = ChExperimentDBExperimentData(
|
||||
volume=body.volume,
|
||||
nitrogen_oxide_emission=body.nitrogen_oxide_emission,
|
||||
temperature=body.temperature,
|
||||
co_fraction=body.co_fraction,
|
||||
co2_fraction=body.co2_fraction,
|
||||
x=body.x,
|
||||
y=body.y,
|
||||
z=body.z,
|
||||
file_id=body.file_id
|
||||
)
|
||||
async with async_session() as session:
|
||||
session.add(new_data)
|
||||
await session.commit()
|
||||
|
@ -2,6 +2,7 @@ from typing import Optional
|
||||
from sqlalchemy import select
|
||||
from db.db_connection import async_session
|
||||
from db.models.experiment_data_model import ExperimentData
|
||||
from network.schemas import ExperimentDataBody
|
||||
|
||||
|
||||
class ExperimentDataRepository:
|
||||
@ -36,3 +37,17 @@ class ExperimentDataRepository:
|
||||
async with async_session() as session:
|
||||
session.add(new_data)
|
||||
await session.commit()
|
||||
|
||||
@staticmethod
|
||||
async def create_from_pydantic(body: ExperimentDataBody):
|
||||
new_data = ExperimentData(
|
||||
direction=body.direction,
|
||||
temperature=body.temperature,
|
||||
nox=body.nox,
|
||||
co2=body.co2,
|
||||
co=body.co,
|
||||
file_id=body.file_id
|
||||
)
|
||||
async with async_session() as session:
|
||||
session.add(new_data)
|
||||
await session.commit()
|
||||
|
@ -2,6 +2,7 @@ from typing import Optional
|
||||
from sqlalchemy import select
|
||||
from db.db_connection import async_session
|
||||
from db.models.experiment_parameters_model import ExperimentParameters
|
||||
from network.schemas import ExperimentParametersBody
|
||||
|
||||
|
||||
class ExperimentParametersRepository:
|
||||
@ -37,3 +38,18 @@ class ExperimentParametersRepository:
|
||||
async with async_session() as session:
|
||||
session.add(new_data)
|
||||
await session.commit()
|
||||
|
||||
@staticmethod
|
||||
async def create_from_pydantic(body: ExperimentParametersBody):
|
||||
new_data = ExperimentParameters(
|
||||
outer_blades_count=body.outer_blades_count,
|
||||
outer_blades_length=body.outer_blades_length,
|
||||
outer_blades_angle=body.outer_blades_angle,
|
||||
middle_blades_count=body.middle_blades_count,
|
||||
load_id=body.load_id,
|
||||
recycling_id=body.recycling_id,
|
||||
experiment_hash=body.experiment_hash
|
||||
)
|
||||
async with async_session() as session:
|
||||
session.add(new_data)
|
||||
await session.commit()
|
@ -2,7 +2,7 @@ from typing import Optional
|
||||
from sqlalchemy.future import select
|
||||
from db.db_connection import async_session
|
||||
from db.models.load_parameters_model import LoadParameters
|
||||
from network.bodies import LoadParametersBody
|
||||
from network.schemas import LoadParametersBody
|
||||
|
||||
|
||||
class LoadParametersRepository:
|
||||
|
@ -2,6 +2,7 @@ from typing import Optional
|
||||
from sqlalchemy import select
|
||||
from db.db_connection import async_session
|
||||
from db.models.recycling_parameters_model import RecyclingParameters
|
||||
from network.schemas import RecyclingParametersBody
|
||||
|
||||
|
||||
class RecyclingParametersRepository:
|
||||
@ -35,3 +36,17 @@ class RecyclingParametersRepository:
|
||||
async with async_session() as session:
|
||||
session.add(new_data)
|
||||
await session.commit()
|
||||
|
||||
@staticmethod
|
||||
async def create_from_pydantic(body: RecyclingParametersBody):
|
||||
new_data = RecyclingParameters(
|
||||
load_id=body.load_id,
|
||||
recycling_level=body.recycling_level,
|
||||
co2=body.co2,
|
||||
n2=body.n2,
|
||||
h2o=body.h2o,
|
||||
o2=body.o2
|
||||
)
|
||||
async with async_session() as session:
|
||||
session.add(new_data)
|
||||
await session.commit()
|
60
main.py
60
main.py
@ -1,23 +1,28 @@
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
from fastapi import FastAPI, HTTPException, Body, Response
|
||||
from pydantic import BaseModel
|
||||
from new_experiment_planner import run_experiment # Импортируем функцию из твоего скрипта
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from db.csv_to_db import csv_to_db
|
||||
from db.repositories.load_parameters_repos import LoadParametersRepository
|
||||
from network.bodies import LoadParametersBody
|
||||
from network.routes import ch_experimentdb_experiment_data_router, experiment_data_router, experiment_parameters_router
|
||||
from network.routes import load_parameters_router, recycling_parameters_router
|
||||
from network.schemas import *
|
||||
from new_experiment_planner import run_experiment # Импортируем функцию из твоего скрипта
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
# Модель данных для передачи параметров
|
||||
class ExperimentParameters(BaseModel):
|
||||
outer_blades_count: str
|
||||
outer_blades_length: str
|
||||
outer_blades_angle: str
|
||||
middle_blades_count: str
|
||||
load: str
|
||||
recycling: str
|
||||
|
||||
app.include_router(ch_experimentdb_experiment_data_router.router,
|
||||
prefix="/ch_experimentdb_experiment_data",
|
||||
tags=["ch_experimentdb_experiment_data"])
|
||||
app.include_router(experiment_data_router.router,
|
||||
prefix="/experiment_data",
|
||||
tags=["experiment_data"])
|
||||
app.include_router(experiment_parameters_router.router,
|
||||
prefix="/experiment_parameters",
|
||||
tags=["experiment_parameters"])
|
||||
app.include_router(load_parameters_router.router,
|
||||
prefix="/load_parameters",
|
||||
tags=["load_parameters"])
|
||||
app.include_router(recycling_parameters_router.router,
|
||||
prefix="/recycling_parameters",
|
||||
tags=["recycling_parameters"])
|
||||
|
||||
|
||||
# Эндпоинт для запуска эксперимента
|
||||
@ -38,6 +43,7 @@ def run_experiment_api(params: ExperimentParameters):
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
||||
|
||||
|
||||
# эндпоинт инициализации бд из csv файлов
|
||||
@app.get('/init_db_data')
|
||||
def init_db_data():
|
||||
try:
|
||||
@ -45,27 +51,3 @@ def init_db_data():
|
||||
return {"status": "success", "message": "База данных инициализирована. Данные из файлов csv успешно добавлены."}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
||||
|
||||
|
||||
@app.post('/create')
|
||||
async def test(data: LoadParametersBody):
|
||||
try:
|
||||
await LoadParametersRepository.create_from_pydantic(data)
|
||||
|
||||
return {"status": "success", "message": "Новая запись успешно добавлена"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
||||
|
||||
|
||||
@app.get('/all')
|
||||
async def get_all():
|
||||
try:
|
||||
result = await LoadParametersRepository.get_all()
|
||||
|
||||
if result is not None:
|
||||
# return {"status": "success", "data": [LoadParametersBody.model_validate(param) for param in result]}
|
||||
return result
|
||||
else:
|
||||
return {"status": "success", "message":"result is not None"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
@ -1,9 +0,0 @@
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
|
||||
class LoadParametersBody(BaseModel):
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
load:int
|
||||
primary_air_consumption: float
|
||||
secondary_air_consumption:float
|
||||
gas_inlet_consumption:float
|
28
network/routes/ch_experimentdb_experiment_data_router.py
Normal file
28
network/routes/ch_experimentdb_experiment_data_router.py
Normal file
@ -0,0 +1,28 @@
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from db.repositories.ch_experimentdb_experiment_data_repos import ChExperimentDBExperimentDataRepository
|
||||
from network.schemas import ChExperimentDBExperimentDataBody
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post('/create')
|
||||
async def test(data: ChExperimentDBExperimentDataBody):
|
||||
try:
|
||||
await ChExperimentDBExperimentDataRepository.create_from_pydantic(data)
|
||||
|
||||
return {"message": "Новая запись <ChExperimentDBExperimentData> успешно добавлена"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
||||
|
||||
|
||||
@router.get('/all')
|
||||
async def get_all():
|
||||
try:
|
||||
result = await ChExperimentDBExperimentDataRepository.get_all()
|
||||
|
||||
if result is not None:
|
||||
return result
|
||||
else:
|
||||
return {"message": "Нет записей в <ChExperimentDBExperimentData>, либо произошла непредвиденная ошибка"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
29
network/routes/experiment_data_router.py
Normal file
29
network/routes/experiment_data_router.py
Normal file
@ -0,0 +1,29 @@
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from db.repositories.experiment_data_repos import ExperimentDataRepository
|
||||
from network.schemas import ExperimentDataBody
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post('/create')
|
||||
async def test(data: ExperimentDataBody):
|
||||
try:
|
||||
await ExperimentDataRepository.create_from_pydantic(data)
|
||||
|
||||
return {"message": "Новая запись <ExperimentData> успешно добавлена"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
||||
|
||||
|
||||
@router.get('/all')
|
||||
async def get_all():
|
||||
try:
|
||||
result = await ExperimentDataRepository.get_all()
|
||||
|
||||
if result is not None:
|
||||
# return {"status": "success", "data": [LoadParametersBody.model_validate(param) for param in result]}
|
||||
return result
|
||||
else:
|
||||
return {"message": "Нет записей в <ExperimentData>, либо произошла непредвиденная ошибка"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
25
network/routes/experiment_parameters_router.py
Normal file
25
network/routes/experiment_parameters_router.py
Normal file
@ -0,0 +1,25 @@
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from db.repositories.experiment_parameters_repos import ExperimentParametersRepository
|
||||
from network.schemas import ExperimentParametersBody
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.post('/create')
|
||||
async def create_experiment_parameters(data: ExperimentParametersBody):
|
||||
try:
|
||||
await ExperimentParametersRepository.create_from_pydantic(data)
|
||||
return {"message": "Новая запись <ExperimentParameters> успешно добавлена"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
||||
|
||||
@router.get('/all')
|
||||
async def get_all_experiment_parameters():
|
||||
try:
|
||||
result = await ExperimentParametersRepository.get_all()
|
||||
|
||||
if result:
|
||||
return result
|
||||
else:
|
||||
return {"message": "Нет записей в <ExperimentParameters>, либо произошла непредвиденная ошибка"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
29
network/routes/load_parameters_router.py
Normal file
29
network/routes/load_parameters_router.py
Normal file
@ -0,0 +1,29 @@
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from db.repositories.load_parameters_repos import LoadParametersRepository
|
||||
from network.schemas import LoadParametersBody
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post('/create')
|
||||
async def test(data: LoadParametersBody):
|
||||
try:
|
||||
await LoadParametersRepository.create_from_pydantic(data)
|
||||
|
||||
return {"message": "Новая запись <LoadParameters> успешно добавлена"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
||||
|
||||
|
||||
@router.get('/all')
|
||||
async def get_all():
|
||||
try:
|
||||
result = await LoadParametersRepository.get_all()
|
||||
|
||||
if result is not None:
|
||||
# return {"status": "success", "data": [LoadParametersBody.model_validate(param) for param in result]}
|
||||
return result
|
||||
else:
|
||||
return {"message": "Нет записей в <LoadParameters>, либо произошла непредвиденная ошибка"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
25
network/routes/recycling_parameters_router.py
Normal file
25
network/routes/recycling_parameters_router.py
Normal file
@ -0,0 +1,25 @@
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from db.repositories.recycling_parameters_repos import RecyclingParametersRepository
|
||||
from network.schemas import RecyclingParametersBody
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@router.post('/create')
|
||||
async def create_recycling_parameters(data: RecyclingParametersBody):
|
||||
try:
|
||||
await RecyclingParametersRepository.create_from_pydantic(data)
|
||||
return {"message": "Новая запись <RecyclingParameters> успешно добавлена"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
||||
|
||||
@router.get('/all')
|
||||
async def get_all_recycling_parameters():
|
||||
try:
|
||||
result = await RecyclingParametersRepository.get_all()
|
||||
|
||||
if result:
|
||||
return result
|
||||
else:
|
||||
return {"message": "Нет записей в <RecyclingParameters>, либо произошла непредвиденная ошибка"}
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}")
|
65
network/schemas.py
Normal file
65
network/schemas.py
Normal file
@ -0,0 +1,65 @@
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, ConfigDict
|
||||
|
||||
|
||||
# Модель данных для передачи параметров
|
||||
class ExperimentParameters(BaseModel):
|
||||
outer_blades_count: str
|
||||
outer_blades_length: str
|
||||
outer_blades_angle: str
|
||||
middle_blades_count: str
|
||||
load: str
|
||||
recycling: str
|
||||
|
||||
|
||||
class ChExperimentDBExperimentDataBody(BaseModel):
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
volume: float
|
||||
nitrogen_oxide_emission: float
|
||||
temperature: float
|
||||
co_fraction: float
|
||||
co2_fraction: float
|
||||
x: float
|
||||
y: float
|
||||
z: float
|
||||
file_id: str
|
||||
|
||||
|
||||
class ExperimentDataBody(BaseModel):
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
direction: float
|
||||
temperature: float
|
||||
nox: float
|
||||
co2: float
|
||||
co: float
|
||||
file_id: Optional[str]
|
||||
|
||||
|
||||
class ExperimentParametersBody(BaseModel):
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
outer_blades_count: int
|
||||
outer_blades_length: float
|
||||
outer_blades_angle: float
|
||||
middle_blades_count: int
|
||||
load_id: Optional[int]
|
||||
recycling_id: Optional[int]
|
||||
experiment_hash: str
|
||||
|
||||
|
||||
class LoadParametersBody(BaseModel):
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
load: int
|
||||
primary_air_consumption: float
|
||||
secondary_air_consumption: float
|
||||
gas_inlet_consumption: float
|
||||
|
||||
|
||||
class RecyclingParametersBody(BaseModel):
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
load_id: Optional[int]
|
||||
recycling_level: int
|
||||
co2: float
|
||||
n2: float
|
||||
h2o: float
|
||||
o2: float
|
Loading…
Reference in New Issue
Block a user