Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQuery: make jobs awaitable #18

Open
dkapitan opened this issue Jul 21, 2019 · 35 comments · May be fixed by #1839 or #1853
Open

BigQuery: make jobs awaitable #18

dkapitan opened this issue Jul 21, 2019 · 35 comments · May be fixed by #1839 or #1853
Assignees
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. Python 3 Only This work would involve supporting Python 3 only code paths. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@dkapitan
Copy link

I know BigQuery jobs are asynchronous by default. However, I am struggling to make my datapipeline async end-to-end.

Looking at this JS example, I thought it would be the most Pythonic to make a BigQuery job awaitable. However, I can't get that to work in Python i.e. errors when await client.query(query). Looking at the source code, I don't see which method returns an awaitable object.

I have little experience in writing async Python code and found this example that wraps jobs in a async def coroutine.

class BQApi(object):                                                                                                 
    def __init__(self):                                                                                              
        self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"])                               

    async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator:                                       
        job = self.api.query(query, **kwargs)                                                                        
        task = asyncio.create_task(self.coroutine_job(job))                                                          
        return await task                                                                                            

    @staticmethod                                                                                                    
    async def coroutine_job(job):                                                                                    
        return job.result()   

The google.api_core.operation.Operation shows how to use add_done_callback to asynchronously wait for long-running operations. I have tried that, but the following yields AttributeError: 'QueryJob' object has no attribute '_condition' :

from concurrent.futures import ThreadPoolExecutor, as_completed
query1 = 'SELECT 1'
query2 = 'SELECT 2'

def my_callback(future):
    result = future.result()

operations = [bq.query(query1), bq.query(query2)]
[operation.add_done_callback(my_callback) for operation in operations]
results2 = []
for future in as_completed(operations):
  results2.append(list(future.result()))

Given that jobs are already asynchronous, would it make sense to add a method that returns an awaitable?

Or am I missing something and is there an Pythonic way to use the BigQuery client with the async/await pattern?

@tseaver
Copy link
Contributor

tseaver commented Jul 22, 2019

@dkapitan Re: the async await / async def keywords: our codebase straddles both Python2 and Python3 until at least the end of this year, and thus cannot adopt the syntax directly in the codebase until we drop Python2 support.

We might be able to tweak the google.api_core.operation.Operation class to support that usage at a low level. Can you please post the traceback for the AttributeError?

Also, for your example above, your callback isn't doing anything with result: it is being called from a helper thread, and would need to do something to signal the main thread

@dkapitan
Copy link
Author

@tseaver thanks for clarifying. I have read up more on concurrent.futures and asyncio. I understand the latter is indeed quite new and Python 3 only. Will investigate the sample code a bit more, and get to you with the results and/or the traceback.

@dkapitan
Copy link
Author

@tseaver here's traceback from the AttributeError

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-19-e9025f68a561> in <module>
      9 [operation.add_done_callback(my_callback) for operation in operations]
     10 results2 = []
---> 11 for future in as_completed(operations):
     12   results2.append(list(future.result()))

/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py in as_completed(fs, timeout)
    217     fs = set(fs)
    218     total_futures = len(fs)
--> 219     with _AcquireFutures(fs):
    220         finished = set(
    221                 f for f in fs

/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/concurrent/futures/_base.py in __enter__(self)
    144     def __enter__(self):
    145         for future in self.futures:
--> 146             future._condition.acquire()
    147 
    148     def __exit__(self, *args):

AttributeError: 'QueryJob' object has no attribute '_condition'

@dkapitan
Copy link
Author

@tseaver
For now, I have decided to do the following:

import numpy as np
from time import sleep


query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'


def dummy_callback(future):
    global jobs_done
    jobs_done[future.job_id] = True


jobs = [bq.query(query1), bq.query(query2)]
jobs_done = {job.job_id: False for job in jobs}
[job.add_done_callback(dummy_callback) for job in jobs]

# blocking loop to wait for jobs to finish
while not (np.all(list(jobs_done.values()))):
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')

Rather than using as_completed I prefer to use the built-in async functionality from the bigquery jobs themselves. This also makes it possible for me to decompose the datapipeline into separate Cloud Functions, without having to keep the main ThreadPoolExecutor live for the duration of the whole pipeline. Incidentally, this was the reason why I was looking into this: my pipelines are longer than the max timeout of 9 minutes for Cloud Functions (or even 15 minutes for Cloud Run).

Downside is I need to keep track of all the job_ids across the various functions, but that is relatively easy to solve when configuring the pipeline by specifying inputs and outputs such that they form a directed acyclic graph.

Any suggestions are welcome. Would be nice if at some point in the future, Google's api future plays nicely with Python's future (no pun intended).

@tseaver
Copy link
Contributor

tseaver commented Jul 24, 2019

@dkapitan Thanks for the follow-up. I agree that it would be ideal if jobs could be awaited.

You could simplify your example code using a set, e.g.:

from time import sleep

query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""

query2 = 'SELECT 2'

queries = [query_1, query_2]

awaiting_jobs = set()

def callback(future):
    awaiting_jobs.discard(future.job_id)

for query in queries:
    job = bq.query(query)
    awaiting_jobs.add(job.job_id)
    job.add_done_callback(callback)

while awaiting_jobs:
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')

@dkapitan
Copy link
Author

@tseaver
Nice, thanks. Never thought of using a set to do a kind of countdown ... 😄

@northtree
Copy link

northtree commented Aug 22, 2019

@tseaver Thanks for the simplify example. Could you fix two typos?

awaiting_jobs.discard(future.job_id)

job = bq.query(query)

@tseaver
Copy link
Contributor

tseaver commented Aug 27, 2019

@northtree Thanks for catching those: updated.

@sambcom
Copy link

sambcom commented Sep 6, 2019

How do you get the rows from your different queries? Won't this block on job.result() What am I missing?

@dkapitan
Copy link
Author

@sambcom
apologies for the late response; forgot to subscribe to this issue.
In my use case, the results of the query are explicitly written to new tables, as part of an ELT pipeline. So that's no issue.

I understand that generically speaking, you could write query results to (temporary) tables in BigQuery, so this should not be a blocking issue.

@zbarnett
Copy link

I just ran into this problem myself and found out that even though the job itself is "asynchronous" there are actually two places where synchronous I/O is happening.

From the example usage on https://googleapis.dev/python/bigquery/latest/index.html

from google.cloud import bigquery

client = bigquery.Client()

# Perform a query.
QUERY = (
    'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
    'WHERE state = "TX" '
    'LIMIT 100')
query_job = client.query(QUERY)  # API request
rows = query_job.result()  # Waits for query to finish

for row in rows:
    print(row.name)

client.query(QUERY) makes a synchronous network request as does query_job.result()

Usually, blocking on result() takes the majority of the time, but I've seen cases where query() can take over a second to complete by itself.

As a workaround for now, I'm running all the BigQuery data fetching code in a separate thread to free up the event loop but it would be fantastic if the BigQuery API supported async I/O.

Example workaround (code fragment):

def get_data(client, query):
    return client.query(query).result()

loop = asyncio.get_running_loop()
data = await loop.run_in_executor(None, get_data, client, query)

@yan-hic
Copy link

yan-hic commented Nov 12, 2019

We too have hundreds of load and dedupe queries running async. We do use futures but only to submit the jobs at once and get the id's.

From there we poke regularly the Stackdriver logging (using airflow) for the status of all in one API call.
Waiting for blocking result() is inefficient if you only want to know when jobs are done.

I wish get_job() could support multiple jobids but the API doesn't and an enhancement request has stalled.

@tswast
Copy link
Contributor

tswast commented Nov 12, 2019

We've identified EOL for new versions of the BigQuery client library as January 1, 2020 in googleapis/google-cloud-python#9036 We can revisit this request after that date.

@plamut plamut transferred this issue from googleapis/google-cloud-python Feb 4, 2020
@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/python-bigquery API. label Feb 4, 2020
@plamut plamut added Python 3 Only This work would involve supporting Python 3 only code paths. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. labels Feb 4, 2020
@jsigee87
Copy link

Wanted to bump this thread, I am currently trying to write async BigQuery jobs myself and it would be great if the job were awaitable.

@vgrabovets
Copy link

Any news here? You've already dropped support for Python 2.

@yanhong-zhao-ef
Copy link

still waiting here... do you have an approximate timeline for this?

@tswast
Copy link
Contributor

tswast commented Mar 11, 2021

@yanhong-zhao-ef I'd love to have this feature, but it's going to take some design work to get right. In other Google Cloud libraries, asynchronous support is provided by a completely separate "AsyncClient". Since this BigQuery library is handwritten, I'd like to avoid having to maintain two copies of the same methods.

@tswast
Copy link
Contributor

tswast commented Mar 22, 2021

I talked with some folks on Firestore who've implemented this. They suggest a trio of classes: async_batch, base_batch, batch https://github.com/googleapis/python-firestore/tree/master/google/cloud/firestore_v1 in order to avoid too much duplicated effort.

@jarednash0
Copy link

@dkapitan Thanks for the follow-up. I agree that it would be ideal if jobs could be awaited.

You could simplify your example code using a set, e.g.:

from time import sleep

query1 = """
SELECT
  language.name,
  average(language.bytes)
FROM `bigquery-public-data.github_repos.languages` 
, UNNEST(language) AS language
GROUP BY language.name"""

query2 = 'SELECT 2'

queries = [query_1, query_2]

awaiting_jobs = set()

def callback(future):
    awaiting_jobs.discard(future.job_id)

for query in queries:
    job = bq.query(query)
    awaiting_jobs.add(job.job_id)
    job.add_done_callback(callback)

while awaiting_jobs:
    print('waiting for jobs to finish ... sleeping for 1s')
    sleep(1)

print('all jobs done, do your stuff')

I tried implementing this with copy_table, but I added "assert future.done()" as the first line in callback() and found that the assertion was failing. This must mean that the callback is getting executed before the job is actually done. Can you confirm this isn't the intended functionality?

@tswast
Copy link
Contributor

tswast commented Mar 24, 2021

This must mean that the callback is getting executed before the job is actually done. Can you confirm this isn't the intended functionality?

@jarednash0 Definitely not intended. It'd be worth filing a separate issue for this so that we can address it, ideally with the code example that reproduces it.

@theophile-dh
Copy link

Any news on this issue? It would be a very useful feature

@eboddington
Copy link

Would definitely appreciate this as well

@adamserafini
Copy link

Any update from Google on asyncio compatibility for BigQuery lib?

@jasonwillschiu
Copy link

jasonwillschiu commented Feb 10, 2022

This is probably not the best, but right now a simple way to get queries running synchronously is to chain the queries together in one mega(big)query haha.

My clunky example:

Q1 = f"""
  CREATE OR REPLACE TABLE `{project}.{dataset}.{table}`
  AS
  SELECT * EXCEPT({currency_pair}) 
  FROM `{project}.{dataset}.{table}`;
  """ 

Q2 = f"""
  CREATE OR REPLACE TABLE `{project}.{dataset}.{table}`
  AS
  (SELECT *
  FROM `{project}.{dataset}.{table}` tab1
  JOIN (
  SELECT {date_col}, {currency_pair}
  FROM `{project}.currency_conversion.{base_currency}_conversion_table` 
  ) currency
  ON tab1.date=currency.{date_col}
  );
  """

Chained_Query1 = Q1 + Q2
query_job = client.query(Chained_Query1)

@adamserafini
Copy link

AFAICT the above proposal isn't actually making the query awaitable though, is it? we want to run the query asynchronously, not synchronously.

@jasonwillschiu
Copy link

My bad, I thought my data was getting stuffed around because these bigquery processes were asynchronous already

@chalmerlowe chalmerlowe added the wontfix This will not be worked on label Nov 2, 2022
@chalmerlowe
Copy link
Contributor

Gonna close this out as "Will not work", due to conflicting priorities.

@tswast tswast reopened this Dec 19, 2023
@tswast tswast removed the wontfix This will not be worked on label Dec 19, 2023
@tswast tswast assigned tswast and kiraksi and unassigned tswast Dec 19, 2023
@dkapitan
Copy link
Author

+1 for reopening!

@kiraksi
Copy link
Contributor

kiraksi commented Mar 4, 2024

For awareness: This is an internship project and my internship ends in 2 weeks. I plan on implementing an AsyncClient for async_query_and_wait rpc, there has been other rpc's requested to be made asynchronous which I may not have time for. For future maintainers I will document my progress here, but please refer to the internal design doc on this feature which can be found at my internship documentation website go/kirnendra-internship

RPCs to make async:

  • query_and_wait - in progress
  • query
  • get_job
  • get_query_results
  • get_table
  • list_partitions
  • get_partition

@adamserafini
Copy link

adamserafini commented Mar 5, 2024

Amazing @kiraksi, that is awesome.

But,
@google leadership: I find it sad that a 5 year old issue to move the library to properly support asyncio Python has taken this long and treated as a side project for the organisation.

The message I take from it is that the Google leadership simply doesn't care about supporting Python as a first-class citizen of the Google Cloud Developer Experience.

@chalmerlowe
Copy link
Contributor

@adamserafini Thank you for your input.
We will take it into consideration.

@kiraksi
Copy link
Contributor

kiraksi commented Mar 11, 2024

New updates: there are some blocking issues in that in order to make a completely asynchronous RowIterator object for the query and query_and_wait methods, which is a child of HTTPIterator in google-api-core library, I would need an AsyncHTTPIterator, I have optioned this issue here: googleapis/python-api-core#627 and I may make this PR myself. However until then I will be focusing on the get_ methods that won't have this blocking issue. Additionally, there is current work on this but I would like this method exposed: googleapis/google-auth-library-python#613

@Tunneller
Copy link

Any news kiraksi? I guess that your internship has ended. Will the PR still go forward? Can you elaborate more on what we will see? You mention that your solution will not be completely asynchronous?

@tommyhutcheson
Copy link

Hey @tswast @chalmerlowe
Is there an update for this issue and @kiraksi PR #1853 the python / BigQuery communities would love this feature to be available.

@chalmerlowe
Copy link
Contributor

At this moment, this task is on hold. We will revisit it when we have manpower. I am not gonna close it so that it remains on the radar.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. Python 3 Only This work would involve supporting Python 3 only code paths. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet