Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pandas materializer is slow when using large datasets with remote artifact stores #2213

Open
strickvl opened this issue Jan 3, 2024 · 25 comments
Labels
enhancement New feature or request good first issue Good for newcomers

Comments

@strickvl
Copy link
Contributor

strickvl commented Jan 3, 2024

Open Source Contributors Welcomed!

Please comment below if you would like to work on this issue!

Contact Details [Optional]

support@zenml.io

What happened?

There is a performance issue with the Pandas materializer in ZenML, particularly when dealing with large datasets materialized on remote artifact stores such as S3. The loading process is extremely slow, which becomes evident when handling datasets of significant size.

Steps to Reproduce

The issue was encountered when trying to load a large dataset (~30m rows) from a Snowflake database. Initially, the pipeline was tested with a limit of 100 on the SQL query, which worked fine. However, removing the limit to fetch the entire dataset leads to a memory outage and slow performance. Here's one way you might like to test this:

import pandas as pd
from zenml import step, pipeline

def create_large_dataset(num_rows=30_000_000):
    # Creating a DataFrame with 30 million rows and a few columns
    df = pd.DataFrame({
        'id': range(num_rows),
        'value': [f'value_{i % 1000}' for i in range(num_rows)],  # Cyclic repetition of values
        'number': range(num_rows),
        'date': pd.date_range(start='2020-01-01', periods=num_rows, freq='T')  # Minute frequency
    })
    return df

@step
def my_step() -> pd.DataFrame:
    df = create_large_dataset()
    return df

# Run this using a remote artifact store
@pipeline
def my_pipeline(my_step):
    my_step()

my_pipeline()

This code generates a DataFrame with 30 million rows, each row containing an ID, a cyclically repeating string value, a number, and a timestamp. You can adjust the create_large_dataset function to tailor the dataset to your specific needs.

Expected Behavior

The expectation is for the Pandas materializer to efficiently handle large datasets, even when they are stored on remote artifact stores. The materializer should be able to load the entire dataset without significant performance degradation or memory issues.

Potential Solutions

Benchmark the current implementation to identify bottlenecks.
Investigate optimizations in data loading, possibly through chunking or more efficient memory management.
Consider alternative approaches or tools that are better suited for handling large datasets.
Explore the possibility of improving the integration with the remote artifact stores to optimize data transfer and loading.

Additional Context

This issue is critical for users who work with large-scale data in ZenML, as it affects the efficiency and feasibility of data pipelines.

@strickvl strickvl added bug Something isn't working good first issue Good for newcomers labels Jan 3, 2024
@strickvl
Copy link
Contributor Author

strickvl commented Jan 3, 2024

Potentially fixed/addressed by #2212.

@strickvl strickvl added enhancement New feature or request and removed bug Something isn't working labels Jan 3, 2024
@htahir1 htahir1 linked a pull request Jan 3, 2024 that will close this issue
9 tasks
@htahir1
Copy link
Contributor

htahir1 commented Jan 5, 2024

#2212 Was not a good solution as evidenced by tests, therefore this issue is still open for thoughts!

@benitomartin
Copy link

I would like to work on this issue. Is it still open or anyone is working on it?

@htahir1
Copy link
Contributor

htahir1 commented Jan 9, 2024

@benitomartin no it’s open still .. you might want to check out the conversation at #2212 before starting though

@benitomartin
Copy link

OK I will check. I was thinking on using modin. Modin is a DataFrame for datasets from 1MB to 1TB+ (here a quick summary Modin

Screenshot 2024-01-09 222301

It would require only
pip install modin

and changing the pandas import
import modin.pandas as pd

Is this a possibility?

@htahir1
Copy link
Contributor

htahir1 commented Jan 9, 2024

@benitomartin yes it’s a possibllity.. we just need to decide which backend (dask,ray etc) to support..

if we do support modin maybe we can ask our users to return a modin data frame instead and have a modin materializer as part of ZenML? That way we can just give a warning in the pandas materializer if the performance is low and ask the user to return a modin dataframe .. I prefer this way rather than doing magical back and forth conversion under the hood.. what do you think

@benitomartin
Copy link

benitomartin commented Jan 10, 2024

If the idea is just to save and load the data, I think modin would do it faster or maybe polars, and they are very user friendly. If a person is working with large dataset, he would need a tool to work with them. And you can always transform to pandas if you want

So it is in my opinion a user friendly solution.

With pip install "modin[all]" it will install Modin with Ray and Dask engines

I'm not familiar with Ray or Dask, and I don't know how this will affect or interact with ZenML, but I guess it could work

@htahir1
Copy link
Contributor

htahir1 commented Jan 10, 2024

I think it makes sense to make a modin integration into ZenML with a modin dataframe/series materializer. In order to do this, you can follow the integrations guide from step 3.

The materializer itself can be in src/zenml/integrations/modin/materializers/modin_materializer.py, and can be something like:

from zenml.materializers.pandas_materializer import PandasMaterializer
import modin.pandas as pd

class ModinMaterializer(BaseMaterializer):
    """Materializer to read data to and from modin.pandas."""

    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = (
        pd.DataFrame,
        pd.Series,
    )
    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.DATA
    
    # Rest can be the same as the pandas materializer: https://github.com/zenml-io/zenml/blob/main/src/zenml/materializers/pandas_materializer.py

And in the pandas materializer you can check in the save and load if the data is above a threshold (lets say >1M rows)
and then we raise a warning that the user should switch to modin by doing zenml integration install modin or pip install modin[all] and returning a modin dataframe instead

WDYT about this plan?

@benitomartin
Copy link

I think it sounds good. Allow me a few days to play around ZenML and Modin, as I have not tested anything so far. Then I will open a PR and If I'm stuck I will come back to you

@benitomartin
Copy link

benitomartin commented Jan 11, 2024

I made a first test running the pipeline file with pandas and modin. I attach a table comparison

Mainly, using modin requires ray (or other framework) initialization. In my test this means that saving the df, takes longer in modin compare to pandas. Reading the file is faster with modin. However, from 5'000'000 rows (threshold) modin is beating pandas (26 vs 25 seconds) and for the 28'000'000 rows 2 minutes vs 1minute 10seconds. Details attached

Modin performance

In case you want me to prepare the integration file, I guess I have to create a folder and an init file with the ModinMaterializer and include the PandasMaterializer code, right?

I can include this in the load method after the raise ImportError and the else statement

raise ImportError(
                    "You have an old version of a `PandasMaterializer` "
                    "data artifact stored in the artifact store "
                    "as a `.parquet` file, which requires `pyarrow` "
                    "for reading, You can install `pyarrow` by running "
                    "'`pip install pyarrow fastparquet`'."
                )
        else:
            with fileio.open(self.csv_path, mode="rb") as f:
                df = pd.read_csv(f, index_col=0, parse_dates=True)
`    # Check if the DataFrame is too large and suggest using modin
    if len(df) > YOUR_THRESHOLD_SIZE:
        warning_message = (
            "Warning: The size of the DataFrame is large. "
            "Consider using `modin` library for more efficient loading. "
            "You can install it with '`pip install modin`'  or '`pip install modin[all]'`"
        )
        logger.warning(warning_message)`

@htahir1
Copy link
Contributor

htahir1 commented Jan 11, 2024

@benitomartin great job! Great results so far. I would ask the following:

  • Currently, you are try this on the local file system right? If so, then it would be interesting to see this on the S3 and/or GCP artifact stores. This guide will help to set that up
  • I would make YOUR_THRESHOLD_SIZE an env variable called 'ZENML_PANDAS_THRESHOLD` and add it to this docs page
  • Is there a difference between ray and dask in these tests? If ray is the way to go, we can also take the ray.init suggestion in their warning at the top of the materializer i suppose?

@benitomartin
Copy link

Yes I can go through the same process with AWS/GCP. Just I need a few days for it. But so wanted to share the first insight with you

The ray initialisation comes automatically, so I guess this is the default as I do not have ray installed but according to modin docs, they detect automatically the engine. I do not know Dask but I can try to set up and check ( but after the buckets load check)

Let me first see how it performs with AWS/GCP and then I check the rest.

Once we have all clear I can create the materializer integration and update the documents 💪

@htahir1
Copy link
Contributor

htahir1 commented Jan 12, 2024

@benitomartin Thank you! let me know if i can support in any way

@benitomartin
Copy link

I will test GCP first today or latest tomorrow and attach here the tables with the times, like I did locally last time

@benitomartin
Copy link

benitomartin commented Jan 14, 2024

I ran some tests today using GCP, see attached

gcloud

At 5'000'000 rows overall pandas and modin perform equal. However at 28'000'000 modin beats pandas overall but the reading time is longer. This is probably due to the missing materializer. If not, maybe saving with modin and reading with modin could be combined?

I also tried to initialize ray in advance, but whenever I ran the pipeline, another initialization takes place

This is the code I have used

from google.cloud import storage
from zenml import step, pipeline
import pandas as pd
# import modin.pandas as pd

import numpy as np
from datetime import datetime, timedelta
import time

# Set your GCS bucket name
GCS_BUCKET_NAME = "zenml-big-dataframe"
FILE_NAME = "my_data.csv"
FILE_PATH = "data/my_data.csv"

@step(enable_cache=False)
def save_df_to_gcs(num_rows: int = 10000) -> str:
    data = {
        "ID": range(1, num_rows + 1),
        "Value": np.random.rand(num_rows),
        "Date": [datetime.now() - timedelta(days=x % 365) for x in range(num_rows)],
    }
    df = pd.DataFrame(data)
    
    csv_file_path = FILE_NAME 

    df.to_csv(csv_file_path, index=False)
    
    # Initialize a client
    client = storage.Client()

    # Reference to the target bucket
    bucket = client.get_bucket(GCS_BUCKET_NAME)

    # Define the object name in the bucket (destination)
    blob = bucket.blob(FILE_PATH)

    # Upload the local file to GCS
    blob.upload_from_filename(csv_file_path)

    # Return the GCS path to the saved CSV file
    return f"gs://{GCS_BUCKET_NAME}/{FILE_PATH}"


@step(enable_cache=False)
def read_df_from_gcs(gcs_path: str) -> pd.DataFrame:

    # REad csv
    return pd.read_csv(gcs_path)


@pipeline(enable_cache=False)
def my_pipeline_gcs():
    start_time = time.time()


    gcs_path = save_df_to_gcs()  # 10k rows
    # gcs_path = save_df_to_gcs(num_rows=1000000)  # 1M rows
    # gcs_path = save_df_to_gcs(num_rows=5000000)  # 5M rows
    # gcs_path = save_df_to_gcs(num_rows=28000000)   # 28M rows
    read_df_from_gcs(gcs_path=gcs_path)

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Pipeline execution time: {elapsed_time} seconds")

# Run the pipeline when the script is executed directly
if __name__ == "__main__":
    my_pipeline_gcs()

I the local tests I ran this code (i forgot to share it last time)

from zenml import step, pipeline
# import pandas as pd
import modin.pandas as pd
import time
import numpy as np
from datetime import datetime, timedelta
import time

@step(enable_cache=False)
def save_df(num_rows: int = 10000) -> pd.DataFrame:
    data = {
        "ID": range(1, num_rows + 1),
        "Value": np.random.rand(num_rows),
        "Date": [datetime.now() - timedelta(days=x % 365) for x in range(num_rows)],
    }
    return pd.DataFrame(data)

@step(enable_cache=False)
def read_df(df: pd.DataFrame) -> pd.DataFrame:
    return df

@pipeline(enable_cache=False)
def my_pipeline():
    start_time = time.time()

    df = save_df()  # 10k rows
    # df = save_df(num_rows=1000000)  # 1M rows
    # df = save_df(num_rows=28000000)   # 28M rows
    reloaded_df = read_df(df=df)

    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Pipeline execution time: {elapsed_time} seconds")
    
# Run the pipeline when the script is executed directly
if __name__ == "__main__":
    my_pipeline()

Let me know the next steps, I can try AWS and see what happens. But I think yo need to decide how to implement the materializer. We can have a quick call in zoom if you want to discuss the next steps

Files for local and gcs test can be seen here

Local: https://docs.google.com/spreadsheets/d/12qRGGBPdHGARav5z_1PoA1U7NrTECqdgZPcXOqbhr4I/edit?usp=sharing

GCS
https://docs.google.com/spreadsheets/d/17CzytX9Mp4A7GLb65LQ-mFC6vW0-XOrNrs0fA588izU/edit?usp=sharing

@htahir1
Copy link
Contributor

htahir1 commented Jan 14, 2024

Wow @benitomartin this is great! I am on the move right now but I can give some quick feedback (later tomorrow I'll add to this) -> Looks like you're testing reading/writing in a test but you should set probably already do this in a materializer with a remote artifact store (These two links will help):

@benitomartin
Copy link

Yes actually I was wondering If I have to connect gcs with zenml somehow, but could not manage to go through the documentation.This way I do not need to download the files

I will try to check tomorrow or tuesday and try to test with the integration of gcs and zenml.

One remark, when I run the pipelines I get the comment, that I shall run zenml up. It took me sometime to find the login command, just "default" as username and no password. It would be good to add this to the last comment of the pipeline, so that new users like me do not need to search for it

@htahir1
Copy link
Contributor

htahir1 commented Jan 15, 2024

@benitomartin Good remark, the username and password are actually printed but only after the user is redirected! But maybe @strickvl has more ideas how to improve this

Regarding the task at hand, I think you are almost there. You just need to create the materializer and artifact store in your zenml server. I think the above two linked docs will give you enough info how to do it. Good luck and ping here if you need us!

@strickvl
Copy link
Contributor Author

One remark, when I run the pipelines I get the comment, that I shall run zenml up. It took me sometime to find the login command, just "default" as username and no password. It would be good to add this to the last comment of the pipeline, so that new users like me do not need to search for it

Hi @benitomartin. Just wanted to check exactly when you're not seeing information about using default as the username etc since when you run zenml up exactly this instruction gets output to the terminal.

CleanShot 2024-01-15 at 10 46 01@2x

@benitomartin
Copy link

benitomartin commented Jan 15, 2024

Hi @strickvl

I do not get this message. Maybe because I am using --bocking in windows. See below the whole pipeline warnings when running it with my code above with gcloud and modin.

modin_gcloud_pipeline

When I run zenml up --blocking, I do not get the username and password notification. So maybe you need to configure it for the ---blocking flag and windows users by adding the windows option for the pipeline run and the notification after running zenml up --blocking?

@strickvl
Copy link
Contributor Author

Ah got it. I opened a PR that will fix this issue going forward.

@htahir1
Copy link
Contributor

htahir1 commented Jan 19, 2024

@benitomartin I just remembered that another community member @christianversloot already added a polars method in this PR: #2229! So we're actually quite ahead at the curve already. Maybe we just need the warning now?

@benitomartin
Copy link

Ah ok. Sorry I have been busy these days and could not manage to move forward. @strickvl already added the warning in #2290, so I guess this issue is finished?

@htahir1
Copy link
Contributor

htahir1 commented Jan 19, 2024

Yes! your tests were very useful though. Thank you. Id leave it up to @strickvl to close this

@benitomartin
Copy link

Ok! I will have a look If I can move forward with other contribution

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good first issue Good for newcomers
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants