Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QST] How to omit the Dataset.shuffle_by_keys step when exporting data from BigQuery to parquet #1862

Open
piojanu opened this issue Sep 19, 2023 · 9 comments
Labels
question Further information is requested

Comments

@piojanu
Copy link

piojanu commented Sep 19, 2023

Hi!

Level of expertise

Note that I'm not a Parquet/Dask expert. I DO know what the hybrid layout is (row groups etc.) but I DON'T really know the internals of CUDA Dask (how data is loaded from parquet into partitions, how scheduling is done, etc.)

Context

I do some data transformations in BigQuery before exporting the data to the parquet using this query:

EXPORT DATA OPTIONS(
    uri='{output_path}/*.parquet',
    format='PARQUET',
    overwrite=true) AS
SELECT session_id, timestamp ... FROM ...

Then, in NVTabular, I run the GroupBy op which groups by the session_id column and sorts by the timestamp column. However, I need to do Dataset.shuffle_by_keys first, because I ran into the problem described in your docs (data not moved between partitions etc.).

Question

This shuffling by keys takes time. What can I do so it isn't required? My ideas:

  • Can I specify some parameters when exporting the data in BigQuery?
    I know you're not Google, but maybe you can direct me further, I wasn't able to find appropriate parameters.
  • Can I specify some parameters during the Dataset creation?
    Again, I even dinged into the source code, but I really got lost in e.g. what are the ParquetDatasetEngine params doing.

Thanks!
Piotr

@piojanu piojanu added the question Further information is requested label Sep 19, 2023
@piojanu
Copy link
Author

piojanu commented Sep 19, 2023

Moreover, I don't understand why "The row group memory size of the parquet files should be smaller than the part_size that you set for the NVTabular dataset such as nvt.Dataset(TRAIN_DIR, engine="parquet", part_size="1000MB").". This is mentioned in the Troubleshooting section of the docs. The ParquetDatasetEngine does seem to ignore the part_size (unless you regenerate the dataset) and reads one file per partition.

@rjzamora
Copy link
Collaborator

Can I specify some parameters when exporting the data in BigQuery?

Unfortunately, I cannot help much with BigQuery. There may be a way to use ORDER BY session_id before exporting. However, (1) that may be slow, and (2) there is still no guarantee that BigQuery will shard your data so that two adjacent files will never contain the same session_id value.

Can I specify some parameters during the Dataset creation?

There is not much you can do in Dataset besides calling the shuffle_by_keys method that you are already familiar with. It might be slightly faster to read in the data with dask.dataframe and use sort_values to adjust the partition boundaries:

import dask
import dask.dataframe as dd

with dask.config.set({"dataframe.backend": "cudf"}):
    ddf = dd.read_parquet(
        path, blocksize="512MB", aggregate_files=True
    ).sort_values("session_id")

dataset = Dataset(ddf)

Note that sort_values is usually slower than shuffling, but may be slightly faster when the data is already sorted. In the future, we could probably add a utility to simply adjust partition boundaries when the data is already sorted. However, such an API doesn't exist yet.

The ParquetDatasetEngine does seem to ignore the part_size (unless you regenerate the dataset) and reads one file per partition.

The part_size argument will be ignored if it is not possible to satisfy. Currently, the minimum size of a NVT/Merlin partition is a single parquet row-group, and the maximum size of a partition is usually a single parquet file. In upstream dask-dataframe, the aggregate_files option can be used to map multiple files to each partitions. However, I'm not sure if this behavior is supported in NVT/Merlin yet (will need to check).

@piojanu
Copy link
Author

piojanu commented Sep 20, 2023

Thank you! Let me repeat in my own words (bullet points 😃) and ask some clarification questions to make sure I understand correctly:

  • When loading multiple parquet files, one partition is one file and nothing is moved around by default.
  • What happens when I load parquets from e.g. Google Storage, are they downloaded immediately and stored in some temporary DASK space? Or are they lazy-loaded?
  • shuffle_by_key has some crazy logic in it, but if I understand correctly, if my data is NOT hive partitioned (it has multiple parquet files in a flat structure and NOT in multiple directories), then the code simply falls back to calling dask.dataframe.DataFrame.shuffle.
  • Are there any utility functions in your code that I could use to specify npartitions during the shuffling in such a way, that the part_size would be satisfied?

EDIT:

  • And one more, ParquetDatasetEngine isn't simply using dask.DataFrame to run the operations on the data, right? It has its own logic built around the dask cluster, if I understand correctly.
  • More general, could you briefly describe what's the high-level architecture of how the data is processed by the workflow or direct me to a place that describes it?

Thanks a lot!

@rjzamora
Copy link
Collaborator

These are good questions @piojanu. They are actually a bit tricky to answer completely, but I'll do my best :)

Some general background (I'm sorry if I am mostly repeating things that you already know):

Dataset is Merlin's "universal dataset adapter". Its purpose is to convert an arbitrary dataset into a dask.dataframe.DataFrame object if and when NVT/Merlin needs one. NVTabular is essentially a high-level API for dask.dataframe, and so most operations internally operate on a dask.dataframe.DataFrame object.

A dask.dataframe.DataFrame (dd.DataFrame) object is typically referred to as a “dask collection" because it represents multiple lazy cudf.DataFrame or pd.DataFrame objects. Each of these lazy DataFrame objects is called a "partition." When you define a dask collection (e.g. dd.read_parquet(...)), dask does not eagerly read the parquet data into memory. Instead, dask defines the various compute tasks needed to materialize each of the partitions in the collection. If you do a follow-up operation on that collection (e.g. add two columns), you will still not read the parquet data into memory, but will define further compute tasks for each partition in your collection. Since these compute tasks can depend on the output of other compute tasks, a dask collection is really just a task graph at its core (plus a bit of metadata to track the current column names and types).

When you (or NVTabular) needs the real data for a specific partition, the underlying task graph needed to generate that partition will be executed by scheduling the necessary compute tasks on worker processes (or threads). It is only when one of these tasks needs to move remote parquet data into local memory that dask will actually do so. In this sense, dask does not copy GCS data into a local dask space for later processing, but will attempt to stream the data to the worker processes as it is needed.

How is Dataset/ParquetDatasetEngine different from dd.read_parquet?

If we were to implement ParquetDatasetEngine today, it would probably do little more than call dd.parquet. However, when the Dataset API was designed a few years ago, dd.read_parquet did not meet all the needs of NVTabular. For example, the only real options were to map an individual file to each output partition or to map a constant number of row-groups to each output partition. It was was harder to say “I don’t want my partitions to be larger than 1GB”.

In order to support the part_size option, we needed to add extra logic to ParquetDatasetEngine to eagerly read the very first row group of the dataset, and calculate its real in-memory size. With that information in hand, we assume that all row-groups in the dataset will be a similar size (not always true), and calculate the exact number of row-groups to map to each dask partition.

For datasets containing large parquet files with many uniformly-sized row-groups per file, the above approach tends to work just fine. However, there are certainly cases were the partition sizes will end up being much larger or much smaller than part_size. For example, some systems will write out parquet files using one giant row-group. In these cases, we will never be able to generate partitions that are <=part_size, because we cannot place partition boundaries within a single row-group. There are also workflows/systems that produce many small files. In this case, you need to use aggregate_files=True to allow multiple files to be mapped to the same output partition in dask (you will get a warning when you do this, but the current plan is to revert that warning).

The other problem with dd.read_parquet is that the dask does not keep track of the parquet metadata after it is used to set partition boundaries. This means that the parquet metadata cannot be used later on to calculate the length of your dataset without needing to read the real data into memory. This is still a problem today, and is one of the main reasons that ParquetDatasetEngine (which does save a reference to the metadata) hasn’t been overhauled.

So, now that we have some background out of the way:

When loading multiple parquet files, one partition is one file and nothing is moved around by default.


Neither ParquetDatasetEngine nor dd.read_parquet will ever move data around. Most workflows get best performance by defining “embarrassingly parallel” IO tasks for each partition you need to create. All we can do is inspect the dataset, and map specific row-group or file ranges to each of our collection partitions. If part_size is larger than your first row-group, then every dask partition will correspond to a single row-group of your dataset. If part_size is larger than your files, each file will be mapped to a distinct partition (unless you specify aggregate_files=True, which is not documented in Merlin, because it is not well tested yet).


What happens when I load parquets from e.g. Google Storage, are they downloaded immediately and stored in some temporary DASK space? Or are they lazy-loaded?


They are loaded lazily. The data needed for a specific partition is moved into memory when the corresponding compute task is executing on a worker.


shuffle_by_key has some crazy logic in it, but if I understand correctly, if my data is NOT hive partitioned (it has multiple parquet files in a flat structure and NOT in multiple directories), then the code simply falls back to calling dask.dataframe.DataFrame.shuffle.

Correct. Shuffling is well-known to be slow and memory intensive, and so it is something you want to avoid at all costs. The crazy hive-partitioning logic is just Merlin jumping through hoops to avoid a conventional shuffle.


Are there any utility functions in your code that I could use to specify npartitions during the shuffling in such a way, that the part_size would be satisfied?

Before shuffling your data, you can always call dataset = dataset.repartition(partition_size=part_size) to adjust your partitions. You can also utilize the npartitions argument in shuffle_by_keys to increase or decrease the partition size. However, there is no way to specify a specific byte-size limit for the output of a shuffle operation.


And one more, ParquetDatasetEngine isn't simply using dask.DataFrame to run the operations on the data, right? It has its own logic built around the dask cluster, if I understand correctly.

It is mostly a wrapper around the Engine classes used by dd.read_parquet, but includes extra logic needed by Merlin/NVT in the past. The Dataset class does work with dask.dataframe.DataFrame directly in most cases.


More general, could you briefly describe what's the high-level architecture of how the data is processed by the workflow or direct me to a place that describes it?


Hopefully I’ve provided some background here, but feel free to ask further questions. Since this is mostly a high-level interface to Dask-DataFrame, I’t may be useful to look through the dask documentation a bit.

@piojanu
Copy link
Author

piojanu commented Sep 22, 2023

This is an amazing explanation, just what I needed to understand how the Dataset parameters influence what is going on under the hood! Thank you 🙏

I think this explanation should be a part of the troubleshooting section of the docs. To fix problems described there, the user shall know how this stuff works. Do you want me to prepare a PR that adds your explanation there?

@rjzamora
Copy link
Collaborator

Do you want me to prepare a PR that adds your explanation there?

I always welcome contributions to the documentation :) Thanks for engaging @piojanu !

@piojanu
Copy link
Author

piojanu commented Nov 6, 2023

Hi!

After some time of working around my problems described in this and other issues (#1863 and #1864), I've come to the conclusion, that I still don't understand: What is the proper way to save the dataset before feature engineering to not have to call shuffle_by_key? I couldn't find the answer in the docs or examples. What do I miss?

@rjzamora
Copy link
Collaborator

rjzamora commented Nov 6, 2023

@piojanu - Thank you for sharing your experiences and raising issues!
I think I can respond with both good news and bad news.

First the bad news:

You are absolutely correct that the Merlin/NVTabular Dataset API is not convenient when you want to ensure that your dask partitions contain unique values for a specific set of columns. This is because (1) there is no clear option to say "please map each input file to a distinct dask partition" in Dataset, (2) there is no clear option to say "please map each dask partition to a distinct output file" in Dataset.to_parquet, and (3) the underlying algorithm used by Dataset.shuffle_by_keys is both slow and bad at avoiding memory pressure.

Historical Context: Early NVT/Merlin use cases did not require this kind of shuffling support, and the general assumption was that some other ETL engine or pure Dask/Dask-cuDF code would be a better fit for those kinds of workflows anyway. The early motivation for NVTabular was to provide a high-level API for common pre-processing and feature-engineering tasks that are "embarrassingly parallel" at transform time (even if we need to do things like tree reductions to gather dataset statistics at fit time). The groupby wrinkle motivatde the introduction of shuffle_by_keys, but it seems pretty clear that we neglected to clearly document its proper usage and limitations!

The good (ish?) news:

It is possible to use Dataset.shuffle_by_keys to shuffle your data so that your dask partitions contain unique values for a specific column set. However, I strongly suggest that you write out your data to disk after performing this kind of shuffle operation (or after doing it with some other ETL engine). You should avoid mixing shuffle_by_keys with other NVTabular operations whenever possible!

Your best bet is to write your data in such a way that each Parquet file already contains unique values for the columns you need. If you used Dataset.shuffle_by_keys to shuffle your data, then you would want to immediately write out the shuffled data using to_parquet:

# If `output_files` is set to the original partition count, Merlin
# will map each dask partition to a distinct Parquet file.
ds.to_parquet("/shuffled/path", output_files=ds.npartitions)

I realize that the documentation doesn't make it clear that you can use output_files=ds.npartitions to map dask partitions to output files. For now, it probably makes sense for us to add a note about this in the documentation. However, the current Dataset-API is probably in need of an overhaul in the future anyway (i.e. a dramatic simplification).

Once your data is written out so that each Parquet file already contains unique values for the grouping columns of interest, you can specify an unrealistically large part_size when you read back your data using Dataset:

# Using an oversized `part_size` value will give you a 1:1 file-to-partition mapping
shuffled_ds = Dataset("/shuffled/path", engine="parquet", part_size="100GB")

Since your "shuffled" files should each be much smaller than "100GB", Dataset will simply map each of the Parquet files in your shuffled dataset to a distinct dask partition. Note that Dataset will aggregate the input files to try and produce a smaller number of 100 GB partitions if you pass in aggregate_files=True (but this is not what you want in this case).

Again, I realize that the documentation does not establish a clear mechanism to achieve a 1:1 file-to-partition mapping. In hindsight, this is very unfortunate!

What about hive partitioning?:

You may be wondering if you can use to_parquet(..., partition_on=...) as and alternative to the ds=ds.shuffle_by_keys(...); ds.to_parquet(..., output_files=ds.npartitions) approach. The answer is "probably yes, but be careful". If you skip the explicit shuffling step and use to_parquet(..., partition_on=...) to shuffle your data, you will almost certainly end up with many tiny files. This in turn means that you will get many tiny partitions when you read back in the data with Dataset(...).

In order to avoid the small-partition problem, I suggest that you pass in aggregate_files=<partitioning-column-name> to your Dataset command. For example, if you wrote your data with partition_on=["year", "month"] to shuffle your data to disk, then using Dataset(..., part_size="100GB", aggregate_files="month") will tell the underlying dd.read_parquet logic that that files within the same year/month/ directory may be combined into the same dask partition as long as they stay below the "100GB" target.

Why do I need to write my shuffled data back to disk before applying Categorify, etc?:

Although you don't *need to write your shuffled data back to disk, you are much more likely to see OOM errors if you don't. Recall that NVTabular was designed with an "embarrassingly parallel" transform step in mind. In-memory shuffling is very far from that original design goal.

@piojanu
Copy link
Author

piojanu commented Nov 13, 2023

Thank you for your detailed answer! One thing I don't understand: If we set "an unrealistically large part_size when you read back", won't we get OOM because Dask will try to load possibly bigger than memory data into one DataFrame? AFAC Dask under the hood works in such a way, that one partition is one Pandas/cuDF DataFrame. What am I missing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants