import asyncio import pandas as pd from sqlalchemy import insert from db.db_connection import async_session, engine from db.models.base import Base from db.models.ch_experimentdb_experiment_data_model import ChExperimentDBExperimentData from db.models.experiment_data_model import ExperimentData from db.models.experiment_parameters_model import ExperimentParameters from db.models.load_parameters_model import LoadParameters from db.models.recycling_parameters_model import RecyclingParameters def add_ids_in_csv(file: str): try: df = pd.read_csv(file) df.insert(0, 'id', pd.Series(range(1, len(df) + 1))) df.to_csv(file, index=False) except Exception as e: print(f'Exception!! {e}') def print_headers_and_types(file: str): df = pd.read_csv(file) headers = df.columns.tolist() print(headers) for header in headers: column_type = df[header].dtype print(column_type) async def create_all_tables(): print('create_all_tables') async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) async def drop_all_tables(): print('drop_all_tables') async with engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) async def load_data_to_db(file: str, model_class): print('load_data_to_db') async with async_session() as session: df = pd.read_csv(file).dropna() # Преобразование данных из DataFrame в формат, подходящий для SQLAlchemy data_records = df.to_dict(orient='records') # Пакетная вставка всех записей stmt = insert(model_class).values(data_records) await session.execute(stmt) await session.commit() async def insert_batch(session, model_class, batch_data): stmt = insert(model_class).values(batch_data) await session.execute(stmt) await session.commit() async def test(file: str, model_class): print('test') df = pd.read_csv(file) data_records = df.to_dict(orient='records') # async with async_session() as session: # batch_size = 3000 # tasks = [] # # for i in range(0, len(data_records), batch_size): # batch_data = data_records[i:i + batch_size] # tasks.append(insert_batch(session, model_class, batch_data)) # # await asyncio.gather(*tasks) # две минуты добавляет async with async_session() as session: batch_size = 3000 print(batch_size) for i in range(0, len(data_records), batch_size): batch_data = data_records[i:i + batch_size] stmt = insert(model_class).values(batch_data) await session.execute(stmt) await session.commit() async def csv_to_db(): await drop_all_tables() await create_all_tables() await load_data_to_db('./db/files/ch_experimentdb_experiment_data.csv', ChExperimentDBExperimentData) await test('./db/files/experiment_data.csv', ExperimentData) await load_data_to_db('./db/files/load_parameters.csv', LoadParameters) await load_data_to_db('./db/files/recycling_parameters.csv', RecyclingParameters) await load_data_to_db('./db/files/experiment_parameters.csv', ExperimentParameters) # asyncio.run(csv_to_db())