Navigation Menu

Skip to content

Commit

Permalink
feat: add progress bar to QueryJob.to_dataframe and to_arrow (#352)
Browse files Browse the repository at this point in the history
* feat: add progress bar for to_arrow method

* feat: add progress bar for to_dataframe

* feat: add default progress bar and unit test

* feat: nit

* feat: result timout for without queryplan
  • Loading branch information
HemangChothani committed Nov 16, 2020
1 parent b899ad1 commit dc78edd
Show file tree
Hide file tree
Showing 5 changed files with 367 additions and 40 deletions.
94 changes: 94 additions & 0 deletions google/cloud/bigquery/_tqdm_helpers.py
@@ -0,0 +1,94 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Shared helper functions for tqdm progress bar."""

import concurrent.futures
import time
import warnings

try:
import tqdm
except ImportError: # pragma: NO COVER
tqdm = None

_NO_TQDM_ERROR = (
"A progress bar was requested, but there was an error loading the tqdm "
"library. Please install tqdm to use the progress bar functionality."
)

_PROGRESS_BAR_UPDATE_INTERVAL = 0.5


def get_progress_bar(progress_bar_type, description, total, unit):
"""Construct a tqdm progress bar object, if tqdm is ."""
if tqdm is None:
if progress_bar_type is not None:
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
return None

try:
if progress_bar_type == "tqdm":
return tqdm.tqdm(desc=description, total=total, unit=unit)
elif progress_bar_type == "tqdm_notebook":
return tqdm.tqdm_notebook(desc=description, total=total, unit=unit)
elif progress_bar_type == "tqdm_gui":
return tqdm.tqdm_gui(desc=description, total=total, unit=unit)
except (KeyError, TypeError):
# Protect ourselves from any tqdm errors. In case of
# unexpected tqdm behavior, just fall back to showing
# no progress bar.
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
return None


def wait_for_query(query_job, progress_bar_type=None):
"""Return query result and display a progress bar while the query running, if tqdm is installed."""
if progress_bar_type is None:
return query_job.result()

default_total = 1
current_stage = None
start_time = time.time()
progress_bar = get_progress_bar(
progress_bar_type, "Query is running", default_total, "query"
)
i = 0
while True:
if query_job.query_plan:
default_total = len(query_job.query_plan)
current_stage = query_job.query_plan[i]
progress_bar.total = len(query_job.query_plan)
progress_bar.set_description(
"Query executing stage {} and status {} : {:0.2f}s".format(
current_stage.name, current_stage.status, time.time() - start_time,
),
)
try:
query_result = query_job.result(timeout=_PROGRESS_BAR_UPDATE_INTERVAL)
progress_bar.update(default_total)
progress_bar.set_description(
"Query complete after {:0.2f}s".format(time.time() - start_time),
)
break
except concurrent.futures.TimeoutError:
query_job.reload() # Refreshes the state via a GET request.
if current_stage:
if current_stage.status == "COMPLETE":
if i < default_total - 1:
progress_bar.update(i + 1)
i += 1
continue
progress_bar.close()
return query_result
7 changes: 5 additions & 2 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -40,6 +40,7 @@
from google.cloud.bigquery.table import _table_arg_to_table_ref
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import TimePartitioning
from google.cloud.bigquery._tqdm_helpers import wait_for_query

from google.cloud.bigquery.job.base import _AsyncJob
from google.cloud.bigquery.job.base import _DONE_STATE
Expand Down Expand Up @@ -1259,7 +1260,8 @@ def to_arrow(
..versionadded:: 1.17.0
"""
return self.result().to_arrow(
query_result = wait_for_query(self, progress_bar_type)
return query_result.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
Expand Down Expand Up @@ -1328,7 +1330,8 @@ def to_dataframe(
Raises:
ValueError: If the `pandas` library cannot be imported.
"""
return self.result().to_dataframe(
query_result = wait_for_query(self, progress_bar_type)
return query_result.to_dataframe(
bqstorage_client=bqstorage_client,
dtypes=dtypes,
progress_bar_type=progress_bar_type,
Expand Down
41 changes: 5 additions & 36 deletions google/cloud/bigquery/table.py
Expand Up @@ -36,11 +36,6 @@
except ImportError: # pragma: NO COVER
pyarrow = None

try:
import tqdm
except ImportError: # pragma: NO COVER
tqdm = None

import google.api_core.exceptions
from google.api_core.page_iterator import HTTPIterator

Expand All @@ -50,6 +45,7 @@
from google.cloud.bigquery.schema import _build_schema_resource
from google.cloud.bigquery.schema import _parse_schema_resource
from google.cloud.bigquery.schema import _to_schema_fields
from google.cloud.bigquery._tqdm_helpers import get_progress_bar
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration

Expand All @@ -68,10 +64,7 @@
"The pyarrow library is not installed, please install "
"pyarrow to use the to_arrow() function."
)
_NO_TQDM_ERROR = (
"A progress bar was requested, but there was an error loading the tqdm "
"library. Please install tqdm to use the progress bar functionality."
)

_TABLE_HAS_NO_SCHEMA = 'Table has no schema: call "client.get_table()"'


Expand Down Expand Up @@ -1418,32 +1411,6 @@ def total_rows(self):
"""int: The total number of rows in the table."""
return self._total_rows

def _get_progress_bar(self, progress_bar_type):
"""Construct a tqdm progress bar object, if tqdm is installed."""
if tqdm is None:
if progress_bar_type is not None:
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
return None

description = "Downloading"
unit = "rows"

try:
if progress_bar_type == "tqdm":
return tqdm.tqdm(desc=description, total=self.total_rows, unit=unit)
elif progress_bar_type == "tqdm_notebook":
return tqdm.tqdm_notebook(
desc=description, total=self.total_rows, unit=unit
)
elif progress_bar_type == "tqdm_gui":
return tqdm.tqdm_gui(desc=description, total=self.total_rows, unit=unit)
except (KeyError, TypeError):
# Protect ourselves from any tqdm errors. In case of
# unexpected tqdm behavior, just fall back to showing
# no progress bar.
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
return None

def _to_page_iterable(
self, bqstorage_download, tabledata_list_download, bqstorage_client=None
):
Expand Down Expand Up @@ -1551,7 +1518,9 @@ def to_arrow(
owns_bqstorage_client = bqstorage_client is not None

try:
progress_bar = self._get_progress_bar(progress_bar_type)
progress_bar = get_progress_bar(
progress_bar_type, "Downloading", self.total_rows, "rows"
)

record_batches = []
for record_batch in self._to_arrow_iterable(
Expand Down

0 comments on commit dc78edd

Please sign in to comment.