Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial read_gbq support #4

Merged
merged 46 commits into from Sep 23, 2021
Merged

Initial read_gbq support #4

merged 46 commits into from Sep 23, 2021

Conversation

ncclementi
Copy link
Contributor

@ncclementi ncclementi commented Aug 19, 2021

Attempt to implement HLG for read_gbq when partition_field=None still having problems due to the format of the function bigquery_arrow_read but I'm not sure how to solve it.

[FIXED] 3070ae3 and b43daf6
Currently, this implementation fails when running the test_read_gbq with the following error:

FAILED test_core.py::test_read_gbq - NotImplementedError: Please specify either `partition_field` or `stream_name`.

I also tried specifying partition_field=None when creating the layer, like

layer = DataFrameIOLayer(
                output_name,
                meta.columns,
                [stream.name for stream in session.streams],
                bigquery_arrow_read(
                    make_create_read_session_request=make_create_read_session_request,
                    project_id=project_id,
                    timeout=read_timeout,
                    partition_field=None,
                ),
                label=label,
            )

but this still gives

FAILED test_core.py::test_read_gbq - NotImplementedError: Please specify either `partition_field` or `stream_name`.

cc: @jrbourbeau @bnaul @mrocklin for visibility

@ncclementi
Copy link
Contributor Author

ncclementi commented Aug 20, 2021

Thank you @jrbourbeau for the tweaks, I took this for a spin and things work. I read a copy of the table bigquery-public-data.covid19_public_forecasts.county_14d which for reference has 132888 rows × 20 columns and is a partitioned table on BigQuery.

ddf = read_gbq(
        project_id="dask-bigquery",
        dataset_id="covid19_public_forecasts",
        table_id="county_14d",
    )
%%time
ddf.compute()
CPU times: user 641 ms, sys: 90.1 ms, total: 731 ms
Wall time: 5.86 s

The task stream looks like:
Screen Shot 2021-08-20 at 5 29 58 PM

I also experimented with partitions and partition_fields and this works, but is a bit slower for the case I was trying, compared to reading the whole data set. For example:

partitions = ["Teton", "Loudoun"]

ddf = read_gbq(
        project_id="dask-bigquery",
        dataset_id="covid19_public_forecasts",
        table_id="county_14d",
        partition_field="county_name",
        partitions=partitions,
        fields=[],
    )
%%time
ddf.compute()
CPU times: user 1.11 s, sys: 102 ms, total: 1.22 s
Wall time: 10.5 s

Which has only two partitions (one per county_name passed in partitions)

>>> ddf.map_partitions(len).compute()
0     42
1    126
dtype: int64

and results in this dataframe when computed.

Screen Shot 2021-08-20 at 5 38 25 PM

Which has as index the entries passed in partitions, I'm not sure this the is a behavior we want. I'd be curious what @bnaul had in mind with this approach.

One last thing I tested was using partitions, partition_fields and fields with data that pushed with pandas, and this functionality works too, what makes me think that we can modify the test_read_gbq_partitioning to use data that we pushed with pandas and not rely on a copy of a public dataset. But we might want to wait for this until we decide if the behavior shown in the dataframe screenshot is what we are looking for.

@ncclementi ncclementi changed the title HLG read_gbq WIP (test_read_gbq failing) HLG read_gbq Aug 20, 2021
"""

bq_storage_client = None
bq_client = bigquery.Client(project_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting with version 2.11.0 of bigquery we can use bigquery.Client as a context manager. Given that there have been many bigquery releases since then (the latest release is 2.26.0) I think it's safe to use 2.11.0 as a minimum support version

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, now we do have a context manager for the bigquery client but not for the bigquery_storage_client. Would you suggest using at least the one we have? Something like

bq_storage_client = None
with bigquery.Client(project_id) as bq_client:
    try:
        if with_storage_api:
            bq_storage_client = bigquery_storage.BigQueryReadClient(
                credentials=bq_client._credentials
            )
            yield bq_client, bq_storage_client
        else:
            yield bq_client

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, that's the right direction. Though since bq_client.close() is all that was being called in the finally block before, we can remove the try/finally blocks since bq_client.close() will be called when we exit the scope of the bq_client context manager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have to include the closing of the storage client, maybe we should keep the try/finally but on the finally have the bigquery_storage.transport.channel.close()

else:
yield bq_client
finally:
bq_client.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to close bq_storage_client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand the bq storage client doesn't have a .close() method. But there is a work around explained in this comment googleapis/gapic-generator-python#575 (comment) , and there are some active discussions about this here googleapis/gapic-generator-python#987

Probably we can now get away by doing bqs.transport.channel.close() as recommended here dask/dask#3121 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for investigating. What you proposed sounds good



@contextmanager
def bigquery_client(project_id=None, with_storage_api=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears we only every use this context manager with with_storage_api=True. If this is the case (I could be missing something), I'll suggest we remove with_storage_api as an option and just always yield both a bigquery.Client and a bigquery_storage.BigQueryReadClient

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For what I see every time we use the created context manager we use it with with_storage_api=True which enables the use of the storage API. My understanding is that we want to use this based on these two comments dask/dask#3121 (comment) and dask/dask#3121 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I'm suggesting that since we never use with_storage_api=False, we remove it as an option from our custom bigquery_client context manager. We can always add it back in the future if needed, but right now it's just an unnecessary keyword argument (since we always call it with with_storage_api=True)

def make_create_read_session_request(row_filter=""):
return bigquery_storage.types.CreateReadSessionRequest(
max_stream_count=100, # 0 -> use as many streams as BQ Storage will provide
parent=f"projects/{project_id}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should projects be hard coded here?

Nvm, looking at the docstring for CreateReadSessionRequest, it appears the answer is "yes, projects should be hardcoded"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requirements.txt Outdated Show resolved Hide resolved
Comment on lines +188 to +190
session = bqs_client.create_read_session(
make_create_read_session_request(row_filter=row_filter)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does session have a close method, or some other cleanup method, we should call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dask_bigquery/core.py Outdated Show resolved Hide resolved
dask_bigquery/core.py Outdated Show resolved Hide resolved
Comment on lines 141 to 142
partition_field: to specify filters of form "WHERE {partition_field} = ..."
partitions: all values to select of `partition_field`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifying the index and specific value of the index in this way seems unusual to me (at least compared to other Dask I/O functions). @bnaul I'm wondering if this API will be familiar with Bigquery users, or if these options were useful for a specific use case when this prototype was initially developed?

dask_bigquery/tests/test_core.py Outdated Show resolved Hide resolved
]


def bigquery_read(
make_create_read_session_request: callable,
project_id: str,
timeout: int,
read_kwargs: int,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrbourbeau I was going over the docs and realized that this still shows as an int. Shouldn't it go as a keyword argument at the end? read_kwargs: dict=None, and I wonder where should the * go, right before it? Like

def bigquery_read(
    make_create_read_session_request: callable,
    project_id: str,
    timeout: int,
    stream_name: str, 
    *, 
    read_kwargs: dict=None)

If this is correct I can modify it. and we should probably add a test that this works, although I'm not sure what's the easist to test for these kwargs, any ideas?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since bigquery_read is only ever called internally in read_gbq I don't think it matters whether or not read_kwargs is a positional or keyword argument to bigquery_read. Though you bring up a good point that the type annotation is now incorrect and should be updated to dict instead of int

read_session=bigquery_storage.types.ReadSession(
data_format=bigquery_storage.types.DataFormat.ARROW,
read_options=bigquery_storage.types.ReadSession.TableReadOptions(
row_restriction=row_filter,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrbourbeau I'm not quite sure if this works if we don't use the default, should we remove this, since we removed the partition field options. We seem to always leave it as ""
Here is some documentation to review https://googleapis.dev/python/bigquerystorage/latest/bigquery_storage_v1beta2/types.html#google.cloud.bigquery_storage_v1beta2.types.ReadSession.TableReadOptions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

row_filter is quite useful as it will perform the filtering server-side and can avoid a lot of extraneous IO. I don't see any reason to remove it, but probably it should be made into just a more generic TableReadOptions object like you linked to so that it can be used for column selection as well

one other small note: the doc you linked is for the beta API, v1 was released since our original implementation https://googleapis.dev/python/bigquerystorage/latest/bigquery_storage_v1/types.html#google.cloud.bigquery_storage_v1.types.ReadSession.TableReadOptions. there are a couple of other references to the beta docs throughout as well

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @ncclementi! I left a few small comments, but after those are addressed I think this PR is at a good place to merge in as a checkpoint. @bnaul @tswast have left some nice comments that we can spin out into separate issues and address in follow-up PRs.

dask_bigquery/tests/test_core.py Outdated Show resolved Hide resolved
dask_bigquery/tests/test_core.py Outdated Show resolved Hide resolved
dask_bigquery/tests/test_core.py Outdated Show resolved Hide resolved
dask_bigquery/tests/test_core.py Outdated Show resolved Hide resolved
dask_bigquery/tests/test_core.py Outdated Show resolved Hide resolved
read_kwargs={"timeout": 1e-12},
)

with pytest.raises(Exception, match="504 Deadline Exceeded"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice use of match=! I'm curious if we can catch a more specific exception here (e.g. ValueError)? What type of error is raised when a timeout occurs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the traceback of the error I got, it's an actual Exception, do you think we can write this better?

distributed.worker - WARNING - Compute Failed
Function:  subgraph_callable-4f693566-2e95-419b-8783-b36b04b3
args:      ('projects/dask-bigquery/locations/us/sessions/CAISDFRjM3NzSmxoZm1BMxoCanEaAmpk/streams/GgJqcRoCamQoAg')
kwargs:    {}
Exception: Exception('504 Deadline Exceeded')

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
/var/folders/1y/ydztfpnd11b6qmvbb8_x56jh0000gn/T/ipykernel_51820/2349744101.py in <module>
----> 1 test_kwargs.compute()

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    284         dask.base.compute
    285         """
--> 286         (result,) = compute(self, traverse=False, **kwargs)
    287         return result
    288 

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2669                     should_rejoin = False
   2670             try:
-> 2671                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2672             finally:
   2673                 for f in futures.values():

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1946             else:
   1947                 local_worker = None
-> 1948             return self.sync(
   1949                 self._gather,
   1950                 futures,

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    843             return future
    844         else:
--> 845             return sync(
    846                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    847             )

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    323     if error[0]:
    324         typ, exc, tb = error[0]
--> 325         raise exc.with_traceback(tb)
    326     else:
    327         return result[0]

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/utils.py in f()
    306             if callback_timeout is not None:
    307                 future = asyncio.wait_for(future, callback_timeout)
--> 308             result[0] = yield future
    309         except Exception:
    310             error[0] = sys.exc_info()

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1811                             exc = CancelledError(key)
   1812                         else:
-> 1813                             raise exception.with_traceback(traceback)
   1814                         raise exc
   1815                     if errors == "skip":

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/optimization.py in __call__()
    967         if not len(args) == len(self.inkeys):
    968             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    970 
    971     def __reduce__(self):

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/core.py in get()
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/core.py in _execute_task()
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/Documents/git/dask-bigquery/dask_bigquery/core.py in bigquery_read()
     65             pyarrow.py_buffer(session.arrow_schema.serialized_schema)
     66         )
---> 67         shards = _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs)
     68         # NOTE: BQ Storage API can return empty streams
     69         if len(shards) == 0:

~/Documents/git/dask-bigquery/dask_bigquery/core.py in _stream_to_dfs()
     35             schema,
     36         ).to_pandas()
---> 37         for message in bqs_client.read_rows(name=stream_name, offset=0, **read_kwargs)
     38     ]
     39 

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/client.py in read_rows()
    122         """
    123         gapic_client = super(BigQueryReadClient, self)
--> 124         stream = gapic_client.read_rows(
    125             read_stream=name,
    126             offset=offset,

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/cloud/bigquery_storage_v1/services/big_query_read/client.py in read_rows()
    596 
    597         # Send the request.
--> 598         response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
    599 
    600         # Done; return the response.

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/gapic_v1/method.py in __call__()
    143             kwargs["metadata"] = metadata
    144 
--> 145         return wrapped_func(*args, **kwargs)
    146 
    147 

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/retry.py in retry_wrapped_func()
    284                 self._initial, self._maximum, multiplier=self._multiplier
    285             )
--> 286             return retry_target(
    287                 target,
    288                 self._predicate,

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/retry.py in retry_target()
    187     for sleep in sleep_generator:
    188         try:
--> 189             return target()
    190 
    191         # pylint: disable=broad-except

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/timeout.py in func_with_timeout()
    100             """Wrapped function that adds timeout."""
    101             kwargs["timeout"] = self._timeout
--> 102             return func(*args, **kwargs)
    103 
    104         return func_with_timeout

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/google/api_core/grpc_helpers.py in error_remapped_callable()
    162             )
    163         except grpc.RpcError as exc:
--> 164             six.raise_from(exceptions.from_grpc_error(exc), exc)
    165 
    166     return error_remapped_callable

~/mambaforge/envs/test_gbq/lib/python3.8/site-packages/six.py in raise_from()

Exception: 504 Deadline Exceeded

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Since an Exception is being raised, I don't think we can do any better than what you've already got here

dask_bigquery/tests/test_core.py Outdated Show resolved Hide resolved
@jrbourbeau jrbourbeau changed the title HLG read_gbq Initial read_gbq support Sep 21, 2021
ncclementi and others added 6 commits September 21, 2021 19:30
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
See googleapis/google-cloud-python#9457
and googleapis/gapic-generator-python#575 for reference.
"""
with bigquery.Client(project_id) as bq_client:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't have to be this PR, but it would be really helpful if we could attribute these requests to Dask/Dask-BigQuery. #6



@contextmanager
def bigquery_client(project_id=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that this always returns both BQ and BQS, should it be called bigquery_clients? 🙂

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work here @ncclementi!

Also, thank you @bnaul @tswast for reviewing -- your feedback is much appreciated

@jrbourbeau jrbourbeau merged commit 149d2c0 into main Sep 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants