Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug in Reading Compressed String Column in Parquet Dataset #1962

Open
cmgreen210 opened this issue Mar 15, 2024 · 2 comments
Open

Bug in Reading Compressed String Column in Parquet Dataset #1962

cmgreen210 opened this issue Mar 15, 2024 · 2 comments

Comments

@cmgreen210
Copy link

cmgreen210 commented Mar 15, 2024

I have a parquet dataset with a column consisting of serialized tf.Example protobufs. When I write this dataset and read without any compression I have no problems deserializing the protos. When I write the dataset with compression I get errors. On further inspection it's clear that TFIO does not read the correct strings from the compressed dataset.

A reproducible example can be found here: https://gist.github.com/cmgreen210/639ab8ea1102c22f67db60c95a8653f5

@kochhar-mw
Copy link

I've looked into this a bit more and found that, TFIO can read some parquet files with compression.

Specifically, it is able to read parquet files created by fastparquet v2024.02.0.
However, it is unable to read parquet files with large compressed byte columns created by pyarrow or created via spark.

I say "large" because I don't know the exact point where the data corruption starts. I have observed that with 10K rows of 50 bytes each the read is successful. With 10K rows of 500 bytes each the reads are unsuccessful.

import random
import string
NUM_EXAMPLES = 1024 * 10


def make_bytes_list(num, length=None, maxlen=100):
    lengths = [length] * num if length is not None else [random.randint(0, maxlen) for _ in range(num)]
    strings = [''.join(random.choices(string.ascii_lowercase, k=length)) for length in lengths]
    return [s.encode() for s in strings] 


strings = make_bytes_list(NUM_EXAMPLES, length=492)

out_path_pa_compress = "/tmp/out.pa.gz"
df = pd.DataFrame(strings, columns=['str_f'])
df.to_parquet(out_path_pa_compress, compression='gzip', engine='pyarrow')

ds_pa_recover = tfio.IODataset.from_parquet(out_path_pa_compress, columns={"str_f": tf.string})
str_pa_recover = [ex['str_f'] for ex in ds_pa_recover]

print("Recovered all rows:", len(str_pa_recover) == len(strings))
print("Recovered equal bytes size:", sum([ len(rec.numpy()) for rec in str_pa_recover ]) == sum([ len(s) for s in strings]))
print("All strings equal:", [rec for rec in str_pa_recover] == [s for s in strings])
print("First mismatch at:", [ rec == s for (rec, s) in zip(str_pa_recover, strings) ].index(False))

With the following output

Recovered all rows: True
Recovered equal bytes size: True
All strings equal: False
First mismatch at: 4096

@kochhar-mw
Copy link

The data read error doesn't seem to be in the tensorflow_io python layer.
this call goes to the cpp parquet reader and returns erroneous data.

To step through the CPP code, I was able to attach gdb to my python process and get a breakpoint inside ParquetReadableResource::Read but debugger could not step through the function -- complains of missing line no information.

Any notes or pointers on how to get gdb working with the tensorflow_io C++ core_ops would be really appreciated. I've tried the development build instructions with the addition of --compilation_mode=dbg to the bazel invocation to build a .so followed by building and installing the python wheel. After installing the built-from-source wheel, gdb can no longer find the ParquetReadableRead class to be able to set the breakpoint. I have verified that the symbols in the compiled .so include the obfuscated class name so I'm not sure why gdb is unabel to find the names.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants