Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement missing parts of OSS RE caching #477

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 12 additions & 3 deletions app/buck2_execute/src/execute/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ use anyhow::Context;
use buck2_common::file_ops::FileDigest;
use buck2_core::execution_types::executor_config::RemoteExecutorUseCase;
use futures::future;
use remote_execution::InlinedBlobWithDigest;
use remote_execution::TDigest;

use crate::digest::CasDigestConversionResultExt;
use crate::digest::CasDigestFromReExt;
use crate::digest::CasDigestToReExt;
use crate::digest_config::DigestConfig;
use crate::re::manager::ManagedRemoteExecutionClient;
use crate::re::streams::RemoteCommandStdStreams;
Expand Down Expand Up @@ -227,12 +229,13 @@ impl CommandStdStreams {
self,
client: &ManagedRemoteExecutionClient,
use_case: RemoteExecutorUseCase,
digest_config: DigestConfig,
) -> anyhow::Result<StdStreamPair<ReStdStream>> {
match self {
Self::Local { stdout, stderr } => {
let (stdout, stderr) = future::try_join(
maybe_upload_to_re(client, use_case, stdout),
maybe_upload_to_re(client, use_case, stderr),
maybe_upload_to_re(client, use_case, stdout, digest_config),
maybe_upload_to_re(client, use_case, stderr, digest_config),
)
.await?;

Expand Down Expand Up @@ -265,11 +268,17 @@ async fn maybe_upload_to_re(
client: &ManagedRemoteExecutionClient,
use_case: RemoteExecutorUseCase,
bytes: Vec<u8>,
digest_config: DigestConfig,
) -> anyhow::Result<ReStdStream> {
const MIN_STREAM_UPLOAD_SIZE: usize = 50 * 1024; // Same as RE
if bytes.len() < MIN_STREAM_UPLOAD_SIZE {
return Ok(ReStdStream::Raw(bytes));
}
let digest = client.upload_blob(bytes, use_case).await?;
let inline_blob = InlinedBlobWithDigest {
digest: FileDigest::from_content(&bytes, digest_config.cas_digest_config()).to_re(),
blob: bytes,
..Default::default()
};
let digest = client.upload_blob(inline_blob, use_case).await?;
Ok(ReStdStream::Digest(digest))
}
11 changes: 8 additions & 3 deletions app/buck2_execute/src/re/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl RemoteExecutionClient {

pub async fn upload_blob(
&self,
blob: Vec<u8>,
blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> anyhow::Result<TDigest> {
self.data
Expand Down Expand Up @@ -1051,10 +1051,12 @@ impl RemoteExecutionClientImpl {

pub async fn upload_blob(
&self,
blob: Vec<u8>,
blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> anyhow::Result<TDigest> {
self.client().upload_blob(blob, use_case.metadata()).await
let digest = blob.digest.clone();
self.client().upload_blob(blob, use_case.metadata()).await?;
Ok(digest)
}

async fn materialize_files(
Expand Down Expand Up @@ -1141,6 +1143,9 @@ impl RemoteExecutionClientImpl {
..Default::default()
},
)
.inspect_err(|err| {
tracing::warn!("write_action_result failed: {err}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That line may have run, but I definitely did not see it in the terminal. Thanks for the pointer, I will have a look at why.

})
.await
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/buck2_execute/src/re/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl ManagedRemoteExecutionClient {

pub async fn upload_blob(
&self,
blob: Vec<u8>,
blob: InlinedBlobWithDigest,
use_case: RemoteExecutorUseCase,
) -> anyhow::Result<TDigest> {
self.lock()?.get().await?.upload_blob(blob, use_case).await
Expand Down
2 changes: 1 addition & 1 deletion app/buck2_execute_impl/src/executors/caching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ impl CacheUploader {
.report
.std_streams
.clone()
.into_re(&self.re_client, self.re_use_case)
.into_re(&self.re_client, self.re_use_case, digest_config)
.await
.context("Error accessing std_streams")
};
Expand Down
3 changes: 3 additions & 0 deletions examples/remote_execution/internal/tests/large_stdout/BUCK
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
load(":defs.bzl", "tests")

tests(name = "tests")
21 changes: 21 additions & 0 deletions examples/remote_execution/internal/tests/large_stdout/defs.bzl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under both the MIT license found in the
# LICENSE-MIT file in the root directory of this source tree and the Apache
# License, Version 2.0 found in the LICENSE-APACHE file in the root directory
# of this source tree.

def _tests(ctx):
# Create a large stdout stream locally, and upload it to CAS.
# The limit for inline stdout is 50KiB. So this will force calling client.upload_blob.
stage0 = ctx.actions.declare_output("stage0")
ctx.actions.run(
["sh", "-c", 'yes abcdefghijklmnopqrstuvwxyz | head -c 65536 && echo done > "$1"', "--", stage0.as_output()],
category = "stage0",
local_only = True,
allow_cache_upload = True,
)

return [DefaultInfo(stage0)]

tests = rule(attrs = {}, impl = _tests)
171 changes: 163 additions & 8 deletions remote_execution/oss/re_grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,15 @@ use re_grpc_proto::build::bazel::remote::execution::v2::Digest;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteOperationMetadata;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteRequest as GExecuteRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecuteResponse as GExecuteResponse;
use re_grpc_proto::build::bazel::remote::execution::v2::ExecutedActionMetadata;
use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::FindMissingBlobsResponse;
use re_grpc_proto::build::bazel::remote::execution::v2::GetActionResultRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::GetCapabilitiesRequest;
use re_grpc_proto::build::bazel::remote::execution::v2::OutputDirectory;
use re_grpc_proto::build::bazel::remote::execution::v2::OutputFile;
use re_grpc_proto::build::bazel::remote::execution::v2::ResultsCachePolicy;
use re_grpc_proto::build::bazel::remote::execution::v2::UpdateActionResultRequest;
use re_grpc_proto::google::bytestream::byte_stream_client::ByteStreamClient;
use re_grpc_proto::google::bytestream::ReadRequest;
use re_grpc_proto::google::bytestream::ReadResponse;
Expand Down Expand Up @@ -115,6 +119,13 @@ fn check_status(status: Status) -> Result<(), REClientError> {
})
}

fn ttimestamp_to(ts: TTimestamp) -> Option<prost_types::Timestamp> {
Some(prost_types::Timestamp {
seconds: ts.seconds,
nanos: ts.nanos,
})
}

fn ttimestamp_from(ts: Option<::prost_types::Timestamp>) -> TTimestamp {
match ts {
Some(timestamp) => TTimestamp {
Expand Down Expand Up @@ -499,10 +510,33 @@ impl REClient {

pub async fn write_action_result(
&self,
_metadata: RemoteExecutionMetadata,
_request: WriteActionResultRequest,
metadata: RemoteExecutionMetadata,
write_request: WriteActionResultRequest,
) -> anyhow::Result<WriteActionResultResponse> {
Err(anyhow::anyhow!("Not supported"))
let mut client = self.grpc_clients.action_cache_client.clone();
let action_digest = tdigest_to(write_request.action_digest.clone());
let action_result = convert_taction_result_to_rbe(write_request.action_result)?;
let request = UpdateActionResultRequest {
action_digest: Some(action_digest),
action_result: Some(action_result),
results_cache_policy: None,
instance_name: self.instance_name.as_str().to_owned(),
};

let t: ActionResult = client
.update_action_result(with_internal_metadata(request, metadata))
.await?
.into_inner();

let result = convert_action_result(t)?;
let result = WriteActionResultResponse {
actual_action_result: result,
// NOTE: This is an arbitrary number because RBE does not return information
// on the TTL of the ActionResult.
// Also buck2 does not appear to read this value anywhere.
ttl_seconds: 0,
};
Ok(result)
}

pub async fn execute_with_progress(
Expand Down Expand Up @@ -655,11 +689,21 @@ impl REClient {

pub async fn upload_blob(
&self,
_blob: Vec<u8>,
_metadata: RemoteExecutionMetadata,
) -> anyhow::Result<TDigest> {
// TODO(aloiscochard)
Err(anyhow::anyhow!("Not implemented (RE upload_blob)"))
blob: InlinedBlobWithDigest,
metadata: RemoteExecutionMetadata,
) -> anyhow::Result<()> {
self.upload(
metadata,
UploadRequest {
inlined_blobs_with_digest: Some(vec![blob]),
files_with_digest: None,
directories: None,
upload_only_missing: false,
..Default::default()
},
)
.await?;
Ok(())
}

pub async fn download(
Expand Down Expand Up @@ -771,7 +815,118 @@ impl REClient {
}
}

fn convert_execution_action_metadata_to_rbe(
execution_metadata: TExecutedActionMetadata,
) -> anyhow::Result<ExecutedActionMetadata> {
let TExecutedActionMetadata {
worker,
queued_timestamp,
worker_start_timestamp,
worker_completed_timestamp,
input_fetch_start_timestamp,
input_fetch_completed_timestamp,
execution_start_timestamp,
execution_completed_timestamp,
output_upload_start_timestamp,
output_upload_completed_timestamp,
execution_dir: _,
input_analyzing_start_timestamp: _,
input_analyzing_completed_timestamp: _,
execution_attempts: _,
last_queued_timestamp: _,
instruction_counts: _,
auxiliary_metadata: _,
_dot_dot_default,
} = execution_metadata;
Ok(ExecutedActionMetadata {
worker,
worker_start_timestamp: ttimestamp_to(worker_start_timestamp),
worker_completed_timestamp: ttimestamp_to(worker_completed_timestamp),
input_fetch_start_timestamp: ttimestamp_to(input_fetch_start_timestamp),
input_fetch_completed_timestamp: ttimestamp_to(input_fetch_completed_timestamp),
execution_start_timestamp: ttimestamp_to(execution_start_timestamp),
execution_completed_timestamp: ttimestamp_to(execution_completed_timestamp),
output_upload_start_timestamp: ttimestamp_to(output_upload_start_timestamp),
output_upload_completed_timestamp: ttimestamp_to(output_upload_completed_timestamp),
queued_timestamp: ttimestamp_to(queued_timestamp),
// TODO(cormacrelf): calculate this in a reasonable way for buck.
// see protobuf docs on virtual_execution_duration.
// May be able to use last_queued_timestamp
virtual_execution_duration: None,
// Ugh, need a routine to convert TAny to prost_type::Any...
auxiliary_metadata: vec![],
})
}

fn convert_taction_result_to_rbe(taction_result: TActionResult2) -> anyhow::Result<ActionResult> {
let TActionResult2 {
output_files,
output_directories,
exit_code,
stdout_raw,
stdout_digest,
stderr_raw,
stderr_digest,
execution_metadata,
auxiliary_metadata: _,
_dot_dot_default,
} = taction_result;

let execution_metadata = convert_execution_action_metadata_to_rbe(execution_metadata)?;
let output_files = output_files.into_try_map(|output_file| {
let TFile {
digest,
name,
executable,
..
} = output_file;
anyhow::Ok(OutputFile {
digest: Some(tdigest_to(digest.digest)),
path: name,
is_executable: executable,
// Clients SHOULD NOT populate this field when uploading to the cache.
contents: Vec::new(),
node_properties: None,
})
})?;
let output_directories = output_directories.into_try_map(|output_directory| {
let tree_digest = tdigest_to(output_directory.tree_digest);
anyhow::Ok(OutputDirectory {
path: output_directory.path,
tree_digest: Some(tree_digest.clone()),
// TODO(cormacrelf): check whether buck2_execute::directory::directory_to_re_tree
// conforms with the requirements of passing `true` here (see .proto file)
is_topologically_sorted: false,
})
})?;
anyhow::Ok(ActionResult {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not just Ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just splatter from copy pasting the function below and inverting every operation, instead of coding it from scratch. I'll tidy this up, a few of these don't even need to be Result.

exit_code,
execution_metadata: Some(execution_metadata),
output_directories,
output_files,
// TODO: support symlinks
output_symlinks: vec![],
output_file_symlinks: vec![],
output_directory_symlinks: vec![],
// If missing, it's because we uploaded it already
// if present, it's inline
stdout_raw: stdout_raw.unwrap_or(Vec::new()),
stdout_digest: stdout_digest.map(tdigest_to),
stderr_raw: stderr_raw.unwrap_or(Vec::new()),
stderr_digest: stderr_digest.map(tdigest_to),
})
}

fn convert_action_result(action_result: ActionResult) -> anyhow::Result<TActionResult2> {
if !action_result.output_symlinks.is_empty()
|| !action_result.output_file_symlinks.is_empty()
|| !action_result.output_directory_symlinks.is_empty()
{
anyhow::bail!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tend to avoid bail because it can be hard to reason about - easier to see the explicit return

"CAS ActionResult returned with symlinks in it, buck2 cannot handle these yet"
);
}

let execution_metadata = action_result
.execution_metadata
.with_context(|| "The execution metadata are not defined.")?;
Expand Down
3 changes: 3 additions & 0 deletions remote_execution/oss/re_grpc/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ pub struct TSubsysPerfCount {
pub struct TActionResult2 {
pub output_files: Vec<TFile>,
pub output_directories: Vec<TDirectory2>,
// TODO: output_symlinks (use in preference when output_paths mode is used the execution side)
// TODO: output_file_symlinks (deprecated)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why mark this as a todo? Is it something we should add? Or is this just for when servers return deprecated stuff?

// TODO: output_directory_symlinks (deprecated)
pub exit_code: i32,
pub stdout_raw: Option<Vec<u8>>,
pub stdout_digest: Option<TDigest>,
Expand Down