Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pagination should allow fetching pages in batch of N pages. #112

Closed
yantrikskaran opened this issue May 17, 2020 · 20 comments
Closed

Pagination should allow fetching pages in batch of N pages. #112

yantrikskaran opened this issue May 17, 2020 · 20 comments
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. type: question Request for information or clarification. Not an issue.

Comments

@yantrikskaran
Copy link

Currently I am facing an issue where I have to process big number of records coming as part of query result. The results are available in multiple pages.

When I am trying to process the row from result I notice that it takes around 10 seconds to load next page. I am able to process one page in a sec or 2 and then I have to wait for 10 seconds for next page. It is time consuming.

I cannot load entire result in memory as the response can be more than 10 GB and we start hitting various QUOTA limits.

It there a way i can load batch of 100 pages in memory at once and start my process and in the background next batch of 100 pages are loaded during processing on first batch of pages.

@plamut
Copy link
Contributor

plamut commented May 18, 2020

Just to clarify, which client library is this question about? BigQuery?

@yantrikskaran
Copy link
Author

yantrikskaran commented May 18, 2020

Just to clarify, which client library is this question about? BigQuery?

yes it is regarding Big Query and I am using google.cloud.bigquery.client.

Just add more details - below is the snippet of my code.
    client = bigquery.Client()
    select_query = client.query(query)
    result = select_query.result()
    for row in result:
         print(row)    

After printing around 1000 records I can see next record is has a gap of 10 sec. Looks like each page has around 1000 records and when it is fetching next page it is take time.

LOGS - Check below logs.

2020-05-18 03:33:36,380 MainThread (ProcessId : 7056) INFO create_tfRecords.py:(78) Completed writing to file
2020-05-18 03:33:36,380 MainThread (ProcessId : 7056) INFO util.py:(86) Memory Used By Process : 1000.03 MB
2020-05-18 03:33:47,691 MainThread (ProcessId : 7056) INFO create_tfRecords.py:(150) Start : 157053
2020-05-18 03:33:47,691 MainThread (ProcessId : 7056) INFO create_tfRecords.py:(210) year_week : 201931

@plamut plamut transferred this issue from googleapis/google-cloud-python May 19, 2020
@product-auto-label product-auto-label bot added the api: bigquery Issues related to the googleapis/python-bigquery API. label May 19, 2020
@plamut plamut added type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. type: question Request for information or clarification. Not an issue. and removed type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. labels May 19, 2020
@plamut
Copy link
Contributor

plamut commented May 19, 2020

@yantrikskaran Thanks for the report, I transferred it to the corresponding repository.

client.query() returns a QueryJob instance and the latter's result() method accepts a page_size argument.

Would increasing the page size to N * 1000 help? After all, that's effectively the same as fetching N pages of 1000 rows in a single batch.

@yantrikskaran
Copy link
Author

@plamut Thanks a lot for the response. Let me try page size and update this thread.

One doubt - When I am going to increase the page size. I think it will add delay in loading the next page.

Example -
Suppose I have to process 500 pages, each has 1000 rows. Each row is of x MB.
Currently with default pagination - after initial page, next page is taking 10 sec.
I change the page size to 10*1000 records per page, after initial page, next page, will take more time as it has to load more rows/data at a time?

@plamut
Copy link
Contributor

plamut commented May 19, 2020

I change the page size to 10*1000 records per page, after initial page, next page, will take more time as it has to load more rows/data at a time?

@yantrikskaran If the delay is due to the network bottleneck, then yes, fetching a bigger page will take longer, although the total delay might be lower than individually fetching 10 pages of 1000 records.

Since fetching the data is an I/O operation, you can also parallelize it and start fetching the next page while processing the current page by moving either the processing or fetching the data to its own thread.

You might also have a look at the new (much) faster BigQuery Storage API, which will become a default in one of the next releases. For now, if it's OK to receive data in a pandas DataFrame and you have the required dependencies, you can try processing table data as a stream of DataFrames:

from google.cloud import bigquery_storage_v1
bqstorage_client = bigquery_storage_v1.BigQueryReadClient()

dataframes = client.query(query).to_dataframe_iterable(bqstorage_client=bqstorage_client)
for df in dataframes:
    # process the dataframe

@yantrikskaran
Copy link
Author

@plamut - Yes I tried changing page size, yes it is adding delay for next page.

I had tried using to_dataframe_iterable() also and but I had an issue where I cannot convert the result to a DF as it has a type which is not supported. I don;t want to add more logic to my SQL to create a DF compatible result.

You have mentioned to "Since fetching the data is an I/O operation, you can also parallelize it and start fetching the next page while processing the current page by moving either the processing or fetching the data to its own thread."

Do you have a sample code which can help me to try this parallelization.

@yantrikskaran
Copy link
Author

yantrikskaran commented May 19, 2020

@plamut - I think there some major issue either at library or Big Query level.

My Query is suppose to return 4500 records.

query.result() implementation doesn't help. Still I can see it is loading around 730 records per page.

from google.cloud.bigquery.client import Client
client = bigquery.Client()
select_query = client.query(query)
result = select_query.result(page_size=10000, max_results=10000)

for row in result:
print(row)

I tried table.list_rows. But implementation doesn't help.

from google.cloud.bigquery.client import Client
client = bigquery.Client()
select_query = client.query(query)
result = select_query.result(page_size=10000, max_results=10000)

destination_table = select_query.destination

result = client.list_rows(destination_table, start_index=0, max_results=10000, page_size=10000)

for row in result:
print(row)

It also has same behaviour around 730 records per page.

@plamut
Copy link
Contributor

plamut commented May 20, 2020

@yantrikskaran 4500 records indeed doesn't sound that much, unless rows are excessively large. What's the typical row size in your data and what's the internet connection speed on the machine downloading the data?

Can you maybe also try reproducing the issue by fetching something from one of the public datasets? If yes, I can try fetching the same data and see how it behaves on my local machine.

@yantrikskaran
Copy link
Author

yantrikskaran commented May 20, 2020

@yantrikskaran 4500 records indeed doesn't sound that much, unless rows are excessively large. What's the typical row size in your data and what's the internet connection speed on the machine downloading the data?

Can you maybe also try reproducing the issue by fetching something from one of the public datasets? If yes, I can try fetching the same data and see how it behaves on my local machine.

@plamut Yes number is not high but I am sure the row size is very large. Each row may be around 2 MB or little more. It is a json row which I am constructing as part of my SQL.

Internet speed should not be an issue, I am facing the problem when running on Google Compute Engine.
My local connection speed is 100 mbps.

@plamut
Copy link
Contributor

plamut commented May 20, 2020

@yantrikskaran I took a closer look at this and the following seems to be happening:

QueryJob.result() returns a RowIterator instance, a subclass of the Iterator form api_core that implements the iteration logic.

When iterating over it, the first page is fetched and iterated over, yielding individual rows. The next page is only requested when all the rows of the current page have been yielded, causing the delay you are observing.

This can also be reproduced with much smaller result sets and small page sizes:

TABLE_ID = "bigquery-public-data.stackoverflow.tags"
PAGE_SIZE = 5
MAX_RESULTS = 100
ROW_PROCESSING_TIME = 0.1

query = f"""
    SELECT id, REVERSE(tag_name) as tag_name, count, excerpt_post_id, wiki_post_id
    FROM `{TABLE_ID}`
"""

client = bigquery.Client()

logging.info("START")

query_job = client.query(query)
row_iterator = query_job.result(page_size=PAGE_SIZE, max_results=MAX_RESULTS)

for row in row_iterator:
    logging.info("Processing row: %s", row)
    time.sleep(ROW_PROCESSING_TIME)

logging.info("DONE.")

I customized the log output format and added a few extra log messages to the iterator to track page fetching, and this was the output:

INFO     [2020-05-20 11:51:14,856] MainThread    [root] [issue_112.py:35][iterate_rows] START
INFO     [2020-05-20 11:51:16,586] MainThread    [google.api_core.page_iterator] [page_iterator.py:249][_page_iter]  Fetching first page...
INFO     [2020-05-20 11:51:16,892] MainThread    [google.api_core.page_iterator] [page_iterator.py:251][_page_iter] ... fetched first page <google.api_core.page_iterator.Page object at 0x7f21a4bcb208>.
INFO     [2020-05-20 11:51:16,892] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((1255, 'aiw', 256, 8843419, 8843418), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 11:51:16,993] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((26602, 'birtnoc-tna', 256, 8924398, 8924397), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 11:51:17,094] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((35050, 'gnidaolerp', 256, 41349882, 41349881), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 11:51:17,194] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((37223, 'ecapselbat', 256, 20811171, 20811170), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 11:51:17,295] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((39137, 'edulcni-php', 256, 10065834, 10065833), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 11:51:17,396] MainThread    [google.api_core.page_iterator] [page_iterator.py:257][_page_iter] Fetching next page...
INFO     [2020-05-20 11:51:17,712] MainThread    [google.api_core.page_iterator] [page_iterator.py:259][_page_iter] ... fetched next page <google.api_core.page_iterator.Page object at 0x7f21a4bf2a90>.
INFO     [2020-05-20 11:51:17,712] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((44801, 'revres-neg', 256, 10314429, 10314428), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 11:51:17,812] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((46127, 'msaf', 256, 5845134, 5845133), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 11:51:17,913] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((46246, 'enigneyeknomj', 256, 7040300, 7040299), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 11:51:18,014] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((46708, 'xum', 256, 47876370, 47876369), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 11:51:18,114] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((56649, 'noitamotua-iu-tfosorcim', 256, 8706533, 8706532), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 11:51:18,215] MainThread    [google.api_core.page_iterator] [page_iterator.py:257][_page_iter] Fetching next page...
INFO     [2020-05-20 11:51:19,146] MainThread    [google.api_core.page_iterator] [page_iterator.py:259][_page_iter] ... fetched next page <google.api_core.page_iterator.Page object at 0x7f21a4bf25c0>.
INFO     [2020-05-20 11:51:19,147] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((59146, 'stroperelcaro', 256, 7732834, 7732833), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 11:51:19,247] MainThread    [root] [issue_112.py:41][iterate_rows] Processing row: Row((59415, 'reweiv-fdp', 256, 31914620, 31914619), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
...
INFO     [2020-05-20 11:51:38,697] MainThread    [google.api_core.page_iterator] [page_iterator.py:257][_page_iter] Fetching next page...
INFO     [2020-05-20 11:51:38,697] MainThread    [google.api_core.page_iterator] [page_iterator.py:259][_page_iter] ... fetched next page None.
INFO     [2020-05-20 11:51:38,698] MainThread    [root] [issue_112.py:44][iterate_rows] DONE.

It's clear that the processing gets blocked every time a new result page needs to be fetched, which takes around 0.3 - 1.0 seconds on my machine. This delay is quite significant when considering the fact that processing a single page of 5 rows takes around half a second in this setup.

The entire script runs for well over 20 seconds and it gets even worse if page size is further reduced due to the overhead of fetching each page.

The solution to this is to increase the page size closer to what the memory and various quota limits allow, and to fetch data in parallel with its processing. I will come up with a sketch of this idea shortly.

@plamut
Copy link
Contributor

plamut commented May 20, 2020

@yantrikskaran
Here's the idea - data is fetched in a background thread, and it's fetched by iterating over result pages to have control over when the next page is requested. Every fetched page is put into out_queue where it is grabbed by the main thread processing the rows.

At the beginning at before the processing each new page starts, an item is put into in_queue. This is a signal to the data fetching thread to start fetching the next page in the background. That queue can also be used to shutdown the thread by putting None into it.

def fetch_data(in_queue, out_queue):
    query_job = client.query(query)
    row_iterator = query_job.result(page_size=PAGE_SIZE, max_results=MAX_RESULTS)

    page_iterator = row_iterator.pages

    while True:
        item = in_queue.get()
        if item is None:
            logging.info("Poison pill, aborting")
            break

        logging.info("Requesting next page")
        try:
            page = next(page_iterator)
        except Exception as exc:
            logging.info("Error getting next page: %s", repr(exc))
            out_queue.put(None)
            break
        else:
            logging.info("Got next page, putting in out queue: %s", page)
            out_queue.put(page)


def process_parallel():
    logging.info("START")

    in_queue = queue.Queue()
    out_queue = queue.Queue()

    fetcher_thread = threading.Thread(
        target=fetch_data,
        args=(in_queue, out_queue),
        name="Thread-Fetcher",
        daemon=True,
    )
    fetcher_thread.start()

    in_queue.put("fetch it!")

    while True:
        page = out_queue.get()
        if page is None:
            in_queue.put(None)
            break

        in_queue.put("fetch it!")
        for row in page:
            logging.info("Processing row: %s", row)
            time.sleep(ROW_PROCESSING_TIME)

    fetcher_thread.join(timeout=1.0)
    if fetcher_thread.is_alive():
        logging.warning(f"{fetcher_thread.name} did not terminate.")

    logging.info("DONE")


 process_parallel()

Running this version of the script with the same parameters (PAGE_SIZE, etc.) reduces the total time to sub 20 seconds on my machine, and the logs show that processing and fetching are indeed done in parallel:

...
INFO     [2020-05-20 12:31:13,254] MainThread    [root] [issue_112.py:112][process_parallel] Processing row: Row((87248, 'tikloot-gc', 1, 14102253, 14102252), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 12:31:13,254] Thread-Fetcher [root] [issue_112.py:76][fetch_data] Requesting next page
INFO     [2020-05-20 12:31:13,254] Thread-Fetcher [google.api_core.page_iterator] [page_iterator.py:257][_page_iter] Fetching next page...
INFO     [2020-05-20 12:31:13,354] MainThread    [root] [issue_112.py:112][process_parallel] Processing row: Row((87277, 'ypocoidua-ipam', 1, 14117646, 14117645), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 12:31:13,455] MainThread    [root] [issue_112.py:112][process_parallel] Processing row: Row((87278, 'etsapoidua-ipam', 1, 14117652, 14117651), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 12:31:13,556] MainThread    [root] [issue_112.py:112][process_parallel] Processing row: Row((87904, 'zyr', 1, 14427246, 14427245), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 12:31:13,560] Thread-Fetcher [google.api_core.page_iterator] [page_iterator.py:259][_page_iter] ... fetched next page <google.api_core.page_iterator.Page object at 0x7f51a1375668>.
INFO     [2020-05-20 12:31:13,560] Thread-Fetcher [root] [issue_112.py:84][fetch_data] Got next page, putting in out queue: <google.api_core.page_iterator.Page object at 0x7f51a1375668>
INFO     [2020-05-20 12:31:13,656] MainThread    [root] [issue_112.py:112][process_parallel] Processing row: Row((87955, 'sjemarf', 1, 14450032, 14450031), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 12:31:13,757] MainThread    [root] [issue_112.py:112][process_parallel] Processing row: Row((87989, 'desacpot', 1, 14497586, 14497585), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
INFO     [2020-05-20 12:31:13,757] Thread-Fetcher [root] [issue_112.py:76][fetch_data] Requesting next page
INFO     [2020-05-20 12:31:13,757] Thread-Fetcher [google.api_core.page_iterator] [page_iterator.py:257][_page_iter] Fetching next page...
INFO     [2020-05-20 12:31:13,857] MainThread    [root] [issue_112.py:112][process_parallel] Processing row: Row((88060, 'stneve-noidirt', 1, 14520018, 14520017), {'id': 0, 'tag_name': 1, 'count': 2, 'excerpt_post_id': 3, 'wiki_post_id': 4})
...

The actual implementation would likely require more advances error handling and/or more sophisticated logic for deciding when to start page transfers, but you should get the idea. Or give the BQ Storage API a shot, if that's feasible.

@yantrikskaran
Copy link
Author

@plamut Thanks a lot for confirming the issue.
Yes I am trying to implement the same logic where multiple thread fetch he data from result and push to queue and main thread consume via queue.

Thanks for sharing the snippet to implement the same.

Yes, I am trying to use BQ Storage API as well. Hope that solves the issue.

Can we implement fetching pages in the background as part of the library? So that it was taken care.
Looks like BQ Storage API does this?

@emkornfield
Copy link

I had tried using to_dataframe_iterable() also and but I had an issue where I cannot convert the result to a DF as it has a type which is not supported.

@yantrikskaran would it be possible to provide more details on which type isn't supported? A recent release of pyarrow (0.17.0) fixed some conversion issues.

@yantrikskaran
Copy link
Author

I had tried using to_dataframe_iterable() also and but I had an issue where I cannot convert the result to a DF as it has a type which is not supported.

@yantrikskaran would it be possible to provide more details on which type isn't supported? A recent release of pyarrow (0.17.0) fixed some conversion issues.

My SQL is having column aggregated as ARRAY_AGG and STRUCT. When I am trying to use get the output as dataframe I am getting some error.

Will share the actual error and data in a day or two.

@plamut
Copy link
Contributor

plamut commented May 22, 2020

Can we implement fetching pages in the background as part of the library? So that it was taken care.
Looks like BQ Storage API does this?

@yantrikskaran The REST APi is generally more suitable for small to medium result sets. While it would be possible to implement automatic pre-fetching pages in the background, it's really a problem that is already better addressed by the BQ Storage API. The latter streams the rows to the client and also supports multiple streams that can be read concurrently.

Generally, it should take take too much code for somebody to implement pre-fetching the REST pages on their own, if desired, but since it's probably not a typical use case, I don't think it will be added to the client library.

@yantrikskaran
Copy link
Author

Can we implement fetching pages in the background as part of the library? So that it was taken care.
Looks like BQ Storage API does this?

@yantrikskaran The REST APi is generally more suitable for small to medium result sets. While it would be possible to implement automatic pre-fetching pages in the background, it's really a problem that is already better addressed by the BQ Storage API. The latter streams the rows to the client and also supports multiple streams that can be read concurrently.

Generally, it should take take too much code for somebody to implement pre-fetching the REST pages on their own, if desired, but since it's probably not a typical use case, I don't think it will be added to the client library.

@plamut yes, It make sense. Thanks a lot..!

@plamut
Copy link
Contributor

plamut commented Jun 2, 2020

Since the original issue has been analyzed and two decent (IMHO) solutions proposed, I will now close this.

@yantrikskaran Feel free to re-open if you think more information is needed. As for the issue with some data types, I suggest opening a separate issue, if necessary, since that is not directly related to result pagination.

@mrn-aglic
Copy link

mrn-aglic commented Apr 17, 2023

If I may ask when running:

job = client.query(query)
res = job.result()
for row in res:
     yield dict(row)

Let's say the query is SELECT * FROM table.

Is the entire data from the table loaded into memory?

@Tunneller
Copy link

@plamut I tried implementing something similar to your post on background fetches and it didnt work. Each background fetch gets a page worth of data, but it is full of duplicates, so end up with massive data loss. I was using start_index=K*Page_size for each thread.

@plamut
Copy link
Contributor

plamut commented Apr 8, 2024

@Tunneller If you are sure that each fetch requested a different (sub)set of result, I suggest opening new issue describing a possible problem with start_index.

Please also note that I am no longer a maintainer of this library (since early 2022), thus I am not up to date with any internal changes that have been made to the library in the last two years or so.

I do hope that you will get to the bottom of this!

@plamut plamut removed their assignment Apr 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquery Issues related to the googleapis/python-bigquery API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

No branches or pull requests

5 participants