From 1fa2ee4339a030f499ce499ff2211a1acbc6250a Mon Sep 17 00:00:00 2001 From: Sean Song Date: Tue, 23 Apr 2024 16:20:40 -0700 Subject: [PATCH] [#20234] Metacache improvement: Refresh Meta-cache when successful RPC responds Summary: Background: As detailed in https://phorge.dev.yugabyte.com/D33533, the YBClient currently fails to update its meta-cache when there are changes in a raft group's configuration. This lapse can lead to inefficiencies such as persistent follower reads not recognizing the addition of closer followers. Consequently, even if there are suitable nearer followers, the system continues to rely on more distant followers as long as the RPCs are successful. Solution: This update proposes enhancing RPC mechanisms (Read, Write, GetChanges, GetTransactionStatus) by appending the current raft_config_opid_index to each request. Upon receiving a request, if the raft_config_opid_index is outdated compared to the committed raft config opid index on the TabletPeer handling the request, the Tablet Server will include updated raft consensus state information in its response. This change aims to ensure that the meta-cache remains current, thus improving the system's efficiency in recognizing and utilizing the optimal server configurations for processing requests. This adjustment is part of a series of updates (alongside D33197 and D33598) designed to keep the meta-cache sufficiently current, thereby preventing the inefficiencies previously caused by outdated cache information. A flag enable_metacache_partial_refresh is added to turn the feature on, it is by default of right now. Upgrade/Rollback safety: The additional field in the Response and Request Protobufs is temporary and will not be stored on disk, maintaining compatibility and safety during potential system upgrades or rollbacks. Jira: DB-9194 Test Plan: Jenkins: urgent Full Coverage Testing: Added a test flag FLAGS_TEST_always_return_consensus_Info_for_succeeded_rpc which will be turned on during debug mode. This flag will prompt the GetRaftConfigOpidIndex method on RemoteTablet to always return an OpId Index of value -2. So when the Tablet server is about to send back a successful response, it will find out that the request's piggybacked OpId index is stale, thus piggyback a TabletConsensusInfo to the response. When we receive the response in the aforementioned RPCs, if this flag is turned on, it will use a DCHECK to verify that if the RPC response can contain a TabletConsensusInfo and that the response was successful, then it must be the case that the TabletConsensusInfo exists in the response. This essentially allows us to leverage all the existing tests in the code base that exercises these RPCs to DCHECK our code path. Unit testing: Added metacache_refresh_itest.cc, which contains the following tests: TestMetacacheRefreshFromFollowerRead: 1. Sets up an external mini-cluster. 2. Fills in the meta-cache by issuing a write op. 3. Change the raft configuration of the tablet group by blacklisting a node and adding a node. 4. Verify the next ConsistentPrefix read successfully refreshes meta-cache using a sync point. TestMetacacheNoRefreshFromWrite: 1. Turns off the FLAGS_TEST_always_return_consensus_Info_for_succeeded_rpc 2. Fills in the meta-cache by issuing a write op. 3. Issue another write op and observe that no refresh happened. Reviewers: mlillibridge, xCluster, hsunder Reviewed By: mlillibridge Subscribers: bogdan, ybase, ycdcxcluster Differential Revision: https://phorge.dev.yugabyte.com/D34272 --- src/yb/cdc/cdc_service.cc | 4 +- src/yb/cdc/cdc_service.proto | 2 + src/yb/cdc/xcluster_rpc.cc | 10 + src/yb/client/async_rpc.cc | 6 + src/yb/client/async_rpc.h | 2 + src/yb/client/client-test.cc | 6 + src/yb/client/client.cc | 4 + src/yb/client/client.h | 2 + src/yb/client/meta_cache.cc | 13 + src/yb/client/meta_cache.h | 2 + src/yb/client/tablet_rpc.cc | 34 ++- src/yb/client/tablet_rpc.h | 21 +- src/yb/client/transaction_rpc.cc | 7 + src/yb/integration-tests/CMakeLists.txt | 1 + .../integration-tests/cdc_service-int-test.cc | 3 + .../metacache_refresh-itest.cc | 263 ++++++++++++++++++ src/yb/tserver/read_query.cc | 39 ++- src/yb/tserver/service_util.cc | 17 +- src/yb/tserver/service_util.h | 21 ++ src/yb/tserver/tablet_service.cc | 5 + src/yb/tserver/tserver.proto | 4 + src/yb/tserver/tserver_fwd.h | 7 + src/yb/tserver/tserver_service.proto | 1 + 23 files changed, 452 insertions(+), 22 deletions(-) create mode 100644 src/yb/integration-tests/metacache_refresh-itest.cc diff --git a/src/yb/cdc/cdc_service.cc b/src/yb/cdc/cdc_service.cc index 1b70aea7a70..b75b2b90a7d 100644 --- a/src/yb/cdc/cdc_service.cc +++ b/src/yb/cdc/cdc_service.cc @@ -1582,7 +1582,8 @@ void CDCServiceImpl::GetChanges( resp->mutable_error(), CDCErrorPB::LEADER_NOT_READY, context); - + // Fill in tablet consensus info now we know that the tablet exists and is leader. + tserver::FillTabletConsensusInfoIfRequestOpIdStale(tablet_peer, req, resp); auto stream_meta_ptr = RPC_VERIFY_RESULT( GetStream(stream_id, RefreshStreamMapOption::kIfInitiatedState), resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); @@ -1615,7 +1616,6 @@ void CDCServiceImpl::GetChanges( streaming_checkpoint_pb.set_key(""); streaming_checkpoint_pb.set_write_id(0); resp->mutable_cdc_sdk_checkpoint()->CopyFrom(streaming_checkpoint_pb); - context.RespondSuccess(); return; } diff --git a/src/yb/cdc/cdc_service.proto b/src/yb/cdc/cdc_service.proto index 5c92186e8f9..8b7ed91ddae 100644 --- a/src/yb/cdc/cdc_service.proto +++ b/src/yb/cdc/cdc_service.proto @@ -328,6 +328,8 @@ message GetChangesRequestPB { optional uint32 auto_flags_config_version = 13; optional CDCSDKRequestSource cdcsdk_request_source = 14 [default = DEBEZIUM]; + + optional int64 raft_config_opid_index = 15; } message KeyValuePairPB { diff --git a/src/yb/cdc/xcluster_rpc.cc b/src/yb/cdc/xcluster_rpc.cc index 767a02b379c..443b26a5287 100644 --- a/src/yb/cdc/xcluster_rpc.cc +++ b/src/yb/cdc/xcluster_rpc.cc @@ -178,6 +178,7 @@ class XClusterWriteRpc : public rpc::Rpc, public client::internal::TabletRpc { } bool RefreshMetaCacheWithResponse() override { + DCHECK(client::internal::CheckIfConsensusInfoUnexpectedlyMissing(req_, resp_)); if (!resp_.has_tablet_consensus_info()) { VLOG(1) << "Partial refresh of tablet for XClusterWrite RPC failed because the response did " "not have a tablet_consensus_info"; @@ -187,6 +188,10 @@ class XClusterWriteRpc : public rpc::Rpc, public client::internal::TabletRpc { return invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info()); } + void SetRequestRaftConfigOpidIndex(int64_t opid_index) override { + req_.set_raft_config_opid_index(opid_index); + } + private: void SendRpcToTserver(int attempt_num) override { InvokeAsync( @@ -323,6 +328,7 @@ class GetChangesRpc : public rpc::Rpc, public client::internal::TabletRpc { } bool RefreshMetaCacheWithResponse() override { + DCHECK(client::internal::CheckIfConsensusInfoUnexpectedlyMissing(req_, resp_)); if (!resp_.has_tablet_consensus_info()) { VLOG(1) << "Partial refresh of tablet for GetChanges RPC failed because the response did not " "have a tablet_consensus_info"; @@ -332,6 +338,10 @@ class GetChangesRpc : public rpc::Rpc, public client::internal::TabletRpc { return invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info()); } + void SetRequestRaftConfigOpidIndex(int64_t opid_index) override { + req_.set_raft_config_opid_index(opid_index); + } + private: const std::string &tablet_id() const { return req_.tablet_id(); } diff --git a/src/yb/client/async_rpc.cc b/src/yb/client/async_rpc.cc index d1ff0798c87..2f3a312f603 100644 --- a/src/yb/client/async_rpc.cc +++ b/src/yb/client/async_rpc.cc @@ -488,6 +488,7 @@ void AsyncRpcBase::ProcessResponseFromTserver(const Status& status) { template bool AsyncRpcBase::RefreshMetaCacheWithResponse() { + DCHECK(client::internal::CheckIfConsensusInfoUnexpectedlyMissing(req_, resp_)); if (!resp_.has_tablet_consensus_info()) { VLOG(1) << "Partial refresh of tablet for " << GetRpcName() << " RPC failed because the response did not have a tablet_consensus_info"; @@ -497,6 +498,11 @@ bool AsyncRpcBase::RefreshMetaCacheWithResponse() { return tablet_invoker_.RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info()); } +template +void AsyncRpcBase::SetRequestRaftConfigOpidIndex(int64_t opid_index) { + req_.set_raft_config_opid_index(opid_index); +} + template FlushExtraResult AsyncRpcBase::MakeFlushExtraResult() { return {GetPropagatedHybridTime(resp_), diff --git a/src/yb/client/async_rpc.h b/src/yb/client/async_rpc.h index 7334c9ba26b..809a92b7cc9 100644 --- a/src/yb/client/async_rpc.h +++ b/src/yb/client/async_rpc.h @@ -149,6 +149,8 @@ class AsyncRpcBase : public AsyncRpc { bool RefreshMetaCacheWithResponse() override; + void SetRequestRaftConfigOpidIndex(int64_t opid_index) override; + virtual std::string GetRpcName() = 0; protected: diff --git a/src/yb/client/client-test.cc b/src/yb/client/client-test.cc index c48e4abdb53..abf767ff6e6 100644 --- a/src/yb/client/client-test.cc +++ b/src/yb/client/client-test.cc @@ -140,6 +140,8 @@ DECLARE_int32(rocksdb_level0_file_num_compaction_trigger); METRIC_DECLARE_counter(rpcs_queue_overflow); +DECLARE_bool(enable_metacache_partial_refresh); + using namespace std::literals; // NOLINT using namespace std::placeholders; @@ -2800,6 +2802,8 @@ Result GetRemoteTablet( // tablet server, thus it will have the latest information about who the leader really is, and // thus should always succeed with exactly 2 RPCs. TEST_F(ClientTest, TestMetacacheRefreshWhenSentToWrongLeader) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_metacache_partial_refresh) = + true; shared_ptr pgsql_table; EXPECT_OK(client_->OpenTable(kPgsqlTableId, &pgsql_table)); std::shared_ptr session = CreateSession(client_.get()); @@ -2878,6 +2882,8 @@ TEST_F(ClientTest, TestMetacacheRefreshWhenSentToWrongLeader) { const auto ts_map = ASSERT_RESULT(itest::CreateTabletServerMap(master_proxy, proxy_cache_.get())); auto target_details = ts_map.at(tserver_uuids[target_index]).get(); auto leader_ts = ts_map.at(leader_server_uuid).get(); + // Step down to target replica and wait until the leader is elected on the target. + // Wait for all peers to get ready. ASSERT_OK( (itest::WaitForAllPeersToCatchup(tablet.tablet_id(), TServerDetailsVector(ts_map), timeout))); ASSERT_OK(itest::LeaderStepDown( diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index e7bef338622..fb99bbd8339 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -2958,5 +2958,9 @@ bool YBClient::RefreshTabletInfoWithConsensusInfo( return true; } +int64_t YBClient::GetRaftConfigOpidIndex(const TabletId& tablet_id) { + return data_->meta_cache_->GetRaftConfigOpidIndex(tablet_id); +} + } // namespace client } // namespace yb diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 2e689f2b978..27fed46bf23 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -1036,6 +1036,8 @@ class YBClient { bool RefreshTabletInfoWithConsensusInfo( const tserver::TabletConsensusInfoPB& newly_received_info); + int64_t GetRaftConfigOpidIndex(const TabletId& tablet_id); + private: class Data; diff --git a/src/yb/client/meta_cache.cc b/src/yb/client/meta_cache.cc index e8487de4b32..7f5fc68d9a2 100644 --- a/src/yb/client/meta_cache.cc +++ b/src/yb/client/meta_cache.cc @@ -125,6 +125,7 @@ METRIC_DEFINE_event_stats( DECLARE_string(placement_cloud); DECLARE_string(placement_region); +DECLARE_bool(TEST_always_return_consensus_info_for_succeeded_rpc); namespace yb { @@ -1255,6 +1256,18 @@ Status MetaCache::RefreshTabletInfoWithConsensusInfo( } } +int64_t MetaCache::GetRaftConfigOpidIndex(const TabletId& tablet_id) { + SharedLock lock(mutex_); + if (GetAtomicFlag(&FLAGS_TEST_always_return_consensus_info_for_succeeded_rpc)) { + return RemoteTablet::kUnknownOpIdIndex; + } + auto remote_tablet = FindPtrOrNull(tablets_by_id_, tablet_id); + if (remote_tablet == nullptr) { + return RemoteTablet::kUnknownOpIdIndex; + } + return remote_tablet->raft_config_opid_index(); +} + std::unordered_map::iterator MetaCache::InitTableDataUnlocked( const TableId& table_id, const VersionedTablePartitionListPtr& partitions) { VLOG_WITH_PREFIX_AND_FUNC(4) << Format( diff --git a/src/yb/client/meta_cache.h b/src/yb/client/meta_cache.h index 0c9c48183c1..d0f4f3cf556 100644 --- a/src/yb/client/meta_cache.h +++ b/src/yb/client/meta_cache.h @@ -629,6 +629,8 @@ class MetaCache : public RefCountedThreadSafe { Status RefreshTabletInfoWithConsensusInfo( const tserver::TabletConsensusInfoPB& tablet_consensus_info); + int64_t GetRaftConfigOpidIndex(const TabletId& tablet_id); + private: friend class LookupRpc; friend class LookupByKeyRpc; diff --git a/src/yb/client/tablet_rpc.cc b/src/yb/client/tablet_rpc.cc index 9c7827a4087..bb7951a1620 100644 --- a/src/yb/client/tablet_rpc.cc +++ b/src/yb/client/tablet_rpc.cc @@ -29,6 +29,7 @@ #include "yb/tserver/tserver_error.h" #include "yb/tserver/tserver_service.proxy.h" +#include "yb/util/debug-util.h" #include "yb/util/flags.h" #include "yb/util/logging.h" #include "yb/util/result.h" @@ -55,6 +56,16 @@ DEFINE_UNKNOWN_int32(lookup_cache_refresh_secs, 60, "When non-zero, specifies ho DEFINE_test_flag(int32, assert_failed_replicas_less_than, 0, "If greater than 0, this process will crash if the number of failed replicas for " "a RemoteTabletServer is greater than the specified number."); +DEFINE_test_flag( + bool, always_return_consensus_info_for_succeeded_rpc, false, + "If set to true, we will always pass a stale raft_config_opid_index to the request when it is " + "possible for the request. This is turned on in debug mode to test that our metacache " + "will always be refreshed when a successful Write/Read/TransactionStatus/GetChanges RPC " + "responds."); +DEFINE_RUNTIME_bool( + enable_metacache_partial_refresh, false, + "If set, we will attempt to refresh the tablet metadata cache with a TabletConsensusInfoPB in " + "the tablet invoker."); using namespace std::placeholders; @@ -264,11 +275,14 @@ void TabletInvoker::Execute(const std::string& tablet_id, bool leader_only) { VLOG(2) << "Tablet " << tablet_id_ << ": Sending " << command_->ToString() << " to replica " << current_ts_->ToString(); + int64_t opid_index = client_->GetRaftConfigOpidIndex(tablet_id_); + rpc_->SetRequestRaftConfigOpidIndex(opid_index); rpc_->SendRpcToTserver(retrier_->attempt_num()); } Status TabletInvoker::FailToNewReplica(const Status& reason, - const tserver::TabletServerErrorPB* error_code) { + const tserver::TabletServerErrorPB* error_code, + bool consensus_info_refresh_succeeded) { TRACE_TO(trace_, "FailToNewReplica($0)", reason.ToString()); if (ErrorCode(error_code) == tserver::TabletServerErrorPB::STALE_FOLLOWER) { VLOG(1) << "Stale follower for " << command_->ToString() << " just retry"; @@ -289,10 +303,9 @@ Status TabletInvoker::FailToNewReplica(const Status& reason, // tablet is successfully refreshed using the tablet_consensus_info, so we need to clear the // followers set to let tablet invoker use our latest leader tablet peer that has just been // refreshed. - - if (rpc_->RefreshMetaCacheWithResponse()) { - bool refresh_succeeded = true; - TEST_SYNC_POINT_CALLBACK("CDCSDKMetaCacheRefreshTest::Refresh", &refresh_succeeded); + if (consensus_info_refresh_succeeded) { + TEST_SYNC_POINT_CALLBACK( + "CDCSDKMetaCacheRefreshTest::Refresh", &consensus_info_refresh_succeeded); followers_.clear(); } else { followers_.emplace( @@ -333,7 +346,14 @@ bool TabletInvoker::Done(Status* status) { bool assign_new_leader = assign_new_leader_; assign_new_leader_ = false; - + bool consensus_info_refresh_succeeded = false; + if (GetAtomicFlag(&FLAGS_enable_metacache_partial_refresh)) { + consensus_info_refresh_succeeded = rpc_->RefreshMetaCacheWithResponse(); + if (status->ok()) { + TEST_SYNC_POINT_CALLBACK( + "TabletInvoker::RefreshFinishedWithOkRPCResponse", &consensus_info_refresh_succeeded); + } + } if (status->IsAborted() || retrier_->finished()) { if (status->ok()) { *status = retrier_->controller().status(); @@ -438,7 +458,7 @@ bool TabletInvoker::Done(Status* status) { if (status->IsIllegalState() || IsTabletConsideredNotFound(rsp_err, *status) || IsTabletConsideredNonLeader(rsp_err, *status)) { // The whole operation is completed if we can't schedule a retry. - return !FailToNewReplica(*status, rsp_err).ok(); + return !FailToNewReplica(*status, rsp_err, consensus_info_refresh_succeeded).ok(); } else { tserver::TabletServerDelay delay(*status); auto retry_status = delay.value().Initialized() diff --git a/src/yb/client/tablet_rpc.h b/src/yb/client/tablet_rpc.h index 321ed1226ab..3196bad25c6 100644 --- a/src/yb/client/tablet_rpc.h +++ b/src/yb/client/tablet_rpc.h @@ -34,9 +34,12 @@ #include "yb/tserver/tserver_fwd.h" #include "yb/tserver/tserver_types.pb.h" +#include "yb/util/atomic.h" #include "yb/util/status_fwd.h" #include "yb/util/net/net_fwd.h" +DECLARE_bool(TEST_always_return_consensus_info_for_succeeded_rpc); + namespace yb { namespace tserver { @@ -62,12 +65,25 @@ class TabletRpc { // Returns true if we successfully updated the metacache, otherwise false. virtual bool RefreshMetaCacheWithResponse() { return false; } + virtual void SetRequestRaftConfigOpidIndex(int64_t opid_index) {} + protected: ~TabletRpc() {} }; tserver::TabletServerErrorPB_Code ErrorCode(const tserver::TabletServerErrorPB* error); +template +inline bool CheckIfConsensusInfoUnexpectedlyMissing(const Req& request, const Resp& response) { + if (tserver::HasTabletConsensusInfo::value) { + if (GetAtomicFlag(&FLAGS_TEST_always_return_consensus_info_for_succeeded_rpc) && + !response.has_error()) { + return response.has_tablet_consensus_info(); + } + } + return true; +} + class TabletInvoker { public: // If table is specified, TabletInvoker can detect that table partitions are stale in case tablet @@ -122,7 +138,8 @@ class TabletInvoker { // Marks all replicas on current_ts_ as failed and retries the write on a // new replica. Status FailToNewReplica(const Status& reason, - const tserver::TabletServerErrorPB* error_code = nullptr); + const tserver::TabletServerErrorPB* error_code = nullptr, + bool consensus_info_refresh_succeeded = false); // Called when we finish a lookup (to find the new consensus leader). Retries // the rpc after a short delay. @@ -150,7 +167,7 @@ class TabletInvoker { if (ErrorCode(error_code) == tserver::TabletServerErrorPB::NOT_THE_LEADER && current_ts_ != nullptr) { return status.IsNotFound() || status.IsIllegalState(); - } +} return false; } diff --git a/src/yb/client/transaction_rpc.cc b/src/yb/client/transaction_rpc.cc index 3dff183d120..0cfbe9a3365 100644 --- a/src/yb/client/transaction_rpc.cc +++ b/src/yb/client/transaction_rpc.cc @@ -118,6 +118,7 @@ class TransactionRpc : public TransactionRpcBase { bool RefreshMetaCacheWithResponse() override { if constexpr (tserver::HasTabletConsensusInfo::value) { + DCHECK(client::internal::CheckIfConsensusInfoUnexpectedlyMissing(req_, resp_)); if (resp_.has_tablet_consensus_info()) { return GetInvoker().RefreshTabletInfoWithConsensusInfo(resp_.tablet_consensus_info()); } @@ -128,6 +129,12 @@ class TransactionRpc : public TransactionRpcBase { return false; } + void SetRequestRaftConfigOpidIndex(int64_t opid_index) override { + if constexpr (tserver::HasRaftConfigOpidIndex::value) { + req_.set_raft_config_opid_index(opid_index); + } + } + private: const std::string& tablet_id() const override { return req_.tablet_id(); diff --git a/src/yb/integration-tests/CMakeLists.txt b/src/yb/integration-tests/CMakeLists.txt index 97ebc70bbc4..78cf24e1366 100644 --- a/src/yb/integration-tests/CMakeLists.txt +++ b/src/yb/integration-tests/CMakeLists.txt @@ -166,6 +166,7 @@ ADD_YB_TEST(tserver_path_handlers-itest) ADD_YB_TEST(master_failover-itest) ADD_YB_TEST(master_config-itest) ADD_YB_TEST(master_tasks-test) +ADD_YB_TEST(metacache_refresh-itest) ADD_YB_TEST(network_failure-test) ADD_YB_TEST(system_table_fault_tolerance) ADD_YB_TEST(raft_consensus-itest) diff --git a/src/yb/integration-tests/cdc_service-int-test.cc b/src/yb/integration-tests/cdc_service-int-test.cc index eb11c45822b..df76b19c5e0 100644 --- a/src/yb/integration-tests/cdc_service-int-test.cc +++ b/src/yb/integration-tests/cdc_service-int-test.cc @@ -102,6 +102,7 @@ DECLARE_string(vmodule); METRIC_DECLARE_entity(cdc); METRIC_DECLARE_gauge_int64(last_read_opid_index); +DECLARE_bool(enable_metacache_partial_refresh); namespace yb { namespace log { @@ -1037,6 +1038,8 @@ class CDCServiceTestMultipleServersOneTablet : public CDCServiceTest { // serving as a proxy, the proxy should receive a TabletConsensusInfo that // it can use to refresh its metacache. TEST_F(CDCServiceTestMultipleServersOneTablet, TestGetChangesRpcTabletConsensusInfo) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_metacache_partial_refresh) = + true; // Find the leader and followers for our tablet. const MonoDelta timeout = MonoDelta::FromSeconds(10); const auto proxy_cache_ = std::make_unique(client_->messenger()); diff --git a/src/yb/integration-tests/metacache_refresh-itest.cc b/src/yb/integration-tests/metacache_refresh-itest.cc new file mode 100644 index 00000000000..90e82dd2510 --- /dev/null +++ b/src/yb/integration-tests/metacache_refresh-itest.cc @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// The following only applies to changes made to this file as part of YugaByte development. +// +// Portions Copyright (c) YugaByte, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +#include +#include +#include +#include + +#include +#include + +#include "yb/client/client_fwd.h" +#include "yb/client/client-test-util.h" +#include "yb/client/meta_cache.h" +#include "yb/client/session.h" +#include "yb/client/table.h" +#include "yb/client/table_creator.h" +#include "yb/client/table_info.h" +#include "yb/client/yb_op.h" + +#include "yb/common/common.pb.h" +#include "yb/common/transaction.h" +#include "yb/common/wire_protocol-test-util.h" + +#include "yb/integration-tests/external_mini_cluster-itest-base.h" +#include "yb/integration-tests/external_mini_cluster.h" +#include "yb/integration-tests/mini_cluster.h" +#include "yb/integration-tests/yb_mini_cluster_test_base.h" + +#include "yb/master/master_client.pb.h" +#include "yb/master/master_ddl.pb.h" +#include "yb/master/master_defaults.h" +#include "yb/master/master_util.h" +#include "yb/master/master_admin.proxy.h" + +#include "yb/util/async_util.h" +#include "yb/rpc/sidecars.h" +#include "yb/util/sync_point.h" +#include "yb/tserver/tserver_service.pb.h" + +#include "yb/yql/pgwrapper/libpq_utils.h" +#include "yb/yql/pgwrapper/pg_wrapper.h" + + +using strings::Substitute; +using yb::client::YBTableName; +using yb::client::YBTableType; +// DECLARE_bool(TEST_always_return_consensus_info_for_succeeded_rpc); +DECLARE_bool(enable_metacache_partial_refresh); + +namespace yb { + +class MetacacheRefreshITest : public MiniClusterTestWithClient { + public: + const std::string kPgsqlNamespaceName = "test_namespace"; + const std::string kPgsqlTableName = "test_table"; + const std::string kPgsqlSchemaName = "test_schema"; + const std::string kPgsqlTableId = "test_table_id"; + const std::string kPgsqlKeyspaceName = "test_keyspace"; + const std::string kPgsqlKeyspaceID = "test_keyspace_id"; + + Result ConnectToDB( + const std::string& dbname, bool simple_query_protocol = false) { + return pgwrapper::PGConnBuilder({.host = cluster_->pgsql_hostport(0).host(), + .port = cluster_->pgsql_hostport(0).port(), + .dbname = dbname}) + .Connect(simple_query_protocol); + } + + void SetUp() { + YBMiniClusterTestBase::SetUp(); + opts_.num_tablet_servers = 3; + opts_.num_masters = 1; + opts_.enable_ysql = true; + cluster_.reset(new ExternalMiniCluster(opts_)); + ASSERT_OK(cluster_->Start()); + + ASSERT_OK(MiniClusterTestWithClient::CreateClient()); + + std::vector hosts; + for (size_t i = 0; i < cluster_->num_tablet_servers(); ++i) { + hosts.push_back(cluster_->tablet_server(i)->bind_host()); + } + CreatePgSqlTable(); + } + + void CreatePgSqlTable() { + std::unique_ptr table_creator(client_->NewTableCreator()); + ASSERT_OK(client_->CreateNamespace( + kPgsqlNamespaceName, YQL_DATABASE_PGSQL, "" /* creator */, "" /* ns_id */, + "" /* src_ns_id */, boost::none /* next_pg_oid */, nullptr /* txn */, false)); + std::string kNamespaceId; + { + auto namespaces = ASSERT_RESULT(client_->ListNamespaces()); + for (const auto& ns : namespaces) { + if (ns.id.name() == kPgsqlNamespaceName) { + kNamespaceId = ns.id.id(); + break; + } + } + } + auto pgsql_table_name = + YBTableName(YQL_DATABASE_PGSQL, kNamespaceId, kPgsqlNamespaceName, kPgsqlTableName); + + client::YBSchemaBuilder schema_builder; + schema_builder.AddColumn("key")->PrimaryKey()->Type(DataType::STRING)->NotNull(); + schema_builder.AddColumn("value")->Type(DataType::INT64)->NotNull(); + schema_builder.SetSchemaName(kPgsqlSchemaName); + EXPECT_OK(client_->CreateNamespaceIfNotExists( + kPgsqlNamespaceName, YQLDatabase::YQL_DATABASE_PGSQL, "" /* creator_role_name */, + kNamespaceId)); + client::YBSchema schema; + EXPECT_OK(schema_builder.Build(&schema)); + EXPECT_OK(table_creator->table_name(pgsql_table_name) + .table_id(kPgsqlTableId) + .schema(&schema) + .table_type(YBTableType::PGSQL_TABLE_TYPE) + .set_range_partition_columns({"key"}) + .num_tablets(1) + .Create()); + } + + Result GetRemoteTablet( + const TabletId& tablet_id, bool use_cache, client::YBClient* client) { + std::promise> tablet_lookup_promise; + auto future = tablet_lookup_promise.get_future(); + client->LookupTabletById( + tablet_id, /* table =*/nullptr, master::IncludeInactive::kTrue, + master::IncludeDeleted::kFalse, CoarseMonoClock::Now() + MonoDelta::FromMilliseconds(1000), + [&tablet_lookup_promise](const Result& result) { + tablet_lookup_promise.set_value(result); + }, + client::UseCache(use_cache)); + return VERIFY_RESULT(future.get()); + } + + + void ChangeClusterConfig(size_t idx = 0) { + // add TServer to blacklist + ASSERT_OK(cluster_->AddTServerToBlacklist(cluster_->master(), cluster_->tablet_server(idx))); + // Add a node to the cluster + ASSERT_OK(cluster_->AddTabletServer()); + ASSERT_OK(cluster_->WaitForTabletServerCount(3, 10s * kTimeMultiplier)); + } + + client::YBPgsqlWriteOpPtr CreateNewWriteOp( + rpc::Sidecars& sidecars, std::shared_ptr pgsql_table, + const std::string& key) { + auto pgsql_write_op = client::YBPgsqlWriteOp::NewInsert(pgsql_table, &sidecars); + PgsqlWriteRequestPB* psql_write_request = pgsql_write_op->mutable_request(); + psql_write_request->add_range_column_values()->mutable_value()->set_string_value(key); + PgsqlColumnValuePB* pgsql_column = psql_write_request->add_column_values(); + pgsql_column->set_column_id(pgsql_table->schema().ColumnId(1)); + pgsql_column->mutable_expr()->mutable_value()->set_int64_value(3); + return pgsql_write_op; + } + + ExternalMiniClusterOptions opts_; +}; + +// Tests that the metacache refresh works when a follower read is issued. +// Can either go to a leader or a follower. +// TEST_F(MetacacheRefreshITest, TestMetacacheRefreshFromFollowerRead) { +// // ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_always_return_consensus_info_for_succeeded_rpc) = +// // false; +// ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_metacache_partial_refresh) = +// true; +// std::shared_ptr pgsql_table; +// EXPECT_OK(client_->OpenTable(kPgsqlTableId, &pgsql_table)); +// std::shared_ptr session = client_->NewSession(10s * kTimeMultiplier); +// rpc::Sidecars sidecars; +// auto write_op = CreateNewWriteOp(sidecars, pgsql_table, "pgsql_key1"); +// session->Apply(write_op); +// FlushSessionOrDie(session); + +// google::protobuf::RepeatedPtrField tablets; +// ASSERT_OK(client_->GetTabletsFromTableId(kPgsqlTableId, 0, &tablets)); +// const auto& tablet = tablets.Get(0); +// auto tablet_id = tablet.tablet_id(); +// ASSERT_FALSE(tablet_id.empty()); +// ChangeClusterConfig(); + +// auto remote_tablet = ASSERT_RESULT(GetRemoteTablet(tablet.tablet_id(), true, client_.get())); +// auto pgsql_read_op = client::YBPgsqlReadOp::NewSelect(pgsql_table, &sidecars); +// pgsql_read_op->set_yb_consistency_level(YBConsistencyLevel::CONSISTENT_PREFIX); +// session->Apply(pgsql_read_op); +// auto* sync_point_instance = yb::SyncPoint::GetInstance(); +// Synchronizer sync; +// bool refresh_succeeded = false; +// sync_point_instance->SetCallBack( +// "TabletInvoker::RefreshFinishedWithOkRPCResponse", +// [sync_point_instance, callback = sync.AsStdStatusCallback(), &refresh_succeeded](void* arg) +// { +// refresh_succeeded = *reinterpret_cast(arg); +// sync_point_instance->DisableProcessing(); +// callback(Status::OK()); +// }); +// sync_point_instance->EnableProcessing(); +// FlushSessionOrDie(session); +// ASSERT_OK(sync.Wait()); +// ASSERT_TRUE(refresh_succeeded); +// } + +TEST_F(MetacacheRefreshITest, TestMetacacheNoRefreshFromWrite) { + // ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_always_return_consensus_info_for_succeeded_rpc) = + // false; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_metacache_partial_refresh) = + true; + std::shared_ptr pgsql_table; + EXPECT_OK(client_->OpenTable(kPgsqlTableId, &pgsql_table)); + std::shared_ptr session = client_->NewSession(10s * kTimeMultiplier); + rpc::Sidecars sidecars; + auto write_op = CreateNewWriteOp(sidecars, pgsql_table, "pgsql_key1"); + session->Apply(write_op); + FlushSessionOrDie(session); + Synchronizer sync; + bool refresh_succeeded = false; + auto* sync_point_instance = yb::SyncPoint::GetInstance(); + sync_point_instance->SetCallBack( + "TabletInvoker::RefreshFinishedWithOkRPCResponse", + [sync_point_instance, callback = sync.AsStdStatusCallback(), &refresh_succeeded](void* arg) { + refresh_succeeded = *reinterpret_cast(arg); + sync_point_instance->DisableProcessing(); + callback(Status::OK()); + }); + sync_point_instance->EnableProcessing(); + write_op = CreateNewWriteOp(sidecars, pgsql_table, "pgsql_key2"); + session->Apply(write_op); + FlushSessionOrDie(session); + ASSERT_OK(sync.Wait()); + ASSERT_FALSE(refresh_succeeded); +} + +} // namespace yb diff --git a/src/yb/tserver/read_query.cc b/src/yb/tserver/read_query.cc index d26b4840042..14a851b5103 100644 --- a/src/yb/tserver/read_query.cc +++ b/src/yb/tserver/read_query.cc @@ -17,6 +17,7 @@ #include "yb/common/transaction.h" #include "yb/gutil/bind.h" +#include "yb/master/sys_catalog_constants.h" #include "yb/rpc/sidecars.h" @@ -107,8 +108,12 @@ class ReadQuery : public std::enable_shared_from_this, public rpc::Th ReadQuery( TabletServerIf* server, ReadTabletProvider* read_tablet_provider, const ReadRequestPB* req, ReadResponsePB* resp, rpc::RpcContext context) - : server_(*server), read_tablet_provider_(*read_tablet_provider), req_(req), resp_(resp), - context_(std::move(context)) {} + : server_(*server), + read_tablet_provider_(*read_tablet_provider), + req_(req), + resp_(resp), + context_(std::move(context)), + tablet_consensus_info_(nullptr) {} void Perform() { RespondIfFailed(DoPerform()); @@ -186,6 +191,7 @@ class ReadQuery : public std::enable_shared_from_this, public rpc::Th bool reading_from_non_leader_ = false; RequestScope request_scope_; std::shared_ptr retained_self_; + std::shared_ptr tablet_consensus_info_; }; bool ReadQuery::transactional() const { @@ -233,14 +239,11 @@ Status ReadQuery::DoPerform() { ADOPT_TRACE(context_.trace()); TRACE("Start Read"); TRACE_EVENT1("tserver", "TabletServiceImpl::Read", "tablet_id", req_->tablet_id()); - VLOG(2) << "Received Read RPC: " << req_->DebugString(); TabletPeerTablet peer_tablet; const auto isolation_level = VERIFY_RESULT(GetIsolationLevel(*req_, &server_, &peer_tablet)); if (isolation_level != IsolationLevel::NON_TRANSACTIONAL) { if (PREDICT_FALSE(FLAGS_TEST_transactional_read_delay_ms > 0)) { - LOG(INFO) << "Delaying transactional read for " - << FLAGS_TEST_transactional_read_delay_ms << " ms."; SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_transactional_read_delay_ms)); } @@ -304,6 +307,24 @@ Status ReadQuery::DoPerform() { tablet_peer = ResultToValue( server_.tablet_peer_lookup()->GetServingTablet(req_->tablet_id()), {}); } + + if (tablet_peer) { + tablet_consensus_info_ = GetTabletConsensusInfoFromTabletPeer(tablet_peer); + } else { + if (abstract_tablet_ && abstract_tablet_->system()) { + // Get tablet consensus info from system catalog tablet if we found a system tablet, + // but no tablet_peer is found. Because the tablet_peer_lookup will return NOT_FOUND + // for a virtual system tablet that is not the SysCatalogTablet. However, since the + // virtual tablets have the same consensus state, we can get their tablet_consensus_info + // via the SysCatalogTablet. + auto sys_catalog_tablet_peer = + server_.tablet_peer_lookup()->GetServingTablet(Slice(master::kSysCatalogTabletId)); + if (sys_catalog_tablet_peer) { + tablet_consensus_info_ = + GetTabletConsensusInfoFromTabletPeer(sys_catalog_tablet_peer.get()); + } + } + } reading_from_non_leader_ = tablet_peer && !CheckPeerIsLeader(*tablet_peer).ok(); if (PREDICT_FALSE(FLAGS_TEST_assert_reads_served_by_follower)) { CHECK_NE(req_->consistency_level(), YBConsistencyLevel::STRONG) @@ -555,9 +576,13 @@ Status ReadQuery::Complete() { LOG(INFO) << txn_id << " READ DONE: " << key << " = " << result; } #endif + if (tablet_consensus_info_ && req_->has_raft_config_opid_index() && + req_->raft_config_opid_index() < + tablet_consensus_info_.get()->consensus_state().config().opid_index()) { + resp_->mutable_tablet_consensus_info()->CopyFrom(*tablet_consensus_info_.get()); + } - MakeRpcOperationCompletionCallback( - std::move(context_), resp_, server_.Clock())(Status::OK()); + MakeRpcOperationCompletionCallback(std::move(context_), resp_, server_.Clock())(Status::OK()); TRACE("Done Read"); return Status::OK(); diff --git a/src/yb/tserver/service_util.cc b/src/yb/tserver/service_util.cc index 334b24a8fdb..1f37cc835ce 100644 --- a/src/yb/tserver/service_util.cc +++ b/src/yb/tserver/service_util.cc @@ -248,6 +248,19 @@ bool IsErrorCodeNotTheLeader(const Status& status) { return code && code.value() == TabletServerErrorPB::NOT_THE_LEADER; } +std::shared_ptr GetTabletConsensusInfoFromTabletPeer( + const tablet::TabletPeerPtr& peer) { + if (auto consensus = peer->GetRaftConsensus()) { + std::shared_ptr tablet_consensus_info = + std::make_shared(); + tablet_consensus_info->set_tablet_id(peer->tablet_id()); + *(tablet_consensus_info->mutable_consensus_state()) = + consensus.get()->GetConsensusStateFromCache(); + return tablet_consensus_info; + } + return nullptr; +} + namespace { template @@ -315,7 +328,6 @@ Result> GetTablet( tablet_peer = std::move(tablet_peer_result.tablet_peer); tablet_ptr = std::move(tablet_peer_result.tablet); } - RETURN_NOT_OK(CheckPeerIsReady(*tablet_peer, allow_split_tablet)); // Check for leader only in strong consistency level. @@ -333,7 +345,6 @@ Result> GetTablet( } } else { auto s = CheckPeerIsLeader(*tablet_peer.get()); - // Peer is not the leader, so check that the time since it last heard from the leader is less // than FLAGS_max_stale_read_bound_time_ms. if (PREDICT_FALSE(!s.ok())) { @@ -372,14 +383,12 @@ Result> GetTablet( } } } - auto tablet = tablet_peer->shared_tablet(); if (PREDICT_FALSE(!tablet)) { return STATUS_EC_FORMAT( IllegalState, TabletServerError(TabletServerErrorPB::TABLET_NOT_RUNNING), "Tablet $0 is not running", tablet_id); } - return tablet; } diff --git a/src/yb/tserver/service_util.h b/src/yb/tserver/service_util.h index eb068436cc8..2665fc081aa 100644 --- a/src/yb/tserver/service_util.h +++ b/src/yb/tserver/service_util.h @@ -73,6 +73,9 @@ void SetupError(LWTabletServerErrorPB* error, const Status& s); Result LeaderTerm(const tablet::TabletPeer& tablet_peer); +std::shared_ptr GetTabletConsensusInfoFromTabletPeer( + const tablet::TabletPeerPtr& peer); + // Template helpers. template @@ -249,6 +252,24 @@ Result LookupLeaderTablet( return result; } +template +void FillTabletConsensusInfoIfRequestOpIdStale( + tablet::TabletPeerPtr peer, const Req* req, Resp* resp) { + if constexpr (HasTabletConsensusInfo::value) { + auto outgoing_tablet_consensus_info = resp->mutable_tablet_consensus_info(); + if (auto consensus = peer->GetRaftConsensus()) { + auto cstate = consensus.get()->GetConsensusStateFromCache(); + if (cstate.has_config() && req->raft_config_opid_index() < cstate.config().opid_index()) { + outgoing_tablet_consensus_info->set_tablet_id(peer->tablet_id()); + *(outgoing_tablet_consensus_info->mutable_consensus_state()) = cstate; + VLOG(1) << "Sending out Consensus state for tablet: " << peer->tablet_id() + << ", leader TServer is: " + << outgoing_tablet_consensus_info->consensus_state().leader_uuid(); + } + } + } +} + // The "peer" argument could be provided by the caller in case the caller has already performed // the LookupTabletPeerOrRespond call, and we only need to fill the leader term. template diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index 7f6d6f07e9d..9f38229c803 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -363,6 +363,7 @@ void PerformAtLeader( if (*context) { resp->set_propagated_hybrid_time(server->Clock()->Now().ToUint64()); if (status.ok()) { + FillTabletConsensusInfoIfRequestOpIdStale(tablet_peer.peer, req, resp); context->RespondSuccess(); } else { SetupErrorAndRespond(resp->mutable_error(), status, context); @@ -439,6 +440,7 @@ class WriteQueryCompletionCallback { response_->set_trace_buffer(trace_->DumpToString(true)); } response_->set_propagated_hybrid_time(clock_->Now().ToUint64()); + FillTabletConsensusInfoIfRequestOpIdStale(tablet_peer_, query_->client_request(), response_); context_->RespondSuccess(); VLOG(1) << __PRETTY_FUNCTION__ << " RespondedSuccess"; } @@ -2248,6 +2250,7 @@ Status TabletServiceImpl::PerformWrite( if (!has_operations && tablet.tablet->table_type() != TableType::REDIS_TABLE_TYPE) { // An empty request. This is fine, can just exit early with ok status instead of working hard. // This doesn't need to go to Raft log. + FillTabletConsensusInfoIfRequestOpIdStale(tablet.peer, req, resp); MakeRpcOperationCompletionCallback(std::move(*context), resp, server_->Clock())(Status::OK()); return Status::OK(); } @@ -2279,6 +2282,7 @@ Status TabletServiceImpl::PerformWrite( return Status::OK(); } + FillTabletConsensusInfoIfRequestOpIdStale(tablet.peer, req, resp); query->set_callback(WriteQueryCompletionCallback( tablet.peer, context_ptr, resp, query.get(), server_->Clock(), req->include_trace(), req->has_leader_term())); @@ -2331,6 +2335,7 @@ void TabletServiceImpl::Read(const ReadRequestPB* req, #endif // NDEBUG if (FLAGS_TEST_tserver_noop_read_write) { + LOG(INFO) << "returning do tue no op read_write"; context.RespondSuccess(); return; } diff --git a/src/yb/tserver/tserver.proto b/src/yb/tserver/tserver.proto index fa7bd5f2b1b..530dc35f1cf 100644 --- a/src/yb/tserver/tserver.proto +++ b/src/yb/tserver/tserver.proto @@ -97,6 +97,8 @@ message WriteRequestPB { optional uint64 start_time_micros = 22; optional AshMetadataPB ash_metadata = 23; + + optional int64 raft_config_opid_index = 24; } message WriteResponsePB { @@ -238,6 +240,8 @@ message ReadRequestPB { optional uint64 start_time_micros = 16; optional AshMetadataPB ash_metadata = 17; + + optional int64 raft_config_opid_index = 18; } message ReadResponsePB { diff --git a/src/yb/tserver/tserver_fwd.h b/src/yb/tserver/tserver_fwd.h index c2d12d99787..d3f96be0759 100644 --- a/src/yb/tserver/tserver_fwd.h +++ b/src/yb/tserver/tserver_fwd.h @@ -65,5 +65,12 @@ struct HasTabletConsensusInfo< T, std::void_t().tablet_consensus_info())>> : std::true_type {}; +template > +struct HasRaftConfigOpidIndex : std::false_type {}; + +template +struct HasRaftConfigOpidIndex< + T, std::void_t().raft_config_opid_index())>> + : std::true_type {}; } // namespace tserver } // namespace yb diff --git a/src/yb/tserver/tserver_service.proto b/src/yb/tserver/tserver_service.proto index caf9fcc28a6..524f350b73d 100644 --- a/src/yb/tserver/tserver_service.proto +++ b/src/yb/tserver/tserver_service.proto @@ -250,6 +250,7 @@ message GetTransactionStatusRequestPB { optional fixed64 propagated_hybrid_time = 3; // Reserved for xcluster deprecated field external_hybrid_time. reserved 4; + optional int64 raft_config_opid_index = 5; } message GetTransactionStatusResponsePB {