New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add progress bar for to_arrow method #352
Changes from 1 commit
05f101e
288739f
d94894c
fa1bb49
485bcf5
f9b2ff7
481fa05
c1ee80e
47d393c
e1d017c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -20,6 +20,9 @@ | |||
import copy | ||||
import re | ||||
import threading | ||||
import tqdm | ||||
import time | ||||
import warnings | ||||
|
||||
import requests | ||||
import six | ||||
|
@@ -79,6 +82,11 @@ | |||
"tableUnavailable": http_client.BAD_REQUEST, | ||||
} | ||||
|
||||
_NO_TQDM_ERROR = ( | ||||
"A progress bar was requested, but there was an error loading the tqdm " | ||||
"library. Please install tqdm to use the progress bar functionality." | ||||
) | ||||
|
||||
|
||||
def _error_result_to_exception(error_result): | ||||
"""Maps BigQuery error reasons to an exception. | ||||
|
@@ -3275,6 +3283,27 @@ def result( | |||
rows._preserve_order = _contains_order_by(self.query) | ||||
return rows | ||||
|
||||
def _get_progress_bar(self, progress_bar_type, description, total, unit): | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like this is very close to the logic in python-bigquery/google/cloud/bigquery/table.py Line 1374 in 20f473b
I'd suggest creating a |
||||
"""Construct a tqdm progress bar object, if tqdm is installed.""" | ||||
if tqdm is None: | ||||
if progress_bar_type is not None: | ||||
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) | ||||
return None | ||||
|
||||
try: | ||||
if progress_bar_type == "tqdm": | ||||
return tqdm.tqdm(desc=description, total=total, unit=unit) | ||||
elif progress_bar_type == "tqdm_notebook": | ||||
return tqdm.tqdm_notebook(desc=description, total=total, unit=unit) | ||||
elif progress_bar_type == "tqdm_gui": | ||||
return tqdm.tqdm_gui(desc=description, total=total, unit=unit) | ||||
except (KeyError, TypeError): | ||||
# Protect ourselves from any tqdm errors. In case of | ||||
# unexpected tqdm behavior, just fall back to showing | ||||
# no progress bar. | ||||
warnings.warn(_NO_TQDM_ERROR, UserWarning, stacklevel=3) | ||||
return None | ||||
|
||||
# If changing the signature of this method, make sure to apply the same | ||||
# changes to table.RowIterator.to_arrow() | ||||
def to_arrow( | ||||
|
@@ -3337,7 +3366,55 @@ def to_arrow( | |||
|
||||
..versionadded:: 1.17.0 | ||||
""" | ||||
return self.result().to_arrow( | ||||
query_plan = self.query_plan | ||||
if query_plan and progress_bar_type: | ||||
start_time = time.time() | ||||
i = 0 | ||||
progress_bar = self._get_progress_bar( | ||||
progress_bar_type, | ||||
"Query executing stage {}".format(query_plan[i].name), | ||||
len(query_plan), | ||||
"query", | ||||
) | ||||
while True: | ||||
total = len(query_plan) | ||||
self.reload() # Refreshes the state via a GET request. | ||||
|
||||
current_stage = query_plan[i] | ||||
progress_bar.set_description( | ||||
"Query executing stage {} and status {} : {:0.2f}s".format( | ||||
current_stage.name, | ||||
current_stage.status, | ||||
time.time() - start_time, | ||||
), | ||||
refresh=True, | ||||
) | ||||
from concurrent import futures | ||||
|
||||
try: | ||||
query_result = self.result(timeout=0.5) | ||||
if current_stage.status == "COMPLETE": | ||||
progress_bar.update(total) | ||||
progress_bar.set_description( | ||||
"Query complete after {:0.2f}s".format( | ||||
time.time() - start_time | ||||
), | ||||
refresh=True, | ||||
) | ||||
break | ||||
except futures.TimeoutError: | ||||
if current_stage.status == "COMPLETE": | ||||
if i < total - 1: | ||||
progress_bar.update(i + 1) | ||||
i += 1 | ||||
continue | ||||
|
||||
if progress_bar is not None: | ||||
progress_bar.close() | ||||
else: | ||||
query_result = self.result() | ||||
|
||||
return query_result.to_arrow( | ||||
progress_bar_type=progress_bar_type, | ||||
bqstorage_client=bqstorage_client, | ||||
create_bqstorage_client=create_bqstorage_client, | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be an optional import. See:
python-bigquery/google/cloud/bigquery/table.py
Lines 39 to 42 in 20f473b