Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileExpired exception when reading parquet from a Minio bucket using Dask #11044

Open
PietroSpalluto opened this issue Apr 11, 2024 · 6 comments

Comments

@PietroSpalluto
Copy link

PietroSpalluto commented Apr 11, 2024

Describe the issue:

I have a list of dataframes in a Minio bucket that are updated every 15 minutes. My script runs inside a Docker container in a loop and every 15 minutes a list of futures is created to read and preprocess every dataframe in the bucket. When computing the result it happens sometimes that the following exception is triggered:

s3fs.utils.FileExpired: [Errno 16] The remote file corresponding to filename filename.part.0.parquet and Etag "76b9a0ed3044b29e8a326d6f4ade2036" no longer exists

and triggers TypeError: __init__() missing 1 required positional argument: 'e_tag'. Even catching the exception does not solve the problem since during the next iteration it is triggered again. I checked on Minio and the Etag effectively does not correspond to that in the exception but I do not know how to solve this problem. The code to read data is this

df = dd.read_parquet('s3://{}/{}/{}'.format(bucket, path, filename),
                         storage_options={
                             "key": key,
                             "secret": secret,
                             "client_kwargs": {'endpoint_url': 'http://{}'.format(minio_endpoint)}
                         })

Minimal Complete Verifiable Example:
Providing a verifiable example is difficult since it runs on Docker and it is the result of various interacting scripts. I tried to replicate it running a simple script outside of docker but the problem does not appear. This is the script I used that is similar to what the original script does.

def load_user(bucket, path, file, minio_endpoint, key, secret):
    print(f'Reading {file}...')
    df = dd.read_parquet('s3://{}/{}/{}'.format(bucket, path, hash_filename(file)),
                         storage_options={
                             "key": key,
                             "secret": secret,
                             "client_kwargs": {'endpoint_url': 'http://{}'.format(minio_endpoint)}
                         },
                         ignore_metadata_file=True)

    return df


if __name__ == "__main__":
    bucket = bucket name
    path = path
    filenames = list of filenames

    cluster = LocalCluster(silence_logs=logging.CRITICAL)
    dask_client = Client(cluster)
    minio_client = Minio(endpoint, key, secret, secure=False)

    minio_endpoint = minio_client._base_url.host
    key = minio_client._provider._credentials.access_key
    secret = minio_client._provider._credentials.secret_key

    while True:
        futures = []
        for file in filenames:
            future = dask_client.submit(load_user,
                                        bucket,
                                        path,
                                        file,
                                        minio_endpoint,
                                        key,
                                        secret)
            futures.append(future)

        for user, future in zip(filenames, futures):
            df = future.result()

Anything else we need to know?:

I have tried to use the function invalidate_cache() of s3fs and using ignore_metadata_file=True when reading data but it didn't worked. Catching the exception works but the problem is not solved during the following iteration.

Here is the complete traceback if you find it useful

Traceback (most recent call last):  File "/usr/local/lib/python3.9/site-packages/s3fs/core.py", line 113, in _error_wrapper    return await func(*args, **kwargs)  File "/usr/local/lib/python3.9/site-packages/aiobotocore/client.py", line 408, in _make_api_call    raise error_class(parsed_response, operation_name)botocore.exceptions.ClientError: An error occurred (PreconditionFailed) when calling the GetObject operation: At least one of the pre-conditions you specified did not holdThe above exception was the direct cause of the following exception:Traceback (most recent call last):  File "/usr/local/lib/python3.9/site-packages/s3fs/core.py", line 2300, in _fetch_range    return _fetch_range(  File "/usr/local/lib/python3.9/site-packages/s3fs/core.py", line 2462, in _fetch_range    return sync(fs.loop, _inner_fetch, fs, bucket, key, version_id, start, end, req_kw)  File "/usr/local/lib/python3.9/site-packages/fsspec/asyn.py", line 103, in sync    raise return_result  File "/usr/local/lib/python3.9/site-packages/fsspec/asyn.py", line 56, in _runner    result[0] = await coro  File "/usr/local/lib/python3.9/site-packages/s3fs/core.py", line 2480, in _inner_fetch    return await _error_wrapper(_call_and_read, retries=fs.retries)  File "/usr/local/lib/python3.9/site-packages/s3fs/core.py", line 142, in _error_wrapper    raise err  File "/usr/local/lib/python3.9/site-packages/s3fs/core.py", line 113, in _error_wrapper    return await func(*args, **kwargs)  File "/usr/local/lib/python3.9/site-packages/s3fs/core.py", line 2467, in _call_and_read    resp = await fs._call_s3(  File "/usr/local/lib/python3.9/site-packages/s3fs/core.py", line 362, in _call_s3    return await _error_wrapper(  File "/usr/local/lib/python3.9/site-packages/s3fs/core.py", line 142, in _error_wrapper    raise errOSError: [Errno 22] At least one of the pre-conditions you specified did not holdThe above exception was the direct cause of the following exception:Traceback (most recent call last):  File "/usr/local/lib/python3.9/site-packages/dask/backends.py", line 141, in wrapper    return func(*args, **kwargs)  File "/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py", line 529, in read_parquet    read_metadata_result = engine.read_metadata(  File "/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py", line 546, in read_metadata    dataset_info = cls._collect_dataset_info(  File "/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py", line 1061, in _collect_dataset_info    ds = pa_ds.dataset(  File "/usr/local/lib/python3.9/site-packages/pyarrow/dataset.py", line 785, in dataset    return _filesystem_dataset(source, **kwargs)  File "/usr/local/lib/python3.9/site-packages/pyarrow/dataset.py", line 475, in _filesystem_dataset    return factory.finish(schema)  File "pyarrow/_dataset.pyx", line 3025, in pyarrow._dataset.DatasetFactory.finish  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status  File "pyarrow/error.pxi", line 88, in pyarrow.lib.check_status  File "/usr/local/lib/python3.9/site-packages/fsspec/spec.py", line 1846, in read    out = self.cache._fetch(self.loc, self.loc + length)  File "/usr/local/lib/python3.9/site-packages/fsspec/caching.py", line 189, in _fetch    self.cache = self.fetcher(start, end)  # new block replaces old  File "/usr/local/lib/python3.9/site-packages/s3fs/core.py", line 2312, in _fetch_range    raise FileExpired(s3fs.utils.FileExpired: [Errno 16] The remote file corresponding to filename 16 and Etag The remote file corresponding to filename filename.part.0.parquet and Etag "76b9a0ed3044b29e8a326d6f4ade2036" no longer exists. no longer exists.During handling of the above exception, another exception occurred:Traceback (most recent call last):  File "/python-docker/run.py", line 19, in     service.start()  File "/python-docker/chimera_ml_app/services.py", line 130, in start    self.channel.start_consuming()  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1883, in start_consuming    self._process_data_events(time_limit=None)  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2044, in _process_data_events    self.connection.process_data_events(time_limit=time_limit)  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 851, in process_data_events    self._dispatch_channel_events()  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events    impl_channel._get_cookie()._dispatch_events()  File "/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1510, in _dispatch_events    consumer_info.on_message_callback(self, evt.method,  File "/python-docker/chimera_ml_app/services.py", line 157, in on_request    response = chimera.ml.predict(body=request)  File "src/dependency_injector/_cwiring.pyx", line 28, in dependency_injector._cwiring._get_sync_patched._patched  File "/python-docker/chimera_ml_app/ml.py", line 20, in predict    predictions = predictor.predict()  File "/usr/local/lib/python3.9/site-packages/hijacking_detection/pipelines/PipelinePredictorKATANA.py", line 125, in predict    preprocessing.preprocess(path, save_path)  File "/usr/local/lib/python3.9/site-packages/hijacking_detection/preprocessing/PreprocessingTestResample.py", line 55, in preprocess    dataframe = future.result()  File "/usr/local/lib/python3.9/site-packages/distributed/client.py", line 323, in result    return self.client.sync(self._result, callback_timeout=timeout)  File "/usr/local/lib/python3.9/site-packages/hijacking_detection/preprocessing/PreprocessingTestResample.py", line 91, in load_user    df = dd.read_parquet('s3://{}/{}'.format(bucket, path),  File "/usr/local/lib/python3.9/site-packages/dask/backends.py", line 143, in wrapper    raise type(e)(TypeError: __init__() missing 1 required positional argument: 'e_tag'

Environment:

  • Dask version: 2024.2.1
  • Python version: 3.9.13
  • Operating System: Docker running python:3.9.17-slim-buster
  • Install method (conda, pip, source): pip
@github-actions github-actions bot added the needs triage Needs a response from a contributor label Apr 11, 2024
@fjetter
Copy link
Member

fjetter commented Apr 11, 2024

Thanks for your bug report. Without a reproducer, we'll likely have a hard time helping out.

I noticed that the Traceback you are posting is incomplete. Can you please verify if there is something missing? Maybe this points to the problem. Particularly the and triggers TypeError: __init__() missing 1 required positional argument: 'e_tag' part at the very end looks like there is something missing.

@PietroSpalluto
Copy link
Author

Yes, I know it is difficult without a working example to reproduce this error, this is also my main problem when trying to fix it. I posted the complete traceback at the end and I only modified sensitive information such as the filename. The TypeError is the last exception called and it terminates exactly like this TypeError: __init__() missing 1 required positional argument: 'e_tag'. I can try to give you more info if needed.

@phofl phofl added dataframe needs info Needs further information from the user parquet and removed needs triage Needs a response from a contributor labels Apr 11, 2024
@PietroSpalluto
Copy link
Author

PietroSpalluto commented Apr 12, 2024

I was able to reproduce the error using the following script

def load_file(bucket, path, filename, minio_endpoint, key, secret):
    print(f'Reading {filename}...')
    df = dd.read_parquet('s3://{}/{}/{}'.format(bucket, path, hash_name(filename)),
                         storage_options={
                             "key": key,
                             "secret": secret,
                             "client_kwargs": {'endpoint_url': 'http://{}'.format(minio_endpoint)}
                         },
                         ignore_metadata_file=True)

    return df


if __name__ == "__main__":
    bucket = bucket name
    path = path
    files = list of filenames

    cluster = LocalCluster(silence_logs=logging.CRITICAL)
    dask_client = Client(cluster)
    minio_client = Minio(minio endpoint, key, secret, secure=False)

    minio_endpoint = minio_client._base_url.host
    key = minio_client._provider._credentials.access_key
    secret = minio_client._provider._credentials.secret_key

    while True:
        futures = []
        for filename in files:
            future = dask_client.submit(load_file,
                                        bucket,
                                        path,
                                        filename,
                                        minio_endpoint,
                                        key,
                                        secret)
            futures.append(future)

        for filename, future in zip(files, futures):
            df = future.result()

        dask_client.cancel(futures)

        print('Sleeping...')
        time.sleep(3)

and after about a hour the exception was triggered. I was able also to retrieve the complete traceback:

Traceback (most recent call last):
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\s3fs\core.py", line 113, in _error_wrapper
    return await func(*args, **kwargs)
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\aiobotocore\client.py", line 408, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (PreconditionFailed) when calling the GetObject operation: At least one of the pre-conditions you specified did not hold

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\s3fs\core.py", line 2300, in _fetch_range
    return _fetch_range(
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\s3fs\core.py", line 2462, in _fetch_range
    return sync(fs.loop, _inner_fetch, fs, bucket, key, version_id, start, end, req_kw)
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\fsspec\asyn.py", line 103, in sync
    raise return_result
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\fsspec\asyn.py", line 56, in _runner
    result[0] = await coro
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\s3fs\core.py", line 2480, in _inner_fetch
    return await _error_wrapper(_call_and_read, retries=fs.retries)
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\s3fs\core.py", line 142, in _error_wrapper
    raise err
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\s3fs\core.py", line 113, in _error_wrapper
    return await func(*args, **kwargs)
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\s3fs\core.py", line 2467, in _call_and_read
    resp = await fs._call_s3(
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\s3fs\core.py", line 362, in _call_s3
    return await _error_wrapper(
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\s3fs\core.py", line 142, in _error_wrapper
    raise err
OSError: [Errno 22] At least one of the pre-conditions you specified did not hold

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\dask\backends.py", line 141, in wrapper
    return func(*args, **kwargs)
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\dask\dataframe\io\parquet\core.py", line 529, in read_parquet
    read_metadata_result = engine.read_metadata(
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\dask\dataframe\io\parquet\arrow.py", line 546, in read_metadata
    dataset_info = cls._collect_dataset_info(
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\dask\dataframe\io\parquet\arrow.py", line 1061, in _collect_dataset_info
    ds = pa_ds.dataset(
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\pyarrow\dataset.py", line 785, in dataset
    return _filesystem_dataset(source, **kwargs)
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\pyarrow\dataset.py", line 475, in _filesystem_dataset
    return factory.finish(schema)
  File "pyarrow\_dataset.pyx", line 3025, in pyarrow._dataset.DatasetFactory.finish
  File "pyarrow\error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow\error.pxi", line 88, in pyarrow.lib.check_status
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\fsspec\spec.py", line 1846, in read
    out = self.cache._fetch(self.loc, self.loc + length)
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\fsspec\caching.py", line 189, in _fetch
    self.cache = self.fetcher(start, end)  # new block replaces old
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\s3fs\core.py", line 2312, in _fetch_range
    raise FileExpired(
s3fs.utils.FileExpired: [Errno 16] The remote file corresponding to filename 16 and Etag The remote file corresponding to filename "filename" and Etag "981e2d1ced3426d4dda10e9e81da6647" no longer exists. no longer exists.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\dask_test\example_etag.py", line 54, in <module>
    df = future.result()
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\distributed\client.py", line 323, in result
    return self.client.sync(self._result, callback_timeout=timeout)
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\dask_test\example_etag.py", line 14, in load_file
    df = dd.read_parquet('s3://{}/{}/{}'.format(bucket, path, hash_name(filename)),
  File "C:\Users\Pietro\PycharmProjects\hijacking_detection\venv\lib\site-packages\dask\backends.py", line 143, in wrapper
    raise type(e)(
TypeError: __init__() missing 1 required positional argument: 'e_tag'

Process finished with exit code 1

For this example I used Windows. If there is there any other information that can be useful let me now.

@fjetter
Copy link
Member

fjetter commented Apr 12, 2024

@martindurant are you familiar with this kind of error?

@phofl phofl removed the needs info Needs further information from the user label Apr 12, 2024
@martindurant
Copy link
Member

This means that the entry for the file contained in the directory listings and help in the file-like instance no longer matches the remote store, because the file has been overwritten. This is intentional, an open file should become invalid if it changed while reading.

I'm not sure of any specifics of minio, but since this appears to be a time related effect, you may wish to add listings_expiry_time=... to your storage options (value in seconds). It might have something to do with files being migrated between storage classes on some bucket lifecycle policy.

@PietroSpalluto
Copy link
Author

Thank you for you answer and sorry for the delay.

I cannot understand why this happens only sometimes. Files are overwritten every 15 minutes but not all of them raise this exception. I will try with listings_expiry_time in storage options when reading my files. For now I solved by saving the updated files in a different folder each time to avoid overwriting them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants