Skip to content

Commit

Permalink
[#20234] Metacache improvement: Refresh Meta-cache when successful RP…
Browse files Browse the repository at this point in the history
…C responds

Summary:
Background:
As detailed in https://phorge.dev.yugabyte.com/D33533, the YBClient currently fails to update its meta-cache when there are changes in a raft group's configuration. This lapse can lead to inefficiencies such as persistent follower reads not recognizing the addition of closer followers. Consequently, even if there are suitable nearer followers, the system continues to rely on more distant followers as long as the RPCs are successful.

Solution:
This update proposes enhancing RPC mechanisms (Read, Write, GetChanges, GetTransactionStatus) by appending the current raft_config_opid_index to each request. Upon receiving a request, if the raft_config_opid_index is outdated compared to the committed raft config opid index on the TabletPeer handling the request, the Tablet Server will include updated raft consensus state information in its response. This change aims to ensure that the meta-cache remains current, thus improving the system's efficiency in recognizing and utilizing the optimal server configurations for processing requests. This adjustment is part of a series of updates (alongside D33197 and D33598) designed to keep the meta-cache sufficiently current, thereby preventing the inefficiencies previously caused by outdated cache information. A flag enable_metacache_partial_refresh is added to turn the feature on, it is by default of right now.

Upgrade/Rollback safety:
The additional field in the Response and Request Protobufs is temporary and will not be stored on disk, maintaining compatibility and safety during potential system upgrades or rollbacks.
Jira: DB-9194

Test Plan:
Jenkins: urgent
Full Coverage Testing:
Added a test flag FLAGS_TEST_always_return_consensus_Info_for_succeeded_rpc which will be turned on during debug mode. This flag will prompt the GetRaftConfigOpidIndex method on RemoteTablet to always return an OpId Index of value -2. So when the Tablet server is about to send back a successful response, it will find out that the request's piggybacked OpId index is stale, thus piggyback a TabletConsensusInfo to the response. When we receive the response in the aforementioned RPCs, if this flag is turned on, it will use a DCHECK to verify that if the RPC response can contain a TabletConsensusInfo and that the response was successful, then it must be the case that the TabletConsensusInfo exists in the response. This essentially allows us to leverage all the existing tests in the code base that exercises these RPCs to DCHECK our code path.

Unit testing:
Added metacache_refresh_itest.cc, which contains the following tests:
TestMetacacheRefreshFromFollowerRead:
1. Sets up an external mini-cluster.
2. Fills in the meta-cache by issuing a write op.
3. Change the raft configuration of the tablet group by blacklisting a node and adding a node.
4. Verify the next ConsistentPrefix read successfully refreshes meta-cache using a sync point.

TestMetacacheNoRefreshFromWrite:
1. Turns off the FLAGS_TEST_always_return_consensus_Info_for_succeeded_rpc
2. Fills in the meta-cache by issuing a write op.
3. Issue another write op and observe that no refresh happened.

Reviewers: mlillibridge, xCluster, hsunder

Reviewed By: mlillibridge

Subscribers: bogdan, ybase, ycdcxcluster

Differential Revision: https://phorge.dev.yugabyte.com/D34272
  • Loading branch information
SeanSong25 committed Apr 26, 2024
1 parent b64cac3 commit 1fa2ee4
Show file tree
Hide file tree
Showing 23 changed files with 452 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/yb/cdc/cdc_service.cc
Expand Up @@ -1582,7 +1582,8 @@ void CDCServiceImpl::GetChanges(
resp->mutable_error(),
CDCErrorPB::LEADER_NOT_READY,
context);

// Fill in tablet consensus info now we know that the tablet exists and is leader.
tserver::FillTabletConsensusInfoIfRequestOpIdStale(tablet_peer, req, resp);
auto stream_meta_ptr = RPC_VERIFY_RESULT(
GetStream(stream_id, RefreshStreamMapOption::kIfInitiatedState), resp->mutable_error(),
CDCErrorPB::INTERNAL_ERROR, context);
Expand Down Expand Up @@ -1615,7 +1616,6 @@ void CDCServiceImpl::GetChanges(
streaming_checkpoint_pb.set_key("");
streaming_checkpoint_pb.set_write_id(0);
resp->mutable_cdc_sdk_checkpoint()->CopyFrom(streaming_checkpoint_pb);

context.RespondSuccess();
return;
}
Expand Down
2 changes: 2 additions & 0 deletions src/yb/cdc/cdc_service.proto
Expand Up @@ -328,6 +328,8 @@ message GetChangesRequestPB {
optional uint32 auto_flags_config_version = 13;

optional CDCSDKRequestSource cdcsdk_request_source = 14 [default = DEBEZIUM];

optional int64 raft_config_opid_index = 15;
}

message KeyValuePairPB {
Expand Down
10 changes: 10 additions & 0 deletions src/yb/cdc/xcluster_rpc.cc
Expand Up @@ -178,6 +178,7 @@ class XClusterWriteRpc : public rpc::Rpc, public client::internal::TabletRpc {
}

bool RefreshMetaCacheWithResponse() override {
DCHECK(client::internal::CheckIfConsensusInfoUnexpectedlyMissing(req_, resp_));
if (!resp_.has_tablet_consensus_info()) {
VLOG(1) << "Partial refresh of tablet for XClusterWrite RPC failed because the response did "
"not have a tablet_consensus_info";
Expand All @@ -187,6 +188,10 @@ class XClusterWriteRpc : public rpc::Rpc, public client::internal::TabletRpc {
return invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info());
}

void SetRequestRaftConfigOpidIndex(int64_t opid_index) override {
req_.set_raft_config_opid_index(opid_index);
}

private:
void SendRpcToTserver(int attempt_num) override {
InvokeAsync(
Expand Down Expand Up @@ -323,6 +328,7 @@ class GetChangesRpc : public rpc::Rpc, public client::internal::TabletRpc {
}

bool RefreshMetaCacheWithResponse() override {
DCHECK(client::internal::CheckIfConsensusInfoUnexpectedlyMissing(req_, resp_));
if (!resp_.has_tablet_consensus_info()) {
VLOG(1) << "Partial refresh of tablet for GetChanges RPC failed because the response did not "
"have a tablet_consensus_info";
Expand All @@ -332,6 +338,10 @@ class GetChangesRpc : public rpc::Rpc, public client::internal::TabletRpc {
return invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info());
}

void SetRequestRaftConfigOpidIndex(int64_t opid_index) override {
req_.set_raft_config_opid_index(opid_index);
}

private:
const std::string &tablet_id() const { return req_.tablet_id(); }

Expand Down
6 changes: 6 additions & 0 deletions src/yb/client/async_rpc.cc
Expand Up @@ -488,6 +488,7 @@ void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) {

template <class Req, class Resp>
bool AsyncRpcBase<Req, Resp>::RefreshMetaCacheWithResponse() {
DCHECK(client::internal::CheckIfConsensusInfoUnexpectedlyMissing(req_, resp_));
if (!resp_.has_tablet_consensus_info()) {
VLOG(1) << "Partial refresh of tablet for " << GetRpcName()
<< " RPC failed because the response did not have a tablet_consensus_info";
Expand All @@ -497,6 +498,11 @@ bool AsyncRpcBase<Req, Resp>::RefreshMetaCacheWithResponse() {
return tablet_invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info());
}

template <class Req, class Resp>
void AsyncRpcBase<Req, Resp>::SetRequestRaftConfigOpidIndex(int64_t opid_index) {
req_.set_raft_config_opid_index(opid_index);
}

template <class Req, class Resp>
FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() {
return {GetPropagatedHybridTime(resp_),
Expand Down
2 changes: 2 additions & 0 deletions src/yb/client/async_rpc.h
Expand Up @@ -149,6 +149,8 @@ class AsyncRpcBase : public AsyncRpc {

bool RefreshMetaCacheWithResponse() override;

void SetRequestRaftConfigOpidIndex(int64_t opid_index) override;

virtual std::string GetRpcName() = 0;

protected:
Expand Down
6 changes: 6 additions & 0 deletions src/yb/client/client-test.cc
Expand Up @@ -140,6 +140,8 @@ DECLARE_int32(rocksdb_level0_file_num_compaction_trigger);

METRIC_DECLARE_counter(rpcs_queue_overflow);

DECLARE_bool(enable_metacache_partial_refresh);

using namespace std::literals; // NOLINT
using namespace std::placeholders;

Expand Down Expand Up @@ -2800,6 +2802,8 @@ Result<client::internal::RemoteTabletPtr> GetRemoteTablet(
// tablet server, thus it will have the latest information about who the leader really is, and
// thus should always succeed with exactly 2 RPCs.
TEST_F(ClientTest, TestMetacacheRefreshWhenSentToWrongLeader) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_metacache_partial_refresh) =
true;
shared_ptr<YBTable> pgsql_table;
EXPECT_OK(client_->OpenTable(kPgsqlTableId, &pgsql_table));
std::shared_ptr<YBSession> session = CreateSession(client_.get());
Expand Down Expand Up @@ -2878,6 +2882,8 @@ TEST_F(ClientTest, TestMetacacheRefreshWhenSentToWrongLeader) {
const auto ts_map = ASSERT_RESULT(itest::CreateTabletServerMap(master_proxy, proxy_cache_.get()));
auto target_details = ts_map.at(tserver_uuids[target_index]).get();
auto leader_ts = ts_map.at(leader_server_uuid).get();
// Step down to target replica and wait until the leader is elected on the target.
// Wait for all peers to get ready.
ASSERT_OK(
(itest::WaitForAllPeersToCatchup(tablet.tablet_id(), TServerDetailsVector(ts_map), timeout)));
ASSERT_OK(itest::LeaderStepDown(
Expand Down
4 changes: 4 additions & 0 deletions src/yb/client/client.cc
Expand Up @@ -2958,5 +2958,9 @@ bool YBClient::RefreshTabletInfoWithConsensusInfo(
return true;
}

int64_t YBClient::GetRaftConfigOpidIndex(const TabletId& tablet_id) {
return data_->meta_cache_->GetRaftConfigOpidIndex(tablet_id);
}

} // namespace client
} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/client/client.h
Expand Up @@ -1036,6 +1036,8 @@ class YBClient {
bool RefreshTabletInfoWithConsensusInfo(
const tserver::TabletConsensusInfoPB& newly_received_info);

int64_t GetRaftConfigOpidIndex(const TabletId& tablet_id);

private:
class Data;

Expand Down
13 changes: 13 additions & 0 deletions src/yb/client/meta_cache.cc
Expand Up @@ -125,6 +125,7 @@ METRIC_DEFINE_event_stats(

DECLARE_string(placement_cloud);
DECLARE_string(placement_region);
DECLARE_bool(TEST_always_return_consensus_info_for_succeeded_rpc);

namespace yb {

Expand Down Expand Up @@ -1255,6 +1256,18 @@ Status MetaCache::RefreshTabletInfoWithConsensusInfo(
}
}

int64_t MetaCache::GetRaftConfigOpidIndex(const TabletId& tablet_id) {
SharedLock lock(mutex_);
if (GetAtomicFlag(&FLAGS_TEST_always_return_consensus_info_for_succeeded_rpc)) {
return RemoteTablet::kUnknownOpIdIndex;
}
auto remote_tablet = FindPtrOrNull(tablets_by_id_, tablet_id);
if (remote_tablet == nullptr) {
return RemoteTablet::kUnknownOpIdIndex;
}
return remote_tablet->raft_config_opid_index();
}

std::unordered_map<TableId, TableData>::iterator MetaCache::InitTableDataUnlocked(
const TableId& table_id, const VersionedTablePartitionListPtr& partitions) {
VLOG_WITH_PREFIX_AND_FUNC(4) << Format(
Expand Down
2 changes: 2 additions & 0 deletions src/yb/client/meta_cache.h
Expand Up @@ -629,6 +629,8 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
Status RefreshTabletInfoWithConsensusInfo(
const tserver::TabletConsensusInfoPB& tablet_consensus_info);

int64_t GetRaftConfigOpidIndex(const TabletId& tablet_id);

private:
friend class LookupRpc;
friend class LookupByKeyRpc;
Expand Down
34 changes: 27 additions & 7 deletions src/yb/client/tablet_rpc.cc
Expand Up @@ -29,6 +29,7 @@
#include "yb/tserver/tserver_error.h"
#include "yb/tserver/tserver_service.proxy.h"

#include "yb/util/debug-util.h"
#include "yb/util/flags.h"
#include "yb/util/logging.h"
#include "yb/util/result.h"
Expand All @@ -55,6 +56,16 @@ DEFINE_UNKNOWN_int32(lookup_cache_refresh_secs, 60, "When non-zero, specifies ho
DEFINE_test_flag(int32, assert_failed_replicas_less_than, 0,
"If greater than 0, this process will crash if the number of failed replicas for "
"a RemoteTabletServer is greater than the specified number.");
DEFINE_test_flag(
bool, always_return_consensus_info_for_succeeded_rpc, false,
"If set to true, we will always pass a stale raft_config_opid_index to the request when it is "
"possible for the request. This is turned on in debug mode to test that our metacache "
"will always be refreshed when a successful Write/Read/TransactionStatus/GetChanges RPC "
"responds.");
DEFINE_RUNTIME_bool(
enable_metacache_partial_refresh, false,
"If set, we will attempt to refresh the tablet metadata cache with a TabletConsensusInfoPB in "
"the tablet invoker.");

using namespace std::placeholders;

Expand Down Expand Up @@ -264,11 +275,14 @@ void TabletInvoker::Execute(const std::string& tablet_id, bool leader_only) {

VLOG(2) << "Tablet " << tablet_id_ << ": Sending " << command_->ToString() << " to replica "
<< current_ts_->ToString();
int64_t opid_index = client_->GetRaftConfigOpidIndex(tablet_id_);
rpc_->SetRequestRaftConfigOpidIndex(opid_index);
rpc_->SendRpcToTserver(retrier_->attempt_num());
}

Status TabletInvoker::FailToNewReplica(const Status& reason,
const tserver::TabletServerErrorPB* error_code) {
const tserver::TabletServerErrorPB* error_code,
bool consensus_info_refresh_succeeded) {
TRACE_TO(trace_, "FailToNewReplica($0)", reason.ToString());
if (ErrorCode(error_code) == tserver::TabletServerErrorPB::STALE_FOLLOWER) {
VLOG(1) << "Stale follower for " << command_->ToString() << " just retry";
Expand All @@ -289,10 +303,9 @@ Status TabletInvoker::FailToNewReplica(const Status& reason,
// tablet is successfully refreshed using the tablet_consensus_info, so we need to clear the
// followers set to let tablet invoker use our latest leader tablet peer that has just been
// refreshed.

if (rpc_->RefreshMetaCacheWithResponse()) {
bool refresh_succeeded = true;
TEST_SYNC_POINT_CALLBACK("CDCSDKMetaCacheRefreshTest::Refresh", &refresh_succeeded);
if (consensus_info_refresh_succeeded) {
TEST_SYNC_POINT_CALLBACK(
"CDCSDKMetaCacheRefreshTest::Refresh", &consensus_info_refresh_succeeded);
followers_.clear();
} else {
followers_.emplace(
Expand Down Expand Up @@ -333,7 +346,14 @@ bool TabletInvoker::Done(Status* status) {

bool assign_new_leader = assign_new_leader_;
assign_new_leader_ = false;

bool consensus_info_refresh_succeeded = false;
if (GetAtomicFlag(&FLAGS_enable_metacache_partial_refresh)) {
consensus_info_refresh_succeeded = rpc_->RefreshMetaCacheWithResponse();
if (status->ok()) {
TEST_SYNC_POINT_CALLBACK(
"TabletInvoker::RefreshFinishedWithOkRPCResponse", &consensus_info_refresh_succeeded);
}
}
if (status->IsAborted() || retrier_->finished()) {
if (status->ok()) {
*status = retrier_->controller().status();
Expand Down Expand Up @@ -438,7 +458,7 @@ bool TabletInvoker::Done(Status* status) {
if (status->IsIllegalState() || IsTabletConsideredNotFound(rsp_err, *status) ||
IsTabletConsideredNonLeader(rsp_err, *status)) {
// The whole operation is completed if we can't schedule a retry.
return !FailToNewReplica(*status, rsp_err).ok();
return !FailToNewReplica(*status, rsp_err, consensus_info_refresh_succeeded).ok();
} else {
tserver::TabletServerDelay delay(*status);
auto retry_status = delay.value().Initialized()
Expand Down
21 changes: 19 additions & 2 deletions src/yb/client/tablet_rpc.h
Expand Up @@ -34,9 +34,12 @@
#include "yb/tserver/tserver_fwd.h"
#include "yb/tserver/tserver_types.pb.h"

#include "yb/util/atomic.h"
#include "yb/util/status_fwd.h"
#include "yb/util/net/net_fwd.h"

DECLARE_bool(TEST_always_return_consensus_info_for_succeeded_rpc);

namespace yb {

namespace tserver {
Expand All @@ -62,12 +65,25 @@ class TabletRpc {
// Returns true if we successfully updated the metacache, otherwise false.
virtual bool RefreshMetaCacheWithResponse() { return false; }

virtual void SetRequestRaftConfigOpidIndex(int64_t opid_index) {}

protected:
~TabletRpc() {}
};

tserver::TabletServerErrorPB_Code ErrorCode(const tserver::TabletServerErrorPB* error);

template <class Resp, class Req>
inline bool CheckIfConsensusInfoUnexpectedlyMissing(const Req& request, const Resp& response) {
if (tserver::HasTabletConsensusInfo<Resp>::value) {
if (GetAtomicFlag(&FLAGS_TEST_always_return_consensus_info_for_succeeded_rpc) &&
!response.has_error()) {
return response.has_tablet_consensus_info();
}
}
return true;
}

class TabletInvoker {
public:
// If table is specified, TabletInvoker can detect that table partitions are stale in case tablet
Expand Down Expand Up @@ -122,7 +138,8 @@ class TabletInvoker {
// Marks all replicas on current_ts_ as failed and retries the write on a
// new replica.
Status FailToNewReplica(const Status& reason,
const tserver::TabletServerErrorPB* error_code = nullptr);
const tserver::TabletServerErrorPB* error_code = nullptr,
bool consensus_info_refresh_succeeded = false);

// Called when we finish a lookup (to find the new consensus leader). Retries
// the rpc after a short delay.
Expand Down Expand Up @@ -150,7 +167,7 @@ class TabletInvoker {
if (ErrorCode(error_code) == tserver::TabletServerErrorPB::NOT_THE_LEADER &&
current_ts_ != nullptr) {
return status.IsNotFound() || status.IsIllegalState();
}
}
return false;
}

Expand Down
7 changes: 7 additions & 0 deletions src/yb/client/transaction_rpc.cc
Expand Up @@ -118,6 +118,7 @@ class TransactionRpc : public TransactionRpcBase {

bool RefreshMetaCacheWithResponse() override {
if constexpr (tserver::HasTabletConsensusInfo<typename Traits::Response>::value) {
DCHECK(client::internal::CheckIfConsensusInfoUnexpectedlyMissing(req_, resp_));
if (resp_.has_tablet_consensus_info()) {
return GetInvoker().RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info());
}
Expand All @@ -128,6 +129,12 @@ class TransactionRpc : public TransactionRpcBase {
return false;
}

void SetRequestRaftConfigOpidIndex(int64_t opid_index) override {
if constexpr (tserver::HasRaftConfigOpidIndex<typename Traits::Request>::value) {
req_.set_raft_config_opid_index(opid_index);
}
}

private:
const std::string& tablet_id() const override {
return req_.tablet_id();
Expand Down
1 change: 1 addition & 0 deletions src/yb/integration-tests/CMakeLists.txt
Expand Up @@ -166,6 +166,7 @@ ADD_YB_TEST(tserver_path_handlers-itest)
ADD_YB_TEST(master_failover-itest)
ADD_YB_TEST(master_config-itest)
ADD_YB_TEST(master_tasks-test)
ADD_YB_TEST(metacache_refresh-itest)
ADD_YB_TEST(network_failure-test)
ADD_YB_TEST(system_table_fault_tolerance)
ADD_YB_TEST(raft_consensus-itest)
Expand Down
3 changes: 3 additions & 0 deletions src/yb/integration-tests/cdc_service-int-test.cc
Expand Up @@ -102,6 +102,7 @@ DECLARE_string(vmodule);
METRIC_DECLARE_entity(cdc);
METRIC_DECLARE_gauge_int64(last_read_opid_index);

DECLARE_bool(enable_metacache_partial_refresh);
namespace yb {

namespace log {
Expand Down Expand Up @@ -1037,6 +1038,8 @@ class CDCServiceTestMultipleServersOneTablet : public CDCServiceTest {
// serving as a proxy, the proxy should receive a TabletConsensusInfo that
// it can use to refresh its metacache.
TEST_F(CDCServiceTestMultipleServersOneTablet, TestGetChangesRpcTabletConsensusInfo) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_metacache_partial_refresh) =
true;
// Find the leader and followers for our tablet.
const MonoDelta timeout = MonoDelta::FromSeconds(10);
const auto proxy_cache_ = std::make_unique<rpc::ProxyCache>(client_->messenger());
Expand Down

0 comments on commit 1fa2ee4

Please sign in to comment.