From 942de8fb3db8eac24802808ce9544a55485e1163 Mon Sep 17 00:00:00 2001 From: Sean Song Date: Thu, 18 Apr 2024 00:12:04 -0700 Subject: [PATCH] [#20234] DocDB: Improved metacache to refresh using TabletConsensusInfo when received NOT_THE_LEADER error Summary: Problem Background: In our system, when a client needs to perform an operation on a specific tablet, it first needs to find out which server is currently responsible for that operation. If the operation is a WriteRpc for example, it must find the tablet leader server. However, the system's current method of figuring out the tablet leader is not very efficient. It tries to guess the leader based on a list of potential servers (peers), but this guessing game can be slow, especially when there are many servers or when the servers are located far apart geographically. This inefficiency can lead to operations failing because the leader wasn't found quickly enough. Additionally, the system doesn't handle server failures well. If a server is down, it might take a long time for the system to stop trying to connect to it, wasting valuable seconds on each attempt. While there's a mechanism to avoid retrying a failed server for 60 seconds, it's not very effective when a server is permanently out of service. One reason for this inefficiency is that the system's information about who the leaders are (stored in something called the meta cache) can become outdated, and it doesn't get updated if the system can still perform its tasks with the outdated information, even if doing so results in repeated connection failures. Solution Introduction: This ticket introduces a preliminary change aimed at improving how the system tracks the current leader for each piece of data. The idea is to add a new piece of information to the meta cache called "raft_config_opid," which records the latest confirmed leadership configuration for each tablet. This way, when the system receives new information about the leadership configuration (which can happen during normal operations from other servers), it can check this new information against what it already knows. If the new information is more up-to-date, the system can update its meta cache, potentially avoiding wasted efforts on trying to connect to servers that are no longer leaders or are down. This diff, combined with D33197 and D33598, updates the meta-cache using TabletConsensusInfo that is piggybacked by a Write/Read/GetChanges/GetTransactionStatus ResponsePB when we sent a request to a non-leader but requires a leader to receive our request. These frequent RPC requests should be able to keep our meta-cache sufficiently up to date to avoid the situation that caused the CE. Upgrade/Rollback safety: The added field in the ResponsePBs is not to be persisted on disk, it is guarded by protobuf's backward compatibility Jira: DB-9194 Test Plan: Unit Testing: 1. ClientTest.TestMetacacheRefreshWhenSentToWrongLeader: Changes leadership of a RaftGroup after meta-cache is already filled in. This introduces a discrepancy between the information available in the meta-cache and the actual cluster configuration, and should return back a NOT_THE_LEADER error for our caller. Normally, this will prompt the TabletInvoker to try the next-in-line replica's Tablet Server, and using our test set up, this will guarantee that the TabletInvoker will retry the RPC at least 3 times. However, because this diff introduces the mechanism to refresh the meta-cache right away after a NOT_THE_LEADER error, we should observe that the RPC will succeed in 2 tries instead of one, the first attempt will piggyback the TabletConsensusInfo and update the meta-cache, while the other attempt will use that newest meta-cache and find the correct leader to send the request to. 2. CDCServiceTestMultipleServersOneTablet.TestGetChangesRpcTabletConsensusInfo: Since the GetChanges code path for updating meta-cache is sufficiently diverged from other RPC types, this test is introduced to explicitly check that when a cdc proxy receives a not the leader error message, its meta-cache should be refreshed. Reviewers: mlillibridge, xCluster, hsunder Reviewed By: mlillibridge Subscribers: yql, jason, ycdcxcluster, hsunder, ybase, bogdan Differential Revision: https://phorge.dev.yugabyte.com/D33533 --- src/yb/cdc/cdc_service.cc | 21 +- src/yb/cdc/cdc_service.proto | 2 + src/yb/cdc/xcluster_rpc.cc | 37 +++- src/yb/client/async_rpc.cc | 10 + src/yb/client/async_rpc.h | 8 + src/yb/client/client-test.cc | 181 ++++++++++++++++++ src/yb/client/client.cc | 11 ++ src/yb/client/client.h | 8 + src/yb/client/meta_cache.cc | 121 +++++++++++- src/yb/client/meta_cache.h | 43 ++++- src/yb/client/tablet_rpc.cc | 42 ++-- src/yb/client/tablet_rpc.h | 24 +++ src/yb/client/transaction_rpc.cc | 16 ++ src/yb/common/CMakeLists.txt | 1 + src/yb/common/common_consensus_util.cc | 85 ++++++++ src/yb/common/common_consensus_util.h | 52 +++++ src/yb/consensus/consensus_meta.cc | 20 ++ src/yb/consensus/consensus_meta.h | 59 ++++++ src/yb/consensus/quorum_util.cc | 48 ----- src/yb/consensus/quorum_util.h | 9 +- src/yb/consensus/raft_consensus.cc | 4 + src/yb/consensus/raft_consensus.h | 4 + src/yb/consensus/replica_state.cc | 4 + src/yb/consensus/replica_state.h | 4 + .../integration-tests/cdc_service-int-test.cc | 66 +++++++ src/yb/master/master_tablet_service.cc | 3 +- src/yb/master/master_tablet_service.h | 3 +- src/yb/tablet/tablet_peer.h | 4 + src/yb/tserver/heartbeater.cc | 1 + src/yb/tserver/read_query.cc | 4 +- src/yb/tserver/read_query.h | 5 +- src/yb/tserver/service_util.cc | 35 ++-- src/yb/tserver/service_util.h | 52 ++++- src/yb/tserver/tablet_service.cc | 8 +- src/yb/tserver/tablet_service.h | 3 +- src/yb/tserver/tserver.proto | 10 + src/yb/tserver/tserver_fwd.h | 8 + src/yb/tserver/tserver_service.proto | 2 + 38 files changed, 901 insertions(+), 117 deletions(-) create mode 100644 src/yb/common/common_consensus_util.cc create mode 100644 src/yb/common/common_consensus_util.h diff --git a/src/yb/cdc/cdc_service.cc b/src/yb/cdc/cdc_service.cc index e537951f9e9..1b70aea7a70 100644 --- a/src/yb/cdc/cdc_service.cc +++ b/src/yb/cdc/cdc_service.cc @@ -69,6 +69,8 @@ #include "yb/tablet/tablet_peer.h" #include "yb/tablet/transaction_participant.h" +#include "yb/tserver/service_util.h" + #include "yb/util/flags.h" #include "yb/util/format.h" #include "yb/util/logging.h" @@ -1546,11 +1548,19 @@ void CDCServiceImpl::GetChanges( auto original_leader_term = tablet_peer ? tablet_peer->LeaderTerm() : OpId::kUnknownTerm; - if ((!tablet_peer || tablet_peer->IsNotLeader()) && req->serve_as_proxy()) { - // Forward GetChanges() to tablet leader. This commonly happens in Kubernetes setups. - auto context_ptr = std::make_shared(std::move(context)); - TabletLeaderGetChanges(req, resp, context_ptr, tablet_peer); - return; + bool isNotLeader = tablet_peer && tablet_peer->IsNotLeader(); + if (!tablet_peer || isNotLeader) { + if (req->serve_as_proxy()) { + // Forward GetChanges() to tablet leader. This commonly happens in Kubernetes setups. + auto context_ptr = std::make_shared(std::move(context)); + VLOG(2) + << "GetChangesRpc::TabletLeaderGetChanges is called because serve_as_proxy is turned on"; + TabletLeaderGetChanges(req, resp, context_ptr, tablet_peer); + return; + } + } + if (isNotLeader) { + tserver::FillTabletConsensusInfo(resp, tablet_peer->tablet_id(), tablet_peer); } // If we can't serve this tablet... @@ -3210,6 +3220,7 @@ void CDCServiceImpl::TabletLeaderGetChanges( [this, resp, context, rpc_handle](const Status& status, GetChangesResponsePB&& new_resp) { auto retained = rpcs_.Unregister(rpc_handle); *resp = std::move(new_resp); + VLOG(1) << "GetChangesRPC TabletLeaderGetChanges finished with status: " << status; RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), resp->error().code(), *context); context->RespondSuccess(); }); diff --git a/src/yb/cdc/cdc_service.proto b/src/yb/cdc/cdc_service.proto index fb71fdfb917..5c92186e8f9 100644 --- a/src/yb/cdc/cdc_service.proto +++ b/src/yb/cdc/cdc_service.proto @@ -545,6 +545,8 @@ message GetChangesResponsePB { // Only set in xCluster streams when error is AUTO_FLAGS_CONFIG_VERSION_MISMATCH. optional uint32 auto_flags_config_version = 12; + + optional yb.tserver.TabletConsensusInfoPB tablet_consensus_info = 13; } message GetCheckpointRequestPB { diff --git a/src/yb/cdc/xcluster_rpc.cc b/src/yb/cdc/xcluster_rpc.cc index 0783eedb2d8..767a02b379c 100644 --- a/src/yb/cdc/xcluster_rpc.cc +++ b/src/yb/cdc/xcluster_rpc.cc @@ -38,6 +38,7 @@ DEFINE_test_flag(bool, xcluster_print_write_request, false, using namespace std::literals; +using yb::tserver::TabletConsensusInfoPB; using yb::tserver::TabletServerErrorPB; using yb::tserver::TabletServerServiceProxy; using yb::tserver::WriteRequestPB; @@ -176,6 +177,16 @@ class XClusterWriteRpc : public rpc::Rpc, public client::internal::TabletRpc { return resp_.has_error() ? &resp_.error() : nullptr; } + bool RefreshMetaCacheWithResponse() override { + if (!resp_.has_tablet_consensus_info()) { + VLOG(1) << "Partial refresh of tablet for XClusterWrite RPC failed because the response did " + "not have a tablet_consensus_info"; + return false; + } + + return invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info()); + } + private: void SendRpcToTserver(int attempt_num) override { InvokeAsync( @@ -266,7 +277,21 @@ class GetChangesRpc : public rpc::Rpc, public client::internal::TabletRpc { // Map CDC Errors to TabletServer Errors. switch (resp_.error().code()) { case cdc::CDCErrorPB::TABLET_NOT_FOUND: - last_error_.set_code(tserver::TabletServerErrorPB::TABLET_NOT_FOUND); + // If tablet_consensus_info is present, we know the problem is actually + // due to making the request to a follower instead of the leader so + // return NOT_THE_LEADER instead of TABLET_NOT_FOUND, so the tablet invoker + // knows the tablet is present on this node but its leader is not. + // + // The invoker needs to know the difference because in the not-a-leader case it just + // tries a different participant when looking for a leader, possibly updating its + // MetaCache using any returned TabletConsensusInfo first, while in the other case it + // marks the node as not having the tablet, blocking future follower reads to that + // tablet. + if (resp_.has_tablet_consensus_info()) { + last_error_.set_code(tserver::TabletServerErrorPB::NOT_THE_LEADER); + } else { + last_error_.set_code(tserver::TabletServerErrorPB::TABLET_NOT_FOUND); + } if (resp_.error().has_status()) { last_error_.mutable_status()->CopyFrom(resp_.error().status()); } @@ -297,6 +322,16 @@ class GetChangesRpc : public rpc::Rpc, public client::internal::TabletRpc { std::bind(&GetChangesRpc::Finished, self, Status::OK())); } + bool RefreshMetaCacheWithResponse() override { + if (!resp_.has_tablet_consensus_info()) { + VLOG(1) << "Partial refresh of tablet for GetChanges RPC failed because the response did not " + "have a tablet_consensus_info"; + return false; + } + + return invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info()); + } + private: const std::string &tablet_id() const { return req_.tablet_id(); } diff --git a/src/yb/client/async_rpc.cc b/src/yb/client/async_rpc.cc index 5e91b347055..d1ff0798c87 100644 --- a/src/yb/client/async_rpc.cc +++ b/src/yb/client/async_rpc.cc @@ -486,6 +486,16 @@ void AsyncRpcBase::ProcessResponseFromTserver(const Status& status) { } } +template +bool AsyncRpcBase::RefreshMetaCacheWithResponse() { + if (!resp_.has_tablet_consensus_info()) { + VLOG(1) << "Partial refresh of tablet for " << GetRpcName() + << " RPC failed because the response did not have a tablet_consensus_info"; + return false; + } + + return tablet_invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info()); +} template FlushExtraResult AsyncRpcBase::MakeFlushExtraResult() { diff --git a/src/yb/client/async_rpc.h b/src/yb/client/async_rpc.h index f5841e03240..7334c9ba26b 100644 --- a/src/yb/client/async_rpc.h +++ b/src/yb/client/async_rpc.h @@ -147,6 +147,10 @@ class AsyncRpcBase : public AsyncRpc { const Resp& resp() const { return resp_; } Resp& resp() { return resp_; } + bool RefreshMetaCacheWithResponse() override; + + virtual std::string GetRpcName() = 0; + protected: // Returns `true` if caller should continue processing response, `false` otherwise. bool CommonResponseCheck(const Status& status); @@ -174,6 +178,8 @@ class WriteRpc : public AsyncRpcBase { ASSERT_NO_FATALS(CreateTable(kTableName, kNumTablets, &client_table_)); ASSERT_NO_FATALS(CreateTable(kTable2Name, 1, &client_table2_)); + ASSERT_NO_FATALS(CreatePgSqlTable()); } void DoTearDown() override { @@ -398,6 +405,42 @@ class ClientTest: public YBMiniClusterTestBase { ASSERT_OK(table->Create(table_name, num_tablets, schema_, client_.get())); } + void CreatePgSqlTable() { + std::unique_ptr table_creator(client_->NewTableCreator()); + ASSERT_OK(client_->CreateNamespace( + kPgsqlNamespaceName, YQL_DATABASE_PGSQL, "" /* creator */, "" /* ns_id */, + "" /* src_ns_id */, boost::none /* next_pg_oid */, nullptr /* txn */, false)); + std::string kNamespaceId; + { + auto namespaces = ASSERT_RESULT(client_->ListNamespaces()); + for (const auto& ns : namespaces) { + if (ns.id.name() == kPgsqlNamespaceName) { + kNamespaceId = ns.id.id(); + break; + } + } + } + auto pgsql_table_name = + YBTableName(YQL_DATABASE_PGSQL, kNamespaceId, kPgsqlNamespaceName, kPgsqlTableName); + + YBSchemaBuilder schema_builder; + schema_builder.AddColumn("key")->PrimaryKey()->Type(DataType::STRING)->NotNull(); + schema_builder.AddColumn("value")->Type(DataType::INT64)->NotNull(); + schema_builder.SetSchemaName(kPgsqlSchemaName); + EXPECT_OK(client_->CreateNamespaceIfNotExists( + kPgsqlNamespaceName, YQLDatabase::YQL_DATABASE_PGSQL, "" /* creator_role_name */, + kNamespaceId)); + YBSchema schema; + EXPECT_OK(schema_builder.Build(&schema)); + EXPECT_OK(table_creator->table_name(pgsql_table_name) + .table_id(kPgsqlTableId) + .schema(&schema) + .table_type(YBTableType::PGSQL_TABLE_TYPE) + .set_range_partition_columns({"key"}) + .num_tablets(1) + .Create()); + } + // Kills a tablet server. // Boolean flags control whether to restart the tserver, and if so, whether to wait for it to // finish bootstrapping. @@ -2724,6 +2767,144 @@ TEST_F(ClientTest, RefreshPartitions) { LOG(INFO) << "num_lookups_done: " << num_lookups_done; } +Result GetRemoteTablet( + const TabletId& tablet_id, bool use_cache, YBClient* client) { + std::promise> tablet_lookup_promise; + auto future = tablet_lookup_promise.get_future(); + client->LookupTabletById( + tablet_id, /* table =*/ nullptr, master::IncludeInactive::kTrue, + master::IncludeDeleted::kFalse, CoarseMonoClock::Now() + MonoDelta::FromMilliseconds(1000), + [&tablet_lookup_promise](const Result& result) { + tablet_lookup_promise.set_value(result); + }, + client::UseCache(use_cache)); + return VERIFY_RESULT(future.get()); +} + +// Without the ability to refresh the metacache using piggybacked TabletConsensusInfo from +// an RPC sent to the wrong leader, the logic for the tablet invoker to find the next valid tablet +// server to send a write request to is as follows: +// 1. Select the leader, provided: +// a. One exists, and +// b. It hasn't failed, and +// c. It isn't currently marked as a follower. +// 2. If there's no good leader select another replica, provided: +// a. It hasn't failed, and +// b. It hasn't rejected our write due to being a follower. +// 3. If we're out of appropriate replicas, force a lookup to the master +// to fetch new consensus configuration information. +// The tablet invoker always tries the replicas in the order that they are stored inside the +// replicas_ field of RemoteTablet. With the current logic, what should happen is that whenever +// the tablet_invoker sees that a tablet server it selected for a leader_only request was not the +// leader, it will refresh its metacache using the piggybacked TabletConsensusInfo from that +// tablet server, thus it will have the latest information about who the leader really is, and +// thus should always succeed with exactly 2 RPCs. +TEST_F(ClientTest, TestMetacacheRefreshWhenSentToWrongLeader) { + shared_ptr pgsql_table; + EXPECT_OK(client_->OpenTable(kPgsqlTableId, &pgsql_table)); + std::shared_ptr session = CreateSession(client_.get()); + rpc::Sidecars sidecars; + auto create_insert_pgsql_row = [&](const std::string& key) -> YBPgsqlWriteOpPtr { + auto pgsql_write_op = client::YBPgsqlWriteOp::NewInsert(pgsql_table, &sidecars); + PgsqlWriteRequestPB* psql_write_request = pgsql_write_op->mutable_request(); + psql_write_request->add_range_column_values()->mutable_value()->set_string_value(key); + PgsqlColumnValuePB* pgsql_column = psql_write_request->add_column_values(); + pgsql_column->set_column_id(pgsql_table->schema().ColumnId(1)); + pgsql_column->mutable_expr()->mutable_value()->set_int64_value(3); + return pgsql_write_op; + }; + + // Create a Write Op to write a row to pgsql_table with key pgsql_key1 + auto write_op = create_insert_pgsql_row("pgsql_key1"); + session->Apply(write_op); + FlushSessionOrDie(session); + + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(client_->GetTabletsFromTableId(kPgsqlTableId, 0, &tablets)); + const auto& tablet = tablets.Get(0); + + // The metacache is populated after the row is written, so we can directly + // get the remote tablet from the metacache of our current client. + auto remote_tablet = ASSERT_RESULT(GetRemoteTablet(tablet.tablet_id(), true, client_.get())); + + // Step 1: Find the leader's uuid from the list of replicas that came back. + int leader_index = -1; + auto replicas = remote_tablet->replicas_; + std::vector tserver_uuids; + std::string leader_server_uuid; + for (unsigned int i = 0; i < replicas.size(); i++) { + tserver_uuids.push_back(replicas[i]->ts->permanent_uuid()); + if (replicas[i]->role == PeerRole::LEADER) { + leader_server_uuid = replicas[i]->ts->permanent_uuid(); + leader_index = i; + } + } + ASSERT_NE(leader_index, -1); + + // Step 2: Change the leadership of the tablet replicas using leader step down, so that without + // the ability of refreshing metacache using piggybacked TabletConsensusInfo that was sent back + // from a wrong leader, the tablet invoker will always try at 3 times to correctly finish the + // write request. + int target_index = -1; + switch (leader_index) { + case 0: + case 1: + // If the current leader is the first or second replica, make the third replica the new + // leader. With the previous logic, if the previous leader is the first replica, Tablet + // Invoker will try the previous leader, fail, second replica, fail and try the third replica + // and succeed. If the previous leader is the second replica, the invoker will try the second + // replica, fail, then go back to the first replica, fail, then since the second replica is + // marked as follower, it will skip and try the third and succeed. After the ability of + // partially refreshing the meta-cache using piggybacked TabletConsensusInfo is added, the + // invoker should go directly to the third replica after the first failure. + target_index = 2; + break; + case 2: + // If the previous leader is the third replica, make the second replica the new leader. + // Previously, invoker will try previous leader, then try the first replica, then try the + // second replica. The current code will allow it to try the second replica directly after the + // NOT_THE_LEADER error returned from the request to the third replica. + target_index = 1; + break; + default: + break; + } + ASSERT_NE(target_index, -1); + + const MonoDelta timeout = MonoDelta::FromSeconds(10); + const auto proxy_cache_ = std::make_unique(client_->messenger()); + const master::MasterClusterProxy master_proxy( + proxy_cache_.get(), cluster_->mini_master()->bound_rpc_addr()); + const auto ts_map = ASSERT_RESULT(itest::CreateTabletServerMap(master_proxy, proxy_cache_.get())); + auto target_details = ts_map.at(tserver_uuids[target_index]).get(); + auto leader_ts = ts_map.at(leader_server_uuid).get(); + ASSERT_OK( + (itest::WaitForAllPeersToCatchup(tablet.tablet_id(), TServerDetailsVector(ts_map), timeout))); + ASSERT_OK(itest::LeaderStepDown( + leader_ts, tablet.tablet_id(), target_details, timeout, false, nullptr)); + ASSERT_OK(itest::WaitUntilLeader(target_details, tablet.tablet_id(), timeout)); + + // Step 3: Send another write RPC, this time the metacache will have stale leader, thus + // it will get a NOT_THE_LEADER error and refresh itself with the attached config, and + // succeed on the second try. + write_op = create_insert_pgsql_row("pgsql_key2"); + auto* sync_point_instance = yb::SyncPoint::GetInstance(); + Synchronizer sync; + int attempt_num = 0; + sync_point_instance->SetCallBack( + "TabletInvoker::Done", + [sync_point_instance, callback = sync.AsStdStatusCallback(), &attempt_num](void* arg) { + attempt_num = *reinterpret_cast(arg); + sync_point_instance->DisableProcessing(); + callback(Status::OK()); + }); + sync_point_instance->EnableProcessing(); + session->Apply(write_op); + FlushSessionOrDie(session); + ASSERT_OK(sync.Wait()); + ASSERT_EQ(attempt_num, 2); +} + class ColocationClientTest: public ClientTest { public: void SetUp() override { diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 2b32938487b..e7bef338622 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -228,6 +228,7 @@ using yb::master::WaitForYsqlBackendsCatalogVersionRequestPB; using yb::master::WaitForYsqlBackendsCatalogVersionResponsePB; using yb::rpc::Messenger; using yb::tserver::AllowSplitTablet; +using yb::tserver::TabletConsensusInfoPB; using namespace yb::size_literals; // NOLINT. @@ -2947,5 +2948,15 @@ void YBClient::ClearAllMetaCachesOnServer() { data_->meta_cache_->ClearAll(); } +bool YBClient::RefreshTabletInfoWithConsensusInfo( + const tserver::TabletConsensusInfoPB& newly_received_info) { + auto status = data_->meta_cache_->RefreshTabletInfoWithConsensusInfo(newly_received_info); + if(!status.ok()) { + VLOG(1) << "Partially refreshing meta-cache for tablet failed because " << status; + return false; + } + return true; +} + } // namespace client } // namespace yb diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 28dc32ddd36..2e689f2b978 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -99,6 +99,7 @@ class GetAutoFlagsConfigResponsePB; namespace tserver { class LocalTabletServer; +class TabletConsensusInfoPB; class TabletServerServiceProxy; } @@ -1029,6 +1030,12 @@ class YBClient { void ClearAllMetaCachesOnServer(); + // Uses the TabletConsensusInfo piggybacked from a response to + // refresh a RemoteTablet in metacache. Returns true if the + // RemoteTablet was indeed refreshed, false otherwise. + bool RefreshTabletInfoWithConsensusInfo( + const tserver::TabletConsensusInfoPB& newly_received_info); + private: class Data; @@ -1056,6 +1063,7 @@ class YBClient { FRIEND_TEST(ClientTest, TestGetTabletServerBlacklist); FRIEND_TEST(ClientTest, TestMasterDown); FRIEND_TEST(ClientTest, TestMasterLookupPermits); + FRIEND_TEST(ClientTest, TestMetacacheRefreshWhenSentToWrongLeader); FRIEND_TEST(ClientTest, TestReplicatedTabletWritesAndAltersWithLeaderElection); FRIEND_TEST(ClientTest, TestScanFaultTolerance); FRIEND_TEST(ClientTest, TestScanTimeout); diff --git a/src/yb/client/meta_cache.cc b/src/yb/client/meta_cache.cc index 55b9d14070c..e8487de4b32 100644 --- a/src/yb/client/meta_cache.cc +++ b/src/yb/client/meta_cache.cc @@ -53,6 +53,7 @@ #include "yb/client/table.h" #include "yb/client/yb_table_name.h" +#include "yb/common/common_consensus_util.h" #include "yb/common/wire_protocol.h" #include "yb/gutil/map-util.h" @@ -165,6 +166,11 @@ RemoteTabletServer::RemoteTabletServer(const master::TSInfoPB& pb) Update(pb); } +RemoteTabletServer::RemoteTabletServer(const consensus::RaftPeerPB& raft_peer) + : uuid_(raft_peer.permanent_uuid()) { + UpdateFromRaftPeer(raft_peer); +} + RemoteTabletServer::RemoteTabletServer(const string& uuid, const shared_ptr& proxy, const LocalTabletServer* local_tserver) @@ -223,6 +229,18 @@ void RemoteTabletServer::Update(const master::TSInfoPB& pb) { cloud_info_pb_ = pb.cloud_info(); } +void RemoteTabletServer::UpdateFromRaftPeer(const consensus::RaftPeerPB& raft_peer) { + if (raft_peer.permanent_uuid() != uuid_) { + VLOG(1) << "RemoteTabletServer " << raft_peer.permanent_uuid() + << " cannot be updated because the raft_peer has a wrong permanent_uuid"; + return; + } + std::lock_guard lock(mutex_); + private_rpc_hostports_ = raft_peer.last_known_private_addr(); + public_rpc_hostports_ = raft_peer.last_known_broadcast_addr(); + cloud_info_pb_ = raft_peer.cloud_info(); +} + bool RemoteTabletServer::IsLocal() const { return local_tserver_ != nullptr; } @@ -330,6 +348,33 @@ RemoteTablet::~RemoteTablet() { } } +Status RemoteTablet::RefreshFromRaftConfig( + const TabletServerMap& tservers, const consensus::RaftConfigPB& raft_config, + const consensus::ConsensusStatePB& consensus_state) { + std::lock_guard lock(mutex_); + std::vector> new_replicas; + std::string leader_uuid = ""; + for (const auto& peer : raft_config.peers()) { + auto tserver = FindPtrOrNull(tservers, peer.permanent_uuid()); + SCHECK(tserver, NotFound, Format("TServer with ID $0 not found.", peer.permanent_uuid())); + auto role = GetConsensusRole(peer.permanent_uuid(), consensus_state); + if (role == PeerRole::LEADER) { + leader_uuid = peer.permanent_uuid(); + } + new_replicas.emplace_back(std::make_shared(tserver.get(), role)); + } + replicas_ = std::move(new_replicas); + raft_config_opid_index_ = consensus_state.config().opid_index(); + VLOG(1) << "Raft config refresh succeeded with opid_index: " << raft_config_opid_index_ + << " for tablet: " << tablet_id_ + << ", replicas are now: " << ReplicasAsStringUnlocked(); + current_leader_uuid_ = leader_uuid; + return Status::OK(); + // Note that we do not update the stale_ variable here: because this is a partial refresh -- we + // are not updating the partition information, in particular -- it is not safe to mark us as + // no longer stale. +} + void RemoteTablet::Refresh( const TabletServerMap& tservers, const google::protobuf::RepeatedPtrField& replicas) { @@ -343,10 +388,14 @@ void RemoteTablet::Refresh( std::sort(old_uuids.begin(), old_uuids.end()); replicas_.clear(); bool has_new_replica = false; + current_leader_uuid_ = ""; for (const TabletLocationsPB_ReplicaPB& r : replicas) { auto it = tservers.find(r.ts_info().permanent_uuid()); CHECK(it != tservers.end()); replicas_.emplace_back(std::make_shared(it->second.get(), r.role())); + if(r.role() == PeerRole::LEADER) { + current_leader_uuid_ = r.ts_info().permanent_uuid(); + } has_new_replica = has_new_replica || !std::binary_search(old_uuids.begin(), old_uuids.end(), r.ts_info().permanent_uuid()); @@ -357,7 +406,7 @@ void RemoteTablet::Refresh( ++lookups_without_new_replicas_; } stale_ = false; - refresh_time_.store(MonoTime::Now(), std::memory_order_release); + full_refresh_time_.store(MonoTime::Now(), std::memory_order_release); } void RemoteTablet::MarkStale() { @@ -605,6 +654,11 @@ void RemoteTablet::MarkTServerAsFollower(const RemoteTabletServer* server) { << server->ToString() << ". Replicas: " << ReplicasAsStringUnlocked(); } +std::string RemoteTablet::current_leader_uuid() const { + SharedLock lock(mutex_); + return current_leader_uuid_; +} + std::string RemoteTablet::ReplicasAsString() const { SharedLock lock(mutex_); return ReplicasAsStringUnlocked(); @@ -719,10 +773,24 @@ void MetaCache::UpdateTabletServerUnlocked(const master::TSInfoPB& pb) { return; } - VLOG_WITH_PREFIX(1) << "Client caching new TabletServer " << permanent_uuid; + VLOG_WITH_PREFIX(1) << "Client caching new TabletServer from Master TSInfo " << permanent_uuid; CHECK(ts_cache_.emplace(permanent_uuid, std::make_shared(pb)).second); } +Status MetaCache::UpdateTabletServerWithRaftPeerUnlocked(const consensus::RaftPeerPB& pb) { + const std::string& permanent_uuid = pb.permanent_uuid(); + auto it = ts_cache_.find(permanent_uuid); + if (it != ts_cache_.end()) { + it->second->UpdateFromRaftPeer(pb); + return Status::OK(); + } + VLOG_WITH_PREFIX(1) << "Client caching new TabletServer from Raft Peer " << permanent_uuid; + SCHECK( + ts_cache_.emplace(permanent_uuid, std::make_shared(pb)).second, + IllegalState, "Failed to emplace a remote tablet server into tablet server cache"); + return Status::OK(); +} + // A (table, partition_key) --> tablet lookup. May be in-flight to a master, or // may be handled locally. // @@ -1138,6 +1206,55 @@ Result MetaCache::ProcessTabletLocation( return remote; } +Status MetaCache::RefreshTabletInfoWithConsensusInfo( + const tserver::TabletConsensusInfoPB& tablet_consensus_info) { + SCHECK( + tablet_consensus_info.has_consensus_state(), IllegalState, + Format( + "Tablet consensus info did not have a consensus state for tablet $0", + tablet_consensus_info.tablet_id())); + auto consensus_state = tablet_consensus_info.consensus_state(); + SCHECK( + consensus_state.config().has_opid_index(), IllegalState, + "TabletConsensusInfo does not have a valid opid_index"); + SCHECK( + consensus_state.has_leader_uuid() && + consensus::IsRaftConfigMember(consensus_state.leader_uuid(), consensus_state.config()), + Incomplete, "Requires a valid leader in TabletConsensusInfo for refresh"); + { + std::lock_guard lock(mutex_); + RemoteTabletPtr remote = FindPtrOrNull(tablets_by_id_, tablet_consensus_info.tablet_id()); + SCHECK( + remote, NotFound, + "Cannot find a matching remote tablet for the TabletConsensusInfo from a tablet " + "server."); + auto tablet_opid = remote->raft_config_opid_index(); + // If the opid_index of the incoming ConsensusInfo is the same as the current remote tablet, we + // can only proceed if the leader_uuid is different from the current one. This is because if the + // tablet we just requested is a hidden tablet, it is possible that it returns a NOT_THE_LEADER + // error, but in the consensus info it returned it is still the leader, so we will end up in a + // loop. + SCHECK( + consensus_state.config().opid_index() >= tablet_opid, Incomplete, + "TabletConsensusInfo contains a staler opid than the remote tablet"); + + SCHECK( + !(tablet_opid == consensus_state.config().opid_index() && + remote->current_leader_uuid() == consensus_state.leader_uuid()), + Incomplete, + "Incoming consensus information contains the same leader and participants as the remote " + "tablet so no need for refresh."); + + VLOG_WITH_PREFIX(1) << "Using Tablet Consensus Info to refresh metacache for tablet " + << tablet_consensus_info.tablet_id(); + consensus::RaftConfigPB raft_config = consensus_state.config(); + for (auto peer : raft_config.peers()) { + RETURN_NOT_OK(UpdateTabletServerWithRaftPeerUnlocked(peer)); + } + return remote->RefreshFromRaftConfig(ts_cache_, raft_config, consensus_state); + } +} + std::unordered_map::iterator MetaCache::InitTableDataUnlocked( const TableId& table_id, const VersionedTablePartitionListPtr& partitions) { VLOG_WITH_PREFIX_AND_FUNC(4) << Format( diff --git a/src/yb/client/meta_cache.h b/src/yb/client/meta_cache.h index 8ddceaa7658..0c9c48183c1 100644 --- a/src/yb/client/meta_cache.h +++ b/src/yb/client/meta_cache.h @@ -81,6 +81,7 @@ class EventStats; namespace client { class ClientTest_TestMasterLookupPermits_Test; +class ClientTest_TestMetacacheRefreshWhenSentToWrongLeader_Test; class YBClient; class YBTable; @@ -94,6 +95,7 @@ using ProcessedTablesMap = std::unordered_map>; using tserver::AllowSplitTablet; +using tserver::TabletConsensusInfoPB; // The information cached about a given tablet server in the cluster. // @@ -106,6 +108,7 @@ class RemoteTabletServer { const std::shared_ptr& proxy, const tserver::LocalTabletServer* local_tserver = nullptr); explicit RemoteTabletServer(const master::TSInfoPB& pb); + explicit RemoteTabletServer(const consensus::RaftPeerPB& raft_peer); ~RemoteTabletServer(); // Initialize the RPC proxy to this tablet server, if it is not already set up. @@ -117,6 +120,9 @@ class RemoteTabletServer { // Requires that 'pb''s UUID matches this server. void Update(const master::TSInfoPB& pb); + // Requires that the raft_peer's UUID matches this RemoteTabletServer + void UpdateFromRaftPeer(const consensus::RaftPeerPB& raft_peer); + // Is this tablet server local? bool IsLocal() const; @@ -233,7 +239,7 @@ struct ReplicasCount { // This class is thread-safe. class RemoteTablet : public RefCountedThreadSafe { public: - static constexpr int64_t kUnknownOpIdIndex = -1; + static constexpr int64_t kUnknownOpIdIndex = -2; RemoteTablet(std::string tablet_id, dockv::Partition partition, @@ -249,6 +255,13 @@ class RemoteTablet : public RefCountedThreadSafe { const TabletServerMap& tservers, const google::protobuf::RepeatedPtrField& replicas); + // Update this tablet's replica locations with raft_config from a Tablet Server. + Status RefreshFromRaftConfig( + const TabletServerMap& tservers, + const consensus::RaftConfigPB& raft_config, + const consensus::ConsensusStatePB& consensus_state + ); + // Mark this tablet as stale, indicating that the cached tablet metadata is // out of date. Staleness is checked by the MetaCache when // LookupTabletByKey() is called to determine whether the fast (non-network) @@ -341,7 +354,7 @@ class RemoteTablet : public RefCountedThreadSafe { const std::string& LogPrefix() const { return log_prefix_; } - MonoTime refresh_time() { return refresh_time_.load(std::memory_order_acquire); } + MonoTime full_refresh_time() { return full_refresh_time_.load(std::memory_order_acquire); } // See TabletLocationsPB::split_depth. uint64 split_depth() const { return split_depth_; } @@ -359,7 +372,11 @@ class RemoteTablet : public RefCountedThreadSafe { void AddReplicasAsJson(JsonWriter* writer) const; + std::string current_leader_uuid() const; + private: + FRIEND_TEST(client::ClientTest, TestMetacacheRefreshWhenSentToWrongLeader); + // Same as ReplicasAsString(), except that the caller must hold mutex_. std::string ReplicasAsStringUnlocked() const; @@ -374,18 +391,26 @@ class RemoteTablet : public RefCountedThreadSafe { mutable rw_spinlock mutex_; bool stale_; bool is_split_ = false; + + // The UUID of the current leader replica. + // Invariant: equal to the UUID of the replica in replicas_ with the role LEADER or "" if there is + // no such replica. + std::string current_leader_uuid_; + // The opid of the latest committed raft config that we fetched from a tablet // server. Defaulted to kUnknownOpIdIndex so when it is first created, it will be // refreshed when we next try a partial update because of stale leadership or raft config. int64_t raft_config_opid_index_; + // Can be updated only when remote tablet is refreshed after a lookup to master or via a + // TabletConsensusInfo piggybacked from a response. std::vector> replicas_; PartitionListVersion last_known_partition_list_version_ = 0; std::atomic replicas_count_{{0, 0}}; - // Last time this object was refreshed. Initialized to MonoTime::Min() so we don't have to be - // checking whether it has been initialized everytime we use this value. - std::atomic refresh_time_{MonoTime::Min()}; + // Last time this object was fully refreshed. Initialized to MonoTime::Min() so we don't have to + // be checking whether it has been initialized everytime we use this value. + std::atomic full_refresh_time_{MonoTime::Min()}; int64_t lookups_without_new_replicas_ = 0; @@ -599,6 +624,11 @@ class MetaCache : public RefCountedThreadSafe { void ClearAll(); + // TabletConsensusInfo is piggybacked from the response of a TServer. + // Returns Status::OK() if and only if the meta-cache was updated. + Status RefreshTabletInfoWithConsensusInfo( + const tserver::TabletConsensusInfoPB& tablet_consensus_info); + private: friend class LookupRpc; friend class LookupByKeyRpc; @@ -606,6 +636,7 @@ class MetaCache : public RefCountedThreadSafe { friend class LookupFullTableRpc; FRIEND_TEST(client::ClientTest, TestMasterLookupPermits); + FRIEND_TEST(client::ClientTest, TestMetacacheRefreshWhenSentToWrongLeader); // Lookup the given tablet by partition_start_key, only consulting local information. // Returns true and sets *remote_tablet if successful. @@ -622,6 +653,8 @@ class MetaCache : public RefCountedThreadSafe { // the latest host/port info for a server. void UpdateTabletServerUnlocked(const master::TSInfoPB& pb) REQUIRES(mutex_); + Status UpdateTabletServerWithRaftPeerUnlocked(const consensus::RaftPeerPB& pb) REQUIRES(mutex_); + // Notify appropriate callbacks that lookup of specified partition group of specified table // was failed because of specified status. void LookupByKeyFailed( diff --git a/src/yb/client/tablet_rpc.cc b/src/yb/client/tablet_rpc.cc index a4bd1df09ca..9c7827a4087 100644 --- a/src/yb/client/tablet_rpc.cc +++ b/src/yb/client/tablet_rpc.cc @@ -32,6 +32,7 @@ #include "yb/util/flags.h" #include "yb/util/logging.h" #include "yb/util/result.h" +#include "yb/util/sync_point.h" #include "yb/util/trace.h" using std::vector; @@ -185,24 +186,24 @@ void TabletInvoker::Execute(const std::string& tablet_id, bool leader_only) { if (consistent_prefix_ && !leader_only) { bool refresh_cache = false; if (PREDICT_FALSE(FLAGS_force_lookup_cache_refresh_secs > 0) && - MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() > + MonoTime::Now().GetDeltaSince(tablet_->full_refresh_time()).ToSeconds() > FLAGS_force_lookup_cache_refresh_secs) { refresh_cache = true; VLOG(1) << "Updating tablet " << tablet_->tablet_id() << " replicas cache " << "force_lookup_cache_refresh_secs: " << FLAGS_force_lookup_cache_refresh_secs - << ". " << MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() + << ". " << MonoTime::Now().GetDeltaSince(tablet_->full_refresh_time()).ToSeconds() << " seconds since the last update. Replicas in current cache: " << tablet_->ReplicasAsString(); } else if (FLAGS_lookup_cache_refresh_secs > 0 && - MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() > + MonoTime::Now().GetDeltaSince(tablet_->full_refresh_time()).ToSeconds() > FLAGS_lookup_cache_refresh_secs && !tablet_->IsReplicasCountConsistent()) { refresh_cache = true; VLOG(1) << "Updating tablet " << tablet_->tablet_id() << " replicas cache " << "force_lookup_cache_refresh_secs: " << FLAGS_force_lookup_cache_refresh_secs - << ". " << MonoTime::Now().GetDeltaSince(tablet_->refresh_time()).ToSeconds() + << ". " << MonoTime::Now().GetDeltaSince(tablet_->full_refresh_time()).ToSeconds() << " seconds since the last update. Replicas in current cache: " << tablet_->ReplicasAsString(); } @@ -263,7 +264,6 @@ void TabletInvoker::Execute(const std::string& tablet_id, bool leader_only) { VLOG(2) << "Tablet " << tablet_id_ << ": Sending " << command_->ToString() << " to replica " << current_ts_->ToString(); - rpc_->SendRpcToTserver(retrier_->attempt_num()); } @@ -284,10 +284,22 @@ Status TabletInvoker::FailToNewReplica(const Status& reason, // tablet server is marked as a follower so that it's not used during a retry for requests that // need to contact the leader only. This has the same effect as marking the replica as failed // for this specific RPC, but without affecting other RPCs. - followers_.emplace(current_ts_, FollowerData { - .status = STATUS(IllegalState, "Not the leader"), - .time = CoarseMonoClock::now() - }); + + // If RefreshMetaCacheWithResponse returns true it means the meta-cache information for this + // tablet is successfully refreshed using the tablet_consensus_info, so we need to clear the + // followers set to let tablet invoker use our latest leader tablet peer that has just been + // refreshed. + + if (rpc_->RefreshMetaCacheWithResponse()) { + bool refresh_succeeded = true; + TEST_SYNC_POINT_CALLBACK("CDCSDKMetaCacheRefreshTest::Refresh", &refresh_succeeded); + followers_.clear(); + } else { + followers_.emplace( + current_ts_, + FollowerData{ + .status = STATUS(IllegalState, "Not the leader"), .time = CoarseMonoClock::now()}); + } } else { VLOG(1) << "Failing " << command_->ToString() << " to a new replica: " << reason << ", old replica: " << yb::ToString(current_ts_); @@ -308,7 +320,6 @@ Status TabletInvoker::FailToNewReplica(const Status& reason, << " as failed. Replicas: " << tablet_->ReplicasAsString(); } } - auto status = retrier_->DelayedRetry(command_, reason); if (!status.ok()) { LOG(WARNING) << "Failed to schedule retry on new replica: " << status; @@ -389,6 +400,7 @@ bool TabletInvoker::Done(Status* status) { if (status->IsIllegalState() || status->IsServiceUnavailable() || status->IsAborted() || status->IsLeaderNotReadyToServe() || status->IsLeaderHasNoLease() || IsTabletConsideredNotFound(rsp_err, *status) || + IsTabletConsideredNonLeader(rsp_err, *status) || (status->IsTimedOut() && CoarseMonoClock::Now() < retrier_->deadline())) { VLOG(4) << "Retryable failure: " << *status << ", response: " << yb::ToString(rsp_err); @@ -423,7 +435,8 @@ bool TabletInvoker::Done(Status* status) { return true; } - if (status->IsIllegalState() || IsTabletConsideredNotFound(rsp_err, *status)) { + if (status->IsIllegalState() || IsTabletConsideredNotFound(rsp_err, *status) || + IsTabletConsideredNonLeader(rsp_err, *status)) { // The whole operation is completed if we can't schedule a retry. return !FailToNewReplica(*status, rsp_err).ok(); } else { @@ -470,6 +483,8 @@ bool TabletInvoker::Done(Status* status) { << "Unable to mark as leader: " << current_ts_->ToString() << " for " << tablet_->ToString(); } + int attempt_num = retrier_->attempt_num(); + TEST_SYNC_POINT_CALLBACK("TabletInvoker::Done", &attempt_num); return true; } @@ -489,6 +504,11 @@ bool TabletInvoker::IsLocalCall() const { return current_ts_ != nullptr && current_ts_->IsLocal(); } +bool TabletInvoker::RefreshTabletInfoWithConsensusInfo( + const tserver::TabletConsensusInfoPB& tablet_consensus_info) { + return client_->RefreshTabletInfoWithConsensusInfo(tablet_consensus_info); +} + std::shared_ptr TabletInvoker::proxy() const { return current_ts_->proxy(); } diff --git a/src/yb/client/tablet_rpc.h b/src/yb/client/tablet_rpc.h index 139240c2ca7..321ed1226ab 100644 --- a/src/yb/client/tablet_rpc.h +++ b/src/yb/client/tablet_rpc.h @@ -40,6 +40,7 @@ namespace yb { namespace tserver { +class TabletConsensusInfoPB; class TabletServerServiceProxy; } @@ -54,6 +55,13 @@ class TabletRpc { // attempt_num starts with 1. virtual void SendRpcToTserver(int attempt_num) = 0; + // Called to partially refresh a tablet's tablet peers using information + // piggybacked from a successful or failed response of a tablet RPC. + // The responses in this case will have a field called tablet_consensus_info, + // which carries the tablet server and replicas' raft config information. + // Returns true if we successfully updated the metacache, otherwise false. + virtual bool RefreshMetaCacheWithResponse() { return false; } + protected: ~TabletRpc() {} }; @@ -94,6 +102,9 @@ class TabletInvoker { bool is_consistent_prefix() const { return consistent_prefix_; } + bool RefreshTabletInfoWithConsensusInfo( + const tserver::TabletConsensusInfoPB& tablet_consensus_info); + private: friend class TabletRpcTest; FRIEND_TEST(TabletRpcTest, TabletInvokerSelectTabletServerRace); @@ -130,6 +141,19 @@ class TabletInvoker { current_ts_ != nullptr) || status.IsShutdownInProgress(); } + bool IsTabletConsideredNonLeader( + const tserver::TabletServerErrorPB* error_code, const Status& status) { + // The error code is undefined for some statuses like Aborted where we don't even send an RPC + // because the service is unavailable and thus don't have a response with an error code; to + // handle that here, we only check the error code for statuses we know have valid error codes + // and may have the error code we are looking for. + if (ErrorCode(error_code) == tserver::TabletServerErrorPB::NOT_THE_LEADER && + current_ts_ != nullptr) { + return status.IsNotFound() || status.IsIllegalState(); + } + return false; + } + YBClient* const client_; rpc::RpcCommand* const command_; diff --git a/src/yb/client/transaction_rpc.cc b/src/yb/client/transaction_rpc.cc index 906cef20322..3dff183d120 100644 --- a/src/yb/client/transaction_rpc.cc +++ b/src/yb/client/transaction_rpc.cc @@ -75,6 +75,10 @@ class TransactionRpcBase : public rpc::Rpc, public internal::TabletRpc { rpc::Rpc::Abort(); } + internal::TabletInvoker& GetInvoker() { + return invoker_; + } + private: void SendRpcToTserver(int attempt_num) override { InvokeAsync(invoker_.proxy().get(), @@ -112,6 +116,18 @@ class TransactionRpc : public TransactionRpcBase { return resp_.has_error() ? &resp_.error() : nullptr; } + bool RefreshMetaCacheWithResponse() override { + if constexpr (tserver::HasTabletConsensusInfo::value) { + if (resp_.has_tablet_consensus_info()) { + return GetInvoker().RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info()); + } + VLOG(1) << "Partial refresh of tablet for " << Traits::kName + << " RPC failed because the response did not " + "have a tablet_consensus_info"; + } + return false; + } + private: const std::string& tablet_id() const override { return req_.tablet_id(); diff --git a/src/yb/common/CMakeLists.txt b/src/yb/common/CMakeLists.txt index 42450910a30..24fb7aa1f09 100644 --- a/src/yb/common/CMakeLists.txt +++ b/src/yb/common/CMakeLists.txt @@ -77,6 +77,7 @@ ADD_YB_LIBRARY(yb_common_base set(COMMON_SRCS clock.cc colocated_util.cc + common_consensus_util.cc common_flags.cc common_types_util.cc common_util.cc diff --git a/src/yb/common/common_consensus_util.cc b/src/yb/common/common_consensus_util.cc new file mode 100644 index 00000000000..291802caf4d --- /dev/null +++ b/src/yb/common/common_consensus_util.cc @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// The following only applies to changes made to this file as part of YugaByte development. +// +// Portions Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include "yb/common/common_consensus_util.h" + +namespace yb::consensus { + +bool IsRaftConfigMember(const std::string& tserver_uuid, const RaftConfigPB& config) { + for (const RaftPeerPB& peer : config.peers()) { + if (peer.permanent_uuid() == tserver_uuid) { + return true; + } + } + return false; +} + +bool IsRaftConfigVoter(const std::string& tserver_uuid, const RaftConfigPB& config) { + for (const RaftPeerPB& peer : config.peers()) { + if (peer.permanent_uuid() == tserver_uuid) { + return peer.member_type() == PeerMemberType::VOTER; + } + } + return false; +} + +PeerRole GetConsensusRole(const std::string& permanent_uuid, const ConsensusStatePB& cstate) { + if (cstate.leader_uuid() == permanent_uuid) { + if (IsRaftConfigVoter(permanent_uuid, cstate.config())) { + return PeerRole::LEADER; + } + return PeerRole::NON_PARTICIPANT; + } + + for (const RaftPeerPB& peer : cstate.config().peers()) { + if (peer.permanent_uuid() == permanent_uuid) { + switch (peer.member_type()) { + case PeerMemberType::VOTER: + return PeerRole::FOLLOWER; + + // PRE_VOTER, PRE_OBSERVER peers are considered LEARNERs. + case PeerMemberType::PRE_VOTER: + case PeerMemberType::PRE_OBSERVER: + return PeerRole::LEARNER; + + case PeerMemberType::OBSERVER: + return PeerRole::READ_REPLICA; + + case PeerMemberType::UNKNOWN_MEMBER_TYPE: + return PeerRole::UNKNOWN_ROLE; + } + } + } + return PeerRole::NON_PARTICIPANT; +} + +} // namespace yb::consensus diff --git a/src/yb/common/common_consensus_util.h b/src/yb/common/common_consensus_util.h new file mode 100644 index 00000000000..ef58c3cd0ee --- /dev/null +++ b/src/yb/common/common_consensus_util.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// The following only applies to changes made to this file as part of YugaByte development. +// +// Portions Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#pragma once + +#include + +#include "yb/common/common_types.pb.h" + +#include "yb/consensus/metadata.pb.h" + +namespace yb::consensus { + +bool IsRaftConfigMember(const std::string& tserver_uuid, const RaftConfigPB& config); + +bool IsRaftConfigVoter(const std::string& tserver_uuid, const RaftConfigPB& config); + +// Determines the role that the peer with uuid 'uuid' plays in the Raft group. +// If the peer uuid is not a voter in the configuration, this function will return +// NON_PARTICIPANT, regardless of whether it is listed as leader in cstate. +PeerRole GetConsensusRole(const std::string& permanent_uuid, const ConsensusStatePB& cstate); + +} // namespace yb::consensus diff --git a/src/yb/consensus/consensus_meta.cc b/src/yb/consensus/consensus_meta.cc index 15ecb5371a8..8afd0f25a08 100644 --- a/src/yb/consensus/consensus_meta.cc +++ b/src/yb/consensus/consensus_meta.cc @@ -108,6 +108,9 @@ Status ConsensusMetadata::Load(FsManager* fs_manager, RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(fs_manager->encrypted_env(), VERIFY_RESULT(fs_manager->GetConsensusMetadataPath(tablet_id)), &cmeta->pb_)); + cmeta->committed_consensus_state_cache_.set_config(cmeta->committed_config()); + cmeta->committed_consensus_state_cache_.set_leader_uuid(cmeta->leader_uuid()); + cmeta->committed_consensus_state_cache_.set_current_term(cmeta->current_term()); cmeta->UpdateActiveRole(); // Needs to happen here as we sidestep the accessor APIs. RETURN_NOT_OK(cmeta->UpdateOnDiskSize()); cmeta_out->swap(cmeta); @@ -137,6 +140,7 @@ int64_t ConsensusMetadata::current_term() const { void ConsensusMetadata::set_current_term(int64_t term) { DCHECK_GE(term, kMinimumTerm); pb_.set_current_term(term); + committed_consensus_state_cache_.set_current_term(term); UpdateRoleAndTermCache(); } @@ -199,6 +203,7 @@ void ConsensusMetadata::set_committed_config(const RaftConfigPB& config) { if (!has_pending_config_) { UpdateActiveRole(); } + committed_consensus_state_cache_.set_config(committed_config()); } bool ConsensusMetadata::has_pending_config() const { @@ -250,6 +255,7 @@ const string& ConsensusMetadata::leader_uuid() const { void ConsensusMetadata::set_leader_uuid(const string& uuid) { leader_uuid_ = uuid; UpdateActiveRole(); + committed_consensus_state_cache_.set_leader_uuid(uuid); } void ConsensusMetadata::clear_leader_uuid() { @@ -279,6 +285,20 @@ ConsensusStatePB ConsensusMetadata::ToConsensusStatePB(ConsensusConfigType type) return cstate; } +ConsensusStatePB ConsensusMetadata::GetConsensusStateFromCache() const { + ConsensusStatePB consensus_state_cache; + *consensus_state_cache.mutable_config() = committed_consensus_state_cache_.config(); + consensus_state_cache.set_current_term(committed_consensus_state_cache_.current_term()); + auto outgoing_leader_uuid = committed_consensus_state_cache_.leader_uuid(); + // It's possible, though unlikely, that a new node from a pending configuration + // could be elected leader. Do not indicate a leader in this case. + if (PREDICT_TRUE(IsRaftConfigVoter( + outgoing_leader_uuid, consensus_state_cache.config()))) { + consensus_state_cache.set_leader_uuid(outgoing_leader_uuid); + } + return consensus_state_cache; +} + void ConsensusMetadata::MergeCommittedConsensusStatePB(const ConsensusStatePB& committed_cstate) { if (committed_cstate.current_term() > current_term()) { set_current_term(committed_cstate.current_term()); diff --git a/src/yb/consensus/consensus_meta.h b/src/yb/consensus/consensus_meta.h index b62629d76b4..51360677880 100644 --- a/src/yb/consensus/consensus_meta.h +++ b/src/yb/consensus/consensus_meta.h @@ -45,6 +45,8 @@ #include "yb/gutil/macros.h" +#include "yb/util/locks.h" +#include "yb/util/shared_lock.h" #include "yb/util/status_fwd.h" namespace yb { @@ -59,6 +61,51 @@ struct CloneSourceInfo { namespace consensus { +// This class is used to cache the values of consensus state so we don't have to wait for the +// current consensus state operation to finish before being able to retrieve the values. +// The fields here are updated atomically but independently so may independently be from before or +// after the current running consensus config update. +class ConsensusStateCache { + public: + RaftConfigPB config() const { + SharedLock lock(mutex_); + return config_; + } + + void set_config(const RaftConfigPB& config) { + std::lock_guard lock(mutex_); + config_ = config; + } + + std::string leader_uuid() const { + SharedLock lock(mutex_); + return leader_uuid_; + } + + void set_leader_uuid(std::string uuid) { + std::lock_guard lock(mutex_); + leader_uuid_ = uuid; + } + + int64_t current_term() const { + SharedLock lock(mutex_); + return current_term_; + } + + void set_current_term(int64_t term) { + std::lock_guard lock(mutex_); + current_term_ = term; + } + + private: + mutable rw_spinlock mutex_; + + // These fields are protected by mutex_. + RaftConfigPB config_; + std::string leader_uuid_; + int64_t current_term_; +}; + // Provides methods to read, write, and persist consensus-related metadata. // This partly corresponds to Raft Figure 2's "Persistent state on all servers". // @@ -164,6 +211,11 @@ class ConsensusMetadata { // leader_uuid field of the returned ConsensusStatePB will be cleared. ConsensusStatePB ToConsensusStatePB(ConsensusConfigType type) const; + // Copies the stored committed consensus info cache into a ConsensusStatePB object. + // It is possible for the current leader to not be a member of the committed configuration; + // in such case the leader_uuid of the returned ConsensusStatePB is cleared. This is thread safe. + ConsensusStatePB GetConsensusStateFromCache() const; + // Merge the committed consensus state from the source node during remote // bootstrap. // @@ -222,6 +274,13 @@ class ConsensusMetadata { // configuration change pending. // RaftConfig used by the peers when there is a pending config change operation. RaftConfigPB pending_config_; + + // Committed Consensus Info, stored as a struct for fast read-only access. Contains the + // currently committed consensus state, it is not atomic and needs to be locked by its internal + // lock before being modified. This cache will not take into account of the currently writing + // config change, therefore it could be behind by one committed config change. + ConsensusStateCache committed_consensus_state_cache_; + OpId pending_config_op_id_; // Cached role of the peer_uuid_ within the active configuration. diff --git a/src/yb/consensus/quorum_util.cc b/src/yb/consensus/quorum_util.cc index f793aef0d3e..24df0161e2a 100644 --- a/src/yb/consensus/quorum_util.cc +++ b/src/yb/consensus/quorum_util.cc @@ -52,24 +52,6 @@ using google::protobuf::RepeatedPtrField; using std::string; using strings::Substitute; -bool IsRaftConfigMember(const std::string& uuid, const RaftConfigPB& config) { - for (const RaftPeerPB& peer : config.peers()) { - if (peer.permanent_uuid() == uuid) { - return true; - } - } - return false; -} - -bool IsRaftConfigVoter(const std::string& uuid, const RaftConfigPB& config) { - for (const RaftPeerPB& peer : config.peers()) { - if (peer.permanent_uuid() == uuid) { - return peer.member_type() == PeerMemberType::VOTER; - } - } - return false; -} - Status GetRaftConfigMember(const RaftConfigPB& config, const std::string& uuid, RaftPeerPB* peer_pb) { @@ -190,36 +172,6 @@ PeerMemberType GetConsensusMemberType(const std::string& permanent_uuid, return PeerMemberType::UNKNOWN_MEMBER_TYPE; } -PeerRole GetConsensusRole(const std::string& permanent_uuid, const ConsensusStatePB& cstate) { - if (cstate.leader_uuid() == permanent_uuid) { - if (IsRaftConfigVoter(permanent_uuid, cstate.config())) { - return PeerRole::LEADER; - } - return PeerRole::NON_PARTICIPANT; - } - - for (const RaftPeerPB& peer : cstate.config().peers()) { - if (peer.permanent_uuid() == permanent_uuid) { - switch (peer.member_type()) { - case PeerMemberType::VOTER: - return PeerRole::FOLLOWER; - - // PRE_VOTER, PRE_OBSERVER peers are considered LEARNERs. - case PeerMemberType::PRE_VOTER: - case PeerMemberType::PRE_OBSERVER: - return PeerRole::LEARNER; - - case PeerMemberType::OBSERVER: - return PeerRole::READ_REPLICA; - - case PeerMemberType::UNKNOWN_MEMBER_TYPE: - return PeerRole::UNKNOWN_ROLE; - } - } - } - return PeerRole::NON_PARTICIPANT; -} - Status VerifyRaftConfig(const RaftConfigPB& config, RaftConfigState type) { std::set uuids; if (config.peers_size() == 0) { diff --git a/src/yb/consensus/quorum_util.h b/src/yb/consensus/quorum_util.h index e6ad6627a33..9348c2fd5d3 100644 --- a/src/yb/consensus/quorum_util.h +++ b/src/yb/consensus/quorum_util.h @@ -35,6 +35,7 @@ #include #include "yb/common/common_types.pb.h" +#include "yb/common/common_consensus_util.h" #include "yb/consensus/consensus_fwd.h" #include "yb/consensus/metadata.pb.h" @@ -52,9 +53,6 @@ enum RaftConfigState { COMMITTED_QUORUM, }; -bool IsRaftConfigMember(const std::string& uuid, const RaftConfigPB& config); -bool IsRaftConfigVoter(const std::string& uuid, const RaftConfigPB& config); - // Get the specified member of the config. // Returns Status::NotFound if a member with the specified uuid could not be // found in the config. @@ -101,11 +99,6 @@ size_t CountServersInTransition(const RaftConfigPB& config, const std::string& i // Calculates size of a configuration majority based on # of voters. size_t MajoritySize(size_t num_voters); -// Determines the role that the peer with uuid 'uuid' plays in the cluster. -// If the peer uuid is not a voter in the configuration, this function will return -// NON_PARTICIPANT, regardless of whether it is listed as leader in cstate. -PeerRole GetConsensusRole(const std::string& uuid, const ConsensusStatePB& cstate); - // Determines the member type that the peer with uuid 'uuid' plays in the cluster. // If the peer uuid is not a voter in the configuration, this function will return // UNKNOWN_MEMBER_TYPE. diff --git a/src/yb/consensus/raft_consensus.cc b/src/yb/consensus/raft_consensus.cc index 10e5e66feaa..9a7f4c4220a 100644 --- a/src/yb/consensus/raft_consensus.cc +++ b/src/yb/consensus/raft_consensus.cc @@ -3272,6 +3272,10 @@ ConsensusStatePB RaftConsensus::ConsensusStateUnlocked( return state_->ConsensusStateUnlocked(type); } +ConsensusStatePB RaftConsensus::GetConsensusStateFromCache() const { + return state_->GetConsensusStateFromCache(); +} + RaftConfigPB RaftConsensus::CommittedConfig() const { auto lock = state_->LockForRead(); return state_->GetCommittedConfigUnlocked(); diff --git a/src/yb/consensus/raft_consensus.h b/src/yb/consensus/raft_consensus.h index fcfe25dd2ee..48e9674acc9 100644 --- a/src/yb/consensus/raft_consensus.h +++ b/src/yb/consensus/raft_consensus.h @@ -208,6 +208,10 @@ class RaftConsensus : public std::enable_shared_from_this, ConsensusConfigType type, LeaderLeaseStatus* leader_lease_status) const override; + // Returns a copy of ConsensusState from the committed consensus state cache. + // This method is thread safe. + ConsensusStatePB GetConsensusStateFromCache() const; + RaftConfigPB CommittedConfig() const override; RaftConfigPB CommittedConfigUnlocked() const; diff --git a/src/yb/consensus/replica_state.cc b/src/yb/consensus/replica_state.cc index 64b57ec7a39..17823ae9c81 100644 --- a/src/yb/consensus/replica_state.cc +++ b/src/yb/consensus/replica_state.cc @@ -326,6 +326,10 @@ ConsensusStatePB ReplicaState::ConsensusStateUnlocked(ConsensusConfigType type) return cmeta_->ToConsensusStatePB(type); } +ConsensusStatePB ReplicaState::GetConsensusStateFromCache() const { + return cmeta_->GetConsensusStateFromCache(); +} + PeerRole ReplicaState::GetActiveRoleUnlocked() const { DCHECK(IsLocked()); return cmeta_->active_role(); diff --git a/src/yb/consensus/replica_state.h b/src/yb/consensus/replica_state.h index a16a96c767d..be27901e72c 100644 --- a/src/yb/consensus/replica_state.h +++ b/src/yb/consensus/replica_state.h @@ -188,6 +188,10 @@ class ReplicaState { // Return current consensus state summary. ConsensusStatePB ConsensusStateUnlocked(ConsensusConfigType type) const; + // Return a copy of the committed consensus state cache. + // This method is thread safe. + ConsensusStatePB GetConsensusStateFromCache() const; + // Returns the currently active Raft role. PeerRole GetActiveRoleUnlocked() const; diff --git a/src/yb/integration-tests/cdc_service-int-test.cc b/src/yb/integration-tests/cdc_service-int-test.cc index 0b60dba0b5b..eb11c45822b 100644 --- a/src/yb/integration-tests/cdc_service-int-test.cc +++ b/src/yb/integration-tests/cdc_service-int-test.cc @@ -29,9 +29,11 @@ #include "yb/gutil/walltime.h" #include "yb/integration-tests/cdc_test_util.h" +#include "yb/integration-tests/cluster_itest_util.h" #include "yb/integration-tests/mini_cluster.h" #include "yb/integration-tests/yb_mini_cluster_test_base.h" +#include "yb/master/master_cluster.proxy.h" #include "yb/master/master_defaults.h" #include "yb/master/mini_master.h" #include "yb/rpc/messenger.h" @@ -51,6 +53,7 @@ #include "yb/util/monotime.h" #include "yb/util/slice.h" #include "yb/util/status_format.h" +#include "yb/util/sync_point.h" #include "yb/util/test_thread_holder.h" #include "yb/util/tsan_util.h" #include "yb/yql/cql/ql/util/errcodes.h" @@ -1030,6 +1033,69 @@ class CDCServiceTestMultipleServersOneTablet : public CDCServiceTest { } }; +// When GetChanges RPC is sent to non-leader TServer and that TServer is +// serving as a proxy, the proxy should receive a TabletConsensusInfo that +// it can use to refresh its metacache. +TEST_F(CDCServiceTestMultipleServersOneTablet, TestGetChangesRpcTabletConsensusInfo) { + // Find the leader and followers for our tablet. + const MonoDelta timeout = MonoDelta::FromSeconds(10); + const auto proxy_cache_ = std::make_unique(client_->messenger()); + const master::MasterClusterProxy master_proxy( + proxy_cache_.get(), cluster_->mini_master()->bound_rpc_addr()); + const auto ts_map = ASSERT_RESULT(itest::CreateTabletServerMap(master_proxy, proxy_cache_.get())); + itest::TServerDetails* leader_ts; + ASSERT_OK(itest::FindTabletLeader(ts_map, GetTablet(), timeout, &leader_ts)); + std::vector follower_uuids; + std::vector follower_idx; + for (int i = 0; i < 3; ++i) { + auto tserver = cluster_->mini_tablet_server(i)->server(); + auto uuid = tserver->permanent_uuid(); + if (uuid != leader_ts->uuid()) { + follower_uuids.push_back(uuid); + follower_idx.push_back(i); + } + } + tserver::MiniTabletServer* leader_mini_tserver; + ASSERT_OK(WaitFor([&]() -> Result { + leader_mini_tserver = GetLeaderForTablet(GetTablet()); + return leader_mini_tserver != nullptr; + }, MonoDelta::FromSeconds(30) * kTimeMultiplier, "Wait for tablet to have a leader.")); + + // Pick the first follower as proxy to send a GetChanges request so it will + // request the leader location and cache it in its metacache. + HostPort endpoint; + endpoint = HostPort::FromBoundEndpoint( + cluster_->mini_tablet_server(follower_idx[0])->bound_rpc_addr()); + cdc_proxy_ = std::make_unique(&client_->proxy_cache(), endpoint); + stream_id_ = ASSERT_RESULT(CreateCDCStream(cdc_proxy_, table_.table()->id(), cdc::CDCSDK)); + GetChangesResponsePB change_resp; + ASSERT_NO_FATALS(GetAllChanges(GetTablet(), stream_id_, &change_resp)); + + // Step down the leader onto the second follower. + auto target_details = ts_map.at(follower_uuids[1]).get(); + ASSERT_OK((itest::WaitForAllPeersToCatchup(GetTablet(), TServerDetailsVector(ts_map), timeout))); + ASSERT_OK(itest::LeaderStepDown(leader_ts, GetTablet(), target_details, timeout, false, nullptr)); + ASSERT_OK(itest::WaitUntilLeader(target_details, GetTablet(), timeout)); + + // The first follower will now first request the previous leader, + // find out it is not the leader, and receive a TabletConsensusInfo to refresh its metacache. + auto* sync_point_instance = yb::SyncPoint::GetInstance(); + Synchronizer sync; + bool refresh_succeeded = false; + sync_point_instance->SetCallBack( + "CDCSDKMetaCacheRefreshTest::Refresh", + [sync_point_instance, callback = sync.AsStdStatusCallback(), &refresh_succeeded](void* arg) { + refresh_succeeded = *reinterpret_cast(arg); + sync_point_instance->DisableProcessing(); + callback(Status::OK()); + }); + sync_point_instance->EnableProcessing(); + GetChangesResponsePB change_resp1; + ASSERT_NO_FATALS(GetAllChanges(GetTablet(), stream_id_, &change_resp1)); + ASSERT_OK(sync.Wait()); + ASSERT_TRUE(refresh_succeeded); +} + TEST_F(CDCServiceTestMultipleServersOneTablet, TestMetricsAfterServerFailure) { // Test that the metric value is not time since epoch after a leadership change. docdb::DisableYcqlPackedRow(); diff --git a/src/yb/master/master_tablet_service.cc b/src/yb/master/master_tablet_service.cc index 481cb526a3c..8659dcb6f8e 100644 --- a/src/yb/master/master_tablet_service.cc +++ b/src/yb/master/master_tablet_service.cc @@ -53,7 +53,8 @@ MasterTabletServiceImpl::MasterTabletServiceImpl(MasterTabletServer* server, Mas Result> MasterTabletServiceImpl::GetTabletForRead( const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer, - YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet) { + YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet, + tserver::ReadResponsePB* resp) { // Ignore looked_up_tablet_peer. SCOPED_LEADER_SHARED_LOCK(l, master_->catalog_manager_impl()); diff --git a/src/yb/master/master_tablet_service.h b/src/yb/master/master_tablet_service.h index 3ba9bd8ec40..cf9330ebb61 100644 --- a/src/yb/master/master_tablet_service.h +++ b/src/yb/master/master_tablet_service.h @@ -53,7 +53,8 @@ class MasterTabletServiceImpl : public tserver::TabletServiceImpl { private: Result> GetTabletForRead( const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer, - YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet) override; + YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet, + tserver::ReadResponsePB* resp) override; Master *const master_; DISALLOW_COPY_AND_ASSIGN(MasterTabletServiceImpl); diff --git a/src/yb/tablet/tablet_peer.h b/src/yb/tablet/tablet_peer.h index 924628f96d6..34780c22dd8 100644 --- a/src/yb/tablet/tablet_peer.h +++ b/src/yb/tablet/tablet_peer.h @@ -481,6 +481,10 @@ class TabletPeer : public std::enable_shared_from_this, Preparer* DEBUG_GetPreparer(); + std::string Tserver_uuid() { + return local_peer_pb_.permanent_uuid(); + } + protected: friend class RefCountedThreadSafe; friend class TabletPeerTest; diff --git a/src/yb/tserver/heartbeater.cc b/src/yb/tserver/heartbeater.cc index 4912782c446..8e0fde89152 100644 --- a/src/yb/tserver/heartbeater.cc +++ b/src/yb/tserver/heartbeater.cc @@ -652,6 +652,7 @@ void Heartbeater::Thread::RunThread() { // Config the "last heartbeat response" to indicate that we need to register // -- since we've never registered before, we know this to be true. last_hb_response_.set_needs_reregister(true); + // Have the Master request a full tablet report on 2nd HB. last_hb_response_.set_needs_full_tablet_report(false); diff --git a/src/yb/tserver/read_query.cc b/src/yb/tserver/read_query.cc index 16aa2afcd93..d26b4840042 100644 --- a/src/yb/tserver/read_query.cc +++ b/src/yb/tserver/read_query.cc @@ -280,7 +280,7 @@ Status ReadQuery::DoPerform() { // At this point we expect that we don't have pure read serializable transactions, and // always write read intents to detect conflicts with other writes. leader_peer = VERIFY_RESULT(LookupLeaderTablet( - server_.tablet_peer_lookup(), req_->tablet_id(), std::move(peer_tablet))); + server_.tablet_peer_lookup(), req_->tablet_id(), resp_, std::move(peer_tablet))); // Serializable read adds intents, i.e. writes data. // We should check for memory pressure in this case. RETURN_NOT_OK(CheckWriteThrottling(req_->rejection_score(), leader_peer.peer.get())); @@ -288,7 +288,7 @@ Status ReadQuery::DoPerform() { } else { abstract_tablet_ = VERIFY_RESULT(read_tablet_provider_.GetTabletForRead( req_->tablet_id(), std::move(peer_tablet.tablet_peer), - req_->consistency_level(), AllowSplitTablet::kFalse)); + req_->consistency_level(), AllowSplitTablet::kFalse, resp_)); leader_peer.leader_term = OpId::kUnknownTerm; } diff --git a/src/yb/tserver/read_query.h b/src/yb/tserver/read_query.h index 666dfe9c285..06f9ed38aa6 100644 --- a/src/yb/tserver/read_query.h +++ b/src/yb/tserver/read_query.h @@ -38,8 +38,9 @@ namespace tserver { class ReadTabletProvider { public: virtual Result> GetTabletForRead( - const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer, - YBConsistencyLevel consistency_level, AllowSplitTablet allow_split_tablet) = 0; + const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer, + YBConsistencyLevel consistency_level, AllowSplitTablet allow_split_tablet, + ReadResponsePB* resp) = 0; virtual ~ReadTabletProvider() = default; }; diff --git a/src/yb/tserver/service_util.cc b/src/yb/tserver/service_util.cc index 4c494061e14..334b24a8fdb 100644 --- a/src/yb/tserver/service_util.cc +++ b/src/yb/tserver/service_util.cc @@ -203,25 +203,6 @@ Status LeaderTabletPeer::FillTerm() { return Status::OK(); } -Result LookupLeaderTablet( - TabletPeerLookupIf* tablet_manager, - const std::string& tablet_id, - TabletPeerTablet peer) { - if (peer.tablet_peer) { - LOG_IF(DFATAL, peer.tablet_peer->tablet_id() != tablet_id) - << "Mismatching table ids: peer " << peer.tablet_peer->tablet_id() - << " vs " << tablet_id; - LOG_IF(DFATAL, !peer.tablet) << "Empty tablet pointer for tablet id : " << tablet_id; - } else { - peer = VERIFY_RESULT(LookupTabletPeer(tablet_manager, tablet_id)); - } - LeaderTabletPeer result; - result.FillTabletPeer(std::move(peer)); - - RETURN_NOT_OK(result.FillTerm()); - return result; -} - Status CheckPeerIsReady( const tablet::TabletPeer& tablet_peer, AllowSplitTablet allow_split_tablet) { auto consensus_result = tablet_peer.GetConsensus(); @@ -262,6 +243,11 @@ Status CheckPeerIsLeader(const tablet::TabletPeer& tablet_peer) { return ResultToStatus(LeaderTerm(tablet_peer)); } +bool IsErrorCodeNotTheLeader(const Status& status) { + auto code = TabletServerError::FromStatus(status); + return code && code.value() == TabletServerErrorPB::NOT_THE_LEADER; +} + namespace { template @@ -318,7 +304,7 @@ Result LookupTabletPeer( Result> GetTablet( TabletPeerLookupIf* tablet_manager, const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer, YBConsistencyLevel consistency_level, - AllowSplitTablet allow_split_tablet) { + AllowSplitTablet allow_split_tablet, ReadResponsePB* resp) { tablet::TabletPtr tablet_ptr = nullptr; if (tablet_peer) { DCHECK_EQ(tablet_peer->tablet_id(), tablet_id); @@ -338,8 +324,13 @@ Result> GetTablet( LOG(FATAL) << "--TEST_assert_reads_from_follower_rejected_because_of_staleness is true but " "consistency level is invalid: YBConsistencyLevel::STRONG"; } - - RETURN_NOT_OK(CheckPeerIsLeader(*tablet_peer)); + auto status = CheckPeerIsLeader(*tablet_peer); + if (!status.ok()) { + if (IsErrorCodeNotTheLeader(status)) { + FillTabletConsensusInfo(resp, tablet_id, tablet_peer); + } + return status; + } } else { auto s = CheckPeerIsLeader(*tablet_peer.get()); diff --git a/src/yb/tserver/service_util.h b/src/yb/tserver/service_util.h index c4133797752..eb068436cc8 100644 --- a/src/yb/tserver/service_util.h +++ b/src/yb/tserver/service_util.h @@ -19,8 +19,12 @@ #include +#include "yb/cdc/cdc_service.pb.h" + #include "yb/common/wire_protocol.h" +#include "yb/consensus/consensus.h" #include "yb/consensus/consensus_error.h" +#include "yb/consensus/raft_consensus.h" #include "yb/rpc/rpc_context.h" #include "yb/server/clock.h" @@ -157,6 +161,8 @@ Result LookupTabletPeer( TabletPeerLookupIf* tablet_manager, const Slice& tablet_id); +bool IsErrorCodeNotTheLeader(const Status& status); + template Result LookupTabletPeerOrRespond( TabletPeerLookupIf* tablet_manager, @@ -202,10 +208,46 @@ struct LeaderTabletPeer { void FillTabletPeer(TabletPeerTablet source); }; +// Note this method expects that the Resp class must have a field tablet_consensus_info field; +// it is used to piggyback a TabletConsensusInfo for the receiver of this response to refresh +// its meta-cache. +template +void FillTabletConsensusInfo(Resp* resp, const TabletId& tablet_id, tablet::TabletPeerPtr peer) { + if constexpr (HasTabletConsensusInfo::value) { + auto outgoing_tablet_consensus_info = resp->mutable_tablet_consensus_info(); + outgoing_tablet_consensus_info->set_tablet_id(tablet_id); + + if (auto consensus = peer->GetRaftConsensus()) { + *(outgoing_tablet_consensus_info->mutable_consensus_state()) = + consensus.get()->GetConsensusStateFromCache(); + VLOG(1) << "Sending out Consensus state for tablet: " << tablet_id << ", leader TServer is: " + << outgoing_tablet_consensus_info->consensus_state().leader_uuid(); + } + } +} + +template Result LookupLeaderTablet( - TabletPeerLookupIf* tablet_manager, - const std::string& tablet_id, - TabletPeerTablet peer = TabletPeerTablet()); + TabletPeerLookupIf* tablet_manager, const TabletId& tablet_id, Resp* resp, + TabletPeerTablet peer = TabletPeerTablet()) { + if (peer.tablet_peer) { + LOG_IF(DFATAL, peer.tablet_peer->tablet_id() != tablet_id) + << "Mismatching table ids: peer " << peer.tablet_peer->tablet_id() << " vs " << tablet_id; + LOG_IF(DFATAL, !peer.tablet) << "Empty tablet pointer for tablet id : " << tablet_id; + } else { + peer = VERIFY_RESULT(LookupTabletPeer(tablet_manager, tablet_id)); + } + LeaderTabletPeer result; + result.FillTabletPeer(std::move(peer)); + auto status = result.FillTerm(); + if(!status.ok()) { + if(IsErrorCodeNotTheLeader(status)) { + FillTabletConsensusInfo(resp, tablet_id, result.peer); + } + return status; + } + return result; +} // The "peer" argument could be provided by the caller in case the caller has already performed // the LookupTabletPeerOrRespond call, and we only need to fill the leader term. @@ -216,7 +258,7 @@ LeaderTabletPeer LookupLeaderTabletOrRespond( RespClass* resp, rpc::RpcContext* context, TabletPeerTablet peer = TabletPeerTablet()) { - auto result = LookupLeaderTablet(tablet_manager, tablet_id, std::move(peer)); + auto result = LookupLeaderTablet(tablet_manager, tablet_id, resp, std::move(peer)); if (!result.ok()) { SetupErrorAndRespond(resp->mutable_error(), result.status(), context); return LeaderTabletPeer(); @@ -237,7 +279,7 @@ Status CheckPeerIsReady( Result> GetTablet( TabletPeerLookupIf* tablet_manager, const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer, YBConsistencyLevel consistency_level, - AllowSplitTablet allow_split_tablet); + AllowSplitTablet allow_split_tablet, ReadResponsePB* resp = nullptr); Status CheckWriteThrottling(double score, tablet::TabletPeer* tablet_peer); diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index c0a7ffff7f6..7f6d6f07e9d 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -496,9 +496,10 @@ class ScanResultChecksummer { Result> TabletServiceImpl::GetTabletForRead( const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer, - YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet) { + YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet, + tserver::ReadResponsePB* resp) { return GetTablet(server_->tablet_peer_lookup(), tablet_id, std::move(tablet_peer), - consistency_level, allow_split_tablet); + consistency_level, allow_split_tablet, resp); } TabletServiceImpl::TabletServiceImpl(TabletServerIf* server) @@ -2196,7 +2197,8 @@ Status TabletServiceImpl::PerformWrite( VLOG(2) << "Received Write RPC: " << req->DebugString(); UpdateClock(*req, server_->Clock()); - auto tablet = VERIFY_RESULT(LookupLeaderTablet(server_->tablet_peer_lookup(), req->tablet_id())); + auto tablet = + VERIFY_RESULT(LookupLeaderTablet(server_->tablet_peer_lookup(), req->tablet_id(), resp)); RETURN_NOT_OK(CheckWriteThrottling(req->rejection_score(), tablet.peer.get())); if (tablet.tablet->metadata()->hidden()) { diff --git a/src/yb/tserver/tablet_service.h b/src/yb/tserver/tablet_service.h index dc5ef93d748..288464b8171 100644 --- a/src/yb/tserver/tablet_service.h +++ b/src/yb/tserver/tablet_service.h @@ -222,7 +222,8 @@ void ClearUniverseUuid(const ClearUniverseUuidRequestPB* req, Result> GetTabletForRead( const TabletId& tablet_id, tablet::TabletPeerPtr tablet_peer, - YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet) override; + YBConsistencyLevel consistency_level, tserver::AllowSplitTablet allow_split_tablet, + tserver::ReadResponsePB* resp) override; Result DoChecksum(const ChecksumRequestPB* req, CoarseTimePoint deadline); diff --git a/src/yb/tserver/tserver.proto b/src/yb/tserver/tserver.proto index 291564f8be8..fa7bd5f2b1b 100644 --- a/src/yb/tserver/tserver.proto +++ b/src/yb/tserver/tserver.proto @@ -42,11 +42,17 @@ import "yb/common/ql_protocol.proto"; import "yb/common/pgsql_protocol.proto"; import "yb/common/transaction.proto"; import "yb/common/wire_protocol.proto"; +import "yb/consensus/metadata.proto"; import "yb/docdb/docdb.proto"; import "yb/tablet/operations.proto"; import "yb/tablet/tablet.proto"; import "yb/tserver/tserver_types.proto"; +message TabletConsensusInfoPB { + optional bytes tablet_id = 1; + optional consensus.ConsensusStatePB consensus_state = 2; +} + // A batched set of insert/mutate requests. message WriteRequestPB { // TODO(proto3) reserved 2, 3, 9; @@ -134,6 +140,8 @@ message WriteResponsePB { optional ReadHybridTimePB used_read_time = 13; optional fixed64 local_limit_ht = 14; + + optional TabletConsensusInfoPB tablet_consensus_info = 15; } // A list tablets request @@ -254,6 +262,8 @@ message ReadResponsePB { optional ReadHybridTimePB used_read_time = 9; optional fixed64 local_limit_ht = 10; + + optional TabletConsensusInfoPB tablet_consensus_info = 11; } message GetTabletKeyRangesRequestPB { diff --git a/src/yb/tserver/tserver_fwd.h b/src/yb/tserver/tserver_fwd.h index f1402c1cc0a..c2d12d99787 100644 --- a/src/yb/tserver/tserver_fwd.h +++ b/src/yb/tserver/tserver_fwd.h @@ -57,5 +57,13 @@ YB_STRONGLY_TYPED_BOOL(AllowSplitTablet); using TransactionPoolProvider = std::function; +template > +struct HasTabletConsensusInfo : std::false_type {}; + +template +struct HasTabletConsensusInfo< + T, std::void_t().tablet_consensus_info())>> + : std::true_type {}; + } // namespace tserver } // namespace yb diff --git a/src/yb/tserver/tserver_service.proto b/src/yb/tserver/tserver_service.proto index da213b94a79..caf9fcc28a6 100644 --- a/src/yb/tserver/tserver_service.proto +++ b/src/yb/tserver/tserver_service.proto @@ -278,6 +278,8 @@ message GetTransactionStatusResponsePB { // Note: The field might not be present on older versions of YB and hence its existence // should be checked before usage by the reader. repeated AppStatusPB deadlock_reason = 8; + + optional TabletConsensusInfoPB tablet_consensus_info = 9; } message GetOldTransactionsRequestPB {