Skip to content

Commit

Permalink
Log test-result message sizes (#87)
Browse files Browse the repository at this point in the history
  • Loading branch information
ayazhafiz committed Jan 12, 2024
1 parent df44b47 commit 39ddb53
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 14 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 1.7.3

ABQ 1.7.3 is a patch release.

This release adds additional info logging for message sizes.

# 1.7.2

ABQ 1.7.2 is a patch release.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/abq_cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "abq"
version = "1.7.2"
version = "1.7.3"
edition = "2021"

[dependencies]
Expand Down
7 changes: 7 additions & 0 deletions crates/abq_queue/src/persistence/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ pub enum EligibleForRemoteDump {
No,
}

impl EligibleForRemoteDump {
fn bool(self) -> bool {
self == Self::Yes
}
}

impl ResultsPersistedCell {
pub fn new(run_id: RunId) -> Self {
Self(Arc::new(CellInner {
Expand Down Expand Up @@ -183,6 +189,7 @@ pub struct PersistencePlan<'a> {
}

impl<'a> PersistencePlan<'a> {
#[tracing::instrument(level = "info", skip_all, fields(run_id = %self.cell.run_id, eligible_for_remote_dump = %self.eligible_for_remote_dump.bool()))]
pub async fn execute(self) -> Result<()> {
let result = self
.persist_results
Expand Down
21 changes: 15 additions & 6 deletions crates/abq_queue/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1921,10 +1921,11 @@ impl QueueServer {
.await
.located(here!())
.no_entity()?;
let Request { entity, message } = net_protocol::async_read(&mut stream)
.await
.located(here!())
.no_entity()?;
let (Request { entity, message }, request_message_size) =
net_protocol::async_read_with_size(&mut stream)
.await
.located(here!())
.no_entity()?;

let result: OpaqueResult<()> = match message {
Message::HealthCheck => Self::handle_healthcheck(entity, stream).await,
Expand Down Expand Up @@ -1972,6 +1973,7 @@ impl QueueServer {
run_id,
entity,
results,
request_message_size,
stream,
)
.await
Expand Down Expand Up @@ -2277,16 +2279,17 @@ impl QueueServer {
Ok(())
}

#[instrument(level = "trace", skip(queues, persist_results))]
#[instrument(level = "info", skip_all, fields(run_id, entity, results_message_size))]
async fn handle_worker_results(
queues: SharedRuns,
persist_results: SharedPersistResults,
run_id: RunId,
entity: Entity,
results: Vec<AssociatedTestResults>,
results_message_size: usize,
mut stream: Box<dyn net_async::ServerStream>,
) -> OpaqueResult<()> {
tracing::debug!(?entity, ?run_id, "got result");
tracing::debug!(?entity, ?run_id, ?results_message_size, "got result");

// Build a plan for persistence of the result, then immediately chuck the test result ACK
// over the wire before executing the plan.
Expand Down Expand Up @@ -3703,6 +3706,7 @@ mod test {
run_id.clone(),
entity,
vec![result],
0,
server_conn,
);
let cancellation_fut =
Expand Down Expand Up @@ -4787,6 +4791,7 @@ mod persist_results {
run_id.clone(),
Entity::runner(0, 1),
results.clone(),
0,
server_conn,
)
.await
Expand Down Expand Up @@ -4834,6 +4839,7 @@ mod persist_results {
run_id.clone(),
Entity::runner(0, 1),
results.clone(),
0,
server_conn,
)
.await
Expand Down Expand Up @@ -4876,6 +4882,7 @@ mod persist_results {
run_id.clone(),
Entity::runner(0, 1),
results.clone(),
0,
server_conn,
)
.await;
Expand Down Expand Up @@ -5038,6 +5045,7 @@ mod persist_results {
run_id.clone(),
Entity::runner(0, 1),
results,
0,
server_conn,
)
.await
Expand Down Expand Up @@ -5154,6 +5162,7 @@ mod persist_results {
run_id.clone(),
Entity::runner(1, 1),
results3.clone(),
0,
server_conn,
)
.await
Expand Down
27 changes: 21 additions & 6 deletions crates/abq_utils/src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,10 @@ impl<const COMPRESS_LARGE: bool> AsyncReader<COMPRESS_LARGE> {
///
/// Cancellation-safe, but the same `reader` must be provided between cancellable calls.
/// If errors, not resumable.
pub async fn next<R, T: serde::de::DeserializeOwned>(&mut self, reader: &mut R) -> io::Result<T>
pub async fn next<R, T: serde::de::DeserializeOwned>(
&mut self,
reader: &mut R,
) -> io::Result<(T, usize)>
where
R: tokio::io::AsyncReadExt + Unpin,
{
Expand Down Expand Up @@ -1178,7 +1181,7 @@ impl<const COMPRESS_LARGE: bool> AsyncReader<COMPRESS_LARGE> {
self.msg_buf.clear();
self.next_expiration = None;

return Ok(msg);
return Ok((msg, size));
}
}
}
Expand All @@ -1192,6 +1195,16 @@ impl<const COMPRESS_LARGE: bool> AsyncReader<COMPRESS_LARGE> {
pub async fn async_read<R, T: serde::de::DeserializeOwned>(
reader: &mut R,
) -> Result<T, std::io::Error>
where
R: tokio::io::AsyncReadExt + Unpin,
{
let (msg, _) = AsyncReader::<true>::new(READ_TIMEOUT).next(reader).await?;
Ok(msg)
}

pub async fn async_read_with_size<R, T: serde::de::DeserializeOwned>(
reader: &mut R,
) -> Result<(T, usize), std::io::Error>
where
R: tokio::io::AsyncReadExt + Unpin,
{
Expand All @@ -1210,7 +1223,8 @@ pub async fn async_read_local<R, T: serde::de::DeserializeOwned>(
where
R: tokio::io::AsyncReadExt + Unpin,
{
AsyncReader::<false>::new(READ_TIMEOUT).next(reader).await
let (msg, _) = AsyncReader::<false>::new(READ_TIMEOUT).next(reader).await?;
Ok(msg)
}

/// Like [write], but async.
Expand Down Expand Up @@ -1388,7 +1402,7 @@ mod test {
let msg_size = 10_u32.to_be_bytes();
client_conn.write_all(&msg_size).await.unwrap();

let read_result: Result<(), _> = AsyncReader::<true>::new(Duration::from_secs(0))
let read_result: Result<((), _), _> = AsyncReader::<true>::new(Duration::from_secs(0))
.next(&mut server_conn)
.await;
assert!(read_result.is_err());
Expand Down Expand Up @@ -1440,8 +1454,9 @@ mod test {
} else {
// After we read the last message, we should in fact end up with the message we
// expect.
let msg = handle.await.unwrap().unwrap();
let (msg, size) = handle.await.unwrap().unwrap();
assert_eq!(msg, "11111111");
assert_eq!(size, 10);
}
}
}
Expand Down Expand Up @@ -1525,7 +1540,7 @@ mod test {
} else {
// After we read the last message, we should in fact end up with the message we
// expect.
let msg = handle.await.unwrap().unwrap();
let (msg, _) = handle.await.unwrap().unwrap();
assert_eq!(msg, expected_str);
}
}
Expand Down

0 comments on commit 39ddb53

Please sign in to comment.