From 500a13c7e6c32a5ec0e040a540e0572fcba1ebfd Mon Sep 17 00:00:00 2001 From: Inohara Date: Mon, 21 Oct 2024 14:31:39 +0400 Subject: [PATCH] =?UTF-8?q?=D0=BF=D1=8B=D1=82=D0=B0=D1=8E=D1=81=20=D1=81?= =?UTF-8?q?=D0=B4=D0=B5=D0=BB=D0=B0=D1=82=D1=8C=20=D0=B4=D0=BE=D0=B1=D0=B0?= =?UTF-8?q?=D0=B2=D0=BB=D0=B5=D0=BD=D0=B8=D0=B5=20=D0=B2=D1=81=D0=B5=D1=85?= =?UTF-8?q?=20=D0=B4=D0=B0=D0=BD=D0=BD=D1=8B=D1=85=20=D0=B8=D0=B7=20=D0=B1?= =?UTF-8?q?=D0=BE=D0=BB=D1=8C=D1=88=D0=BE=D0=B3=D0=BE=20=D1=84=D0=B0=D0=B9?= =?UTF-8?q?=D0=BB=D0=B8=D0=BA=D0=B0,=20=D0=BF=D0=BE=D0=BA=D0=B0=20=D1=8D?= =?UTF-8?q?=D1=82=D0=BE=20=D0=BB=D0=B8=D0=B1=D0=BE=20=D0=BE=D1=87=D0=B5?= =?UTF-8?q?=D0=BD=D1=8C=20=D0=B4=D0=BE=D0=BB=D0=B3=D0=BE,=20=D0=BB=D0=B8?= =?UTF-8?q?=D0=B1=D0=BE=20=D0=BD=D0=B5=20=D1=80=D0=B0=D0=B1=D0=BE=D1=82?= =?UTF-8?q?=D0=B0=D0=B5=D1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- db/csv_to_db.py | 44 ++++++++++++++++++++++++++++++++++++-------- db/db_connection.py | 3 ++- 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/db/csv_to_db.py b/db/csv_to_db.py index f613f77..155a2de 100644 --- a/db/csv_to_db.py +++ b/db/csv_to_db.py @@ -1,5 +1,7 @@ +from unittest.mock import inplace + import pandas as pd -from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession from sqlalchemy import insert from settings import settings from db.models.base import Base @@ -18,7 +20,7 @@ async_session = async_sessionmaker(engine) def add_ids_in_csv(file: str): try: df = pd.read_csv(file) - df.insert(0, 'id', pd.Series(range(1, len(d) + 1))) + df.insert(0, 'id', pd.Series(range(1, len(df) + 1))) df.to_csv(file, index=False) except Exception as e: print(f'Exception!! {e}') @@ -57,16 +59,39 @@ async def load_data_to_db(file: str, model_class): await session.commit() +async def insert_batch(session, model_class, batch_data): + stmt = insert(model_class).values(batch_data) + await session.execute(stmt) + await session.commit() + async def test(file: str, model_class): async with async_session() as session: df = pd.read_csv(file) - df.dropna(inplace=True) - df = df.head(20) - data_records = df.to_dict(orient='records') - stmt = insert(model_class).values(data_records) - await session.execute(stmt) + # df.dropna(inplace=True) + df.pop('id') + df.insert(0, 'id', pd.Series(range(1, len(df) + 1))) - await session.commit() + # df = df.head(3000) + # num_rows = df.shape[0] + # print(f"Количество строк: {num_rows}") + + tasks = [] + data_records = df.to_dict(orient='records') + batch_size = 3000 + + for i in range(0, len(data_records), batch_size): + batch_data = data_records[i:i + batch_size] + tasks.append(insert_batch(session, model_class, batch_data)) + await asyncio.gather(*tasks) + + # for i in range(0, len(data_records), batch_size): + # batch_data = data_records[i:i + batch_size] + # stmt = insert(model_class).values(batch_data) + # await session.execute(stmt) + # await session.commit() + # + # await session.execute(stmt) + # await session.commit() async def csv_to_db(): @@ -81,3 +106,6 @@ async def csv_to_db(): # asyncio.run(csv_to_db()) + + + diff --git a/db/db_connection.py b/db/db_connection.py index 3e18f44..97a2663 100644 --- a/db/db_connection.py +++ b/db/db_connection.py @@ -1,4 +1,5 @@ -from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker + from settings import settings engine = create_async_engine(url=settings.db_url_asyncpg_docker, echo=False)