Skip to content

Commit

Permalink
Use BQ Storage API by default in cell magic
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed Apr 14, 2020
1 parent f3b7a4f commit c01648d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 92 deletions.
58 changes: 30 additions & 28 deletions google/cloud/bigquery/magics.py
Expand Up @@ -39,10 +39,9 @@
Project to use for running the query. Defaults to the context
:attr:`~google.cloud.bigquery.magics.Context.project`.
* ``--use_bqstorage_api`` (optional, line argument):
Downloads the DataFrame using the BigQuery Storage API. To use this
option, install the ``google-cloud-bigquery-storage`` and ``fastavro``
packages, and `enable the BigQuery Storage API
<https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com>`_.
[Deprecated] Not used anymore, as BigQuery Storage API is used by default.
* ``--use_rest_api`` (optional, line argument):
Use the BigQuery REST API instead of the Storage API.
* ``--use_legacy_sql`` (optional, line argument):
Runs the query using Legacy SQL syntax. Defaults to Standard SQL if
this argument not used.
Expand Down Expand Up @@ -142,6 +141,7 @@
import functools
import sys
import time
import warnings
from concurrent import futures

try:
Expand Down Expand Up @@ -174,7 +174,6 @@ def __init__(self):
self._credentials = None
self._project = None
self._connection = None
self._use_bqstorage_api = None
self._default_query_job_config = bigquery.QueryJobConfig()

@property
Expand Down Expand Up @@ -237,21 +236,6 @@ def project(self):
def project(self, value):
self._project = value

@property
def use_bqstorage_api(self):
"""bool: [Beta] Set to True to use the BigQuery Storage API to
download query results
To use this option, install the ``google-cloud-bigquery-storage`` and
``fastavro`` packages, and `enable the BigQuery Storage API
<https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com>`_.
"""
return self._use_bqstorage_api

@use_bqstorage_api.setter
def use_bqstorage_api(self, value):
self._use_bqstorage_api = value

@property
def default_query_job_config(self):
"""google.cloud.bigquery.job.QueryJobConfig: Default job
Expand Down Expand Up @@ -426,11 +410,21 @@ def _create_dataset_if_necessary(client, dataset_id):
@magic_arguments.argument(
"--use_bqstorage_api",
action="store_true",
default=None,
help=(
"[Deprecated] The BigQuery Storage API is already used by default to "
"download large query results, and this option has no effect. "
"If you want to switch to the classic REST API instead, use the "
"--use_rest_api option."
),
)
@magic_arguments.argument(
"--use_rest_api",
action="store_true",
default=False,
help=(
"[Beta] Use the BigQuery Storage API to download large query results. "
"To use this option, install the google-cloud-bigquery-storage and "
"fastavro packages, and enable the BigQuery Storage API."
"Use the classic REST API instead of the BigQuery Storage API to "
"download query results."
),
)
@magic_arguments.argument(
Expand Down Expand Up @@ -473,6 +467,14 @@ def _cell_magic(line, query):
"""
args = magic_arguments.parse_argstring(_cell_magic, line)

if args.use_bqstorage_api is not None:
warnings.warn(
"Deprecated option --use_bqstorage_api, the BigQuery "
"Storage API is already used by default.",
category=DeprecationWarning,
)
use_bqstorage_api = not args.use_rest_api

params = []
if args.params is not None:
try:
Expand All @@ -494,9 +496,7 @@ def _cell_magic(line, query):
)
if context._connection:
client._connection = context._connection
bqstorage_client = _make_bqstorage_client(
args.use_bqstorage_api or context.use_bqstorage_api, context.credentials
)
bqstorage_client = _make_bqstorage_client(use_bqstorage_api, context.credentials)

close_transports = functools.partial(_close_transports, client, bqstorage_client)

Expand Down Expand Up @@ -598,8 +598,10 @@ def _make_bqstorage_client(use_bqstorage_api, credentials):
from google.cloud import bigquery_storage_v1beta1
except ImportError as err:
customized_error = ImportError(
"Install the google-cloud-bigquery-storage and pyarrow packages "
"to use the BigQuery Storage API."
"The default BigQuery Storage API client cannot be used, install "
"the missing google-cloud-bigquery-storage and pyarrow packages "
"to use it. Alternatively, use the classic REST API by specifying "
"the --use_rest_api magic option."
)
six.raise_from(customized_error, err)

Expand Down
101 changes: 37 additions & 64 deletions tests/unit/test_magics.py
Expand Up @@ -15,6 +15,7 @@
import copy
import re
from concurrent import futures
import warnings

import mock
import pytest
Expand Down Expand Up @@ -386,13 +387,33 @@ def test_extension_load():

@pytest.mark.usefixtures("ipython_interactive")
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
def test_bigquery_magic_without_optional_arguments(missing_bq_storage):
@pytest.mark.skipif(
bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test_bigquery_magic_without_optional_arguments(monkeypatch):
ip = IPython.get_ipython()
ip.extension_manager.load_extension("google.cloud.bigquery")
magics.context.credentials = mock.create_autospec(
mock_credentials = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
)

# Set up the context with monkeypatch so that it's reset for subsequent
# tests.
monkeypatch.setattr(magics.context, "credentials", mock_credentials)

# Mock out the BigQuery Storage API.
bqstorage_mock = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
bqstorage_instance_mock = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient, instance=True
)
bqstorage_instance_mock.transport = mock.Mock()
bqstorage_mock.return_value = bqstorage_instance_mock
bqstorage_client_patch = mock.patch(
"google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient", bqstorage_mock
)

sql = "SELECT 17 AS num"
result = pandas.DataFrame([17], columns=["num"])
run_query_patch = mock.patch(
Expand All @@ -403,11 +424,11 @@ def test_bigquery_magic_without_optional_arguments(missing_bq_storage):
)
query_job_mock.to_dataframe.return_value = result

# Shouldn't fail when BigQuery Storage client isn't installed.
with run_query_patch as run_query_mock, missing_bq_storage:
with run_query_patch as run_query_mock, bqstorage_client_patch:
run_query_mock.return_value = query_job_mock
return_value = ip.run_cell_magic("bigquery", "", sql)

assert bqstorage_mock.called # BQ storage client was used
assert isinstance(return_value, pandas.DataFrame)
assert len(return_value) == len(result) # verify row count
assert list(return_value) == list(result) # verify column names
Expand Down Expand Up @@ -542,7 +563,6 @@ def test_bigquery_magic_with_bqstorage_from_argument(monkeypatch):
# Set up the context with monkeypatch so that it's reset for subsequent
# tests.
monkeypatch.setattr(magics.context, "credentials", mock_credentials)
monkeypatch.setattr(magics.context, "use_bqstorage_api", False)

# Mock out the BigQuery Storage API.
bqstorage_mock = mock.create_autospec(
Expand All @@ -566,67 +586,20 @@ def test_bigquery_magic_with_bqstorage_from_argument(monkeypatch):
google.cloud.bigquery.job.QueryJob, instance=True
)
query_job_mock.to_dataframe.return_value = result
with run_query_patch as run_query_mock, bqstorage_client_patch:
with run_query_patch as run_query_mock, bqstorage_client_patch, warnings.catch_warnings(
record=True
) as warned:
run_query_mock.return_value = query_job_mock

return_value = ip.run_cell_magic("bigquery", "--use_bqstorage_api", sql)

assert len(bqstorage_mock.call_args_list) == 1
kwargs = bqstorage_mock.call_args_list[0].kwargs
assert kwargs.get("credentials") is mock_credentials
client_info = kwargs.get("client_info")
assert client_info is not None
assert client_info.user_agent == "ipython-" + IPython.__version__

query_job_mock.to_dataframe.assert_called_once_with(
bqstorage_client=bqstorage_instance_mock
)

assert isinstance(return_value, pandas.DataFrame)


@pytest.mark.usefixtures("ipython_interactive")
@pytest.mark.skipif(
bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test_bigquery_magic_with_bqstorage_from_context(monkeypatch):
ip = IPython.get_ipython()
ip.extension_manager.load_extension("google.cloud.bigquery")
mock_credentials = mock.create_autospec(
google.auth.credentials.Credentials, instance=True
)

# Set up the context with monkeypatch so that it's reset for subsequent
# tests.
monkeypatch.setattr(magics.context, "credentials", mock_credentials)
monkeypatch.setattr(magics.context, "use_bqstorage_api", True)
# Deprecation warning should have been issued.
def warning_match(warning):
message = str(warning).lower()
return "deprecated" in message and "use_bqstorage_api" in message

# Mock out the BigQuery Storage API.
bqstorage_mock = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient
)
bqstorage_instance_mock = mock.create_autospec(
bigquery_storage_v1beta1.BigQueryStorageClient, instance=True
)
bqstorage_instance_mock.transport = mock.Mock()
bqstorage_mock.return_value = bqstorage_instance_mock
bqstorage_client_patch = mock.patch(
"google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient", bqstorage_mock
)

sql = "SELECT 17 AS num"
result = pandas.DataFrame([17], columns=["num"])
run_query_patch = mock.patch(
"google.cloud.bigquery.magics._run_query", autospec=True
)
query_job_mock = mock.create_autospec(
google.cloud.bigquery.job.QueryJob, instance=True
)
query_job_mock.to_dataframe.return_value = result
with run_query_patch as run_query_mock, bqstorage_client_patch:
run_query_mock.return_value = query_job_mock

return_value = ip.run_cell_magic("bigquery", "", sql)
expected_warnings = list(filter(warning_match, warned))
assert len(expected_warnings) == 1

assert len(bqstorage_mock.call_args_list) == 1
kwargs = bqstorage_mock.call_args_list[0].kwargs
Expand All @@ -646,7 +619,7 @@ def test_bigquery_magic_with_bqstorage_from_context(monkeypatch):
@pytest.mark.skipif(
bigquery_storage_v1beta1 is None, reason="Requires `google-cloud-bigquery-storage`"
)
def test_bigquery_magic_without_bqstorage(monkeypatch):
def test_bigquery_magic_with_rest_client_requested(monkeypatch):
ip = IPython.get_ipython()
ip.extension_manager.load_extension("google.cloud.bigquery")
mock_credentials = mock.create_autospec(
Expand Down Expand Up @@ -677,7 +650,7 @@ def test_bigquery_magic_without_bqstorage(monkeypatch):
with run_query_patch as run_query_mock, bqstorage_client_patch:
run_query_mock.return_value = query_job_mock

return_value = ip.run_cell_magic("bigquery", "", sql)
return_value = ip.run_cell_magic("bigquery", "--use_rest_api", sql)

bqstorage_mock.assert_not_called()
query_job_mock.to_dataframe.assert_called_once_with(bqstorage_client=None)
Expand Down Expand Up @@ -895,7 +868,7 @@ def test_bigquery_magic_w_table_id_and_bqstorage_client():
with default_patch, client_patch as client_mock, bqstorage_client_patch:
client_mock().list_rows.return_value = row_iterator_mock

ip.run_cell_magic("bigquery", "--use_bqstorage_api --max_results=5", table_id)
ip.run_cell_magic("bigquery", "--max_results=5", table_id)
row_iterator_mock.to_dataframe.assert_called_once_with(
bqstorage_client=bqstorage_instance_mock
)
Expand Down

0 comments on commit c01648d

Please sign in to comment.