Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add progress bar for to_arrow method #352

Merged
merged 10 commits into from Nov 16, 2020
79 changes: 78 additions & 1 deletion google/cloud/bigquery/job.py
Expand Up @@ -20,6 +20,9 @@
import copy
import re
import threading
import tqdm
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be an optional import. See:

try:
import tqdm
except ImportError: # pragma: NO COVER
tqdm = None

import time
import warnings

import requests
import six
Expand Down Expand Up @@ -79,6 +82,11 @@
"tableUnavailable": http_client.BAD_REQUEST,
}

_NO_TQDM_ERROR = (
"A progress bar was requested, but there was an error loading the tqdm "
"library. Please install tqdm to use the progress bar functionality."
)


def _error_result_to_exception(error_result):
"""Maps BigQuery error reasons to an exception.
Expand Down Expand Up @@ -3275,6 +3283,27 @@ def result(
rows._preserve_order = _contains_order_by(self.query)
return rows

def _get_progress_bar(self, progress_bar_type, description, total, unit):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is very close to the logic in table.py

def _get_progress_bar(self, progress_bar_type):

I'd suggest creating a _tqdm_helpers.py module similar to our pandas helpers to hold this logic for both Table and Job logic.

"""Construct a tqdm progress bar object, if tqdm is installed."""
if tqdm is None:
if progress_bar_type is not None:
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
return None

try:
if progress_bar_type == "tqdm":
return tqdm.tqdm(desc=description, total=total, unit=unit)
elif progress_bar_type == "tqdm_notebook":
return tqdm.tqdm_notebook(desc=description, total=total, unit=unit)
elif progress_bar_type == "tqdm_gui":
return tqdm.tqdm_gui(desc=description, total=total, unit=unit)
except (KeyError, TypeError):
# Protect ourselves from any tqdm errors. In case of
# unexpected tqdm behavior, just fall back to showing
# no progress bar.
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3)
return None

# If changing the signature of this method, make sure to apply the same
# changes to table.RowIterator.to_arrow()
def to_arrow(
Expand Down Expand Up @@ -3337,7 +3366,55 @@ def to_arrow(

..versionadded:: 1.17.0
"""
return self.result().to_arrow(
query_plan = self.query_plan
if query_plan and progress_bar_type:
start_time = time.time()
i = 0
progress_bar = self._get_progress_bar(
progress_bar_type,
"Query executing stage {}".format(query_plan[i].name),
len(query_plan),
"query",
)
while True:
total = len(query_plan)
self.reload() # Refreshes the state via a GET request.

current_stage = query_plan[i]
progress_bar.set_description(
"Query executing stage {} and status {} : {:0.2f}s".format(
current_stage.name,
current_stage.status,
time.time() - start_time,
),
refresh=True,
)
from concurrent import futures

try:
query_result = self.result(timeout=0.5)
if current_stage.status == "COMPLETE":
progress_bar.update(total)
progress_bar.set_description(
"Query complete after {:0.2f}s".format(
time.time() - start_time
),
refresh=True,
)
break
except futures.TimeoutError:
if current_stage.status == "COMPLETE":
if i < total - 1:
progress_bar.update(i + 1)
i += 1
continue

if progress_bar is not None:
progress_bar.close()
else:
query_result = self.result()

return query_result.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
Expand Down