PIbd-42_SSPR/db/csv_to_db.py

84 lines
2.7 KiB
Python
Raw Normal View History

2024-10-14 16:57:57 +04:00
import pandas as pd
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy import insert
from settings import settings
from db.models.base import Base
from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData
from db.models.experiment_data_model import ExperimentData
from db.models.experiment_parameters_model import ExperimentParameters
from db.models.load_parameters_model import LoadParameters
from db.models.recycling_parameters_model import RecyclingParameters
import asyncio
2024-10-14 16:57:57 +04:00
engine = create_async_engine(url=settings.db_url_asyncpg_docker, echo=True)
2024-10-14 16:57:57 +04:00
async_session = async_sessionmaker(engine)
2024-10-14 16:57:57 +04:00
def add_ids_in_csv(file: str):
try:
df = pd.read_csv(file)
df.insert(0, 'id', pd.Series(range(1, len(d) + 1)))
df.to_csv(file, index=False)
except Exception as e:
print(f'Exception!! {e}')
def print_headers_and_types(file: str):
df = pd.read_csv(file)
headers = df.columns.tolist()
print(headers)
for header in headers:
column_type = df[header].dtype
print(column_type)
async def create_all_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def drop_all_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
async def load_data_to_db(file: str, model_class):
async with async_session() as session:
df = pd.read_csv(file).dropna()
# Преобразование данных из DataFrame в формат, подходящий для SQLAlchemy
data_records = df.to_dict(orient='records')
# Пакетная вставка всех записей
stmt = insert(model_class).values(data_records)
await session.execute(stmt)
await session.commit()
async def test(file: str, model_class):
async with async_session() as session:
df = pd.read_csv(file)
df.dropna(inplace=True)
df = df.head(20)
data_records = df.to_dict(orient='records')
stmt = insert(model_class).values(data_records)
await session.execute(stmt)
await session.commit()
async def csv_to_db():
await drop_all_tables()
await create_all_tables()
await load_data_to_db('./db/files/ch_experimentdb_experiment_data.csv', ChExperimentDBExperimentData)
await test('./db/files/experiment_data.csv', ExperimentData)
await load_data_to_db('./db/files/load_parameters.csv', LoadParameters)
await load_data_to_db('./db/files/recycling_parameters.csv', RecyclingParameters)
await load_data_to_db('./db/files/experiment_parameters.csv', ExperimentParameters)
# asyncio.run(csv_to_db())