Skip to content

Commit

Permalink
[#20234] DocDB: Improved metacache to refresh using TabletConsensusIn…
Browse files Browse the repository at this point in the history
…fo when received NOT_THE_LEADER error

Summary:
Problem Background:
In our system, when a client needs to perform an operation on a specific tablet, it first needs to find out which server is currently responsible for that operation. If the operation is a WriteRpc for example, it must find the tablet leader server. However, the system's current method of figuring out the tablet leader is not very efficient. It tries to guess the leader based on a list of potential servers (peers), but this guessing game can be slow, especially when there are many servers or when the servers are located far apart geographically. This inefficiency can lead to operations failing because the leader wasn't found quickly enough.

Additionally, the system doesn't handle server failures well. If a server is down, it might take a long time for the system to stop trying to connect to it, wasting valuable seconds on each attempt. While there's a mechanism to avoid retrying a failed server for 60 seconds, it's not very effective when a server is permanently out of service. One reason for this inefficiency is that the system's information about who the leaders are (stored in something called the meta cache) can become outdated, and it doesn't get updated if the system can still perform its tasks with the outdated information, even if doing so results in repeated connection failures.

Solution Introduction:
This ticket introduces a preliminary change aimed at improving how the system tracks the current leader for each piece of data. The idea is to add a new piece of information to the meta cache called "raft_config_opid," which records the latest confirmed leadership configuration for each tablet. This way, when the system receives new information about the leadership configuration (which can happen during normal operations from other servers), it can check this new information against what it already knows. If the new information is more up-to-date, the system can update its meta cache, potentially avoiding wasted efforts on trying to connect to servers that are no longer leaders or are down.
This diff, combined with D33197 and D33598, updates the meta-cache using TabletConsensusInfo that is piggybacked by a Write/Read/GetChanges/GetTransactionStatus ResponsePB when we sent a request to a non-leader but requires a leader to receive our request. These frequent RPC requests should be able to keep our meta-cache sufficiently up to date to avoid the situation that caused the CE.

Upgrade/Rollback safety:
The added field in the ResponsePBs is not to be persisted on disk, it is guarded by protobuf's backward compatibility
Jira: DB-9194

Test Plan:
Unit Testing:
1. ClientTest.TestMetacacheRefreshWhenSentToWrongLeader: Changes leadership of a RaftGroup after meta-cache is already filled in. This introduces a discrepancy between the information available in the meta-cache and the actual cluster configuration, and should return back a NOT_THE_LEADER error for our caller. Normally, this will prompt the TabletInvoker to try the next-in-line replica's Tablet Server, and using our test set up, this will guarantee that the TabletInvoker will retry the RPC at least 3 times. However, because this diff introduces the mechanism to refresh the meta-cache right away after a NOT_THE_LEADER error, we should observe that the RPC will succeed in 2 tries instead of one, the first attempt will piggyback the TabletConsensusInfo and update the meta-cache, while the other attempt will use that newest meta-cache and find the correct leader to send the request to.
2. CDCServiceTestMultipleServersOneTablet.TestGetChangesRpcTabletConsensusInfo: Since the GetChanges code path for updating meta-cache is sufficiently diverged from other RPC types, this test is introduced to explicitly check that when a cdc proxy receives a not the leader error message, its meta-cache should be refreshed.

Reviewers: mlillibridge, xCluster, hsunder

Reviewed By: mlillibridge

Subscribers: yql, jason, ycdcxcluster, hsunder, ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D33533
  • Loading branch information
SeanSong25 committed Apr 25, 2024
1 parent ee9321d commit 942de8f
Show file tree
Hide file tree
Showing 38 changed files with 901 additions and 117 deletions.
21 changes: 16 additions & 5 deletions src/yb/cdc/cdc_service.cc
Expand Up @@ -69,6 +69,8 @@
#include "yb/tablet/tablet_peer.h"
#include "yb/tablet/transaction_participant.h"

#include "yb/tserver/service_util.h"

#include "yb/util/flags.h"
#include "yb/util/format.h"
#include "yb/util/logging.h"
Expand Down Expand Up @@ -1546,11 +1548,19 @@ void CDCServiceImpl::GetChanges(

auto original_leader_term = tablet_peer ? tablet_peer->LeaderTerm() : OpId::kUnknownTerm;

if ((!tablet_peer || tablet_peer->IsNotLeader()) && req->serve_as_proxy()) {
// Forward GetChanges() to tablet leader. This commonly happens in Kubernetes setups.
auto context_ptr = std::make_shared<RpcContext>(std::move(context));
TabletLeaderGetChanges(req, resp, context_ptr, tablet_peer);
return;
bool isNotLeader = tablet_peer && tablet_peer->IsNotLeader();
if (!tablet_peer || isNotLeader) {
if (req->serve_as_proxy()) {
// Forward GetChanges() to tablet leader. This commonly happens in Kubernetes setups.
auto context_ptr = std::make_shared<RpcContext>(std::move(context));
VLOG(2)
<< "GetChangesRpc::TabletLeaderGetChanges is called because serve_as_proxy is turned on";
TabletLeaderGetChanges(req, resp, context_ptr, tablet_peer);
return;
}
}
if (isNotLeader) {
tserver::FillTabletConsensusInfo(resp, tablet_peer->tablet_id(), tablet_peer);
}

// If we can't serve this tablet...
Expand Down Expand Up @@ -3210,6 +3220,7 @@ void CDCServiceImpl::TabletLeaderGetChanges(
[this, resp, context, rpc_handle](const Status& status, GetChangesResponsePB&& new_resp) {
auto retained = rpcs_.Unregister(rpc_handle);
*resp = std::move(new_resp);
VLOG(1) << "GetChangesRPC TabletLeaderGetChanges finished with status: " << status;
RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), resp->error().code(), *context);
context->RespondSuccess();
});
Expand Down
2 changes: 2 additions & 0 deletions src/yb/cdc/cdc_service.proto
Expand Up @@ -545,6 +545,8 @@ message GetChangesResponsePB {

// Only set in xCluster streams when error is AUTO_FLAGS_CONFIG_VERSION_MISMATCH.
optional uint32 auto_flags_config_version = 12;

optional yb.tserver.TabletConsensusInfoPB tablet_consensus_info = 13;
}

message GetCheckpointRequestPB {
Expand Down
37 changes: 36 additions & 1 deletion src/yb/cdc/xcluster_rpc.cc
Expand Up @@ -38,6 +38,7 @@ DEFINE_test_flag(bool, xcluster_print_write_request, false,

using namespace std::literals;

using yb::tserver::TabletConsensusInfoPB;
using yb::tserver::TabletServerErrorPB;
using yb::tserver::TabletServerServiceProxy;
using yb::tserver::WriteRequestPB;
Expand Down Expand Up @@ -176,6 +177,16 @@ class XClusterWriteRpc : public rpc::Rpc, public client::internal::TabletRpc {
return resp_.has_error() ? &resp_.error() : nullptr;
}

bool RefreshMetaCacheWithResponse() override {
if (!resp_.has_tablet_consensus_info()) {
VLOG(1) << "Partial refresh of tablet for XClusterWrite RPC failed because the response did "
"not have a tablet_consensus_info";
return false;
}

return invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info());
}

private:
void SendRpcToTserver(int attempt_num) override {
InvokeAsync(
Expand Down Expand Up @@ -266,7 +277,21 @@ class GetChangesRpc : public rpc::Rpc, public client::internal::TabletRpc {
// Map CDC Errors to TabletServer Errors.
switch (resp_.error().code()) {
case cdc::CDCErrorPB::TABLET_NOT_FOUND:
last_error_.set_code(tserver::TabletServerErrorPB::TABLET_NOT_FOUND);
// If tablet_consensus_info is present, we know the problem is actually
// due to making the request to a follower instead of the leader so
// return NOT_THE_LEADER instead of TABLET_NOT_FOUND, so the tablet invoker
// knows the tablet is present on this node but its leader is not.
//
// The invoker needs to know the difference because in the not-a-leader case it just
// tries a different participant when looking for a leader, possibly updating its
// MetaCache using any returned TabletConsensusInfo first, while in the other case it
// marks the node as not having the tablet, blocking future follower reads to that
// tablet.
if (resp_.has_tablet_consensus_info()) {
last_error_.set_code(tserver::TabletServerErrorPB::NOT_THE_LEADER);
} else {
last_error_.set_code(tserver::TabletServerErrorPB::TABLET_NOT_FOUND);
}
if (resp_.error().has_status()) {
last_error_.mutable_status()->CopyFrom(resp_.error().status());
}
Expand Down Expand Up @@ -297,6 +322,16 @@ class GetChangesRpc : public rpc::Rpc, public client::internal::TabletRpc {
std::bind(&GetChangesRpc::Finished, self, Status::OK()));
}

bool RefreshMetaCacheWithResponse() override {
if (!resp_.has_tablet_consensus_info()) {
VLOG(1) << "Partial refresh of tablet for GetChanges RPC failed because the response did not "
"have a tablet_consensus_info";
return false;
}

return invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info());
}

private:
const std::string &tablet_id() const { return req_.tablet_id(); }

Expand Down
10 changes: 10 additions & 0 deletions src/yb/client/async_rpc.cc
Expand Up @@ -486,6 +486,16 @@ void AsyncRpcBase<Req, Resp>::ProcessResponseFromTserver(const Status& status) {
}
}

template <class Req, class Resp>
bool AsyncRpcBase<Req, Resp>::RefreshMetaCacheWithResponse() {
if (!resp_.has_tablet_consensus_info()) {
VLOG(1) << "Partial refresh of tablet for " << GetRpcName()
<< " RPC failed because the response did not have a tablet_consensus_info";
return false;
}

return tablet_invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info());
}

template <class Req, class Resp>
FlushExtraResult AsyncRpcBase<Req, Resp>::MakeFlushExtraResult() {
Expand Down
8 changes: 8 additions & 0 deletions src/yb/client/async_rpc.h
Expand Up @@ -147,6 +147,10 @@ class AsyncRpcBase : public AsyncRpc {
const Resp& resp() const { return resp_; }
Resp& resp() { return resp_; }

bool RefreshMetaCacheWithResponse() override;

virtual std::string GetRpcName() = 0;

protected:
// Returns `true` if caller should continue processing response, `false` otherwise.
bool CommonResponseCheck(const Status& status);
Expand Down Expand Up @@ -174,6 +178,8 @@ class WriteRpc : public AsyncRpcBase<tserver::WriteRequestPB, tserver::WriteResp

virtual ~WriteRpc();

std::string GetRpcName() override { return "Write"; }

private:
Status SwapResponses() override;
void CallRemoteMethod() override;
Expand All @@ -187,6 +193,8 @@ class ReadRpc : public AsyncRpcBase<tserver::ReadRequestPB, tserver::ReadRespons

virtual ~ReadRpc();

std::string GetRpcName() override { return "Read"; }

private:
Status SwapResponses() override;
void CallRemoteMethod() override;
Expand Down
181 changes: 181 additions & 0 deletions src/yb/client/client-test.cc
Expand Up @@ -69,12 +69,14 @@
#include "yb/gutil/stl_util.h"
#include "yb/gutil/strings/substitute.h"

#include "yb/integration-tests/cluster_itest_util.h"
#include "yb/integration-tests/mini_cluster.h"
#include "yb/integration-tests/yb_mini_cluster_test_base.h"

#include "yb/master/catalog_manager_if.h"
#include "yb/master/master.h"
#include "yb/master/master_client.pb.h"
#include "yb/master/master_cluster.proxy.h"
#include "yb/master/master_ddl.pb.h"
#include "yb/master/master_error.h"
#include "yb/master/mini_master.h"
Expand Down Expand Up @@ -103,6 +105,7 @@
#include "yb/util/status.h"
#include "yb/util/status_log.h"
#include "yb/util/stopwatch.h"
#include "yb/util/sync_point.h"
#include "yb/util/test_thread_holder.h"
#include "yb/util/thread.h"
#include "yb/util/tostring.h"
Expand Down Expand Up @@ -168,6 +171,9 @@ constexpr int kNumTablets = 2;
const std::string kKeyspaceName = "my_keyspace";
const std::string kPgsqlKeyspaceName = "psql" + kKeyspaceName;
const std::string kPgsqlSchemaName = "my_schema";
const std::string kPgsqlTableName = "table";
const std::string kPgsqlTableId = "tableid";
const std::string kPgsqlNamespaceName = "test_namespace";

} // namespace

Expand Down Expand Up @@ -205,6 +211,7 @@ class ClientTest: public YBMiniClusterTestBase<MiniCluster> {

ASSERT_NO_FATALS(CreateTable(kTableName, kNumTablets, &client_table_));
ASSERT_NO_FATALS(CreateTable(kTable2Name, 1, &client_table2_));
ASSERT_NO_FATALS(CreatePgSqlTable());
}

void DoTearDown() override {
Expand Down Expand Up @@ -398,6 +405,42 @@ class ClientTest: public YBMiniClusterTestBase<MiniCluster> {
ASSERT_OK(table->Create(table_name, num_tablets, schema_, client_.get()));
}

void CreatePgSqlTable() {
std::unique_ptr<YBTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(client_->CreateNamespace(
kPgsqlNamespaceName, YQL_DATABASE_PGSQL, "" /* creator */, "" /* ns_id */,
"" /* src_ns_id */, boost::none /* next_pg_oid */, nullptr /* txn */, false));
std::string kNamespaceId;
{
auto namespaces = ASSERT_RESULT(client_->ListNamespaces());
for (const auto& ns : namespaces) {
if (ns.id.name() == kPgsqlNamespaceName) {
kNamespaceId = ns.id.id();
break;
}
}
}
auto pgsql_table_name =
YBTableName(YQL_DATABASE_PGSQL, kNamespaceId, kPgsqlNamespaceName, kPgsqlTableName);

YBSchemaBuilder schema_builder;
schema_builder.AddColumn("key")->PrimaryKey()->Type(DataType::STRING)->NotNull();
schema_builder.AddColumn("value")->Type(DataType::INT64)->NotNull();
schema_builder.SetSchemaName(kPgsqlSchemaName);
EXPECT_OK(client_->CreateNamespaceIfNotExists(
kPgsqlNamespaceName, YQLDatabase::YQL_DATABASE_PGSQL, "" /* creator_role_name */,
kNamespaceId));
YBSchema schema;
EXPECT_OK(schema_builder.Build(&schema));
EXPECT_OK(table_creator->table_name(pgsql_table_name)
.table_id(kPgsqlTableId)
.schema(&schema)
.table_type(YBTableType::PGSQL_TABLE_TYPE)
.set_range_partition_columns({"key"})
.num_tablets(1)
.Create());
}

// Kills a tablet server.
// Boolean flags control whether to restart the tserver, and if so, whether to wait for it to
// finish bootstrapping.
Expand Down Expand Up @@ -2724,6 +2767,144 @@ TEST_F(ClientTest, RefreshPartitions) {
LOG(INFO) << "num_lookups_done: " << num_lookups_done;
}

Result<client::internal::RemoteTabletPtr> GetRemoteTablet(
const TabletId& tablet_id, bool use_cache, YBClient* client) {
std::promise<Result<client::internal::RemoteTabletPtr>> tablet_lookup_promise;
auto future = tablet_lookup_promise.get_future();
client->LookupTabletById(
tablet_id, /* table =*/ nullptr, master::IncludeInactive::kTrue,
master::IncludeDeleted::kFalse, CoarseMonoClock::Now() + MonoDelta::FromMilliseconds(1000),
[&tablet_lookup_promise](const Result<client::internal::RemoteTabletPtr>& result) {
tablet_lookup_promise.set_value(result);
},
client::UseCache(use_cache));
return VERIFY_RESULT(future.get());
}

// Without the ability to refresh the metacache using piggybacked TabletConsensusInfo from
// an RPC sent to the wrong leader, the logic for the tablet invoker to find the next valid tablet
// server to send a write request to is as follows:
// 1. Select the leader, provided:
// a. One exists, and
// b. It hasn't failed, and
// c. It isn't currently marked as a follower.
// 2. If there's no good leader select another replica, provided:
// a. It hasn't failed, and
// b. It hasn't rejected our write due to being a follower.
// 3. If we're out of appropriate replicas, force a lookup to the master
// to fetch new consensus configuration information.
// The tablet invoker always tries the replicas in the order that they are stored inside the
// replicas_ field of RemoteTablet. With the current logic, what should happen is that whenever
// the tablet_invoker sees that a tablet server it selected for a leader_only request was not the
// leader, it will refresh its metacache using the piggybacked TabletConsensusInfo from that
// tablet server, thus it will have the latest information about who the leader really is, and
// thus should always succeed with exactly 2 RPCs.
TEST_F(ClientTest, TestMetacacheRefreshWhenSentToWrongLeader) {
shared_ptr<YBTable> pgsql_table;
EXPECT_OK(client_->OpenTable(kPgsqlTableId, &pgsql_table));
std::shared_ptr<YBSession> session = CreateSession(client_.get());
rpc::Sidecars sidecars;
auto create_insert_pgsql_row = [&](const std::string& key) -> YBPgsqlWriteOpPtr {
auto pgsql_write_op = client::YBPgsqlWriteOp::NewInsert(pgsql_table, &sidecars);
PgsqlWriteRequestPB* psql_write_request = pgsql_write_op->mutable_request();
psql_write_request->add_range_column_values()->mutable_value()->set_string_value(key);
PgsqlColumnValuePB* pgsql_column = psql_write_request->add_column_values();
pgsql_column->set_column_id(pgsql_table->schema().ColumnId(1));
pgsql_column->mutable_expr()->mutable_value()->set_int64_value(3);
return pgsql_write_op;
};

// Create a Write Op to write a row to pgsql_table with key pgsql_key1
auto write_op = create_insert_pgsql_row("pgsql_key1");
session->Apply(write_op);
FlushSessionOrDie(session);

google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(client_->GetTabletsFromTableId(kPgsqlTableId, 0, &tablets));
const auto& tablet = tablets.Get(0);

// The metacache is populated after the row is written, so we can directly
// get the remote tablet from the metacache of our current client.
auto remote_tablet = ASSERT_RESULT(GetRemoteTablet(tablet.tablet_id(), true, client_.get()));

// Step 1: Find the leader's uuid from the list of replicas that came back.
int leader_index = -1;
auto replicas = remote_tablet->replicas_;
std::vector<std::string> tserver_uuids;
std::string leader_server_uuid;
for (unsigned int i = 0; i < replicas.size(); i++) {
tserver_uuids.push_back(replicas[i]->ts->permanent_uuid());
if (replicas[i]->role == PeerRole::LEADER) {
leader_server_uuid = replicas[i]->ts->permanent_uuid();
leader_index = i;
}
}
ASSERT_NE(leader_index, -1);

// Step 2: Change the leadership of the tablet replicas using leader step down, so that without
// the ability of refreshing metacache using piggybacked TabletConsensusInfo that was sent back
// from a wrong leader, the tablet invoker will always try at 3 times to correctly finish the
// write request.
int target_index = -1;
switch (leader_index) {
case 0:
case 1:
// If the current leader is the first or second replica, make the third replica the new
// leader. With the previous logic, if the previous leader is the first replica, Tablet
// Invoker will try the previous leader, fail, second replica, fail and try the third replica
// and succeed. If the previous leader is the second replica, the invoker will try the second
// replica, fail, then go back to the first replica, fail, then since the second replica is
// marked as follower, it will skip and try the third and succeed. After the ability of
// partially refreshing the meta-cache using piggybacked TabletConsensusInfo is added, the
// invoker should go directly to the third replica after the first failure.
target_index = 2;
break;
case 2:
// If the previous leader is the third replica, make the second replica the new leader.
// Previously, invoker will try previous leader, then try the first replica, then try the
// second replica. The current code will allow it to try the second replica directly after the
// NOT_THE_LEADER error returned from the request to the third replica.
target_index = 1;
break;
default:
break;
}
ASSERT_NE(target_index, -1);

const MonoDelta timeout = MonoDelta::FromSeconds(10);
const auto proxy_cache_ = std::make_unique<rpc::ProxyCache>(client_->messenger());
const master::MasterClusterProxy master_proxy(
proxy_cache_.get(), cluster_->mini_master()->bound_rpc_addr());
const auto ts_map = ASSERT_RESULT(itest::CreateTabletServerMap(master_proxy, proxy_cache_.get()));
auto target_details = ts_map.at(tserver_uuids[target_index]).get();
auto leader_ts = ts_map.at(leader_server_uuid).get();
ASSERT_OK(
(itest::WaitForAllPeersToCatchup(tablet.tablet_id(), TServerDetailsVector(ts_map), timeout)));
ASSERT_OK(itest::LeaderStepDown(
leader_ts, tablet.tablet_id(), target_details, timeout, false, nullptr));
ASSERT_OK(itest::WaitUntilLeader(target_details, tablet.tablet_id(), timeout));

// Step 3: Send another write RPC, this time the metacache will have stale leader, thus
// it will get a NOT_THE_LEADER error and refresh itself with the attached config, and
// succeed on the second try.
write_op = create_insert_pgsql_row("pgsql_key2");
auto* sync_point_instance = yb::SyncPoint::GetInstance();
Synchronizer sync;
int attempt_num = 0;
sync_point_instance->SetCallBack(
"TabletInvoker::Done",
[sync_point_instance, callback = sync.AsStdStatusCallback(), &attempt_num](void* arg) {
attempt_num = *reinterpret_cast<int*>(arg);
sync_point_instance->DisableProcessing();
callback(Status::OK());
});
sync_point_instance->EnableProcessing();
session->Apply(write_op);
FlushSessionOrDie(session);
ASSERT_OK(sync.Wait());
ASSERT_EQ(attempt_num, 2);
}

class ColocationClientTest: public ClientTest {
public:
void SetUp() override {
Expand Down

0 comments on commit 942de8f

Please sign in to comment.