Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial read_gbq implementation (WIP) #1

Closed
wants to merge 16 commits into from
Closed

Initial read_gbq implementation (WIP) #1

wants to merge 16 commits into from

Conversation

ncclementi
Copy link
Contributor

This PR is an implementation of read_gbq. Based on @bnaul and this gist and the comments on dask/dask#3121

There are a couple of things that I'm not sure where they come from, like where the functions full_id and execute_query come from.

            query = f"SELECT * FROM `{full_id(table_ref)}`"
            table_ref, _, _ = execute_query(query)

There are currently no tests, Do we have a suggestion on how to go about testing and dealing with credentials?

Currently, this implementation will read only in one partition unless the data on BigQuery is partitioned in which case it reads those number of partitions.

One thing I noticed is that the last partition in some cases have only 1 element. For example, if I read a copy of the public dataset I have on my project, as

test_read = gbq_as_dask_df(
                project_id="dask-bigquery",
                dataset_id="covid19_public_forecasts",
                table_id="county_14d",)

I get 43 partitions out of which 41 partitions have 3164 elements, and the last two partitions have 3163 and 1 element respectively.

Any inputs, comment are appreciated.

logging.warning(
"Materializing view in order to read into dask. This may be expensive."
)
query = f"SELECT * FROM `{full_id(table_ref)}`"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just a shortcut we use

def full_id(table):
    return f"{table.project}.{table.dataset_id}.{table.table_id}"

your call whether to inline it or add the helper

also same comment re: this view behavior, not sure whether it'd be safer here to just raise instead of materializing



@contextmanager
def bigquery_client(project_id="dask-bigquery", with_storage_api=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe link to googleapis/google-cloud-python#9457 and/or googleapis/gapic-generator-python#575 for context (no pun intended)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also re project_id, you probably ought make it default to None and infer from the global context, don't remember off the top of my head where you grab that from

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe link to googleapis/google-cloud-python#9457 and/or googleapis/gapic-generator-python#575 for context (no pun intended)

Do you mean add them as a comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, just bc eventually there will probably be one upstream that could be used but right now there's not


@contextmanager
def bigquery_client(project_id="dask-bigquery", with_storage_api=False):
# Ignore google auth credentials warning
Copy link
Contributor

@bnaul bnaul Aug 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably delete this filterwarnings (although the warning is super annoying 🙃)

if (partition_field is not None) and fields and (partition_field not in fields):
fields = (partition_field, *fields)

# These read tasks seems to cause deadlocks (or at least long stuck workers out of touch with
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this annotate is maybe a bad idea, would be nice to have @jrbourbeau or someone weigh in; note that we observed this behavior with now-fairly old dask and bigquery_storage/pyarrow versions so I have no idea if it's still relevant

"Materializing view in order to read into dask. This may be expensive."
)
query = f"SELECT * FROM `{full_id(table_ref)}`"
table_ref, _, _ = execute_query(query)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bnaul what about the execute_query function, is this also a shortcut?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yes, it's just a wrapper for bigquery.Client.query() that saves the result to a temporary table. but it needs a temporary dataset to store things in which not everyone would have configured, so again maybe it's better to just raise for VIEWs instead

Copy link
Contributor Author

@ncclementi ncclementi Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, if we remove this part I'm assuming it'll raise an error on itself when we have a "VIEW" case, or is there a need to do a custom raise like:

if table_ref.table_type == "VIEW":
   raise TypeError('Table type VIEW not supported')

"Specified `partition_field` without `partitions`; reading full table."
)
partitions = pd.read_gbq(
f"SELECT DISTINCT {partition_field} FROM {dataset_id}.{table_id}",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will do a complete scan of the table. Maybe consider using something like return [p for p in bq_client.list_partitions(f'{dataset_id}.{table_id}') if p != '__NULL__']

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shaayohn thanks for the comment, I got a bit confused by the return do you mean replacing this:

partitions = pd.read_gbq(
                    f"SELECT DISTINCT {partition_field} FROM {dataset_id}.{table_id}",
                    project_id=project_id,
                )[partition_field].tolist()

For

partitions = [p for p in bq_client.list_partitions(f'{dataset_id}.{table_id}') if p != '__NULL__']

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, that's exactly what I meant! Silly copypasta :)

"Specified `partition_field` without `partitions`; reading full table."
)
partitions = pd.read_gbq(
f"SELECT DISTINCT {partition_field} FROM {dataset_id}.{table_id}",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shaayohn thanks for the comment, I got a bit confused by the return do you mean replacing this:

partitions = pd.read_gbq(
                    f"SELECT DISTINCT {partition_field} FROM {dataset_id}.{table_id}",
                    project_id=project_id,
                )[partition_field].tolist()

For

partitions = [p for p in bq_client.list_partitions(f'{dataset_id}.{table_id}') if p != '__NULL__']

# TODO generalize to ranges (as opposed to discrete values)

partitions = sorted(partitions)
delayed_kwargs["divisions"] = (*partitions, partitions[-1])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bnaul I noticed in the example I run, that this line causes to have the last partition to contain only 1 element, but that element could have fit into the previous to last partition. What is the reason you separate the last partition?

Copy link
Contributor

@bnaul bnaul Aug 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why it's not working correctly for you but the idea is that you need n+1 divisions for n partitions. seems to work OK here

import dask.dataframe as dd
from dask import delayed

@delayed
def make_df(d):
    return pd.DataFrame({"date": d, "x": np.random.random(10)}).set_index("date")

dates = pd.date_range("2020-01-01", "2020-01-08")
ddf = dd.from_delayed([make_df(d) for d in dates], divisions=[*dates, dates[-1]])

ddf
Out[61]:
Dask DataFrame Structure:
                     x
npartitions=8
2020-01-01     float64
2020-01-02         ...
...                ...
2020-01-08         ...
2020-01-08         ...
Dask Name: from-delayed, 16 tasks


ddf.map_partitions(len).compute()
Out[62]:
0    10
1    10
2    10
3    10
4    10
5    10
6    10
7    10
dtype: int64

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's related to how the data is originally partitioned. For example when I read one of the tables of the covid public data set that I copied on "my_project" I see this

from dask_bigquery import read_gbq

ddf= read_gbq(
                project_id="my_project",
                dataset_id="covid19_public_forecasts",
                table_id="county_14d",)

ddf.map_partitions(len).compute()

Notice the last two partitions...

0     3164
1     3164
2     3164
3     3164
4     3164
5     3164
6     3164
7     3164
8     3164
9     3164
10    3164
11    3164
12    3164
13    3164
14    3164
15    3164
16    3164
17    3164
18    3164
19    3164
20    3164
21    3164
22    3164
23    3164
24    3164
25    3164
26    3164
27    3164
28    3164
29    3164
30    3164
31    3164
32    3164
33    3164
34    3164
35    3164
36    3164
37    3164
38    3164
39    3164
40    3164
41    3163
42       1
dtype: int64

@ncclementi
Copy link
Contributor Author

Added some tests, having issues with the linting. It's because of a flake warning, it complaining about this bare except. I wasn't sure what kind of error to put, I just need to pass, but any suggestions are welcomed.

except: # if data doesn't exisit continue is that Value Error?
pass

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments!

Comment on lines 45 to 57
try:
# delete data set if exists
bq_client = bigquery.Client()
bq_client.delete_dataset(
dataset="dask-bigquery.dataset_test",
delete_contents=True,
)
bq_client.close()
except: # if data doesn't exisit continue is that Value Error?
pass
# create data
df = push_data()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like the kind of stuff that might be good as a fixture.

@pytest.fixture
def dataset():
    bq = bigquery.Client()
    ...
    push_data()
    yield dask-bigquery.dataset_test
    bq.delete_dataset(...)
def test_read_gbq(client, dataset):
    project_id, dataset_id = dataset.split(".")
    ddf = read_gbq(dataset_id=dataset_id, project_id=project_id, ...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look and see if we can use fixtures, but maybe not with the delete part. The purpose of the try/except here is to delete the data set if it exists before pushing the data. If I delete the dataset right after pushing it then I won't be able to read it,

@pytest.fixture
def dataset():
    bq = bigquery.Client()
    ...
    push_data()
    yield dask-bigquery.dataset_test
    bq.delete_dataset(...)

Maybe I'm missing something, but from this snippet I will be yielding the name of location of the dataset and then deleting it, by the time I get to read_gbq the data would be removed, and there would be nothing to read.
That being said, there might be a way of still using a fixture of some form.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test runs during the yield. So this test says "set up a dataset, then give the path of that dataset to the test, let the test do its thing, and then once the test is done clean up the dataset"

If you're concerned that the dataset might exist beforehand then you could add it to the part of the fixture before the yield as well.


assert ddf.columns.tolist() == ["name", "number", "idx"]
assert len(ddf) == 10
assert ddf.npartitions == 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we verify that the data is actually the same as the data created in push_data instead? Pushing the fixture idea a little further

@pytest.fixture
def df():
    ...

@pytest.fixture
def dataset(df):
    ...

def test_read_gbq(client, dataset, df):
    ddf = read_gbq(...)

    assert_eq(ddf, df)

Maybe there are sorting things that get in the way (is GBQ ordered?) If so then, as you did before

assert_eq(ddf.set_index("idx"), df.set_index("idx"))

In general we want to use assert_eq if possible. It runs lots of cleanliness checks on the Dask collection, graph, metadata, and so on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I get some order issues when reading back from gbq, mainly because when I read back the default index goes from 0 to chunksize-1, where chunksize was chosen when I pushed the pandas dataframe. This was part of the reason I had as an extra column "idx".

But thanks for pointing out the assert_eq I forgot we had that. Although, in order for that line to work I had to do a compute on the dask dataframe. I'm assuming this is because I'm comparing a dask data frame with a pandas one.?

assert_eq(ddf.set_index("idx").compute(), df.set_index("idx"))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert_eq should handle comparing Dask and pandas DataFrames. What error do you get without the compute()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get this:

____________________________________________________________________ test_read_gbq _____________________________________________________________________

df =      name  number  idx
0   betty      71    0
1    fred      36    1
2   wilma      75    2
3   betty      13    3
4  ...   4
5    fred      74    5
6   wilma      69    6
7    fred      31    7
8  barney      31    8
9   betty      97    9
dataset = 'dask-bigquery.dataset_test.table_test', client = <Client: 'tcp://127.0.0.1:55212' processes=2 threads=2, memory=32.00 GiB>

    def test_read_gbq(df, dataset, client):
        """Test simple read of data pushed to BigQuery using pandas-gbq"""
        project_id, dataset_id, table_id = dataset.split(".")
    
        ddf = read_gbq(
            project_id=project_id, dataset_id=dataset_id, table_id=table_id
        )
    
        assert ddf.columns.tolist() == ["name", "number", "idx"]
        assert len(ddf) == 10
        assert ddf.npartitions == 2
        #breakpoint()
>       assert assert_eq(ddf.set_index("idx"), df.set_index("idx"))

test_core.py:67: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../../../../mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/dataframe/utils.py:541: in assert_eq
    assert_sane_keynames(a)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

ddf = Dask DataFrame Structure:
                 name number
npartitions=2               
0              object  int64
4                 ...    ...
9                 ...    ...
Dask Name: sort_index, 22 tasks

    def assert_sane_keynames(ddf):
        if not hasattr(ddf, "dask"):
            return
        for k in ddf.dask.keys():
            while isinstance(k, tuple):
                k = k[0]
            assert isinstance(k, (str, bytes))
            assert len(k) < 100
            assert " " not in k
>           assert k.split("-")[0].isidentifier()
E           AssertionError

../../../../../mambaforge/envs/test_gbq/lib/python3.8/site-packages/dask/dataframe/utils.py:621: AssertionError

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, interesting! Do you have any additional information in the traceback? I'm wondering what k is? You could try adding --pdb to the end of your pytest command, which will drop you into a pdb session at the point the test raises an error. You can then do pp k to see what k is

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also xref dask/dask#8061

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, interesting! Do you have any additional information in the traceback? I'm wondering what k is? You could try adding --pdb to the end of your pytest command, which will drop you into a pdb session at the point the test raises an error. You can then do pp k to see what k is

There is no additional in the traceback but pp k returns
'dataset_test.table_test--46e9ff0148164adf1b543e44137043cd'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is coming from the

delayed_kwargs = dict(prefix=f"{dataset_id}.{table_id}-")

line earlier in this function. Ultimately we'll want to move away from delayed and constructing the Dask graph ourselves, so for now I think it's okay to drop the prefix= here and use

delayed_kwargs = {}

instead. That should allow you to also drop the compute() call in assert_eq

timeout=read_timeout,
)
for row_filter in row_filters
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great for now, but at some point we may want to use raw task graphs. They're a bit cleaner in a few ways. Delayed is more designed for user code. If we have the time we prefer to use raw graphs in dev code.

For example, in some cases I wouldn't be surprised if each Delayed task produces a single TaskGroup, rather than having all of the tasks in a single TaskGroup. Sure, this will compute just fine, but other features (like the task group visualization, or coiled telemetry) may be sad.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrbourbeau and I gave it a try to use HighLevelGraphs and we realized that this will require modifying the structure of the function _read_rows_arrow since as is now, the inputs don't match the required format asked in DataFrameIOLayer
https://github.com/dask/dask/blob/95fb60a31a87c6b94b01ed75ab6533fa04d51f19/dask/layers.py#L1159-L1166

We might want to move this to a separate PR.

with bigquery_client(project_id, with_storage_api=True) as (
bq_client,
bqs_client,
), dask.annotate(priority=1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would definitely prefer to not have this annotation if possible. Data generation tasks should be *de-*prioritized if anything



@dask.delayed
def _read_rows_arrow(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use delayed (see comment below) then this name will be the name that shows up in the task stream, progress bars, etc.. We may want to make it more clearly GBQ related, like bigquery_read

Comment on lines 34 to 43
try:
# delete data set if exists
bq_client = bigquery.Client()
bq_client.delete_dataset(
dataset="dask-bigquery.dataset_test",
delete_contents=True,
)
bq_client.close()
except: # if data doesn't exisit continue is that Value Error?
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an error is raised when we call delete_dataset, then bq_client.close() won't ever be called. Let's put bq_client.close() in a finally: block to ensure it's called, even if an exception is raised. Something like:

    try:
        # delete data set if exists
        bq_client = bigquery.Client()
        bq_client.delete_dataset(
            dataset="dask-bigquery.dataset_test",
            delete_contents=True,
        )
    except:  # if data doesn't exisit continue is that Value Error?
        pass
    finally:
        bq_client.close()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, just to confirm, we can't use bigquery.Client() as a context manager, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just double-checking this it might exists now. Haven't checked it but will give it a try
googleapis/python-bigquery#540

I had the feeling that this wasn't possible based on the code on the gist, but it seems most of this was drafted before the PR I mentioned was merged. But it seems only the storage API doesn't have the context manager nor a close method.
So just for the test we could use the bigquery context manager. Will give it a try now.

@bnaul
Copy link
Contributor

bnaul commented Sep 21, 2021

#4 is looking great, should this PR be closed in favor of that one?

@ncclementi
Copy link
Contributor Author

#4 is looking great, should this PR be closed in favor of that one?

@bnaul yes we should, thanks for the reminder. I'm closing it now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants