Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proactively upload file parts in PrestoS3FileSystem #22424

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ZacBlanco
Copy link
Contributor

@ZacBlanco ZacBlanco commented Apr 4, 2024

Description

This changes forgoes using the original method of using the AWS SDK TransferManager and instead uses the low-level AWS client API in order to support file uploads to S3 which are greater than the machine's disk size.

Motivation and Context

Originally, In order to write a file to S3, the FS implementation would write the entire file to temporary storage on the local disk, then upload the file via the SDK TransferManager API. This meant that the machine is required to have
disk space available up to the size of the file being written.

For creating large tables through CTAS, you can easily have a file which exceeds the disk size, or you may not provision large disks. This change attempts to upload the file in chunks, similar to a BufferedOutputStream, but uploading each part to S3 through a multipart upload, deleting the data on the local disk once each part is uploaded. This bounds the disk space required.

Impact

No user-facing API impact. Users might see better perf writing through CTAS

Test Plan

  • Additional tests for PrestoS3FileSystem
  • Successfully wrote TPC-DS SF1k data without errors

I also ran a few benchmarks to determine how the minimum part upload size effects performance. I used tpch SF 100 data and wrote it to a table using CREATE TABLE benchmark as SELECT * from orders on a 2-node cluster on r5.xlarge instances (4vCPUs, 32G memory). The data was read from a pre-existing table in S3. The resulting files were about 6.8GiB in size with a task.writer-count of 1.

branch task.writer-count Min Part Size time Data Rate (MB/s) time(s)
0 master 1 5MB 8:21 27.9 501
2 PR 1 5MB 6:54 33.8 414
3 PR 1 256MB 5:03 46.1 303
1 master 8 5MB 5:00 46.6 300
6 s3-write-override 1 5MB 4:42 49.7 282
7 s3-write-override 8 5MB 3:47 61.7 227
4 PR 8 5MB 3:33 65.6 213
5 PR 8 256MB 3:23 68.8 203

The additional performance gain is due to the fact that the original implementation relied on extending FilterOutputStream. In the default implementation of the FilterOutputStream, all writes end up occurring through the write(byte b) interface - See implementation here meaning we were writing a single byte at a time in a tight loop. This change now allows multi-byte copies by overriding each variant of the write() method properly.

The s3-write-override branch's only change was overriding the write methods of PrestoS3OutputStream. This shows the performance difference between this implementation and if we had only fixed that.

And for clarification on the master and s3-write-override minimum part size of 5MB: this part size is not necessarily used in the upload. The TransferManager API is supposed to calculate an optimal part size based on the file size. So the part size actually used is likely to be different than 5MB.

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

== NO RELEASE NOTE ==

@ZacBlanco ZacBlanco force-pushed the upstream-presto-s3-multipart branch 3 times, most recently from 47754cf to c184cff Compare April 4, 2024 22:55
@ZacBlanco ZacBlanco marked this pull request as ready for review April 9, 2024 15:38
@ZacBlanco ZacBlanco requested a review from a team as a code owner April 9, 2024 15:38
@ZacBlanco ZacBlanco changed the title Use multipart uploads for large writes in PrestoS3FileSystem Proactively upload file parts in PrestoS3FileSystem Apr 9, 2024
@ZacBlanco ZacBlanco force-pushed the upstream-presto-s3-multipart branch 3 times, most recently from 84926b8 to 5f50c21 Compare April 10, 2024 18:56
Copy link
Contributor

@ClarenceThreepwood ClarenceThreepwood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@aaneja
Copy link
Contributor

aaneja commented Apr 11, 2024

The additional performance gain is due to the fact that the original implementation relied on extending FilterOutputStream. In the default implementation of the FilterOutputStream, all writes end up occurring through the write(byte b) interface

Could you measure what the impact would be be if we fixed this problem only without replacing the TransferManager ?

I'm also curious if you played around with task.writer-count (and task.partitioned-writer-count) to see if we can utilize the disk/network better. I found this old thread which discusses the impact of these configs


private String uploadSingleFile()
{
TransferManager transfer = TransferManagerBuilder.standard()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using TransferManager here ? If we are at this point, doesn't it imply that the file we want to upload is smaller than minPartSize and therefore doesn't really need a multi-part upload, i.e a simple PutObjectRequest will suffice ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could also be that the min part size was configured to a very large size. I didn't dig into what optimizations the transfer manager might make for larger-sized files. I just deferred to that implementation since it seems robust enough to handle large file uploads

@ZacBlanco
Copy link
Contributor Author

ZacBlanco commented Apr 12, 2024

@aaneja I re-ran the benchmarks for creating the tpch.orders table and built a branch which only overrides the outputstream methods. Additionally, I tested with task.writer-count set to the default (1) and 8. I also tested with a larger min part size for one of the runs on my PR. Here are. the results:

image

The main takeaways I see are that

  1. In all cases (PR and simple outputstream override), we get better perf than master
  2. With a small part size, the performance of the PR is worse than that of overriding. This is expected due to uploading not occuring concurrently with writes.
  3. When min part size is increased for this PR, the performance tends toward the same performance as the override fix, since we're doing less uploads
  4. With a larger task.writer-count, the performance difference for a larger min part size and simple override become negligible

Copy link
Contributor

@aaneja aaneja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % some minor changes

@aaneja
Copy link
Contributor

aaneja commented Apr 15, 2024

Aside : Your experiment showed that the default task.writer-count may not be appropriate. Do you think we should set this higher ? Any suggestions on how we could set this dynamically for a better OOTB experience ?

@ZacBlanco
Copy link
Contributor Author

ZacBlanco commented Apr 16, 2024

I did some digging into task.writer-count. I think the ideal value will depend on a number of hardware factors: the writer's core count, disk metrics (capacity, bandwidth, media type), network bandwidth. Additionally, the rate at which data can be fed into a TableWriter operator will determine the write speed. That rate could depend on a variety of non-hardware factors too such as the file format and compression algorithm. There could be more. Those are just the factors I could think of off the top of my head.

In these few experiments I ran without compression and with NVMe disks to try to achieve the highest data rate possible to stress the new implementation. We found that the performance increased, but for other workloads without compression I can't say that we would see gains as large. I could also see a situation which, if deployed using traditional spinning platter disks, concurrent writes would lead to abysmal performance.

I don't think there's a one-size fits all where we could guarantee better performance, but I think most "standard" use cases ("standard" referring to our standard cluster types used for benchmarking) would probably benefit from a writer count at least equal to the number of threads on the machine. Most deployments nowadays are probably using SSD and have multiple cores and NVMe SSDs are capable of a significant number of IOPS, so I think it's probably safe to increase the parallelism by a bit since it seems to give a measurable benefit. This does affect table layouts though (more files), so that's another thing to consider.

aaneja
aaneja previously approved these changes Apr 18, 2024
@aaneja
Copy link
Contributor

aaneja commented Apr 18, 2024

I think we need the task writer to be adaptive to scale out/in based on utilization levels. I'm not sure if Prestissimo does this either, I will check. We can suggest this improvement if it does not

@ZacBlanco
Copy link
Contributor Author

@tdcmeehan do you want to review this? Or know anyone else who might want to?

@jaystarshot
Copy link
Member

jaystarshot commented Apr 25, 2024

Nice! But can this increase S3 operation cost if the buffer size is too small?

@ZacBlanco
Copy link
Contributor Author

@jaystarshot You're correct that it can increase the costs by a fair margin. The minimum part size for S3 is 5MB. I put together a spreadsheet with the cost calculations for uploading 1PB to S3 with a fixed file size of 256MB and varying min part sizes:

image

When using a small part size, the cost for uploading to S3 can be significantly higher than standard PUT requests. However, you also need to consider that previously, we deferred to the TransferManager class, which makes the decision on whether or not an upload should be a PutObject or Multipart, so I'm not 100% sure how often we already do multipart uploads with the existing implementation.

Additionally, if you're uploading data at this scale, the monthly storage cost is going to far outstrip the cost of uploading the data in the first place. If you were to store 1PB in S3 and use the minimum part size of 5MB, the one-time upload cost is just 5.13% of the monthly storage cost, linearly decreasing with increases in part size.

The data from the chart is in the following spreadsheet: https://docs.google.com/spreadsheets/d/1DNNpfbahHAiwcrBnhxaGlgexAhraVV9_yu9VUpxez3E/edit?usp=sharing

@jaystarshot
Copy link
Member

jaystarshot commented Apr 25, 2024

Sounds good, but for temporary tables it might make more sense to have a bigger buffer size which is anyways configurable i guess.

@tdcmeehan tdcmeehan self-assigned this Apr 25, 2024
This changes forgoes using the original method of using the AWS
SDK TransferManager and instead uses the low-level AWS client API
in order to support file uploads to S3 which are greater than the
machine's disk size.

Originally, In order to write a file to S3, the FS implementation would
write the entire file to temporary storage on the local disk, then
upload the file via the SDK TransferManager API. This meant that the
machine is required to have disk space available up to the size of the
file being written.

For creating large tables through CTAS, you can easily have a file
which exceeds the disk size, or you may not provision large disks.
This change attempts to upload the file in chunks, similar to a
BufferedOutputStream, but uploading each part to S3 through a
multipart upload, deleting the data on the local disk once each part is
uploaded. This bounds the disk space required.

Additionally, this change boosts performance when writing due to
properly overriding each method on FilterOutputStream
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants