Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate_schema keyword not supported yet #758

Open
kisel4363 opened this issue Jul 3, 2022 · 6 comments
Open

Validate_schema keyword not supported yet #758

kisel4363 opened this issue Jul 3, 2022 · 6 comments

Comments

@kisel4363
Copy link

Hi, Im using petastorm to feed tensorflow models lunched with spark in an EMR cluster. The code is the basic to read parquet files on s3:

from pyarrow import fs
from petastorm.reader import Reader
from petastorm.tf_utils import make_petastorm_dataset

ratings_uri = "s3://path/to/parquet/file"

s3, path = fs.FileSystem.from_uri(ratings_uri)
with Reader(pyarrow_filesystem= s3, dataset_path=path) as ratings_r:
    r = make_petastorm_dataset(ratings_r)

It throw the next error:

Traceback (most recent call last):
  File "/home/hadoop/ai_script.py", line 105, in <module>
    with Reader(pyarrow_filesystem= s3, dataset_path=path) as ratings_r:
  File "/home/hadoop/.local/lib/python3.7/site-packages/petastorm/reader.py", line 406, in __init__
    filters=filters)
  File "/home/hadoop/.local/lib/python3.7/site-packages/pyarrow/parquet.py", line 1213, in __new__
    metadata_nthreads=metadata_nthreads)
  File "/home/hadoop/.local/lib/python3.7/site-packages/pyarrow/parquet.py", line 1466, in __init__
    "Dataset API".format(keyword))
ValueError: Keyword 'validate_schema' is not yet supported with the new Dataset API

How can be solved this issue? Thanks

@selitvin
Copy link
Collaborator

Which version of pyarrow are you using?

@dsenica
Copy link

dsenica commented Aug 17, 2022

Hi

I'm having the same problem when using Reader class.
I'm using petastorm version 0.11.5 and pyarrow version 9.0.0.

@selitvin
Copy link
Collaborator

Please try using fs = FilesystemResolver(cache_dir_url).filesystem() to get a filesystem that should be compatible with the Reader. The pyarrow.fs instance of a filesystem can not be currently used with petastorm as it requires it to support pyarrow ParquetDataset v2, which is not supported just yet. See: #613.

@UpstatePedro
Copy link

Hi @selitvin , I'm getting the same error when trying to write a petastorm dataset to cloud storage (ValueError: Keyword 'validate_schema' is not yet supported with the new Dataset API) and this workaround doesn't seem to work for me - except I'm using resolver.filesystem_factory() instead of resolver.filesystem() to fit the materialise_dataset() API.

I am using

from petastorm.fs_utils import FilesystemResolver
from petastorm.etl.dataset_metadata import materialize_dataset

dataset_url = "gs://bucket/dataset-name"
resolver = FilesystemResolver(dataset_url)

with materialize_dataset(spark, dataset_url, schema, filesystem_factory=resolver.filesystem_factory()):
    # spark logic here
    # essentially the same as the example in the docs

I'm not sure whether your suggestion isn't working for me due to the fact that I'm writing rather than reading, due to the time that's passed since you originally posted(, or whether I'm doing something stupid!). Do you know how I can resolve this?

@nateyoder
Copy link

I'm also getting the same error despite using using the file system resolve (which results an s3fs filesystem.

from petastorm import make_batch_reader
from petastorm.tf_utils import make_petastorm_dataset
from petastorm.fs_utils import FilesystemResolver
s3_url = f"s3://{S3_BASE}"
fs = FilesystemResolver(f"s3://{S3_BASE}").filesystem()
print(type(fs))

with make_batch_reader(s3_url, filesystem=fs) as reader:
    dataset = make_petastorm_dataset(reader)
    iterator = dataset.make_one_shot_iterator()
    tensor = iterator.get_next()

Output:

<class 's3fs.core.S3FileSystem'>

Trace:

File [~/pln_bin/quickstart/py_env/quickstart/lib/python3.11/site-packages/petastorm/reader.py:300](https://vscode-remote+ssh-002dremote-002bappl644-002dny2.vscode-resource.vscode-cdn.net/space/nyoder/notebooks/nyoder/mf/~/pln_bin/quickstart/py_env/quickstart/lib/python3.11/site-packages/petastorm/reader.py:300), in make_batch_reader(dataset_url_or_urls, schema_fields, reader_pool_type, workers_count, results_queue_size, seed, shuffle_rows, shuffle_row_groups, shuffle_row_drop_partitions, predicate, rowgroup_selector, num_epochs, cur_shard, shard_count, shard_seed, cache_type, cache_location, cache_size_limit, cache_row_size_estimate, cache_extra_settings, hdfs_driver, transform_spec, filters, storage_options, zmq_copy_buffers, filesystem)
    292 filesystem, dataset_path_or_paths = get_filesystem_and_path_or_paths(
    293     dataset_url_or_urls,
    294     hdfs_driver,
    295     storage_options=storage_options,
    296     filesystem=filesystem
    297 )
    299 try:
--> 300     dataset_metadata.get_schema_from_dataset_url(dataset_url_or_urls, hdfs_driver=hdfs_driver,
    301                                                  storage_options=storage_options, filesystem=filesystem)
    302     warnings.warn('Please use make_reader (instead of \'make_batch_dataset\' function to read this dataset. '
    303                   'You may get unexpected results. '
    304                   'Currently make_batch_reader supports reading only Parquet stores that contain '
    305                   'standard Parquet data types and do not require petastorm decoding.')
    306 except PetastormMetadataError:

File [~/pln_bin/quickstart/py_env/quickstart/lib/python3.11/site-packages/petastorm/etl/dataset_metadata.py:402](https://vscode-remote+ssh-002dremote-002bappl644-002dny2.vscode-resource.vscode-cdn.net/space/nyoder/notebooks/nyoder/mf/~/pln_bin/quickstart/py_env/quickstart/lib/python3.11/site-packages/petastorm/etl/dataset_metadata.py:402), in get_schema_from_dataset_url(dataset_url_or_urls, hdfs_driver, storage_options, filesystem)
    389 """Returns a :class:`petastorm.unischema.Unischema` object loaded from a dataset specified by a url.
    390 
    391 :param dataset_url_or_urls: a url to a parquet directory or a url list (with the same scheme) to parquet files.
   (...)
    396 :return: A :class:`petastorm.unischema.Unischema` object
    397 """
    398 fs, path_or_paths = get_filesystem_and_path_or_paths(dataset_url_or_urls, hdfs_driver,
    399                                                      storage_options=storage_options,
    400                                                      filesystem=filesystem)
--> 402 dataset = pq.ParquetDataset(path_or_paths, filesystem=fs, validate_schema=False, metadata_nthreads=10)
    404 # Get a unischema stored in the dataset metadata.
    405 stored_schema = get_schema(dataset)

File [~/pln_bin/quickstart/py_env/quickstart/lib/python3.11/site-packages/pyarrow/parquet/core.py:1776](https://vscode-remote+ssh-002dremote-002bappl644-002dny2.vscode-resource.vscode-cdn.net/space/nyoder/notebooks/nyoder/mf/~/pln_bin/quickstart/py_env/quickstart/lib/python3.11/site-packages/pyarrow/parquet/core.py:1776), in ParquetDataset.__new__(cls, path_or_paths, filesystem, schema, metadata, split_row_groups, validate_schema, filters, metadata_nthreads, read_dictionary, memory_map, buffer_size, partitioning, use_legacy_dataset, pre_buffer, coerce_int96_timestamp_unit, thrift_string_size_limit, thrift_container_size_limit)
   1773         use_legacy_dataset = False
   1775 if not use_legacy_dataset:
-> 1776     return _ParquetDatasetV2(
   1777         path_or_paths, filesystem=filesystem,
   1778         filters=filters,
   1779         partitioning=partitioning,
   1780         read_dictionary=read_dictionary,
   1781         memory_map=memory_map,
   1782         buffer_size=buffer_size,
   1783         pre_buffer=pre_buffer,
   1784         coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
   1785         # unsupported keywords
   1786         schema=schema, metadata=metadata,
   1787         split_row_groups=split_row_groups,
   1788         validate_schema=validate_schema,
   1789         metadata_nthreads=metadata_nthreads,
   1790         thrift_string_size_limit=thrift_string_size_limit,
   1791         thrift_container_size_limit=thrift_container_size_limit,
   1792     )
   1793 warnings.warn(
   1794     "Passing 'use_legacy_dataset=True' to get the legacy behaviour is "
   1795     "deprecated as of pyarrow 11.0.0, and the legacy implementation "
   1796     "will be removed in a future version." + extra_msg,
   1797     FutureWarning, stacklevel=2)
   1798 self = object.__new__(cls)

File [~/pln_bin/quickstart/py_env/quickstart/lib/python3.11/site-packages/pyarrow/parquet/core.py:2407](https://vscode-remote+ssh-002dremote-002bappl644-002dny2.vscode-resource.vscode-cdn.net/space/nyoder/notebooks/nyoder/mf/~/pln_bin/quickstart/py_env/quickstart/lib/python3.11/site-packages/pyarrow/parquet/core.py:2407), in _ParquetDatasetV2.__init__(self, path_or_paths, filesystem, filters, partitioning, read_dictionary, buffer_size, memory_map, ignore_prefixes, pre_buffer, coerce_int96_timestamp_unit, schema, decryption_properties, thrift_string_size_limit, thrift_container_size_limit, **kwargs)
   2403 for keyword, default in [
   2404         ("metadata", None), ("split_row_groups", False),
   2405         ("validate_schema", True), ("metadata_nthreads", None)]:
   2406     if keyword in kwargs and kwargs[keyword] is not default:
-> 2407         raise ValueError(
   2408             "Keyword '{0}' is not yet supported with the new "
   2409             "Dataset API".format(keyword))
   2411 # map format arguments
   2412 read_options = {
   2413     "pre_buffer": pre_buffer,
   2414     "coerce_int96_timestamp_unit": coerce_int96_timestamp_unit,
   2415     "thrift_string_size_limit": thrift_string_size_limit,
   2416     "thrift_container_size_limit": thrift_container_size_limit,
   2417 }

ValueError: Keyword 'validate_schema' is not yet supported with the new Dataset API

@KumudKoirala
Copy link

KumudKoirala commented Oct 30, 2023

Here is how i fixed it .. I am using s3 minIO to cache directory for petastorm. Objective was to create tf dataset from the petastorm cache data in minIO. Since petastorm uses pyarrow underneath, there has been a dependency issues.
Follow these steps:

  • write your aws credentials to file /.aws/credentials # use default profile

  • pip install pyarrow==10.0.1
    dont use any other version greater than this one. Version >10.0.1 , doesnot support keyword validation_schema , i do not know why!!

  • pip install petastorm==0.12.1

from petastorm.fs_utils import FilesystemResolver
from petastorm.spark import SparkDatasetConverter, make_spark_converter
import pyarrow.parquet as pq

path_or_paths='s3a://bucket/key' # dont mention the parquet file
pq.ParquetDataset(path_or_paths= path_or_paths,filesystem=FilesystemResolver(path_or_paths).filesystem()
,validate_schema=False).read().to_pandas() # do whatever you want to do after this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants