New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Faster INSERT methods #497
Comments
This is the result from a test run using the referenced FAQ Python: 3.8.10 (default, Jun 22 2022, 20:18:18) [GCC 9.4.0]
sqlalchemy v1.4.27 (future=True)
SQLA ORM:
Total time for 10 records 44.846 secs
SQLA ORM pk given:
Total time for 10 records 37.176 secs
SQLA ORM-Enabled INSERT:
Total time for 10 records 42.995 secs
SQLA ORM bulk_save_objects:
Total time for 10 records 22.663 secs
SQLA ORM bulk_save_objects, return_defaults:
Total time for 10 records 48.917 secs
SQLA ORM bulk_insert_mappings:
Total time for 10 records 48.129 secs
SQLA ORM bulk_insert_mappings, return_defaults:
Total time for 10 records 22.078 secs
SQLA Core:
Total time for 10 records 48.751 secs
BQ Client:
Total time for 10 records 0.727 secs This is the code used to test the different possibilities # -*- coding: utf-8 -*-
from datetime import datetime
from pprint import pprint
from typing import Dict, List
from uuid import uuid4
import contextlib
import time
import sys
from google.cloud.bigquery import Client, Table
from google.api_core import retry
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from sqlalchemy import __version__, Column, DateTime, String, create_engine, insert
def make_str_id() -> str:
return uuid4().hex
Base = declarative_base()
class Customer(Base):
__tablename__ = "customer"
id: str = Column('id', String, primary_key=True, default=make_str_id)
name: str = Column('name', String)
created_at: datetime = Column('created_at', DateTime, default=datetime.now)
__mapper_args__ = {
"version_id_col": created_at,
"version_id_generator": datetime.now,
"eager_defaults": True
}
@contextlib.contextmanager
def sqlalchemy_session(project: str, future: bool):
uri = f'bigquery://{project}/sandBox'
engine = create_engine(uri, future=True)
Base.metadata.create_all(engine, checkfirst=True)
session = Session(
bind=engine, future=future,
autoflush=False, expire_on_commit=False
)
yield session
session.close()
Base.metadata.drop_all(engine, checkfirst=True)
engine.dispose()
def print_result(name, nrows, seconds):
print(f"{name}:\n{' '*10}Total time for {nrows} records {seconds:.3f} secs\n")
def test_sqlalchemy_orm(project: str, n=100000, future=True):
with sqlalchemy_session(project, future) as session:
t0 = time.time()
for i in range(n):
customer = Customer()
customer.name = "NAME " + str(i)
session.add(customer)
if i % 1000 == 0:
session.flush()
session.commit()
print_result("SQLA ORM", n, time.time() - t0)
def test_sqlalchemy_orm_pk_given(project: str, n=100000, future=True):
with sqlalchemy_session(project, future) as session:
t0 = time.time()
for i in range(n):
customer = Customer(id=make_str_id(), name="NAME " + str(i))
session.add(customer)
if i % 1000 == 0:
session.flush()
session.commit()
print_result("SQLA ORM pk given", n, time.time() - t0)
def test_sqlalchemy_orm_enabled_insert(project: str, n=100000, future=True):
with sqlalchemy_session(project, future) as session:
t0 = time.time()
session.execute(
insert(Customer),
[
{"name": f"NAME{i}"}
for i in range(n)
]
)
session.commit()
print_result("SQLA ORM-Enabled INSERT", n, time.time() - t0)
def test_sqlalchemy_orm_bulk_save_objects(project: str, n=100000, future=True, return_defaults=False):
with sqlalchemy_session(project, future) as session:
t0 = time.time()
for chunk in range(0, n, 10000):
session.bulk_save_objects(
[
Customer(name="NAME " + str(i))
for i in range(chunk, min(chunk + 10000, n))
],
return_defaults=return_defaults,
)
session.commit()
print_result(
f"SQLA ORM bulk_save_objects{', return_defaults' if return_defaults else ''}",
n,
time.time() - t0,
)
def test_sqlalchemy_orm_bulk_insert(project: str, n=100000, future=True, return_defaults=False):
with sqlalchemy_session(project, future) as session:
t0 = time.time()
for chunk in range(0, n, 10000):
session.bulk_insert_mappings(
Customer,
[
dict(name="NAME " + str(i))
for i in range(chunk, min(chunk + 10000, n))
],
return_defaults=return_defaults,
)
session.commit()
print_result(
f"SQLA ORM bulk_insert_mappings{', return_defaults' if return_defaults else ''}",
n,
time.time() - t0,
)
def test_sqlalchemy_core(project: str, n=100000, future=True):
with sqlalchemy_session(project, future) as session:
with session.bind.begin() as conn:
t0 = time.time()
conn.execute(
insert(Customer.__table__),
[{"name": "NAME " + str(i)} for i in range(n)],
)
conn.commit()
print_result("SQLA Core", n, time.time() - t0)
@contextlib.contextmanager
def bigquery_client(project: str) -> Client:
schema = [
{"name": "id",
"type": "STRING",
"mode":"REQUIRED"},
{"name": "name",
"type": "STRING",
"mode":"NULLABLE"},
{"name": "created_at",
"type": "DATETIME",
"mode":"NULLABLE"},
]
table_path = f"{project}.sandBox.{Customer.__tablename__}"
with Client(project) as client:
customer = Table(table_path, schema)
client.create_table(customer, exists_ok=True)
yield client
client.delete_table(table_path)
def test_bigquery(project: str, n=100000):
retry_ = retry.Retry((lambda e: True), 0.2, 10., 5, 10.)
with bigquery_client(project) as client:
client: Client
t0 = time.time()
rows = [
{"id": make_str_id(),
"name": "NAME " + str(i),
"created_at": datetime.now()}
for i in range(n)
]
table_path = f"{project}.sandBox.{Customer.__tablename__}"
customer = client.get_table(table_path)
client.insert_rows(customer, rows, retry=retry_)
print_result("BQ Client", n, time.time() - t0)
if __name__ == "__main__":
rows = 10
_future = True
project_id = 'YOUR_PROJECT_ID'
print(f"Python: {' '.join(sys.version.splitlines())}")
print(f"sqlalchemy v{__version__} (future={_future})\n")
test_sqlalchemy_orm(project_id, rows, _future)
test_sqlalchemy_orm_pk_given(project_id, rows, _future)
test_sqlalchemy_orm_enabled_insert(project_id, rows, _future)
test_sqlalchemy_orm_bulk_save_objects(project_id, rows, _future)
test_sqlalchemy_orm_bulk_save_objects(project_id, rows, _future, True)
test_sqlalchemy_orm_bulk_insert(project_id, rows, _future)
test_sqlalchemy_orm_bulk_insert(project_id, rows, _future, True)
test_sqlalchemy_core(project_id, rows, _future)
test_bigquery(project_id, rows) |
I am in the same boat where I have to operate on large datasets and use SQLalchemy to format my BigQuery queries. Since I also use pandas when locally operating on my data, I find I have to use a mixture of these functions and modules for the best performance: Upload:
Download:
So it appears all these methods (mostly) bypass using raw SQL data streams and instead convert data to compressed binary files for bulk data transfer. It would be convenient if all this could be accomplished via python-bigquery-sqlalchemy, but I am not sure if SQLalchemy supports underlying dialects using their own "conversion engines" (lets call it that). I do see in the documentation for this library, there is a way to supply your own custom BQ client: Maybe there is a way to configure this client to use the faster interact methods/clients to interact with BiqQuery? |
BQ's high throughput ingestion is entirely divorced from the query engine, so mapping it into sqlalchemy already feels unusual. There's work unrelated to sqlalchemy to improve ingestion, but that will take time to propagate into other consumers like this project and other connectors/tools if it makes sense. |
What do the first two sentences mean, and what is the logical connection between them? I am also trying to understand why the throughput is so low. With such bad performance, this makes the library completely non-viable for most use cases. It's very hard to think of a production use case where an average of one minute to insert 10 / 20 rows into a table would be acceptable. |
The BigQuery Write API supports high throughput ingestion, but it is not exposed as part of the SQL query surface. This is a case of SQLAlchemy not being the right tool for an ingestion-based workload into BigQuery. |
Is your feature request related to a problem? Please describe.
I need to work with large amounts of rows on an ETL service and wanted to use SQLAlchemy ORM backed by BigQuery dialect to write more pythonic code, but it is extremely slow to upload more than tens of rows.
Describe the solution you'd like
Be able to use ORM-Enabled INSERT like the SQLAlchemy docs suggest, or at least the soon-to-be legacy
Session.bulk_insert_mappings
and have better performanceDescribe alternatives you've considered
I've tried using all methods provided by this SQLA FAQ and none could be faster than about 900 seconds for a minimum of 200 rows.
Aditional context
While the examples shows that methods like the bulk_insert_mappings could do 0.4 seconds for hundreds of thousands of rows, using BigQuery dialect I've faced 2 h 30min to upload about 4k rows. I don't expect that BQ could reach anything close to the 0.4 seconds for 100k rows, but something around 5, 6 minutes I do believe is possible, since it is the average time the BigQuery client from
google.cloud.bigquery
can do for an upload of about 90k rows.The text was updated successfully, but these errors were encountered: