New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
A faster pandas materializer #2212
Conversation
Important Auto Review SkippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the To trigger a single review, invoke the WalkthroughThe recent changes involve enhancing the Pandas materializer in the ZenML framework. These updates include the introduction of environment variables specific to the Pandas materializer, adjustments to support chunked parquet file handling, and metadata management. Additionally, there is a note for configuring these variables in Docker for remote orchestration, and a minor update to the documentation link. Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChat with CodeRabbit Bot (
|
@coderabbitai review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 2
Configuration used: .coderabbit.yaml
Files selected for processing (2)
- docs/book/user-guide/advanced-guide/environment-management/environment-variables.md (2 hunks)
- src/zenml/materializers/pandas_materializer.py (6 hunks)
Files skipped from review due to trivial changes (1)
- docs/book/user-guide/advanced-guide/environment-management/environment-variables.md
Additional comments: 5
src/zenml/materializers/pandas_materializer.py (5)
1-1: The copyright year has been updated to 2024, which is correct assuming the file has been modified in 2024.
26-26: The import of
io_utils
is added, which is likely used for the new chunking and compression functionality.69-69: The addition of
metadata_path
is consistent with the PR's objective to handle metadata for large dataframes.126-129: The handling of
.csv
files remains unchanged, ensuring backward compatibility.164-177: The
save
method now writes the length of the dataframe to a metadata file and saves the dataframe in chunks. Ensure that theCHUNK_SIZE
is cast to an integer before use, as previously mentioned.Verification successful
The output from the shell script provides the context of the
save
method in thepandas_materializer.py
file. It shows the use ofCHUNK_SIZE
in the for loop as part of the logic to write chunked parquet files. TheCHUNK_SIZE
is used in the expressioni // CHUNK_SIZE
which is an integer division operation, implying thatCHUNK_SIZE
is indeed being used as an integer.Based on the provided context, there is no need to cast
CHUNK_SIZE
to an integer before its use in the for loop, as it is already being used in a context that requires an integer (the third argument of therange
function and in integer division).Therefore, the review comment is addressed correctly, and the use of
CHUNK_SIZE
in thesave
method is verified to be appropriate.Scripts Executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Verify that CHUNK_SIZE is used as an integer in the save method. ast-grep --lang python --pattern $'for i in range(0, $_, CHUNK_SIZE)'Length of output: 415
Script:
#!/bin/bash # Search for the 'save' method in the 'pandas_materializer.py' file to inspect the usage of CHUNK_SIZE. rg --context 10 'def save' src/zenml/materializers/pandas_materializer.pyLength of output: 1548
COMPRESSION_TYPE = os.getenv("ZENML_PANDAS_COMPRESSION_TYPE", "snappy") | ||
# Get the chunk size from the environment variable | ||
CHUNK_SIZE = os.getenv("ZENML_PANDAS_CHUNK_SIZE", 100000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of environment variables for setting the compression type and chunk size is a good practice for configurability. However, the default value for CHUNK_SIZE
is set as a string instead of an integer.
- CHUNK_SIZE = os.getenv("ZENML_PANDAS_CHUNK_SIZE", 100000)
+ CHUNK_SIZE = int(os.getenv("ZENML_PANDAS_CHUNK_SIZE", "100000"))
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
COMPRESSION_TYPE = os.getenv("ZENML_PANDAS_COMPRESSION_TYPE", "snappy") | |
# Get the chunk size from the environment variable | |
CHUNK_SIZE = os.getenv("ZENML_PANDAS_CHUNK_SIZE", 100000) | |
COMPRESSION_TYPE = os.getenv("ZENML_PANDAS_COMPRESSION_TYPE", "snappy") | |
# Get the chunk size from the environment variable | |
CHUNK_SIZE = int(os.getenv("ZENML_PANDAS_CHUNK_SIZE", "100000")) |
# If the metadata file exists, then the data is stored as a chunked | ||
# parquet file in the latest version of the materializer | ||
if fileio.exists(self.metadata_path): | ||
if self.pyarrow_exists: | ||
dfs = [] | ||
|
||
# Read the length of the dataframe from the metadata file | ||
len_df = int( | ||
io_utils.read_file_contents_as_string(self.metadata_path) | ||
) | ||
|
||
for i in range(0, len_df, CHUNK_SIZE): | ||
with fileio.open( | ||
f"{self.parquet_path}_{i//CHUNK_SIZE}", mode="rb" | ||
) as f: | ||
dfs.append(pd.read_parquet(f)) | ||
|
||
# concatenate all the dataframes to one | ||
df = pd.concat(dfs) | ||
else: | ||
raise ImportError( | ||
"You have an old version of a `PandasMaterializer` " | ||
"data artifact stored in the artifact store " | ||
"as a `.parquet` file, which requires `pyarrow` " | ||
"for reading, You can install `pyarrow` by running " | ||
"'`pip install pyarrow fastparquet`'." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The load
method has been updated to handle chunked parquet files. The logic checks for the existence of a metadata file to determine if the data is chunked. The error message in lines 105-110 suggests installing pyarrow
and fastparquet
, but fastparquet
is not used in the code. This could be misleading.
- "'`pip install pyarrow fastparquet`'."
+ "'`pip install pyarrow`'."
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
# If the metadata file exists, then the data is stored as a chunked | |
# parquet file in the latest version of the materializer | |
if fileio.exists(self.metadata_path): | |
if self.pyarrow_exists: | |
dfs = [] | |
# Read the length of the dataframe from the metadata file | |
len_df = int( | |
io_utils.read_file_contents_as_string(self.metadata_path) | |
) | |
for i in range(0, len_df, CHUNK_SIZE): | |
with fileio.open( | |
f"{self.parquet_path}_{i//CHUNK_SIZE}", mode="rb" | |
) as f: | |
dfs.append(pd.read_parquet(f)) | |
# concatenate all the dataframes to one | |
df = pd.concat(dfs) | |
else: | |
raise ImportError( | |
"You have an old version of a `PandasMaterializer` " | |
"data artifact stored in the artifact store " | |
"as a `.parquet` file, which requires `pyarrow` " | |
"for reading, You can install `pyarrow` by running " | |
"'`pip install pyarrow fastparquet`'." | |
) | |
# If the metadata file exists, then the data is stored as a chunked | |
# parquet file in the latest version of the materializer | |
if fileio.exists(self.metadata_path): | |
if self.pyarrow_exists: | |
dfs = [] | |
# Read the length of the dataframe from the metadata file | |
len_df = int( | |
io_utils.read_file_contents_as_string(self.metadata_path) | |
) | |
for i in range(0, len_df, CHUNK_SIZE): | |
with fileio.open( | |
f"{self.parquet_path}_{i//CHUNK_SIZE}", mode="rb" | |
) as f: | |
dfs.append(pd.read_parquet(f)) | |
# concatenate all the dataframes to one | |
df = pd.concat(dfs) | |
else: | |
raise ImportError( | |
"You have an old version of a `PandasMaterializer` " | |
"data artifact stored in the artifact store " | |
"as a `.parquet` file, which requires `pyarrow` " | |
"for reading, You can install `pyarrow` by running " | |
"'`pip install pyarrow`'." | |
) |
dfs.append(pd.read_parquet(f)) | ||
|
||
# concatenate all the dataframes to one | ||
df = pd.concat(dfs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does df.concat(dfs)
uses memory? will it be a problem in the case of a very large dataframe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm wouldn’t be more of a problem than what was currently status quo :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@coderabbitai what do you think? is there a better way of doing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@strickvl, using pd.concat(dfs)
does require memory to hold all the DataFrames in the list dfs
and the resulting concatenated DataFrame. For very large dataframes, this could lead to memory issues. A more memory-efficient approach could be to use the iterator
parameter of the pd.read_parquet
function to load and concatenate chunks of the dataframe lazily. This way, you only have one chunk of the dataframe in memory at a time. Here's a code snippet demonstrating this approach:
dfs = pd.read_parquet(f"{self.parquet_path}_*", iterator=True)
df = pd.concat(dfs, ignore_index=True)
This requires that pyarrow
supports the iterator
parameter, which should be verified. If pyarrow
does not support this, another option could be to process the data in chunks and perform necessary operations chunk by chunk, which avoids loading the entire dataframe into memory at once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No guys we dont need to touch this, of course we have to hold the dataframe in memory... thats what materialization means right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a comment regarding the backward compatibility.
|
||
logger = get_logger(__name__) | ||
|
||
PARQUET_FILENAME = "df.parquet.gzip" | ||
COMPRESSION_TYPE = "gzip" | ||
PARQUET_FILENAME = "df.parquet" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this break the functionality where there is an artifact saved as a parquet file with the old materializer, and people upgrade their ZenML version?
In more detail, in this case, the load
function would check:
- if the metadata file exists and would skip,
- then it would check whether the parquet file exists and since the filename has changed would skip again,
- finally, would try to load a csv file and fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then it would check whether the parquet file exists and since the filename has changed would skip again,
It would check if the parquet files exists and would not skip as it would exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it was generated by the old materializer, the filepath of the old artifact would be self.uri/df.parquet.gzip
. But the new materializer will look for it on self.uri/df.parquet
. or am I missing something?
@htahir1 My bigger question is around the specific use case that triggered this PR (~28m rows dataframe etc). In some tests I ran, materializing (saving and loading), was twice as slow on this branch when compared to what we already have. (Details in the table, but the key comparison is 28-million-row dataframe materialized to a GCP artifact store). (It works a little bit faster in the local artifact store case, which is what we'd want / expect). Probably the repeated saving (at the 100K chunk size that we specify here) that causes the slowdown. In any case, presumably we want this to work faster in the remote artifact store case? The code used for these tests: from zenml import step, pipeline
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
@step(enable_cache=False)
def save_df(num_rows: int = 10000) -> pd.DataFrame:
"""Reads the data from the source."""
data = {
"ID": range(1, num_rows + 1),
"Value": np.random.rand(num_rows), # Random float values
# Create dates cycling through a year to avoid 'date value out of range' error
"Date": [datetime.now() - timedelta(days=x % 365) for x in range(num_rows)],
}
return pd.DataFrame(data)
@step(enable_cache=False)
def read_df(df: pd.DataFrame) -> pd.DataFrame:
"""Reads the data from the source."""
return df
@pipeline(enable_cache=False)
def my_pipeline():
# df = save_df() # 10k rows
# df = save_df(num_rows=1000000) # 1M rows
df = save_df(num_rows=28000000) # 28M rows
reloaded_df = read_df(df=df)
my_pipeline() |
Parallelising the chunk saving (i.e. fanning out the saving of the artifacts to multiple threads) brings a decent improvement to the process, though then we run the risk of overwhelming the users' machines. WDYT? def save_chunk(self, df, start, chunk_size, path, compression):
"""Save a chunk of the dataframe to a parquet file."""
chunk = df.iloc[start : start + chunk_size]
with fileio.open(path, mode="wb") as f:
chunk.to_parquet(f, compression=compression)
def save(self, df: Union[pd.DataFrame, pd.Series]) -> None:
"""Writes a pandas dataframe or series to the specified filename in parallel chunks."""
if isinstance(df, pd.Series):
df = df.to_frame(name="series")
if self.pyarrow_exists:
# Write the length of the dataframe to a file for later use
io_utils.write_file_contents_as_string(
self.metadata_path, str(len(df))
)
# Define the maximum number of threads
# Fallback to 10 if cpu_count is None
max_workers = os.cpu_count() or 10
# Prepare the thread pool for parallel writes
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures = []
for i in range(0, len(df), CHUNK_SIZE):
chunk_path = f"{self.parquet_path}_{i//CHUNK_SIZE}"
# Submit the chunk-saving task to the executor
futures.append(
executor.submit(
self.save_chunk,
df,
i,
CHUNK_SIZE,
chunk_path,
COMPRESSION_TYPE,
)
)
# Wait for all futures to complete
for future in concurrent.futures.as_completed(futures):
# Check for exceptions and handle them if necessary
future.result()
else:
with fileio.open(self.csv_path, mode="wb") as f:
df.to_csv(f, index=True) |
Closing due to @strickvl 's results, lets table it |
Describe changes
I create a faster pandas materializer that uses
snappy
and chunks to write to disk. This works better in cases where the dataframe is >10000 rows of data. I have ensured that it is backwards compatible and not a breaking change, however I have not tested the case of backwards compatibility myself (Some help here is appreciated).This materializer has been tested manually on a S3 artifact store.
Pre-requisites
Please ensure you have done the following:
develop
and the open PR is targetingdevelop
. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.Types of changes
Summary by CodeRabbit
New Features
Documentation
Bug Fixes