Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Go/v12.0.1 patch #1

Open
wants to merge 2 commits into
base: go/v12.0.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
### Build Notes
- from inside the directory python/examples/minimal_build
- create a docker image using
- `docker build -t arrow_ubi -f Dockerfile-ubi .`
- Run `docker run --rm -t -i -v $PWD:/io -v $PWD/../../..:/arrow arrow_ubi /io/build_venv.sh`
It will produce a whl file.

<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
Expand Down
3 changes: 0 additions & 3 deletions cpp/CMakePresets.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,7 @@
{
"name": "features-python-maximal",
"inherits": [
"features-cuda",
"features-filesystems",
"features-flight",
"features-gandiva",
"features-main",
"features-python-minimal"
],
Expand Down
191 changes: 141 additions & 50 deletions cpp/src/arrow/filesystem/s3fs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include "arrow/filesystem/s3fs.h"
#include "arrow/result.h"

#include <algorithm>
#include <atomic>
Expand Down Expand Up @@ -105,6 +106,7 @@ using internal::TaskGroup;
using internal::ToChars;
using internal::Uri;
using io::internal::SubmitIO;
using internal::GetEnvVarNative;

namespace fs {

Expand Down Expand Up @@ -1155,6 +1157,7 @@ class ObjectInputFile final : public io::RandomAccessFile {
// so I chose the safer value.
// (see https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html)
static constexpr int64_t kMinimumPartUpload = 5 * 1024 * 1024;
static constexpr int64_t kMultipartThreshold = 5 * 1024 * 1024;

// An OutputStream that writes to a S3 object
class ObjectOutputStream final : public io::OutputStream {
Expand All @@ -1179,6 +1182,24 @@ class ObjectOutputStream final : public io::OutputStream {
}

Status Init() {
//If the size of the file is less than 1GB dont do multipart

int size_enforced_key = metadata_ ? metadata_->FindKey("falkonry:write_options:file_size") : -1;
ARROW_LOG(DEBUG) << "!!! size_enforced_key = " << size_enforced_key;
if (size_enforced_key > -1) {
closed_ = true;
auto maybe_value = metadata_->value(size_enforced_key);
ARROW_LOG(DEBUG) << "!!! size_detected = " << maybe_value;
if (std::stoi(maybe_value) < (kMultipartThreshold * 1024)) {
disable_mulitpart_ = true;
ARROW_LOG(DEBUG) << "!!! Non multipart upload";
closed_ = false;
return Status::OK();
}
}
ARROW_LOG(DEBUG) << "!!! multipart upload";
disable_mulitpart_ = false;

// Initiate the multi-part upload
S3Model::CreateMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
Expand Down Expand Up @@ -1210,7 +1231,8 @@ class ObjectOutputStream final : public io::OutputStream {
}

Status Abort() override {
if (closed_) {
ARROW_LOG(DEBUG) << "!!! Abort upload";
if (closed_ || disable_mulitpart_) {
return Status::OK();
}

Expand All @@ -1235,18 +1257,24 @@ class ObjectOutputStream final : public io::OutputStream {
// OutputStream interface

Status Close() override {
ARROW_LOG(DEBUG) << "!!! Close upload";
auto fut = CloseAsync();
return fut.status();
}

Future<> CloseAsync() override {
ARROW_LOG(DEBUG) << "!!! Close upload";
if (closed_) return Status::OK();

if (current_part_) {
// Upload last part
RETURN_NOT_OK(CommitCurrentPart());
}

if(disable_mulitpart_)
return Status::OK();

ARROW_LOG(DEBUG) << "!!! Close Multipart upload";
// S3 mandates at least one part, upload an empty one if necessary
if (part_number_ == 1) {
RETURN_NOT_OK(UploadPart("", 0));
Expand Down Expand Up @@ -1300,6 +1328,21 @@ class ObjectOutputStream final : public io::OutputStream {

Status DoWrite(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
if(disable_mulitpart_) {
ARROW_LOG(DEBUG) << "!!! Buffering Non-Multipart upload";
if (!current_part_) {
ARROW_ASSIGN_OR_RAISE(
current_part_,
io::BufferOutputStream::Create(part_upload_threshold_, io_context_.pool()));
current_part_size_ = 0;
}
RETURN_NOT_OK(current_part_->Write(data, nbytes));
pos_ += nbytes;
current_part_size_ += nbytes;
return Status::OK();
}

ARROW_LOG(DEBUG) << "!!! Multipart upload";
if (closed_) {
return Status::Invalid("Operation on closed stream");
}
Expand Down Expand Up @@ -1359,51 +1402,67 @@ class ObjectOutputStream final : public io::OutputStream {

Status UploadPart(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
S3Model::UploadPartRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);
req.SetPartNumber(part_number_);
req.SetContentLength(nbytes);

if (!background_writes_) {
if(disable_mulitpart_) {
ARROW_LOG(DEBUG) << "!!! Non multipart upload finalizing";
S3Model::PutObjectRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
auto outcome = client_->UploadPart(req);
auto outcome = client_->PutObject(std::move(req));
if (!outcome.IsSuccess()) {
return UploadPartError(req, outcome);
} else {
AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
return ErrorToStatus(
std::forward_as_tuple("When uploading part for key '", req.GetKey(),
"' in bucket '", req.GetBucket(), "': "),
"PutObject", outcome.GetError());
}
} else {
// If the data isn't owned, make an immutable copy for the lifetime of the closure
if (owned_buffer == nullptr) {
ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool()));
memcpy(owned_buffer->mutable_data(), data, nbytes);
ARROW_LOG(DEBUG) << "!!! multipart upload intermediate";
S3Model::UploadPartRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
req.SetUploadId(upload_id_);
req.SetPartNumber(part_number_);
req.SetContentLength(nbytes);

if (!background_writes_) {
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
auto outcome = client_->UploadPart(req);
if (!outcome.IsSuccess()) {
return UploadPartError(req, outcome);
} else {
AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
}
} else {
DCHECK_EQ(data, owned_buffer->data());
DCHECK_EQ(nbytes, owned_buffer->size());
}
req.SetBody(
std::make_shared<StringViewStream>(owned_buffer->data(), owned_buffer->size()));
// If the data isn't owned, make an immutable copy for the lifetime of the closure
if (owned_buffer == nullptr) {
ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool()));
memcpy(owned_buffer->mutable_data(), data, nbytes);
} else {
DCHECK_EQ(data, owned_buffer->data());
DCHECK_EQ(nbytes, owned_buffer->size());
}
req.SetBody(
std::make_shared<StringViewStream>(owned_buffer->data(), owned_buffer->size()));

{
std::unique_lock<std::mutex> lock(upload_state_->mutex);
if (upload_state_->parts_in_progress++ == 0) {
upload_state_->pending_parts_completed = Future<>::Make();
{
std::unique_lock<std::mutex> lock(upload_state_->mutex);
if (upload_state_->parts_in_progress++ == 0) {
upload_state_->pending_parts_completed = Future<>::Make();
}
}
auto client = client_;
ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
return client->UploadPart(req);
}));
// The closure keeps the buffer and the upload state alive
auto state = upload_state_;
auto part_number = part_number_;
auto handler = [owned_buffer, state, part_number,
req](const Result<S3Model::UploadPartOutcome>& result) -> void {
HandleUploadOutcome(state, part_number, req, result);
};
fut.AddCallback(std::move(handler));
}
auto client = client_;
ARROW_ASSIGN_OR_RAISE(auto fut, SubmitIO(io_context_, [client, req]() {
return client->UploadPart(req);
}));
// The closure keeps the buffer and the upload state alive
auto state = upload_state_;
auto part_number = part_number_;
auto handler = [owned_buffer, state, part_number,
req](const Result<S3Model::UploadPartOutcome>& result) -> void {
HandleUploadOutcome(state, part_number, req, result);
};
fut.AddCallback(std::move(handler));
}

++part_number_;
Expand Down Expand Up @@ -1478,6 +1537,7 @@ class ObjectOutputStream final : public io::OutputStream {

Aws::String upload_id_;
bool closed_ = true;
bool disable_mulitpart_ = false;
int64_t pos_ = 0;
int32_t part_number_ = 1;
std::shared_ptr<io::BufferOutputStream> current_part_;
Expand Down Expand Up @@ -2285,20 +2345,26 @@ Result<FileInfo> S3FileSystem::GetFileInfo(const std::string& s) {
return ErrorToStatus(msg, "HeadObject", outcome.GetError(),
impl_->options().region);
}
// Not found => perhaps it's an empty "directory"
ARROW_ASSIGN_OR_RAISE(bool is_dir, impl_->IsEmptyDirectory(path, &outcome));
if (is_dir) {
info.set_type(FileType::Directory);
auto maybe_env_var = GetEnvVarNative("ARROW_S3_OPTIMIZED_KEY_LOOKUP");
if (maybe_env_var.ok()) {
info.set_type(FileType::NotFound);
return info;
}
// Not found => perhaps it's a non-empty "directory"
ARROW_ASSIGN_OR_RAISE(is_dir, impl_->IsNonEmptyDirectory(path));
if (is_dir) {
info.set_type(FileType::Directory);
} else {
info.set_type(FileType::NotFound);
// Not found => perhaps it's an empty "directory"
ARROW_ASSIGN_OR_RAISE(bool is_dir, impl_->IsEmptyDirectory(path, &outcome));
if (is_dir) {
info.set_type(FileType::Directory);
return info;
}
// Not found => perhaps it's a non-empty "directory"
ARROW_ASSIGN_OR_RAISE(is_dir, impl_->IsNonEmptyDirectory(path));
if (is_dir) {
info.set_type(FileType::Directory);
} else {
info.set_type(FileType::NotFound);
}
return info;
}
return info;
}
}

Expand Down Expand Up @@ -2691,7 +2757,32 @@ Status InitializeS3(const S3GlobalOptions& options) {
}

Status EnsureS3Initialized() {
return EnsureAwsInstanceInitialized({S3LogLevel::Fatal}).status();
auto log_level = S3LogLevel::Fatal;

auto result = arrow::internal::GetEnvVar("ARROW_S3_LOG_LEVEL");

if (result.ok()) {
// Extract, trim, and downcase the value of the enivronment variable
auto value =
arrow::internal::AsciiToLower(arrow::internal::TrimString(result.ValueUnsafe()));

if (value == "fatal") {
log_level = S3LogLevel::Fatal;
} else if (value == "error") {
log_level = S3LogLevel::Error;
} else if (value == "warn") {
log_level = S3LogLevel::Warn;
} else if (value == "info") {
log_level = S3LogLevel::Info;
} else if (value == "debug") {
log_level = S3LogLevel::Debug;
} else if (value == "trace") {
log_level = S3LogLevel::Trace;
} else if (value == "off") {
log_level = S3LogLevel::Off;
}
}
return EnsureAwsInstanceInitialized({log_level}).status();
}

Status FinalizeS3() {
Expand Down
34 changes: 34 additions & 0 deletions python/examples/minimal_build/Dockerfile-ubi
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

FROM registry.access.redhat.com/ubi9/python-39:1-143.1697559877

USER 0

RUN dnf update -y && \
dnf install -y \
autoconf \
gcc \
gcc-g++ \
git \
wget \
make \
cmake \
ninja-build \
python3-devel

RUN pip3 install -U pip setuptools