Skip to content

Commit

Permalink
[#21904] DocDB: Fetch remote TServer proxy from the local TServer as …
Browse files Browse the repository at this point in the history
…opposed to fetching from Meta Cache

Summary:
The `GetLockStatus` is an RPC endpoint in pg_client_service that fetches info about currently held transactional locks in the cluster. This RPC issued by pggate and serviced by the pg_client_service in the local tserver proxy.

In the existing implementation, as a part of this RPC, the local tserver fetches a list of "live" tserver UUIDs. This list is populated/refreshed from tserver <-> master heartbeat path. For each of the fetched UUIDs, the tserver then attempts to fetch a handle to the remote tserver from the local meta cache. The meta cache might not have the a remote tserver's connection info unless the tserver in question processed some `GetTabletLocation(s)` responses that fetched the remote tserver's connection info (a replica of the tablet would have been hosted on the remote tserver). This discrepancy produces a crash when the local tserver attempts to fetch the remote tserver's handle from the local meta cache. **Note that this bug only exists in the `pg_locks`/`CancelTransaction` codepaths.**

This revision fetches the remote tserver handle from the local tserver instead.  We already have all the required info to build a proxy/handle to the remote tserver (this data is fetched as part of master - tserver heartbeats). While updating the set of `live_tservers_` (after every successful master-tserver heartbeat exchange), we also create `RemoteTabletServer` ptrs and initialize the proxy handles if the `RemoteTabletServer` doesn't already exist. All calls `GetRemoteTabletServers` now pick the shared ptrs from `remote_tserver_`.

The new approach is probably better since it doesn't add latency in the meta cache path for all `pg_locks`/`CancelTransaction` calls.

The diff also reverts changes to meta cache introduced in commit 75ad3df. The quoted commit changed `unique_ptr<RemoteTabletServer>` stored at the meta cache layer to use shared ptrs instead. And these shared ptrs were being used in the `pg_locks`/`CancelTransaction` codepaths. Now that we don't need to fetch `RemoteTabletServer` ptrs from meta_cache, reverting back to unique pts for the meta cache layer.
Jira: DB-10802

Test Plan:
Run the following test:
```
./yb_build.sh --cxx-test='TEST_F(PgGetLockStatusTest, TestPgLocksOutputAfterNodeOperations) {'
```

To manually test the issue, run the following ybd commands:
```
# Destroy any existing cluster
./bin/yugabyted destroy --base_dir ~/yb_01
./bin/yugabyted destroy --base_dir ~/yb_02
...

# Create a new cluster
./bin/yugabyted start --advertise_address=127.0.0.1 --base_dir=~/yb_01
# Add a new node to the cluster
./bin/yugabyted start --advertise_address=127.0.0.2 --join 127.0.0.1 --base_dir=~/yb_02

./bin/ysqlsh -h 127.0.0.1 -c "SELECT * FROM pg_locks;"
```

Reviewers: mlillibridge, hsunder, rsami, kramanathan

Reviewed By: hsunder, kramanathan

Subscribers: bkolagani, smishra, ybase, yql

Differential Revision: https://phorge.dev.yugabyte.com/D34564
  • Loading branch information
basavaraj29 committed May 10, 2024
1 parent e6b5f35 commit cfab020
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 78 deletions.
7 changes: 0 additions & 7 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2315,13 +2315,6 @@ std::pair<RetryableRequestId, RetryableRequestId> YBClient::NextRequestIdAndMinR
return std::make_pair(id, *requests.running_requests.begin());
}

Result<std::shared_ptr<internal::RemoteTabletServer>> YBClient::GetRemoteTabletServer(
const std::string& permanent_uuid) {
auto tserver = data_->meta_cache_->GetRemoteTabletServer(permanent_uuid);
RETURN_NOT_OK(tserver->InitProxy(this));
return tserver;
}

void YBClient::AddMetaCacheInfo(JsonWriter* writer) {
data_->meta_cache_->AddAllTabletInfo(writer);
}
Expand Down
5 changes: 0 additions & 5 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -1003,11 +1003,6 @@ class YBClient {

std::pair<RetryableRequestId, RetryableRequestId> NextRequestIdAndMinRunningRequestId();

// Get a RemoteTabletServer pointer from this client's meta_cache, if there is one present. Return
// null if none is found.
Result<std::shared_ptr<internal::RemoteTabletServer>> GetRemoteTabletServer(
const std::string& permanent_uuid);

void AddMetaCacheInfo(JsonWriter* writer);

void RequestsFinished(const RetryableRequestIdRange& request_id_range);
Expand Down
1 change: 1 addition & 0 deletions src/yb/client/client_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class RemoteTablet;
typedef scoped_refptr<RemoteTablet> RemoteTabletPtr;

class RemoteTabletServer;
using RemoteTabletServerPtr = std::shared_ptr<RemoteTabletServer>;

class Batcher;
using BatcherPtr = std::shared_ptr<Batcher>;
Expand Down
38 changes: 23 additions & 15 deletions src/yb/client/meta_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ RemoteTabletServer::RemoteTabletServer(const master::TSInfoPB& pb)
Update(pb);
}

RemoteTabletServer::RemoteTabletServer(const master::TSInformationPB& pb)
: uuid_(pb.tserver_instance().permanent_uuid()) {
Update(pb);
}

RemoteTabletServer::RemoteTabletServer(const consensus::RaftPeerPB& raft_peer)
: uuid_(raft_peer.permanent_uuid()) {
UpdateFromRaftPeer(raft_peer);
Expand Down Expand Up @@ -230,6 +235,19 @@ void RemoteTabletServer::Update(const master::TSInfoPB& pb) {
cloud_info_pb_ = pb.cloud_info();
}

void RemoteTabletServer::Update(const master::TSInformationPB& pb) {
if (pb.tserver_instance().permanent_uuid() != uuid_) {
LOG(WARNING) << "RemoteTabletServer " << uuid_ << " cannot be updated "
<< "because the TSInformationPB has a wrong permanent_uuid "
<< pb.tserver_instance().permanent_uuid();
return;
}
std::lock_guard lock(mutex_);
private_rpc_hostports_ = pb.registration().common().private_rpc_addresses();
public_rpc_hostports_ = pb.registration().common().broadcast_addresses();
cloud_info_pb_ = pb.registration().common().cloud_info();
}

void RemoteTabletServer::UpdateFromRaftPeer(const consensus::RaftPeerPB& raft_peer) {
if (raft_peer.permanent_uuid() != uuid_) {
VLOG(1) << "RemoteTabletServer " << raft_peer.permanent_uuid()
Expand Down Expand Up @@ -356,13 +374,13 @@ Status RemoteTablet::RefreshFromRaftConfig(
std::vector<std::shared_ptr<RemoteReplica>> new_replicas;
std::string leader_uuid = "";
for (const auto& peer : raft_config.peers()) {
auto tserver = FindPtrOrNull(tservers, peer.permanent_uuid());
auto tserver = FindOrNull(tservers, peer.permanent_uuid());
SCHECK(tserver, NotFound, Format("TServer with ID $0 not found.", peer.permanent_uuid()));
auto role = GetConsensusRole(peer.permanent_uuid(), consensus_state);
if (role == PeerRole::LEADER) {
leader_uuid = peer.permanent_uuid();
}
new_replicas.emplace_back(std::make_shared<RemoteReplica>(tserver.get(), role));
new_replicas.emplace_back(std::make_shared<RemoteReplica>((*tserver).get(), role));
}
replicas_ = std::move(new_replicas);
raft_config_opid_index_ = consensus_state.config().opid_index();
Expand Down Expand Up @@ -759,7 +777,7 @@ void MetaCache::SetLocalTabletServer(const string& permanent_uuid,
const shared_ptr<TabletServerServiceProxy>& proxy,
const LocalTabletServer* local_tserver) {
const auto entry = ts_cache_.emplace(permanent_uuid,
std::make_shared<RemoteTabletServer>(permanent_uuid,
std::make_unique<RemoteTabletServer>(permanent_uuid,
proxy,
local_tserver));
CHECK(entry.second);
Expand All @@ -775,7 +793,7 @@ void MetaCache::UpdateTabletServerUnlocked(const master::TSInfoPB& pb) {
}

VLOG_WITH_PREFIX(1) << "Client caching new TabletServer from Master TSInfo " << permanent_uuid;
CHECK(ts_cache_.emplace(permanent_uuid, std::make_shared<RemoteTabletServer>(pb)).second);
CHECK(ts_cache_.emplace(permanent_uuid, std::make_unique<RemoteTabletServer>(pb)).second);
}

Status MetaCache::UpdateTabletServerWithRaftPeerUnlocked(const consensus::RaftPeerPB& pb) {
Expand All @@ -787,7 +805,7 @@ Status MetaCache::UpdateTabletServerWithRaftPeerUnlocked(const consensus::RaftPe
}
VLOG_WITH_PREFIX(1) << "Client caching new TabletServer from Raft Peer " << permanent_uuid;
SCHECK(
ts_cache_.emplace(permanent_uuid, std::make_shared<RemoteTabletServer>(pb)).second,
ts_cache_.emplace(permanent_uuid, std::make_unique<RemoteTabletServer>(pb)).second,
IllegalState, "Failed to emplace a remote tablet server into tablet server cache");
return Status::OK();
}
Expand Down Expand Up @@ -1352,16 +1370,6 @@ void MetaCache::InvalidateTableCache(const YBTable& table) {
}
}

std::shared_ptr<RemoteTabletServer> MetaCache::GetRemoteTabletServer(
const std::string& permanent_uuid) {
SharedLock lock(mutex_);
auto it = ts_cache_.find(permanent_uuid);
if (it != ts_cache_.end()) {
return it->second;
}
return nullptr;
}

void MetaCache::AddAllTabletInfo(JsonWriter* writer) {
SharedLock lock(mutex_);
writer->StartObject();
Expand Down
9 changes: 6 additions & 3 deletions src/yb/client/meta_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "yb/master/master_client.fwd.h"
#include "yb/master/master_fwd.h"

#include "yb/master/master_heartbeat.pb.h"
#include "yb/rpc/rpc_fwd.h"
#include "yb/rpc/rpc.h"

Expand Down Expand Up @@ -108,6 +109,7 @@ class RemoteTabletServer {
const std::shared_ptr<tserver::TabletServerServiceProxy>& proxy,
const tserver::LocalTabletServer* local_tserver = nullptr);
explicit RemoteTabletServer(const master::TSInfoPB& pb);
explicit RemoteTabletServer(const master::TSInformationPB& pb);
explicit RemoteTabletServer(const consensus::RaftPeerPB& raft_peer);
~RemoteTabletServer();

Expand All @@ -120,6 +122,9 @@ class RemoteTabletServer {
// Requires that 'pb''s UUID matches this server.
void Update(const master::TSInfoPB& pb);

// Update connection information from the passed TSInformationPB.
void Update(const master::TSInformationPB& pb);

// Requires that the raft_peer's UUID matches this RemoteTabletServer
void UpdateFromRaftPeer(const consensus::RaftPeerPB& raft_peer);

Expand Down Expand Up @@ -195,7 +200,7 @@ struct RemoteReplica {
std::string ToString() const;
};

typedef std::unordered_map<std::string, std::shared_ptr<RemoteTabletServer>> TabletServerMap;
typedef std::unordered_map<std::string, std::unique_ptr<RemoteTabletServer>> TabletServerMap;

YB_STRONGLY_TYPED_BOOL(UpdateLocalTsState);
YB_STRONGLY_TYPED_BOOL(IncludeFailedReplicas);
Expand Down Expand Up @@ -616,8 +621,6 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {

void InvalidateTableCache(const YBTable& table);

std::shared_ptr<RemoteTabletServer> GetRemoteTabletServer(const std::string& permanent_uuid);

void AddAllTabletInfo(JsonWriter* writer);

const std::string& LogPrefix() const { return log_prefix_; }
Expand Down
12 changes: 12 additions & 0 deletions src/yb/master/master_tserver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ Status MasterTabletServer::GetLiveTServers(
return Status::OK();
}

Result<std::vector<client::internal::RemoteTabletServerPtr>>
MasterTabletServer::GetRemoteTabletServers() const {
return STATUS_FORMAT(NotSupported,
Format("GetRemoteTabletServers not implemented for master_tserver"));
}

Result<std::vector<client::internal::RemoteTabletServerPtr>>
MasterTabletServer::GetRemoteTabletServers(const std::unordered_set<std::string>&) const {
return STATUS_FORMAT(NotSupported,
Format("GetRemoteTabletServers not implemented for master_tserver"));
}

const std::shared_ptr<MemTracker>& MasterTabletServer::mem_tracker() const {
return master_->mem_tracker();
}
Expand Down
6 changes: 6 additions & 0 deletions src/yb/master/master_tserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ class MasterTabletServer : public tserver::TabletServerIf,
Status GetLiveTServers(
std::vector<master::TSInformationPB> *live_tservers) const override;

virtual Result<std::vector<client::internal::RemoteTabletServerPtr>>
GetRemoteTabletServers() const override;

virtual Result<std::vector<client::internal::RemoteTabletServerPtr>>
GetRemoteTabletServers(const std::unordered_set<std::string>& ts_uuids) const override;

const std::shared_ptr<MemTracker>& mem_tracker() const override;

void SetPublisher(rpc::Publisher service) override;
Expand Down
55 changes: 19 additions & 36 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -705,28 +705,21 @@ class PgClientServiceImpl::Impl {
return is_within_retry ? combined_status : ReplaceSplitTabletsAndGetLocations(req, true);
}

std::set<std::string> tserver_uuids;
std::unordered_set<std::string> tserver_uuids;
for (const auto& tablet_location_pb : resp.tablet_locations()) {
for (const auto& replica : tablet_location_pb.replicas()) {
if (replica.role() == PeerRole::LEADER) {
tserver_uuids.insert(replica.ts_info().permanent_uuid());
}
}
}

std::vector<RemoteTabletServerPtr> remote_tservers;
remote_tservers.reserve(tserver_uuids.size());
for(const auto& ts_uuid : tserver_uuids) {
remote_tservers.push_back(VERIFY_RESULT(client().GetRemoteTabletServer(ts_uuid)));
}
return remote_tservers;
return tablet_server_.GetRemoteTabletServers(tserver_uuids);
}

Status GetLockStatus(
const PgGetLockStatusRequestPB& req, PgGetLockStatusResponsePB* resp,
rpc::RpcContext* context) {
std::vector<master::TSInformationPB> live_tservers;
RETURN_NOT_OK(tablet_server_.GetLiveTServers(&live_tservers));
auto remote_tservers = VERIFY_RESULT(tablet_server_.GetRemoteTabletServers());
GetLockStatusRequestPB lock_status_req;
lock_status_req.set_max_txn_locks_per_tablet(req.max_txn_locks_per_tablet());
if (!req.transaction_id().empty()) {
Expand All @@ -739,12 +732,6 @@ class PgClientServiceImpl::Impl {
// TODO(pglocks): Once we call GetTransactionStatus for involved tablets, ensure we populate
// aborted_subtxn_set in the GetLockStatusRequests that we send to involved tablets as well.
lock_status_req.add_transaction_ids(req.transaction_id());
std::vector<RemoteTabletServerPtr> remote_tservers;
remote_tservers.reserve(live_tservers.size());
for (const auto& live_ts : live_tservers) {
const auto& permanent_uuid = live_ts.tserver_instance().permanent_uuid();
remote_tservers.push_back(VERIFY_RESULT(client().GetRemoteTabletServer(permanent_uuid)));
}
return DoGetLockStatus(&lock_status_req, resp, context, remote_tservers);
}
const auto& min_txn_age_ms = req.min_txn_age_ms();
Expand All @@ -763,12 +750,11 @@ class PgClientServiceImpl::Impl {

std::vector<std::future<Result<OldTxnsRespInfo>>> res_futures;
std::unordered_set<TabletId> status_tablet_ids;
for (const auto& live_ts : live_tservers) {
const auto& permanent_uuid = live_ts.tserver_instance().permanent_uuid();
auto remote_tserver = VERIFY_RESULT(client().GetRemoteTabletServer(permanent_uuid));
for (const auto& remote_tserver : remote_tservers) {
auto txn_status_tablets = VERIFY_RESULT(
client().GetTransactionStatusTablets(remote_tserver->cloud_info_pb()));

RETURN_NOT_OK(remote_tserver->InitProxy(&client()));
auto proxy = remote_tserver->proxy();
for (const auto& tablet : txn_status_tablets.global_tablets) {
res_futures.push_back(
Expand Down Expand Up @@ -802,8 +788,9 @@ class PgClientServiceImpl::Impl {

std::visit([&](auto&& old_txns_resp) {
if (old_txns_resp->has_error()) {
// Ignore leadership errors as we broadcast the request to all tservers.
if (old_txns_resp->error().code() == TabletServerErrorPB::NOT_THE_LEADER) {
// Ignore leadership and NOT_FOUND errors as we broadcast the request to all tservers.
if (old_txns_resp->error().code() == TabletServerErrorPB::NOT_THE_LEADER ||
old_txns_resp->error().code() == TabletServerErrorPB::TABLET_NOT_FOUND) {
it = res_futures.erase(it);
return;
}
Expand Down Expand Up @@ -879,8 +866,9 @@ class PgClientServiceImpl::Impl {
if (include_single_shard_waiters) {
lock_status_req.set_max_single_shard_waiter_start_time_us(max_single_shard_waiter_start_time);
}
auto remote_tservers = VERIFY_RESULT(ReplaceSplitTabletsAndGetLocations(&lock_status_req));
return DoGetLockStatus(&lock_status_req, resp, context, remote_tservers);
auto remote_tservers_with_locks = VERIFY_RESULT(
ReplaceSplitTabletsAndGetLocations(&lock_status_req));
return DoGetLockStatus(&lock_status_req, resp, context, remote_tservers_with_locks);
}

// Merges the src PgGetLockStatusResponsePB into dest, while preserving existing entries in dest.
Expand Down Expand Up @@ -915,6 +903,7 @@ class PgClientServiceImpl::Impl {
std::vector<std::shared_ptr<GetLockStatusResponsePB>> node_responses;
node_responses.reserve(remote_tservers.size());
for (const auto& remote_tserver : remote_tservers) {
RETURN_NOT_OK(remote_tserver->InitProxy(&client()));
auto proxy = remote_tserver->proxy();
auto status_promise = std::make_shared<std::promise<Status>>();
status_futures.push_back(status_promise->get_future());
Expand Down Expand Up @@ -1559,24 +1548,17 @@ class PgClientServiceImpl::Impl {
TryAgain, Format("Leader not found for tablet $0", status_tablet_id)));
}
const auto& permanent_uuid = remote_tablet->LeaderTServer()->permanent_uuid();
callback(client().GetRemoteTabletServer(permanent_uuid));
auto remote_ts_or_status = tablet_server_.GetRemoteTabletServers({permanent_uuid});
if (!remote_ts_or_status.ok()) {
return callback(remote_ts_or_status.status());
}
callback((*remote_ts_or_status)[0]);
},
// Force a client cache refresh so as to not hit NOT_LEADER error.
client::UseCache::kFalse);
});
}

Result<std::vector<RemoteTabletServerPtr>> GetAllLiveTservers() {
std::vector<RemoteTabletServerPtr> remote_tservers;
std::vector<master::TSInformationPB> live_tservers;
RETURN_NOT_OK(tablet_server_.GetLiveTServers(&live_tservers));
for (const auto& live_ts : live_tservers) {
const auto& permanent_uuid = live_ts.tserver_instance().permanent_uuid();
remote_tservers.push_back(VERIFY_RESULT(client().GetRemoteTabletServer(permanent_uuid)));
}
return remote_tservers;
}

Status GetActiveTransactionList(
const PgGetActiveTransactionListRequestPB& req, PgGetActiveTransactionListResponsePB* resp,
rpc::RpcContext* context) {
Expand Down Expand Up @@ -1610,7 +1592,7 @@ class PgClientServiceImpl::Impl {

std::vector<RemoteTabletServerPtr> remote_tservers;
if (req.status_tablet_id().empty()) {
remote_tservers = VERIFY_RESULT(GetAllLiveTservers());
remote_tservers = VERIFY_RESULT(tablet_server_.GetRemoteTabletServers());
} else {
const auto& remote_ts = VERIFY_RESULT(GetTServerHostingStatusTablet(
req.status_tablet_id(), context->GetClientDeadline()).get());
Expand All @@ -1621,6 +1603,7 @@ class PgClientServiceImpl::Impl {
std::vector<std::future<Status>> status_future;
std::vector<tserver::CancelTransactionResponsePB> node_resp(remote_tservers.size());
for (size_t i = 0 ; i < remote_tservers.size() ; i++) {
RETURN_NOT_OK(remote_tservers[i]->InitProxy(&client()));
const auto& proxy = remote_tservers[i]->proxy();
auto controller = std::make_shared<rpc::RpcController>();
status_future.push_back(
Expand Down

0 comments on commit cfab020

Please sign in to comment.