Skip to content

Commit

Permalink
[#22267] ASH: Export Wait state as string when viewing it in /rpcz
Browse files Browse the repository at this point in the history
Summary:
Export Wait state as string when viewing it in /rpcz
Jira: DB-11184

Upgrade/Downgrade safety:
  Ok to downgrade. Only changes to proto are a) renaming fields, and b) introducing a new flag which is used within the TServer process.

Test Plan: ybd --cxx-test wait_states-itest

Reviewers: asaha

Reviewed By: asaha

Subscribers: esheng, ybase, yql

Differential Revision: https://phorge.dev.yugabyte.com/D34769
  • Loading branch information
amitanandaiyer committed May 7, 2024
1 parent 1fbe96e commit de9fd5d
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 61 deletions.
9 changes: 4 additions & 5 deletions src/yb/ash/wait_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "yb/util/uuid.h"

DECLARE_bool(ysql_yb_enable_ash);
DECLARE_bool(TEST_export_wait_state_names);

#define SET_WAIT_STATUS_TO(ptr, code) \
if ((ptr)) (ptr)->set_code(BOOST_PP_CAT(yb::ash::WaitStateCode::k, code))
Expand Down Expand Up @@ -368,13 +367,13 @@ class WaitStateInfo {
}

template <class PB>
void ToPB(PB* pb) EXCLUDES(mutex_) {
void ToPB(PB* pb, bool export_wait_state_names) EXCLUDES(mutex_) {
std::lock_guard lock(mutex_);
metadata_.ToPB(pb->mutable_metadata());
WaitStateCode code = this->code();
pb->set_wait_status_code(yb::to_underlying(code));
if (FLAGS_TEST_export_wait_state_names) {
pb->set_wait_status_code_as_string(yb::ToString(code));
pb->set_wait_state_code(yb::to_underlying(code));
if (export_wait_state_names) {
pb->set_wait_state_code_as_string(yb::ToString(code));
}
aux_info_.ToPB(pb->mutable_aux_info());
}
Expand Down
6 changes: 3 additions & 3 deletions src/yb/common/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -664,10 +664,10 @@ message WaitStateInfoPB {
// The metadata is rarely modified.
optional AshMetadataPB metadata = 1;
// Represents the state of the curreent rpc/request when it was polled.
optional uint32 wait_status_code = 2;
optional uint32 wait_state_code = 2;
// Some additional information that may be persisted as
// yb_active_universe_history.wait_event_aux
optional AshAuxDataPB aux_info = 3;
// A string representation of wait_status_code for easy human consumption.
optional string wait_status_code_as_string = 4;
// A string representation of wait_state_code for easy human consumption.
optional string wait_state_code_as_string = 4;
}
8 changes: 4 additions & 4 deletions src/yb/integration-tests/wait_states-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,18 +335,18 @@ void WaitStateTestCheckMethodCounts::UpdateCounts(
VLOG(2) << "Entry " << ++idx << " : " << yb::ToString(entry);
const auto& method =
(entry.has_aux_info() && entry.aux_info().has_method() ? entry.aux_info().method() : "");
const auto& wait_state_code = entry.wait_status_code_as_string();
const auto& wait_state_code = entry.wait_state_code_as_string();
++method_counts_[method];
++wait_state_code_counts_[wait_state_code];
++wait_state_code_counts_by_method_[method][wait_state_code];

if (method.empty()) {
LOG(ERROR) << "Found entry without AuxInfo/method." << entry.DebugString();
// If an RPC does not have the aux/method information, it shouldn't have progressed much.
if (entry.has_wait_status_code_as_string()) {
if (entry.has_wait_state_code_as_string()) {
EXPECT_TRUE(
entry.wait_status_code_as_string() == "kOnCpu_Passive" ||
entry.wait_status_code_as_string() == "kOnCpu_Active");
entry.wait_state_code_as_string() == "kOnCpu_Passive" ||
entry.wait_state_code_as_string() == "kOnCpu_Active");
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/rpc/rpc_introspection.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ message DumpRunningRpcsRequestPB {
optional bool dump_timed_out = 2;
optional bool get_wait_state = 3;
optional bool get_local_calls = 4;
optional bool export_wait_state_code_as_string = 5;
}

message DumpRunningRpcsResponsePB {
Expand Down
5 changes: 2 additions & 3 deletions src/yb/rpc/yb_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,10 @@ bool YBInboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
}
if (req.get_wait_state()) {
if (const auto& wait_state = this->wait_state()) {
// TBD: Add WaitStateInfoPB to the response instead of the string.
wait_state->ToPB(resp->mutable_wait_state());
wait_state->ToPB(resp->mutable_wait_state(), req.export_wait_state_code_as_string());
TRACE_TO(
trace(), "Pulled $0",
yb::ToString(ash::WaitStateCode(resp->wait_state().wait_status_code())));
yb::ToString(ash::WaitStateCode(resp->wait_state().wait_state_code())));
}
}
resp->set_elapsed_millis(MonoTime::Now().GetDeltaSince(timing_.time_received)
Expand Down
1 change: 1 addition & 0 deletions src/yb/server/rpcz-path-handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ void RpczPathHandler(Messenger* messenger,
DumpRunningRpcsResponsePB dump_resp;

dump_req.set_get_wait_state(GetBool(req.parsed_args, "get_wait_state", false));
dump_req.set_export_wait_state_code_as_string(true);
dump_req.set_include_traces(GetBool(req.parsed_args, "include_traces", false));
dump_req.set_dump_timed_out(GetBool(req.parsed_args, "timed_out", false));
dump_req.set_get_local_calls(true);
Expand Down
1 change: 1 addition & 0 deletions src/yb/tserver/pg_client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ message PgActiveSessionHistoryRequestPB {
bool fetch_cql_states = 3;
bool ignore_ash_and_perform_calls = 4;
bool fetch_raft_log_appender_states = 5;
bool export_wait_state_code_as_string = 6;
}

message WaitStatesPB {
Expand Down
72 changes: 30 additions & 42 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1385,14 +1385,26 @@ class PgClientServiceImpl::Impl {
return (
!call.has_wait_state() ||
// Ignore log-appenders which are just Idle
call.wait_state().wait_status_code() == yb::to_underlying(ash::WaitStateCode::kIdle) ||
call.wait_state().wait_state_code() == yb::to_underlying(ash::WaitStateCode::kIdle) ||
// Ignore ActiveSessionHistory/Perform calls, if desired.
(req.ignore_ash_and_perform_calls() && call.wait_state().has_aux_info() &&
call.wait_state().aux_info().has_method() &&
(call.wait_state().aux_info().method() == "ActiveSessionHistory" ||
call.wait_state().aux_info().method() == "Perform")));
}

void PopulateWaitStates(
const PgActiveSessionHistoryRequestPB& req, const yb::rpc::RpcConnectionPB& conn,
tserver::WaitStatesPB* resp) {
for (const auto& call : conn.calls_in_flight()) {
if (ShouldIgnoreCall(req, call)) {
VLOG(3) << "Ignoring " << call.wait_state().DebugString();
}
auto* wait_state = resp->add_wait_states();
wait_state->CopyFrom(call.wait_state());
}
}

void GetRpcsWaitStates(
const PgActiveSessionHistoryRequestPB& req, ash::Component component,
tserver::WaitStatesPB* resp) {
Expand All @@ -1410,62 +1422,36 @@ class PgClientServiceImpl::Impl {
dump_req.set_get_wait_state(true);
dump_req.set_dump_timed_out(false);
dump_req.set_get_local_calls(true);
dump_req.set_export_wait_state_code_as_string(req.export_wait_state_code_as_string());

WARN_NOT_OK(messenger->DumpRunningRpcs(dump_req, &dump_resp), "DumpRunningRpcs failed");

size_t ignored_calls = 0;
size_t ignored_calls_no_wait_state = 0;
size_t cntr = 0;
for (const auto& conns : dump_resp.inbound_connections()) {
for (const auto& call : conns.calls_in_flight()) {
if (ShouldIgnoreCall(req, call)) {
ignored_calls++;
if (!call.has_wait_state()) {
ignored_calls_no_wait_state++;
}
continue;
}
resp->add_wait_states()->CopyFrom(call.wait_state());
VLOG(2) << cntr++ << " Inbound call sending " << call.wait_state().DebugString();
}
for (const auto& conn : dump_resp.inbound_connections()) {
PopulateWaitStates(req, conn, resp);
}

if (dump_resp.has_local_calls()) {
cntr = 0;
for (const auto& call : dump_resp.local_calls().calls_in_flight()) {
if (ShouldIgnoreCall(req, call)) {
ignored_calls++;
if (!call.has_wait_state()) {
ignored_calls_no_wait_state++;
}
continue;
}
resp->add_wait_states()->CopyFrom(call.wait_state());
VLOG(2) << cntr++ << " Local call sending " << call.wait_state().DebugString();
}
PopulateWaitStates(req, dump_resp.local_calls(), resp);
}
LOG_IF(INFO, VLOG_IS_ON(1) || ignored_calls_no_wait_state > 0)
<< "Ignored " << ignored_calls << " calls. " << ignored_calls_no_wait_state
<< " without wait state";
VLOG(3) << __PRETTY_FUNCTION__
<< " wait-states: " << yb::ToString(resp->wait_states());

VLOG(3) << __PRETTY_FUNCTION__ << " wait-states: " << yb::ToString(resp->wait_states());
}

void AddWaitStatesToResponse(const ash::WaitStateTracker& tracker, tserver::WaitStatesPB* resp) {
void AddWaitStatesToResponse(
const ash::WaitStateTracker& tracker, bool export_wait_state_names,
tserver::WaitStatesPB* resp) {
Result<Uuid> local_uuid = Uuid::FromHexStringBigEndian(instance_id_);
DCHECK_OK(local_uuid);
resp->set_component(yb::to_underlying(ash::Component::kTServer));
size_t cntr = 0;
for (auto& wait_state_ptr : tracker.GetWaitStates()) {
yb::WaitStateInfoPB pb;
if (wait_state_ptr && wait_state_ptr->code() != ash::WaitStateCode::kIdle) {
if (local_uuid) {
wait_state_ptr->set_yql_endpoint_tserver_uuid(*local_uuid);
}
wait_state_ptr->ToPB(&pb);
resp->add_wait_states()->CopyFrom(pb);
VLOG(2) << cntr++ << " Tracker call sending " << pb.DebugString();
wait_state_ptr->ToPB(resp->add_wait_states(), export_wait_state_names);
}
}
VLOG(2) << "Tracker call sending " << resp->DebugString();
}

Status ActiveSessionHistory(
Expand All @@ -1474,16 +1460,18 @@ class PgClientServiceImpl::Impl {
if (req.fetch_tserver_states()) {
GetRpcsWaitStates(req, ash::Component::kTServer, resp->mutable_tserver_wait_states());
AddWaitStatesToResponse(
ash::SharedMemoryPgPerformTracker(), resp->mutable_tserver_wait_states());
ash::SharedMemoryPgPerformTracker(), req.export_wait_state_code_as_string(),
resp->mutable_tserver_wait_states());
}
if (req.fetch_flush_and_compaction_states()) {
AddWaitStatesToResponse(
ash::FlushAndCompactionWaitStatesTracker(),
ash::FlushAndCompactionWaitStatesTracker(), req.export_wait_state_code_as_string(),
resp->mutable_flush_and_compaction_wait_states());
}
if (req.fetch_raft_log_appender_states()) {
AddWaitStatesToResponse(
ash::RaftLogAppenderWaitStatesTracker(), resp->mutable_raft_log_appender_wait_states());
ash::RaftLogAppenderWaitStatesTracker(), req.export_wait_state_code_as_string(),
resp->mutable_raft_log_appender_wait_states());
}
if (req.fetch_cql_states()) {
GetRpcsWaitStates(req, ash::Component::kYCQL, resp->mutable_cql_wait_states());
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/cql/cqlserver/cql_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ bool CQLInboundCall::DumpPB(const rpc::DumpRunningRpcsRequestPB& req,
}
if (req.get_wait_state()) {
if (wait_state()) {
wait_state()->ToPB(resp->mutable_wait_state());
wait_state()->ToPB(resp->mutable_wait_state(), req.export_wait_state_code_as_string());
}
}
resp->set_elapsed_millis(
Expand Down
2 changes: 2 additions & 0 deletions src/yb/yql/pggate/pg_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ DEFINE_NON_RUNTIME_int32(pg_client_extra_timeout_ms, 2000,
DECLARE_bool(TEST_index_read_multiple_partitions);
DECLARE_bool(TEST_ash_fetch_wait_states_for_raft_log);
DECLARE_bool(TEST_ash_fetch_wait_states_for_rocksdb_flush_and_compaction);
DECLARE_bool(TEST_export_wait_state_names);
DECLARE_bool(ysql_enable_db_catalog_version_mode);

extern int yb_locks_min_txn_age;
Expand Down Expand Up @@ -1129,6 +1130,7 @@ class PgClient::Impl : public BigDataFetcher {
FLAGS_TEST_ash_fetch_wait_states_for_rocksdb_flush_and_compaction);
req.set_fetch_cql_states(true);
req.set_ignore_ash_and_perform_calls(true);
req.set_export_wait_state_code_as_string(FLAGS_TEST_export_wait_state_names);
tserver::PgActiveSessionHistoryResponsePB resp;

RETURN_NOT_OK(proxy_->ActiveSessionHistory(req, &resp, PrepareController()));
Expand Down
2 changes: 1 addition & 1 deletion src/yb/yql/pggate/ybc_pggate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ void AshCopyTServerSample(
cb_metadata->session_id = tserver_metadata.session_id();
cb_sample->rpc_request_id = tserver_metadata.rpc_request_id();
cb_sample->encoded_wait_event_code =
AshEncodeWaitStateCodeWithComponent(component, tserver_sample.wait_status_code());
AshEncodeWaitStateCodeWithComponent(component, tserver_sample.wait_state_code());
cb_sample->sample_weight = 1; // TODO: Change this once sampling is done at tserver side
cb_sample->sample_time = sample_time;

Expand Down
4 changes: 2 additions & 2 deletions src/yb/yql/pgwrapper/pg_mini-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ TEST_F_EX(PgMiniTest, YB_DISABLE_TEST_IN_TSAN(Ash), PgMiniAshTest) {
} else {
LOG(ERROR) << "Found entry without AuxInfo/method." << entry.DebugString();
// If an RPC does not have the aux/method information, it shouldn't have progressed much.
if (entry.has_wait_status_code_as_string()) {
ASSERT_EQ(entry.wait_status_code_as_string(), "OnCpu_Passive");
if (entry.has_wait_state_code_as_string()) {
ASSERT_EQ(entry.wait_state_code_as_string(), "OnCpu_Passive");
}
++calls_without_aux_info_details;
}
Expand Down

0 comments on commit de9fd5d

Please sign in to comment.