Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I would love to snowflake to support supports_multivalues_insert #55

Closed
wxianxin opened this issue Aug 14, 2018 · 10 comments
Closed

I would love to snowflake to support supports_multivalues_insert #55

wxianxin opened this issue Aug 14, 2018 · 10 comments
Assignees
Labels

Comments

@wxianxin
Copy link

Please answer these questions before submitting your issue. Thanks!

  1. What version of Python are you using (python --version)?
    python 3.7

  2. What operating system and processor architecture are you using (python -c 'import platform; print(platform.platform())')?
    Mac OS. X86

  3. What are the component versions in the environment (pip list)?
    All latest as of today: 20180814

  4. What did you do?

    dialect = getattr(connection, 'dialect', None)
    if dialect and getattr(dialect, 'supports_multivalues_insert', False):
        print('YESSSS!!!!!!!!!!!!!!!!!')
    else:
        print('NOOOOOOO!')

I got No. There is no support for supports_multivalues_insert. I feel heartbroken.

A complete runnable program is good.

  1. What did you expect to see?
    Add support for pandas sql to fast upload dataframe to snowflake table.

  2. What did you see instead?
    Slow ORM unit of work.

  3. Can you set logging to DEBUG and collect the logs?

Not necessary

@smtakeda
Copy link
Contributor

I think Snowflake supports it but the flag is not enabled for some reason. Trying add the flag.

@smtakeda smtakeda self-assigned this Aug 22, 2018
@smtakeda
Copy link
Contributor

smtakeda commented Aug 22, 2018

@wxianxin can you please educate how this flag impacts the functionality? I'm mostly sure I can add

supports_multivalues_insert = True

to the base class, but I'm not clear what test case I should add to verify the impact.

In my understanding, this flag indicates INSERT statement can have multiple values, e.g.,

INSERT INTO t VALUES(1,'test1'),(2,'test2');

My question is how SQLAlchemy transforms the objects into the above multi-values INSERT statement.

@wxianxin
Copy link
Author

@smtakeda Thanks for your attention. I am afraid that the answer to your question is: I don't know. I am not familiar with the design of SQLAlchemy.

Why do I raise this issue? I am just a pandas and snowflake user. I often need to upload large dataframe to snowflake. Ideally I hope to use pandas.Dataframe.to_sql(). However it is slow and sometimes not responsive if the dataframe is too large.

I have been tracking the development of pandas and I think they will add a feature to speed up the upload. Which is related to supports_multivalues_insert. The feature is included in pandas 0.23.0 but it is reverted shortly. I think they are aiming for 0.24.0.

I can point you to related issues in pandas repo.
pandas-dev/pandas#21401
pandas-dev/pandas#8953

For the flag, I think at one time I see it used in pandas to check whether the dialect supports this feature and behave accordingly. But now I can't find the specific code anymore. Maybe it doesn't really matter.

As I am not an expert in this, feel free to close this issue in any way you see fit. Thanks!

@smtakeda
Copy link
Contributor

Actually to_sql already uses the multi-value insert in executemany in Python connector.

Can you try adding the following two lines before the connection?

import snowflake.connector
snowflake.connector.paramstyle = 'qmark'

For example:

import snowflake.connector
snowflake.connector.paramstyle = 'qmark'

import pandas as pd
from snowflake.sqlalchemy import URL

engine = create_engine(URL(
    account='...',
   ...
))
con = engine.connect()
try:
    con.execute("create or replace table bulkload_test(c1 number, c2 string)")
    c1data = []
    c2data = []
    for id in range(1, 1000000):
        c1data.append(id)
        c2data.append('test' + str(id) + 'test' + str(id))
    d={'c1': c1data, 'c2': c2data}
    df = pd.DataFrame(data=d)
    df.to_sql("bulkload_test", con, if_exists='append', index=False, index_label=None)
finally:
    con.close()
    engine.dispose()

parmstyle='qmark' will use ? for server side binding instead of %s for client side binding.
In the above example, this SQL is executed with the data binding:

INSERT INTO bulkload_test (c1, c2) VALUES (?, ?)

instead of:

INSERT INTO bulkload_test (c1, c2) VALUES (1, 'test1test1'),(2, 'test2test2'),(3, 'test3test3'),(4, 'test4test4'),(5, 'test5test5'),(6, 'test6test6'),(7, 'test7test7'),(8, 'test8test8'),(9, 'test9test9'),(10, 'test10test10'),(11, 'test11test11'),(12, 'test12test12'),(13, 'test13test13'),(14, 'test14test14'),(15, 'test15test15'),(16, 'test16test16'),(17, 'test17test17'),(18, 'test18test18'),(19, 'test19test19'),(20, 'test20test20'),(21, 'test21test21'),(22, 'test22test22')...

@wxianxin
Copy link
Author

wxianxin commented Aug 22, 2018

Thanks. I have noticed 25% improvement in speed.
I would be glad to know if there is any additional way to improve the speed.
It is still kind of slow. My dataframes are like 200+ columns and 100,000 rows. I have never successfully uploaded the whole dataframe using to_sql. python would just hang. I don't if it is because of the server limitation. I am able to upload dataframe with 200 columns and 10,000 rows though.

@smtakeda
Copy link
Contributor

In terms of performance, there is no much we can do at the moment. We are experimenting a scalable bulk bind but the performance is not good enough to enable for general purpose.

About the hang, can you please file a ticket to the support including the log?. This code snippet can generate python_connector.log:

import logging

for logger_name in ['snowflake.connector','botocore']:
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.FileHandler('python_connector.log')
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)

@wxianxin
Copy link
Author

wxianxin commented Aug 24, 2018

Thanks for the update.

I did 3 test run. I will separate the test info multiple messages:

  1. Successfully uploaded 200 (row) by 200 (column) dataframe in 6 seconds.

  2. Successfully uploaded 10000 by 200 dataframe in 78.4 seconds.

  3. Failed to uploaded 20000 by 200 dataframe and I killed my python process at 10 minutes mark.

  • The system is responsive, I am able to stop python by KeyboardInterrupt.

  • The ram usage is very high, the highest in the system:
    image

  • I will add my code in the next message.

  • Log file for successfully uploaded 200 by 200 dateframe in the 3rd message.

  • Log file for failed upload 20000 by 200 dateframe in the 4th message.

  • I replaced the usename and warehouse information with dummy values in the log file.

My company's IT security stuff is pretty complicated. That could also contribute to the failure. So this is just for your reference.
Have a good weekend!

@wxianxin
Copy link
Author

import yaml
import pandas as pd
from time import time
import teradata
from sqlalchemy import create_engine
import snowflake.connector
snowflake.connector.paramstyle = 'qmark'


def to_sql_df(df: pd.DataFrame, table_name: str, username: str, password: str, db: str, schema: str, warehouse: str='CMCL_Q_BUS_ANALYTICAL', timeout: int=10):
    """Wrapper function to upload pandas.DataFrame to snowflake tables
    Args:
        df (pd.DataFrame): Pandas.DataFrame to be uploaded to snowflake.
        table_name (str): Table name in snowflake.
        username (str): Snowflake username.
        password (str): Snowflake password.
        db (str): Snowflake database.
        schema (str): Snowflake schema.
        warehouse (str): Snowflake warehouse.
        timeout (int): Connection timeout value.

    Returns:
        None

    """
    start = time()
    engine = create_engine(
        f'snowflake://{username}:{password}@prod.us-east-1.capitalone/{db}/{schema}?warehouse={warehouse}',
        connect_args={'connect_timeout': timeout}
    )
    connection = engine.connect()
    print("Connected to snowflake.\nUploading data...")
    df.to_sql(table_name, connection, index=False)
    connection.close()
    engine.dispose()
    print('Snowflake time: ' + '{0:.1f}s'.format(time() - start))

if __name__ == '__main__':
    import logging

    for logger_name in ['snowflake.connector','botocore']:
        logger = logging.getLogger(logger_name)
        logger.setLevel(logging.DEBUG)
        ch = logging.FileHandler('python_connector.log')
        ch.setLevel(logging.DEBUG)
        ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
        logger.addHandler(ch)

    print('test')
    import numpy as np
    import os
    df = pd.DataFrame([np.random.random(200) for x in range(20000)])

    to_sql_df(df, 'test_table', os.environ['USER'], os.environ['PASSWORD'], 'xxx', 'xxxx', 'xxxxxx')

@smtakeda
Copy link
Contributor

@wxianxin thanks for the perf test. will look into it when I have a chance.

I would recommend to communicate with the support especially if we need to share the logs. We do our best not to include sensitive information in the log but cannot guarantee it never happen at any point of time. It would be safer to upload logs directly to the support ticket. Thanks.

@wxianxin
Copy link
Author

I agree. Thanks for the help!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants