Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A faster pandas materializer #2212

Closed
wants to merge 6 commits into from
Closed

A faster pandas materializer #2212

wants to merge 6 commits into from

Conversation

htahir1
Copy link
Contributor

@htahir1 htahir1 commented Jan 3, 2024

Describe changes

I create a faster pandas materializer that uses snappy and chunks to write to disk. This works better in cases where the dataframe is >10000 rows of data. I have ensured that it is backwards compatible and not a breaking change, however I have not tested the case of backwards compatibility myself (Some help here is appreciated).

This materializer has been tested manually on a S3 artifact store.

Pre-requisites

Please ensure you have done the following:

  • I have read the CONTRIBUTING.md document.
  • If my change requires a change to docs, I have updated the documentation accordingly.
  • I have added tests to cover my changes.
  • I have based my new branch on develop and the open PR is targeting develop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.
  • If my changes require changes to the dashboard, these changes are communicated/requested.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Other (add details above)

Summary by CodeRabbit

  • New Features

    • Introduced environment variables specific to the Pandas materializer for enhanced configuration.
    • Added support for chunked parquet file handling in the Pandas materializer, improving performance for large datasets.
  • Documentation

    • Updated the advanced guide with instructions for setting environment variables in DockerSettings.
    • Corrected documentation links for better user navigation.
  • Bug Fixes

    • Adjusted default compression settings for parquet files to optimize storage and access efficiency.

Copy link
Contributor

coderabbitai bot commented Jan 3, 2024

Important

Auto Review Skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository.

To trigger a single review, invoke the @coderabbitai review command.

Walkthrough

The recent changes involve enhancing the Pandas materializer in the ZenML framework. These updates include the introduction of environment variables specific to the Pandas materializer, adjustments to support chunked parquet file handling, and metadata management. Additionally, there is a note for configuring these variables in Docker for remote orchestration, and a minor update to the documentation link.

Changes

File Path Change Summary
docs/.../environment-variables.md Added details on environmental variables for Pandas materializer and notes for DockerSettings.
src/zenml/.../pandas_materializer.py - Updated copyright year.
- Changed default compression and chunk size for parquet.
- Added CHUNK_SIZE, io_utils import, and metadata_path attribute.
- Enhanced load and save methods for chunked parquet and metadata handling.
- Revised docstring.

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share

Tips

Chat with CodeRabbit Bot (@coderabbitai)

  • You can directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit-tests for this file.
  • You can tag CodeRabbit on specific lines of code or entire files in the PR by tagging @coderabbitai in a comment. Examples:
    • @coderabbitai generate unit tests for this file.
    • @coderabbitai modularize this function.
  • You can tag @coderabbitai in a PR comment and ask questions about the PR and the codebase. Examples:
    • @coderabbitai generate interesting stats about this repository from git and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit tests.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid.
    • @coderabbitai read the files in the src/scheduler package and generate README in the markdown format.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger a review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • The JSON schema for the configuration file is available here.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/coderabbit-overrides.v2.json

CodeRabbit Discord Community

Join our Discord Community to get help, request features, and share feedback.

@htahir1 htahir1 requested a review from safoinme January 3, 2024 10:52
@htahir1
Copy link
Contributor Author

htahir1 commented Jan 3, 2024

@coderabbitai review

@github-actions github-actions bot added internal To filter out internal PRs and issues enhancement New feature or request labels Jan 3, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 2

Configuration used: .coderabbit.yaml

Commits Files that changed from the base of the PR and between 79d967e and 4718f5a.
Files selected for processing (2)
  • docs/book/user-guide/advanced-guide/environment-management/environment-variables.md (2 hunks)
  • src/zenml/materializers/pandas_materializer.py (6 hunks)
Files skipped from review due to trivial changes (1)
  • docs/book/user-guide/advanced-guide/environment-management/environment-variables.md
Additional comments: 5
src/zenml/materializers/pandas_materializer.py (5)
  • 1-1: The copyright year has been updated to 2024, which is correct assuming the file has been modified in 2024.

  • 26-26: The import of io_utils is added, which is likely used for the new chunking and compression functionality.

  • 69-69: The addition of metadata_path is consistent with the PR's objective to handle metadata for large dataframes.

  • 126-129: The handling of .csv files remains unchanged, ensuring backward compatibility.

  • 164-177: The save method now writes the length of the dataframe to a metadata file and saves the dataframe in chunks. Ensure that the CHUNK_SIZE is cast to an integer before use, as previously mentioned.

Verification successful

The output from the shell script provides the context of the save method in the pandas_materializer.py file. It shows the use of CHUNK_SIZE in the for loop as part of the logic to write chunked parquet files. The CHUNK_SIZE is used in the expression i // CHUNK_SIZE which is an integer division operation, implying that CHUNK_SIZE is indeed being used as an integer.

Based on the provided context, there is no need to cast CHUNK_SIZE to an integer before its use in the for loop, as it is already being used in a context that requires an integer (the third argument of the range function and in integer division).

Therefore, the review comment is addressed correctly, and the use of CHUNK_SIZE in the save method is verified to be appropriate.

Scripts Executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Verify that CHUNK_SIZE is used as an integer in the save method.
ast-grep --lang python --pattern $'for i in range(0, $_, CHUNK_SIZE)'

Length of output: 415



Script:

#!/bin/bash
# Search for the 'save' method in the 'pandas_materializer.py' file to inspect the usage of CHUNK_SIZE.
rg --context 10 'def save' src/zenml/materializers/pandas_materializer.py

Length of output: 1548

Comment on lines 32 to 34
COMPRESSION_TYPE = os.getenv("ZENML_PANDAS_COMPRESSION_TYPE", "snappy")
# Get the chunk size from the environment variable
CHUNK_SIZE = os.getenv("ZENML_PANDAS_CHUNK_SIZE", 100000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of environment variables for setting the compression type and chunk size is a good practice for configurability. However, the default value for CHUNK_SIZE is set as a string instead of an integer.

- CHUNK_SIZE = os.getenv("ZENML_PANDAS_CHUNK_SIZE", 100000)
+ CHUNK_SIZE = int(os.getenv("ZENML_PANDAS_CHUNK_SIZE", "100000"))

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
COMPRESSION_TYPE = os.getenv("ZENML_PANDAS_COMPRESSION_TYPE", "snappy")
# Get the chunk size from the environment variable
CHUNK_SIZE = os.getenv("ZENML_PANDAS_CHUNK_SIZE", 100000)
COMPRESSION_TYPE = os.getenv("ZENML_PANDAS_COMPRESSION_TYPE", "snappy")
# Get the chunk size from the environment variable
CHUNK_SIZE = int(os.getenv("ZENML_PANDAS_CHUNK_SIZE", "100000"))

Comment on lines +85 to +111
# If the metadata file exists, then the data is stored as a chunked
# parquet file in the latest version of the materializer
if fileio.exists(self.metadata_path):
if self.pyarrow_exists:
dfs = []

# Read the length of the dataframe from the metadata file
len_df = int(
io_utils.read_file_contents_as_string(self.metadata_path)
)

for i in range(0, len_df, CHUNK_SIZE):
with fileio.open(
f"{self.parquet_path}_{i//CHUNK_SIZE}", mode="rb"
) as f:
dfs.append(pd.read_parquet(f))

# concatenate all the dataframes to one
df = pd.concat(dfs)
else:
raise ImportError(
"You have an old version of a `PandasMaterializer` "
"data artifact stored in the artifact store "
"as a `.parquet` file, which requires `pyarrow` "
"for reading, You can install `pyarrow` by running "
"'`pip install pyarrow fastparquet`'."
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The load method has been updated to handle chunked parquet files. The logic checks for the existence of a metadata file to determine if the data is chunked. The error message in lines 105-110 suggests installing pyarrow and fastparquet, but fastparquet is not used in the code. This could be misleading.

- "'`pip install pyarrow fastparquet`'."
+ "'`pip install pyarrow`'."

Committable suggestion

IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
# If the metadata file exists, then the data is stored as a chunked
# parquet file in the latest version of the materializer
if fileio.exists(self.metadata_path):
if self.pyarrow_exists:
dfs = []
# Read the length of the dataframe from the metadata file
len_df = int(
io_utils.read_file_contents_as_string(self.metadata_path)
)
for i in range(0, len_df, CHUNK_SIZE):
with fileio.open(
f"{self.parquet_path}_{i//CHUNK_SIZE}", mode="rb"
) as f:
dfs.append(pd.read_parquet(f))
# concatenate all the dataframes to one
df = pd.concat(dfs)
else:
raise ImportError(
"You have an old version of a `PandasMaterializer` "
"data artifact stored in the artifact store "
"as a `.parquet` file, which requires `pyarrow` "
"for reading, You can install `pyarrow` by running "
"'`pip install pyarrow fastparquet`'."
)
# If the metadata file exists, then the data is stored as a chunked
# parquet file in the latest version of the materializer
if fileio.exists(self.metadata_path):
if self.pyarrow_exists:
dfs = []
# Read the length of the dataframe from the metadata file
len_df = int(
io_utils.read_file_contents_as_string(self.metadata_path)
)
for i in range(0, len_df, CHUNK_SIZE):
with fileio.open(
f"{self.parquet_path}_{i//CHUNK_SIZE}", mode="rb"
) as f:
dfs.append(pd.read_parquet(f))
# concatenate all the dataframes to one
df = pd.concat(dfs)
else:
raise ImportError(
"You have an old version of a `PandasMaterializer` "
"data artifact stored in the artifact store "
"as a `.parquet` file, which requires `pyarrow` "
"for reading, You can install `pyarrow` by running "
"'`pip install pyarrow`'."
)

dfs.append(pd.read_parquet(f))

# concatenate all the dataframes to one
df = pd.concat(dfs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does df.concat(dfs) uses memory? will it be a problem in the case of a very large dataframe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm wouldn’t be more of a problem than what was currently status quo :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderabbitai what do you think? is there a better way of doing this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@strickvl, using pd.concat(dfs) does require memory to hold all the DataFrames in the list dfs and the resulting concatenated DataFrame. For very large dataframes, this could lead to memory issues. A more memory-efficient approach could be to use the iterator parameter of the pd.read_parquet function to load and concatenate chunks of the dataframe lazily. This way, you only have one chunk of the dataframe in memory at a time. Here's a code snippet demonstrating this approach:

dfs = pd.read_parquet(f"{self.parquet_path}_*", iterator=True)
df = pd.concat(dfs, ignore_index=True)

This requires that pyarrow supports the iterator parameter, which should be verified. If pyarrow does not support this, another option could be to process the data in chunks and perform necessary operations chunk by chunk, which avoids loading the entire dataframe into memory at once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No guys we dont need to touch this, of course we have to hold the dataframe in memory... thats what materialization means right?

Copy link
Contributor

@bcdurak bcdurak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a comment regarding the backward compatibility.


logger = get_logger(__name__)

PARQUET_FILENAME = "df.parquet.gzip"
COMPRESSION_TYPE = "gzip"
PARQUET_FILENAME = "df.parquet"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this break the functionality where there is an artifact saved as a parquet file with the old materializer, and people upgrade their ZenML version?

In more detail, in this case, the load function would check:

  • if the metadata file exists and would skip,
  • then it would check whether the parquet file exists and since the filename has changed would skip again,
  • finally, would try to load a csv file and fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then it would check whether the parquet file exists and since the filename has changed would skip again,

It would check if the parquet files exists and would not skip as it would exist

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it was generated by the old materializer, the filepath of the old artifact would be self.uri/df.parquet.gzip. But the new materializer will look for it on self.uri/df.parquet. or am I missing something?

@zenml-io zenml-io deleted a comment from coderabbitai bot Jan 4, 2024
@strickvl
Copy link
Contributor

strickvl commented Jan 4, 2024

@htahir1 My bigger question is around the specific use case that triggered this PR (~28m rows dataframe etc). In some tests I ran, materializing (saving and loading), was twice as slow on this branch when compared to what we already have. (Details in the table, but the key comparison is 28-million-row dataframe materialized to a GCP artifact store). (It works a little bit faster in the local artifact store case, which is what we'd want / expect).

CleanShot 2024-01-04 at 13 56 11@2x

Probably the repeated saving (at the 100K chunk size that we specify here) that causes the slowdown. In any case, presumably we want this to work faster in the remote artifact store case?

The code used for these tests:

from zenml import step, pipeline
import pandas as pd
import numpy as np
from datetime import datetime, timedelta


@step(enable_cache=False)
def save_df(num_rows: int = 10000) -> pd.DataFrame:
    """Reads the data from the source."""
    data = {
        "ID": range(1, num_rows + 1),
        "Value": np.random.rand(num_rows),  # Random float values
        # Create dates cycling through a year to avoid 'date value out of range' error
        "Date": [datetime.now() - timedelta(days=x % 365) for x in range(num_rows)],
    }
    return pd.DataFrame(data)


@step(enable_cache=False)
def read_df(df: pd.DataFrame) -> pd.DataFrame:
    """Reads the data from the source."""
    return df


@pipeline(enable_cache=False)
def my_pipeline():
    # df = save_df()  # 10k rows
    # df = save_df(num_rows=1000000)  # 1M rows
    df = save_df(num_rows=28000000)  # 28M rows
    reloaded_df = read_df(df=df)


my_pipeline()

@strickvl
Copy link
Contributor

strickvl commented Jan 4, 2024

Parallelising the chunk saving (i.e. fanning out the saving of the artifacts to multiple threads) brings a decent improvement to the process, though then we run the risk of overwhelming the users' machines. WDYT?

CleanShot 2024-01-04 at 14 37 54@2x

def save_chunk(self, df, start, chunk_size, path, compression):
        """Save a chunk of the dataframe to a parquet file."""
        chunk = df.iloc[start : start + chunk_size]
        with fileio.open(path, mode="wb") as f:
            chunk.to_parquet(f, compression=compression)

    def save(self, df: Union[pd.DataFrame, pd.Series]) -> None:
        """Writes a pandas dataframe or series to the specified filename in parallel chunks."""
        if isinstance(df, pd.Series):
            df = df.to_frame(name="series")

        if self.pyarrow_exists:
            # Write the length of the dataframe to a file for later use
            io_utils.write_file_contents_as_string(
                self.metadata_path, str(len(df))
            )

            # Define the maximum number of threads
            # Fallback to 10 if cpu_count is None
            max_workers = os.cpu_count() or 10

            # Prepare the thread pool for parallel writes
            with concurrent.futures.ThreadPoolExecutor(
                max_workers=max_workers
            ) as executor:
                futures = []
                for i in range(0, len(df), CHUNK_SIZE):
                    chunk_path = f"{self.parquet_path}_{i//CHUNK_SIZE}"
                    # Submit the chunk-saving task to the executor
                    futures.append(
                        executor.submit(
                            self.save_chunk,
                            df,
                            i,
                            CHUNK_SIZE,
                            chunk_path,
                            COMPRESSION_TYPE,
                        )
                    )

                # Wait for all futures to complete
                for future in concurrent.futures.as_completed(futures):
                    # Check for exceptions and handle them if necessary
                    future.result()
        else:
            with fileio.open(self.csv_path, mode="wb") as f:
                df.to_csv(f, index=True)

@strickvl
Copy link
Contributor

strickvl commented Jan 4, 2024

Reducing the number of workers to be no more than the # of CPU cores on the machine (i.e. 16 on my machine) made it even faster. So almost at twice as fast as the original implementation on this PR, but it's still quite a bit slower than the original gzip implementation.

CleanShot 2024-01-04 at 14 59 30@2x

@htahir1
Copy link
Contributor Author

htahir1 commented Jan 4, 2024

Closing due to @strickvl 's results, lets table it

@htahir1 htahir1 closed this Jan 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request internal To filter out internal PRs and issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pandas materializer is slow when using large datasets with remote artifact stores
5 participants