diff --git a/db/csv_to_db.py b/db/csv_to_db.py index f613f77..155a2de 100644 --- a/db/csv_to_db.py +++ b/db/csv_to_db.py @@ -1,5 +1,7 @@ +from unittest.mock import inplace + import pandas as pd -from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from sqlalchemy import insert from settings import settings from db.models.base import Base @@ -18,7 +20,7 @@ async_session = async_sessionmaker(engine) def add_ids_in_csv(file: str): try: df = pd.read_csv(file) - df.insert(0, 'id', pd.Series(range(1, len(d) + 1))) + df.insert(0, 'id', pd.Series(range(1, len(df) + 1))) df.to_csv(file, index=False) except Exception as e: print(f'Exception!! {e}') @@ -57,16 +59,39 @@ async def load_data_to_db(file: str, model_class): await session.commit() +async def insert_batch(session, model_class, batch_data): + stmt = insert(model_class).values(batch_data) + await session.execute(stmt) + await session.commit() + async def test(file: str, model_class): async with async_session() as session: df = pd.read_csv(file) - df.dropna(inplace=True) - df = df.head(20) - data_records = df.to_dict(orient='records') - stmt = insert(model_class).values(data_records) - await session.execute(stmt) + # df.dropna(inplace=True) + df.pop('id') + df.insert(0, 'id', pd.Series(range(1, len(df) + 1))) - await session.commit() + # df = df.head(3000) + # num_rows = df.shape[0] + # print(f"Количество строк: {num_rows}") + + tasks = [] + data_records = df.to_dict(orient='records') + batch_size = 3000 + + for i in range(0, len(data_records), batch_size): + batch_data = data_records[i:i + batch_size] + tasks.append(insert_batch(session, model_class, batch_data)) + await asyncio.gather(*tasks) + + # for i in range(0, len(data_records), batch_size): + # batch_data = data_records[i:i + batch_size] + # stmt = insert(model_class).values(batch_data) + # await session.execute(stmt) + # await session.commit() + # + # await session.execute(stmt) + # await session.commit() async def csv_to_db(): @@ -81,3 +106,6 @@ async def csv_to_db(): # asyncio.run(csv_to_db()) + + + diff --git a/db/db_connection.py b/db/db_connection.py index 3e18f44..97a2663 100644 --- a/db/db_connection.py +++ b/db/db_connection.py @@ -1,4 +1,5 @@ -from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker + from settings import settings engine = create_async_engine(url=settings.db_url_asyncpg_docker, echo=False)