Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic S3 error: Error after 0 retries ... Broken pipe (os error 32) #2403

Open
t1g0rz opened this issue Apr 9, 2024 · 4 comments
Open
Labels
bug Something isn't working

Comments

@t1g0rz
Copy link
Contributor

t1g0rz commented Apr 9, 2024

Environment

Delta-rs version: 0.16.4

Binding: python

Environment:

  • Cloud provider: AWS
  • OS: ubuntu
  • Other:

Bug

What happened:
I was updating a fairly large table on S3, but during the update process, I encountered the error below. I monitored the memory, and it was sufficient. However, for some reason, Delta Lake only writes the first two files (100 MB each) to each partition. When attempting to write the third file, it crashes with the error below. The table parameters are as follows: 8 partitions, with 250 columns and 180,000 rows inside each partition. I have no idea where to start debugging; I would appreciate your help.

[2024-04-09T16:37:38Z DEBUG hyper::client::pool] pooling idle connection for ("https", s3.us-east-1.amazonaws.com)
[2024-04-09T16:37:45Z DEBUG deltalake_core::operations::writer] Writing file with estimated size 106587867 to disk.
[2024-04-09T16:37:45Z DEBUG hyper::client::pool] reuse idle connection for ("https", s3.us-east-1.amazonaws.com)
[2024-04-09T16:37:45Z DEBUG hyper::proto::h1::dispatch] error writing: Broken pipe (os error 32)
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/deltalake/table.py", line 1778, in execute
    metrics = self.table._table.merge_execute(
OSError: Generic S3 error: Error after 0 retries in 14.827719ms, max_retries:10, retry_timeout:180s, source:error sending request for url (https://s3.us-east-1.amazonaws.com/lake/table/part-00002-8444cfd5-69c9-4923-9b05-978513701d59-c000.snappy.parquet): error writing a body to connection: Broken pipe (os error 32)

What you expected to happen:
Normal completion of writing to the table

@t1g0rz t1g0rz added the bug Something isn't working label Apr 9, 2024
@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Apr 9, 2024

@t1g0rz does this happen every time? Can you see the logs in S3 during the execution?

What are the storage options you are passing?

@t1g0rz
Copy link
Contributor Author

t1g0rz commented Apr 9, 2024

@ion-elgreco

Yes, this is persistent problem with this table. I managed to work around that by dividing one whole update into smaller partition-related updates.

Here is the storage options

storage_options = {
                "AWS_S3_LOCKING_PROVIDER": "dynamodb",
                "DELTA_DYNAMO_TABLE_NAME": "delta_log",
                "AWS_REGION": "us-east-1",
                "DELTA_DYNAMO_REGION": "us-east-1",
                "AWS_ACCESS_KEY_ID": credentials.access_key,
                "AWS_SECRET_ACCESS_KEY": credentials.secret_key,
                "AWS_SESSION_TOKEN": credentials.token
            }

Let me try to find out if there are logs in s3.

@t1g0rz
Copy link
Contributor Author

t1g0rz commented Apr 10, 2024

Can you see the logs in S3 during the execution?

No, I cannot. The folder _delta_log contains only 00000000000000000000.json

@t1g0rz
Copy link
Contributor Author

t1g0rz commented Apr 10, 2024

I found a combination that throws this error on EC2 (r6a.2xlarge - 64 GiB, 8 CPU):

dt = DeltaTable.create("s3://lake/test3",
                  schema=pa.schema([("id", pa.int64()), ("part", pa.string())] + [(f"c{i}", pa.float64()) for i in range(250)]),
                  storage_options=storage_options,
                  partition_by=["part"],
                 )

r = []

for p in "ABCDEFJ":
    part_df = pd.DataFrame(np.random.random((280_000, 250)))
    part_df.columns = [f"c{i}" for i in range(250)]
    part_df.insert(0, 'id', range(280_000))
    part_df.insert(1, 'part', p)
    r.append(part_df)

df = pd.concat(r)

dt = DeltaTable("s3://lake/test3", storage_options=storage_options)
dt.merge(
    df, predicate="s.id = t.id and s.part = t.part", source_alias="s", target_alias="t"
).when_not_matched_insert_all().when_matched_update_all().execute()

t seems to me like a memory problem. Otherwise, I cannot explain why I had to increase the number of rows to reproduce that. However, the peak memory consumption was 38.7 Gb.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants