diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 3c7bb621..bc37b498 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -148,7 +148,7 @@ Running System Tests .. note:: - System tests are only configured to run under Python 3.8 and 3.9. + System tests are only configured to run under Python 3.7, 3.8 and 3.9. For expediency, we do not run them in older versions of Python 3. This alone will not run the tests. You'll need to change some local diff --git a/ci/requirements-3.7-0.23.2.conda b/ci/requirements-3.7-0.23.2.conda index af4768ab..1da6d226 100644 --- a/ci/requirements-3.7-0.23.2.conda +++ b/ci/requirements-3.7-0.23.2.conda @@ -2,8 +2,9 @@ codecov coverage fastavro flake8 -numpy==1.14.5 +numpy==1.16.6 google-cloud-bigquery==1.11.1 +pyarrow==3.0.0 pydata-google-auth pytest pytest-cov diff --git a/ci/requirements-3.9-NIGHTLY.conda b/ci/requirements-3.9-NIGHTLY.conda index 9dfe3f6b..ccaa87e5 100644 --- a/ci/requirements-3.9-NIGHTLY.conda +++ b/ci/requirements-3.9-NIGHTLY.conda @@ -1,6 +1,7 @@ pydata-google-auth google-cloud-bigquery google-cloud-bigquery-storage +pyarrow pytest pytest-cov codecov diff --git a/docs/install.rst b/docs/install.rst index 6810a44a..9887c799 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -29,7 +29,7 @@ Install from Source .. code-block:: shell - $ pip install git+https://github.com/pydata/pandas-gbq.git + $ pip install git+https://github.com/googleapis/python-bigquery-pandas.git Dependencies @@ -38,6 +38,7 @@ Dependencies This module requires following additional dependencies: - `pydata-google-auth `__: Helpers for authentication to Google's API +- `pyarrow `__: Format for getting data to/from Google BigQuery - `google-auth `__: authentication and authorization for Google's API - `google-auth-oauthlib `__: integration with `oauthlib `__ for end-user authentication - `google-cloud-bigquery `__: Google Cloud client library for BigQuery diff --git a/noxfile.py b/noxfile.py index 2542369d..ed88b094 100644 --- a/noxfile.py +++ b/noxfile.py @@ -28,7 +28,7 @@ BLACK_PATHS = ["docs", "pandas_gbq", "tests", "noxfile.py", "setup.py"] DEFAULT_PYTHON_VERSION = "3.8" -SYSTEM_TEST_PYTHON_VERSIONS = ["3.8", "3.9"] +SYSTEM_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9"] UNIT_TEST_PYTHON_VERSIONS = ["3.7", "3.8", "3.9"] CURRENT_DIRECTORY = pathlib.Path(__file__).parent.absolute() diff --git a/owlbot.py b/owlbot.py index d382cf66..76a17e40 100644 --- a/owlbot.py +++ b/owlbot.py @@ -31,7 +31,7 @@ extras = ["tqdm"] templated_files = common.py_library( unit_test_python_versions=["3.7", "3.8", "3.9"], - system_test_python_versions=["3.8", "3.9"], + system_test_python_versions=["3.7", "3.8", "3.9"], cov_level=86, unit_test_extras=extras, system_test_extras=extras, diff --git a/pandas_gbq/exceptions.py b/pandas_gbq/exceptions.py index aec0ea1a..1b4f6925 100644 --- a/pandas_gbq/exceptions.py +++ b/pandas_gbq/exceptions.py @@ -3,12 +3,23 @@ # license that can be found in the LICENSE file. +class GenericGBQException(ValueError): + """ + Raised when an unrecognized Google API Error occurs. + """ + + class AccessDenied(ValueError): """ Raised when invalid credentials are provided, or tokens have expired. """ - pass + +class ConversionError(GenericGBQException): + """ + Raised when there is a problem converting the DataFrame to a format + required to upload it to BigQuery. + """ class InvalidPrivateKeyFormat(ValueError): @@ -16,8 +27,6 @@ class InvalidPrivateKeyFormat(ValueError): Raised when provided private key has invalid format. """ - pass - class PerformanceWarning(RuntimeWarning): """ diff --git a/pandas_gbq/features.py b/pandas_gbq/features.py index ef1969fd..fc8ef568 100644 --- a/pandas_gbq/features.py +++ b/pandas_gbq/features.py @@ -10,6 +10,7 @@ BIGQUERY_BQSTORAGE_VERSION = "1.24.0" BIGQUERY_FROM_DATAFRAME_CSV_VERSION = "2.6.0" PANDAS_VERBOSITY_DEPRECATION_VERSION = "0.23.0" +PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION = "1.1.0" class Features: @@ -89,5 +90,14 @@ def pandas_has_deprecated_verbose(self): ) return self.pandas_installed_version >= pandas_verbosity_deprecation + @property + def pandas_has_parquet_with_lossless_timestamp(self): + import pkg_resources + + desired_version = pkg_resources.parse_version( + PANDAS_PARQUET_LOSSLESS_TIMESTAMP_VERSION + ) + return self.pandas_installed_version >= desired_version + FEATURES = Features() diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 856c1285..5c6ae457 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -18,8 +18,11 @@ bigquery = None google_exceptions = None -from pandas_gbq.exceptions import AccessDenied -from pandas_gbq.exceptions import PerformanceWarning +from pandas_gbq.exceptions import ( + AccessDenied, + GenericGBQException, + PerformanceWarning, +) from pandas_gbq import features from pandas_gbq.features import FEATURES import pandas_gbq.schema @@ -69,14 +72,6 @@ class DatasetCreationError(ValueError): pass -class GenericGBQException(ValueError): - """ - Raised when an unrecognized Google API Error occurs. - """ - - pass - - class InvalidColumnOrder(ValueError): """ Raised when the provided column order for output @@ -520,7 +515,7 @@ def _download_results( df = rows_iter.to_dataframe( dtypes=conversion_dtypes, progress_bar_type=progress_bar_type, - **to_dataframe_kwargs + **to_dataframe_kwargs, ) except self.http_error as ex: self.process_http_error(ex) @@ -541,6 +536,7 @@ def load_data( chunksize=None, schema=None, progress_bar=True, + api_method: str = "load_parquet", ): from pandas_gbq import load @@ -554,6 +550,7 @@ def load_data( chunksize=chunksize, schema=schema, location=self.location, + api_method=api_method, ) if progress_bar and tqdm: chunks = tqdm.tqdm(chunks) @@ -876,6 +873,7 @@ def to_gbq( location=None, progress_bar=True, credentials=None, + api_method: str = "default", verbose=None, private_key=None, ): @@ -964,6 +962,12 @@ def to_gbq( :class:`google.oauth2.service_account.Credentials` directly. .. versionadded:: 0.8.0 + api_method : str, optional + API method used to upload DataFrame to BigQuery. One of "load_parquet", + "load_csv". Default "load_parquet" if pandas is version 1.1.0+, + otherwise "load_csv". + + .. versionadded:: 0.16.0 verbose : bool, deprecated Deprecated in Pandas-GBQ 0.4.0. Use the `logging module to adjust verbosity instead @@ -988,6 +992,28 @@ def to_gbq( stacklevel=1, ) + if api_method == "default": + # Avoid using parquet if pandas doesn't support lossless conversions to + # parquet timestamp. See: https://stackoverflow.com/a/69758676/101923 + if FEATURES.pandas_has_parquet_with_lossless_timestamp: + api_method = "load_parquet" + else: + api_method = "load_csv" + + if chunksize is not None: + if api_method == "load_parquet": + warnings.warn( + "chunksize is ignored when using api_method='load_parquet'", + DeprecationWarning, + stacklevel=2, + ) + elif api_method == "load_csv": + warnings.warn( + "chunksize will be ignored when using api_method='load_csv' in a future version of pandas-gbq", + PendingDeprecationWarning, + stacklevel=2, + ) + if if_exists not in ("fail", "replace", "append"): raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) @@ -1071,6 +1097,7 @@ def to_gbq( chunksize=chunksize, schema=table_schema, progress_bar=progress_bar, + api_method=api_method, ) diff --git a/pandas_gbq/load.py b/pandas_gbq/load.py index faa674c2..69210e41 100644 --- a/pandas_gbq/load.py +++ b/pandas_gbq/load.py @@ -5,9 +5,13 @@ """Helper methods for loading data into BigQuery""" import io +from typing import Any, Callable, Dict, List, Optional +import pandas +import pyarrow.lib from google.cloud import bigquery +from pandas_gbq import exceptions from pandas_gbq.features import FEATURES import pandas_gbq.schema @@ -52,45 +56,126 @@ def split_dataframe(dataframe, chunksize=None): yield remaining_rows, chunk -def load_chunks( - client, - dataframe, - destination_table_ref, - chunksize=None, - schema=None, - location=None, +def load_parquet( + client: bigquery.Client, + dataframe: pandas.DataFrame, + destination_table_ref: bigquery.TableReference, + location: Optional[str], + schema: Optional[Dict[str, Any]], ): job_config = bigquery.LoadJobConfig() job_config.write_disposition = "WRITE_APPEND" - job_config.source_format = "CSV" - job_config.allow_quoted_newlines = True + job_config.source_format = "PARQUET" - # Explicit schema? Use that! if schema is not None: schema = pandas_gbq.schema.remove_policy_tags(schema) job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) - # If not, let BigQuery determine schema unless we are encoding the CSV files ourselves. - elif not FEATURES.bigquery_has_from_dataframe_with_csv: - schema = pandas_gbq.schema.generate_bq_schema(dataframe) - schema = pandas_gbq.schema.remove_policy_tags(schema) - job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) + try: + client.load_table_from_dataframe( + dataframe, destination_table_ref, job_config=job_config, location=location, + ).result() + except pyarrow.lib.ArrowInvalid as exc: + raise exceptions.ConversionError( + "Could not convert DataFrame to Parquet." + ) from exc + + +def load_csv( + dataframe: pandas.DataFrame, + chunksize: Optional[int], + bq_schema: Optional[List[bigquery.SchemaField]], + load_chunk: Callable, +): + job_config = bigquery.LoadJobConfig() + job_config.write_disposition = "WRITE_APPEND" + job_config.source_format = "CSV" + job_config.allow_quoted_newlines = True + + if bq_schema is not None: + job_config.schema = bq_schema + + # TODO: Remove chunking feature for load jobs. Deprecated in 0.16.0. chunks = split_dataframe(dataframe, chunksize=chunksize) for remaining_rows, chunk in chunks: yield remaining_rows + load_chunk(chunk, job_config) - if FEATURES.bigquery_has_from_dataframe_with_csv: - client.load_table_from_dataframe( - chunk, destination_table_ref, job_config=job_config, location=location, + +def load_csv_from_dataframe( + client: bigquery.Client, + dataframe: pandas.DataFrame, + destination_table_ref: bigquery.TableReference, + location: Optional[str], + chunksize: Optional[int], + schema: Optional[Dict[str, Any]], +): + bq_schema = None + + if schema is not None: + schema = pandas_gbq.schema.remove_policy_tags(schema) + bq_schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) + + def load_chunk(chunk, job_config): + client.load_table_from_dataframe( + chunk, destination_table_ref, job_config=job_config, location=location, + ).result() + + return load_csv(dataframe, chunksize, bq_schema, load_chunk) + + +def load_csv_from_file( + client: bigquery.Client, + dataframe: pandas.DataFrame, + destination_table_ref: bigquery.TableReference, + location: Optional[str], + chunksize: Optional[int], + schema: Optional[Dict[str, Any]], +): + if schema is None: + schema = pandas_gbq.schema.generate_bq_schema(dataframe) + + schema = pandas_gbq.schema.remove_policy_tags(schema) + bq_schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) + + def load_chunk(chunk, job_config): + try: + chunk_buffer = encode_chunk(chunk) + client.load_table_from_file( + chunk_buffer, + destination_table_ref, + job_config=job_config, + location=location, ).result() + finally: + chunk_buffer.close() + + return load_csv(dataframe, chunksize, bq_schema, load_chunk,) + + +def load_chunks( + client, + dataframe, + destination_table_ref, + chunksize=None, + schema=None, + location=None, + api_method="load_parquet", +): + if api_method == "load_parquet": + load_parquet(client, dataframe, destination_table_ref, location, schema) + # TODO: yield progress depending on result() with timeout + return [0] + elif api_method == "load_csv": + if FEATURES.bigquery_has_from_dataframe_with_csv: + return load_csv_from_dataframe( + client, dataframe, destination_table_ref, location, chunksize, schema + ) else: - try: - chunk_buffer = encode_chunk(chunk) - client.load_table_from_file( - chunk_buffer, - destination_table_ref, - job_config=job_config, - location=location, - ).result() - finally: - chunk_buffer.close() + return load_csv_from_file( + client, dataframe, destination_table_ref, location, chunksize, schema + ) + else: + raise ValueError( + f"Got unexpected api_method: {api_method!r}, expected one of 'load_parquet', 'load_csv'." + ) diff --git a/setup.py b/setup.py index a7e23eec..b1169cef 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,9 @@ release_status = "Development Status :: 4 - Beta" dependencies = [ "setuptools", + "numpy>=1.16.6", "pandas>=0.23.2", + "pyarrow >=3.0.0, <7.0dev", "pydata-google-auth", "google-auth", "google-auth-oauthlib", diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 251c81b4..7c67d275 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -5,11 +5,12 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 -numpy==1.14.5 -pandas==0.23.2 google-auth==1.4.1 google-auth-oauthlib==0.0.1 google-cloud-bigquery==1.11.1 google-cloud-bigquery-storage==1.1.0 +numpy==1.16.6 +pandas==0.23.2 +pyarrow==3.0.0 pydata-google-auth==0.1.2 tqdm==4.23.0 diff --git a/tests/system/test_gbq.py b/tests/system/test_gbq.py index 00bbd3d6..a8d6bd0d 100644 --- a/tests/system/test_gbq.py +++ b/tests/system/test_gbq.py @@ -1185,77 +1185,6 @@ def test_google_upload_errors_should_raise_exception(self, project_id): credentials=self.credentials, ) - def test_upload_chinese_unicode_data(self, project_id): - test_id = "2" - test_size = 6 - df = DataFrame(np.random.randn(6, 4), index=range(6), columns=list("ABCD")) - df["s"] = u"信用卡" - - gbq.to_gbq( - df, - self.destination_table + test_id, - project_id, - credentials=self.credentials, - chunksize=10000, - ) - - result_df = gbq.read_gbq( - "SELECT * FROM {0}".format(self.destination_table + test_id), - project_id=project_id, - credentials=self.credentials, - dialect="legacy", - ) - - assert len(result_df) == test_size - - if sys.version_info.major < 3: - pytest.skip(msg="Unicode comparison in Py2 not working") - - result = result_df["s"].sort_values() - expected = df["s"].sort_values() - - tm.assert_numpy_array_equal(expected.values, result.values) - - def test_upload_other_unicode_data(self, project_id): - test_id = "3" - test_size = 3 - df = DataFrame( - { - "s": ["Skywalker™", "lego", "hülle"], - "i": [200, 300, 400], - "d": [ - "2017-12-13 17:40:39", - "2017-12-13 17:40:39", - "2017-12-13 17:40:39", - ], - } - ) - - gbq.to_gbq( - df, - self.destination_table + test_id, - project_id=project_id, - credentials=self.credentials, - chunksize=10000, - ) - - result_df = gbq.read_gbq( - "SELECT * FROM {0}".format(self.destination_table + test_id), - project_id=project_id, - credentials=self.credentials, - dialect="legacy", - ) - - assert len(result_df) == test_size - - if sys.version_info.major < 3: - pytest.skip(msg="Unicode comparison in Py2 not working") - - result = result_df["s"].sort_values() - expected = df["s"].sort_values() - - tm.assert_numpy_array_equal(expected.values, result.values) - def test_upload_mixed_float_and_int(self, project_id): """Test that we can upload a dataframe containing an int64 and float64 column. See: https://github.com/pydata/pandas-gbq/issues/116 @@ -1454,6 +1383,9 @@ def test_upload_data_with_different_df_and_user_schema(self, project_id): project_id, credentials=self.credentials, table_schema=test_schema, + # Loading string pandas series to FLOAT column not supported with + # Parquet. + api_method="load_csv", ) dataset, table = destination_table.split(".") assert verify_schema( diff --git a/tests/system/test_to_gbq.py b/tests/system/test_to_gbq.py index b0d2d031..d16997fd 100644 --- a/tests/system/test_to_gbq.py +++ b/tests/system/test_to_gbq.py @@ -3,9 +3,10 @@ # license that can be found in the LICENSE file. import functools +import random + import pandas import pandas.testing - import pytest @@ -21,31 +22,61 @@ def method_under_test(credentials, project_id): ) -def test_float_round_trip(method_under_test, random_dataset_id, bigquery_client): - """Ensure that 64-bit floating point numbers are unchanged. - - See: https://github.com/pydata/pandas-gbq/issues/326 - """ - - table_id = "{}.float_round_trip".format(random_dataset_id) - input_floats = pandas.Series( - [ - 0.14285714285714285, - 0.4406779661016949, - 1.05148, - 1.05153, - 1.8571428571428572, - 2.718281828459045, - 3.141592653589793, - 2.0988936657440586e43, - ], - name="float_col", +@pytest.mark.parametrize( + ["input_series"], + [ + # Ensure that 64-bit floating point numbers are unchanged. + # See: https://github.com/pydata/pandas-gbq/issues/326 + ( + pandas.Series( + [ + 0.14285714285714285, + 0.4406779661016949, + 1.05148, + 1.05153, + 1.8571428571428572, + 2.718281828459045, + 3.141592653589793, + 2.0988936657440586e43, + ], + name="test_col", + ), + ), + ( + pandas.Series( + [ + "abc", + "defg", + # Ensure that empty strings are written as empty string, + # not NULL. See: + # https://github.com/googleapis/python-bigquery-pandas/issues/366 + "", + None, + # Ensure that unicode characters are encoded. See: + # https://github.com/googleapis/python-bigquery-pandas/issues/106 + "信用卡", + "Skywalker™", + "hülle", + ], + name="test_col", + ), + ), + ], +) +def test_series_round_trip( + method_under_test, random_dataset_id, bigquery_client, input_series +): + table_id = f"{random_dataset_id}.round_trip_{random.randrange(1_000_000)}" + input_series = input_series.sort_values().reset_index(drop=True) + df = pandas.DataFrame( + # Some errors only occur in multi-column dataframes. See: + # https://github.com/googleapis/python-bigquery-pandas/issues/366 + {"test_col": input_series, "test_col2": input_series} ) - df = pandas.DataFrame({"float_col": input_floats}) method_under_test(df, table_id) round_trip = bigquery_client.list_rows(table_id).to_dataframe() - round_trip_floats = round_trip["float_col"].sort_values() + round_trip_series = round_trip["test_col"].sort_values().reset_index(drop=True) pandas.testing.assert_series_equal( - round_trip_floats, input_floats, check_exact=True, + round_trip_series, input_series, check_exact=True, ) diff --git a/tests/unit/test_gbq.py b/tests/unit/test_gbq.py index 3b603412..0a5ecad2 100644 --- a/tests/unit/test_gbq.py +++ b/tests/unit/test_gbq.py @@ -124,6 +124,29 @@ def test_to_gbq_with_no_project_id_given_should_fail(monkeypatch): gbq.to_gbq(DataFrame([[1]]), "dataset.tablename") +@pytest.mark.parametrize( + ["api_method", "warning_message", "warning_type"], + [ + ("load_parquet", "chunksize is ignored", DeprecationWarning), + ("load_csv", "chunksize will be ignored", PendingDeprecationWarning), + ], +) +def test_to_gbq_with_chunksize_warns_deprecation( + api_method, warning_message, warning_type +): + with pytest.warns(warning_type, match=warning_message): + try: + gbq.to_gbq( + DataFrame([[1]]), + "dataset.tablename", + project_id="my-project", + api_method=api_method, + chunksize=100, + ) + except gbq.TableCreationError: + pass + + @pytest.mark.parametrize(["verbose"], [(True,), (False,)]) def test_to_gbq_with_verbose_new_pandas_warns_deprecation(monkeypatch, verbose): monkeypatch.setattr( diff --git a/tests/unit/test_load.py b/tests/unit/test_load.py index 2ddb4f50..a32d2d9e 100644 --- a/tests/unit/test_load.py +++ b/tests/unit/test_load.py @@ -16,10 +16,10 @@ from pandas_gbq import load -def load_method(bqclient): - if FEATURES.bigquery_has_from_dataframe_with_csv: - return bqclient.load_table_from_dataframe - return bqclient.load_table_from_file +def load_method(bqclient, api_method): + if not FEATURES.bigquery_has_from_dataframe_with_csv and api_method == "load_csv": + return bqclient.load_table_from_file + return bqclient.load_table_from_dataframe def test_encode_chunk_with_unicode(): @@ -91,9 +91,12 @@ def test_encode_chunks_with_chunksize_none(): assert len(chunk.index) == 6 -@pytest.mark.parametrize(["bigquery_has_from_dataframe_with_csv"], [(True,), (False,)]) +@pytest.mark.parametrize( + ["bigquery_has_from_dataframe_with_csv", "api_method"], + [(True, "load_parquet"), (True, "load_csv"), (False, "load_csv")], +) def test_load_chunks_omits_policy_tags( - monkeypatch, mock_bigquery_client, bigquery_has_from_dataframe_with_csv + monkeypatch, mock_bigquery_client, bigquery_has_from_dataframe_with_csv, api_method ): """Ensure that policyTags are omitted. @@ -117,11 +120,20 @@ def test_load_chunks_omits_policy_tags( ] } - _ = list(load.load_chunks(mock_bigquery_client, df, destination, schema=schema)) + _ = list( + load.load_chunks( + mock_bigquery_client, df, destination, schema=schema, api_method=api_method + ) + ) - mock_load = load_method(mock_bigquery_client) + mock_load = load_method(mock_bigquery_client, api_method=api_method) assert mock_load.called _, kwargs = mock_load.call_args assert "job_config" in kwargs sent_field = kwargs["job_config"].schema[0].to_api_repr() assert "policyTags" not in sent_field + + +def test_load_chunks_with_invalid_api_method(): + with pytest.raises(ValueError, match="Got unexpected api_method:"): + load.load_chunks(None, None, None, api_method="not_a_thing")