Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(buck2_common): don't overload the user with HTTP retry backoff warnings #342

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions app/buck2_client_ctx/src/manifold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use buck2_common::http::counting_client::CountingHttpClient;
use buck2_common::http::find_certs::find_tls_cert;
use buck2_common::http::retries::http_retry;
use buck2_common::http::retries::AsHttpError;
use buck2_common::http::retries::DispatchableHttpRetryWarning;
use buck2_common::http::retries::HttpError;
use buck2_common::http::HttpClient;
use buck2_core::fs::paths::abs_path::AbsPath;
Expand Down Expand Up @@ -428,6 +429,25 @@ pub struct ManifoldClient {
manifold_url: Option<String>,
}

struct ManifoldDispatchableHttpRetryWarning {}

impl ManifoldDispatchableHttpRetryWarning {
pub fn new() -> Self {
Self {}
}
}

Comment on lines +432 to +439
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
struct ManifoldDispatchableHttpRetryWarning {}
impl ManifoldDispatchableHttpRetryWarning {
pub fn new() -> Self {
Self {}
}
}
struct ManifoldDispatchableHttpRetryWarning;

And then just do ManifoldDispatchableHttpRetryWarning instead of ManifoldDispatchableHttpRetryWarning::new ?

impl DispatchableHttpRetryWarning for ManifoldDispatchableHttpRetryWarning {
fn dispatch(&self, backoff: &Duration, retries: usize, url: &str) {
tracing::warn!(
"HTTP request to {} failed; {} retries. Retrying in {} seconds",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(see my comment below, it's not actually retries, it's attempts)

url,
retries,
&backoff.as_secs(),
);
}
}

impl ManifoldClient {
pub fn new() -> anyhow::Result<Self> {
Ok(Self {
Expand Down Expand Up @@ -467,6 +487,8 @@ impl ManifoldClient {
}

let res = http_retry(
&url,
ManifoldDispatchableHttpRetryWarning::new(),
|| async {
self.client
.put(&url, buf.clone(), headers.clone())
Expand Down Expand Up @@ -499,6 +521,8 @@ impl ManifoldClient {
);

let res = http_retry(
&url,
ManifoldDispatchableHttpRetryWarning::new(),
|| async {
self.client
.post(&url, buf.clone(), vec![])
Expand Down
56 changes: 46 additions & 10 deletions app/buck2_common/src/http/retries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,46 @@ pub trait AsHttpError {
fn as_http_error(&self) -> Option<&HttpError>;
}

pub async fn http_retry<Exec, F, T, E>(exec: Exec, mut intervals: Vec<Duration>) -> Result<T, E>
/// Used to dispatch warnings to the event log in the case in the case of a
/// retryable HTTP error. This often occurs when some level of exponential
/// backoff is happening due to a lot of concurrent http_archive() requests for
/// a single host; see GH-316 and GH-321 for background.
pub trait DispatchableHttpRetryWarning {
/// Fire off a warning. This might go to the console, event log, or
/// some other place.
fn dispatch(&self, dur: &Duration, retries: usize, url: &str);
Comment on lines +53 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename retries -> attempts? The first time this function is called the value will be 1 so it's more accurately attempts than retries

}

/// No-op implementation of DispatchableHttpRetryWarning. This does nothing at
/// all when a retry occurs; useful for mock tests.
pub struct NoopDispatchableHttpRetryWarning {}

impl NoopDispatchableHttpRetryWarning {
pub fn new() -> Self {
Self {}
}
}
Comment on lines +58 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(same as above)


impl DispatchableHttpRetryWarning for NoopDispatchableHttpRetryWarning {
fn dispatch(&self, _backoff: &Duration, _retries: usize, _url: &str) {}
}

pub async fn http_retry<Exec, F, T, E, R>(
url: &str,
dispatch_retry_warning: R,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) passing &R might make a bit more sense considering that' all the API requires

exec: Exec,
mut intervals: Vec<Duration>,
) -> Result<T, E>
where
Exec: Fn() -> F,
E: AsHttpError + std::fmt::Display,
F: Future<Output = Result<T, E>>,
R: DispatchableHttpRetryWarning,
{
intervals.insert(0, Duration::from_secs(0));
let mut backoff = intervals.into_iter().peekable();

let mut retries = 0;
while let Some(duration) = backoff.next() {
tokio::time::sleep(duration).await;

Expand All @@ -63,12 +94,9 @@ where

if let Some(http_error) = http_error {
if http_error.is_retryable() {
retries += 1;
if let Some(b) = backoff.peek() {
tracing::warn!(
"Retrying a HTTP error after {} seconds: {:#}",
b.as_secs(),
http_error
);
dispatch_retry_warning.dispatch(&b, retries, url);
continue;
}
}
Expand Down Expand Up @@ -143,29 +171,37 @@ mod tests {

#[tokio::test]
async fn test_http_retry_success() {
let url = "http://example.com";
let dispatch_retry_warning = NoopDispatchableHttpRetryWarning::new();
let mock = Mock::new(vec![ok_response()]);
let result = http_retry(|| mock.exec(), retries(0)).await;
let result = http_retry(url, dispatch_retry_warning, || mock.exec(), retries(0)).await;
assert!(result.is_ok());
}

#[tokio::test]
async fn test_http_retry_retryable() {
let url = "http://example.com";
let dispatch_retry_warning = NoopDispatchableHttpRetryWarning::new();
let mock = Mock::new(vec![retryable(), ok_response()]);
let result = http_retry(|| mock.exec(), retries(1)).await;
let result = http_retry(url, dispatch_retry_warning, || mock.exec(), retries(1)).await;
assert!(result.is_ok());
}

#[tokio::test]
async fn test_http_retry_exhaust_retries() {
let url = "http://example.com";
let dispatch_retry_warning = NoopDispatchableHttpRetryWarning::new();
let mock = Mock::new(vec![retryable(), ok_response()]);
let result = http_retry(|| mock.exec(), retries(0)).await;
let result = http_retry(url, dispatch_retry_warning, || mock.exec(), retries(0)).await;
assert!(result.is_err());
}

#[tokio::test]
async fn test_http_retry_non_retryable() {
let url = "http://example.com";
let dispatch_retry_warning = NoopDispatchableHttpRetryWarning::new();
let mock = Mock::new(vec![non_retryable(), ok_response()]);
let result = http_retry(|| mock.exec(), retries(1)).await;
let result = http_retry(url, dispatch_retry_warning, || mock.exec(), retries(1)).await;
assert!(result.is_err());
}
}
8 changes: 8 additions & 0 deletions app/buck2_data/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ message CommandOptions {
uint64 concurrency = 1;
}

message HttpRequestRetried {
uint64 backoff_duration_secs = 1;
string url = 2;
}

// An event that represents a single point in time.
message InstantEvent {
reserved 8, 9, 13, 22;
Expand Down Expand Up @@ -236,6 +241,9 @@ message InstantEvent {

// Log options that are command-level and processed by the daemon.
CommandOptions comand_options = 31;

// Warnings of HTTP retries and exponential backoff
HttpRequestRetried http_request_retried = 32;
}

reserved 12; // Log
Expand Down
32 changes: 32 additions & 0 deletions app/buck2_execute/src/materialize/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ use buck2_common::file_ops::TrackedFileDigest;
use buck2_common::http::counting_client::CountingHttpClient;
use buck2_common::http::retries::http_retry;
use buck2_common::http::retries::AsHttpError;
use buck2_common::http::retries::DispatchableHttpRetryWarning;
use buck2_common::http::retries::HttpError;
use buck2_common::http::HttpClient;
use buck2_core::fs::fs_util;
use buck2_core::fs::project::ProjectRoot;
use buck2_core::fs::project_rel_path::ProjectRelativePath;
use buck2_data::HttpRequestRetried;
use bytes::Bytes;
use digest::DynDigest;
use dupe::Dupe;
Expand Down Expand Up @@ -99,8 +101,36 @@ impl AsHttpError for HttpDownloadError {
}
}

struct MaterializerDispatchableHttpRetryWarning {}

impl MaterializerDispatchableHttpRetryWarning {
fn new() -> Self {
Self {}
}
}

impl DispatchableHttpRetryWarning for MaterializerDispatchableHttpRetryWarning {
fn dispatch(&self, dur: &Duration, _retries: usize, url: &str) {
let event: HttpRequestRetried = HttpRequestRetried {
backoff_duration_secs: dur.as_secs(),
url: url.to_owned(),
};

match buck2_events::dispatch::get_dispatcher_opt() {
Some(dispatcher) => {
dispatcher.instant_event(event);
}
None => {
tracing::warn!("Failed to dispatch HttpRequestRetried event: {:?}", event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here what we actually would like to call is this function:

pub fn broadcast_instant_event<E: Into<buck2_data::instant_event::Data> + Clone>(
    event: &E,
) -> bool {
    let mut has_subscribers = false;

    for cmd in ACTIVE_COMMANDS.lock().values() {
        cmd.dispatcher.instant_event(event.clone());
        has_subscribers = true;
    }

    has_subscribers
}

(as-is unfortunately the logging wouldn't work when using the deferred materializer)

This function lives in buck2_server, so what we need to do is allow this to be injected into the callsites.

Maybe the easiest way to do this is to simply have the HttpClient provide its logging method (because this is already threaded through to all the places you need it), via a method like:

trait HttpClient {
  ..

  fn retry_logger(&self) -> &dyn DispatchableHttpRetryWarning;
}

You probably should actually keep the "log to stderr" implementation as just the default implementation for this method on HttpClient. This way all you need to do is just wrap the client being constructed for the server with the event-logging code.

So, putting it all together, what we need to do is:

  • Move this code to buck2_server
  • Have it implement the broadcast
  • When we create the server's http client (in DaemonState::init_data), we wrap it with some new struct implementing HttpClient that uses the event-logging implementation
  • When we create ManifoldClient's http client, you inject the other one in
  • When calling http_retry you get the retry logging from HttpClient

I can also make some of those changes if you'd like, just let me know

}
}
}
}

pub async fn http_head(client: &dyn HttpClient, url: &str) -> anyhow::Result<Response<()>> {
let response = http_retry(
url,
MaterializerDispatchableHttpRetryWarning::new(),
|| async {
client
.head(url)
Expand Down Expand Up @@ -128,6 +158,8 @@ pub async fn http_download(
}

Ok(http_retry(
url,
MaterializerDispatchableHttpRetryWarning::new(),
|| async {
let file = fs_util::create_file(&abs_path).map_err(HttpDownloadError::IoError)?;

Expand Down