Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue repartitioning a time series by frequency when loaded from parquet file #10949

Open
pvaezi opened this issue Feb 24, 2024 · 5 comments
Open

Comments

@pvaezi
Copy link

pvaezi commented Feb 24, 2024

Describe the issue:

When loading a parquet file that has a datetime index, I can't repartition based on frequency, getting the following error:

Traceback (most recent call last):
  File "/Users/.../gitrepos/dask-exp/test_dask_issue.py", line 19, in <module>
    df2 = df2.repartition(freq="1D")
  File "/Users/.../miniconda3/envs/dask/lib/python3.10/site-packages/dask_expr/_collection.py", line 1184, in repartition
    raise TypeError("Can only repartition on frequency for timeseries")
TypeError: Can only repartition on frequency for timeseries

Despite the fact the loaded dataframe from parquet file having datetime64[ns] dtype.

Note that time series generator dataframe can be repartitioned by frequency.

Minimal Complete Verifiable Example:

import dask
dask.config.set({'dataframe.query-planning': True})
import dask.dataframe as dd

df1 = dask.datasets.timeseries(
    start="2000",
    end="2001",
    freq="1h",
    seed=1,
)
df1 = df1.repartition(freq="1ME")
df1.to_parquet("test")

df2 = dd.read_parquet(
    "test/*parquet",
    index="timestamp",
    columns=["x", "y"]
)
print(df2.index.dtype)
df2 = df2.repartition(freq="1D")
print(df2.compute())

Anything else we need to know?:

Looking to repartition data loaded via parquet for efficient time series based queries. Default partition results in larger than need memory bloatedness.

Environment:

  • Dask version: 2024.2.1
  • Python version: 3.10.13
  • Operating System: Mac OSX
  • Install method (conda, pip, source): pip
@github-actions github-actions bot added the needs triage Needs a response from a contributor label Feb 24, 2024
@phofl
Copy link
Collaborator

phofl commented Feb 24, 2024

Hi,

thanks for your report. This doesn't work because the divisions of the DataFrame are unknown after read_parquet. That means we can't efficiently repartition by frequency without scanning the whole Index. I am not against making this work more reliably though in the future.

I have a question though: This doesn't work either for the current dask.dataframe implementation for me if you disable query planning. Does this work for you?

@pvaezi
Copy link
Author

pvaezi commented Feb 24, 2024

Hi,

thanks for your report. This doesn't work because the divisions of the DataFrame are unknown after read_parquet. That means we can't efficiently repartition by frequency without scanning the whole Index. I am not against making this work more reliably though in the future.

I have a question though: This doesn't work either for the current dask.dataframe implementation for me if you disable query planning. Does this work for you?

Thanks for the response, I tried without query planning, still the same issue.

It does make sense that index needs to be scanned fully to make repartitioning effective. Can you suggest workarounds? Context, I'm trying to resample dataframe by day and aggregate, if partitions are divided by day, it makes my desired resampling and aggregations much easier and less memory intensive.

@phofl
Copy link
Collaborator

phofl commented Feb 24, 2024

The most effective way depends a little on where you are reading the data from. You can set calculate_divisions=True in the read_parquet call, this will populate the divisions and enable your repartitioning.

The scan can be expensive though, this depends a little how many files are in your dataset and where your data is stored (e.g. local or remote like s3). That will get you there though.

@pvaezi
Copy link
Author

pvaezi commented Feb 29, 2024

Thanks, in general I'm looking to replicate a sql query like below with dask, I had trouble with memory consumption of dask workers, if you can guide me how to properly use resampling with timestamp column, that would be great:

   SELECT
        TIME_BUCKET(INTERVAL 1 DAY, timestamp) AS timestamp,
        col1,
        col2,
        col3,
        sum(col4),
        avg(col5)
    FROM df
    GROUP BY TIME_BUCKET(INTERVAL 1 DAY, timestamp), col1, col2, col3;

@phofl
Copy link
Collaborator

phofl commented Apr 4, 2024

I think you have to calculate the divisions at some point if you want to resample by day, there isn't really a way around this since we need the information.

Not sure if you have to resample though, you could use the dt accessor on your timestamp column and round this to day accuracy. Then you can do a regular groupy instead of resampling, which doesn't need to know anything about the divisions.

Is that helpful?

@phofl phofl added dataframe and removed needs triage Needs a response from a contributor labels Apr 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants