Skip to content

Commit

Permalink
[BACKPORT 2.14.10][#14473] docdb: Accept registrations from tservers …
Browse files Browse the repository at this point in the history
…on the same node with greater sequence numbers.

Summary:
Original commits:
	  - 59f0520 / D25761
	  - 16d2151 / D25886
This diff modifies the master registration logic. The master has some logic for whether or not to accept a ts registration from a node it has seen before. Unfortunately this logic depends on whether it had ever seen the uuid trying to register before. The previous logic is:
If the uuid is new, accept the registration as long as the seqno is greater than the previously registered ts' seqno (seqno is the unix epoch on start of the tserver process).
If the uuid is old, accept the re-registration but if the uuid has previously been superseded by a subsequent registration of a new uuid with the same hostport, fail loop by endlessly requesting the node re-register.
Otherwise if the uuid is old, accept the re-registration.

This diff streamlines this logic by removing the condition on the uuid. That is, regardless of whether the uuid has been previously seen or not:
Accept the registration if and only if the seqno is greater than the previously registered tserver's seqno.

The existing logic causes problems when operators try to fix broken quorums. When a majority of tservers have failed due to wiped disks, the respawned tserver processes cannot rejoin the quorum because:

  #  they have different uuids and thus are considered new tservers
  #  they cannot be added to the quorum because such an action requires an effective leader and thus a quorum

One trick to bypass this is to respawn new tservers on the wiped nodes and override their UUIDs to the original, pre-wipe UUIDs. These processes will be recognized as existing quorum members and can be manually bootstrapped. However, the master leader refuses to register such tservers because it has already seen these UUIDs and marked them as removed. Concretely, for a tablet group with tservers with UUIDs A, B, and C on nodes N1, N2, and N3:

  # B and C fail with wiped disks
  # automation respawns tserver processes on nodes N2 and N3 with uuids B' and C'
  # B' and C' cannot join the tablet group because there is no tablet leader
  # operator wipes the disk of N2, and respawns a new tserver process on N2 with uuid B
  # master rejects B's attempt to register. When B' registered, it marked B as removed
Jira: DB-3872

Test Plan:
```
ybd --cxx-test master-test --gtest_filter 'MasterTest.TestReRegisterRemovedUUID'
ybd --cxx-test master_heartbeat-itest --gtest_filter 'MasterHeartbeatITestWithExternal.ReRegisterRemovedPeers'
```

Reviewers: bogdan, stiwary, rahuldesirazu, asrivastava

Reviewed By: asrivastava

Subscribers: aaruj, asrivastava, ybase, slingam, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D26021
  • Loading branch information
druzac committed Jun 7, 2023
1 parent b0d8182 commit bc706ba
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 48 deletions.
92 changes: 92 additions & 0 deletions src/yb/integration-tests/master_heartbeat-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "yb/client/table_handle.h"

#include "yb/integration-tests/cluster_itest_util.h"
#include "yb/integration-tests/external_mini_cluster.h"
#include "yb/integration-tests/yb_table_test_base.h"

#include "yb/master/catalog_manager_if.h"
Expand Down Expand Up @@ -119,6 +120,97 @@ TEST_F(MasterHeartbeatITest, IgnorePeerNotInConfig) {
}, FLAGS_heartbeat_interval_ms * 5ms, "Wait for proper replica locations."));
}

class MasterHeartbeatITestWithExternal : public MasterHeartbeatITest {
public:
bool use_external_mini_cluster() { return true; }

Status RestartAndWipeWithFlags(
std::vector<ExternalTabletServer*> tservers,
std::vector<std::vector<std::pair<std::string, std::string>>> extra_flags = {}) {
for (const auto& tserver : tservers) {
tserver->Shutdown();
}
for (const auto& tserver : tservers) {
for (const auto& data_dir : tserver->GetDataDirs()) {
RETURN_NOT_OK(Env::Default()->DeleteRecursively(data_dir));
}
}
for (size_t i = 0; i < tservers.size(); ++i) {
auto extra_flags_for_tserver = extra_flags.size() > i
? extra_flags[i]
: std::vector<std::pair<std::string, std::string>>();
RETURN_NOT_OK(tservers[i]->Restart(
ExternalMiniClusterOptions::kDefaultStartCqlProxy, extra_flags_for_tserver));
}
return Status::OK();
}

Status WaitForRegisteredTserverSet(
const std::set<std::string>& uuids, MonoDelta timeout, const std::string& message) {
master::MasterClusterProxy master_proxy(
proxy_cache_.get(), external_mini_cluster()->master()->bound_rpc_addr());
return WaitFor(
[&]() -> Result<bool> {
master::ListTabletServersResponsePB resp;
master::ListTabletServersRequestPB req;
rpc::RpcController rpc;
RETURN_NOT_OK(master_proxy.ListTabletServers(req, &resp, &rpc));
std::set<std::string> current_uuids;
for (const auto& server : resp.servers()) {
current_uuids.insert(server.instance_id().permanent_uuid());
}
return current_uuids == uuids;
},
timeout, message);
}
};

TEST_F(MasterHeartbeatITestWithExternal, ReRegisterRemovedPeers) {
auto cluster = external_mini_cluster();
ASSERT_EQ(cluster->tserver_daemons().size(), 3);
LOG(INFO) << "Create a user table.";
CreateTable();
constexpr int kNumRows = 1000;
for (int i = 0; i < kNumRows; ++i) {
PutKeyValue(Format("k$0", i), Format("v$0", i));
}
std::map<std::string, ExternalTabletServer*> wiped_tservers;
wiped_tservers[cluster->tablet_server(1)->uuid()] = cluster->tablet_server(1);
wiped_tservers[cluster->tablet_server(2)->uuid()] = cluster->tablet_server(2);
LOG(INFO) << "Wipe a majority of the quorum to simulate majority disk failures.";
ASSERT_OK(RestartAndWipeWithFlags({cluster->tablet_server(1), cluster->tablet_server(2)}));

std::set<std::string> original_uuids;
std::set<std::string> new_uuids;
original_uuids.insert(cluster->tablet_server(0)->uuid());
new_uuids.insert(cluster->tablet_server(0)->uuid());
for (const auto& [original_uuid, wiped_tserver] : wiped_tservers) {
ASSERT_NE(original_uuid, wiped_tserver->uuid())
<< "Original tserver uuid should not be equal to the restarted tserver uuid";
original_uuids.insert(original_uuid);
new_uuids.insert(wiped_tserver->uuid());
}
ASSERT_EQ(original_uuids.size(), 3);
ASSERT_EQ(new_uuids.size(), 3);
ASSERT_OK(
WaitForRegisteredTserverSet(new_uuids, 60s, "Waiting for master to register new uuids"));
const std::string override_flag_name = "instance_uuid_override";
std::vector<std::vector<std::pair<std::string, std::string>>> extra_flags;
std::vector<ExternalTabletServer*> just_tservers;
for (const auto& [original_uuid, wiped_tserver] : wiped_tservers) {
extra_flags.push_back({{override_flag_name, original_uuid}});
just_tservers.push_back(wiped_tserver);
}
ASSERT_OK(RestartAndWipeWithFlags(just_tservers, extra_flags));
for (const auto& [original_uuid, wiped_tserver] : wiped_tservers) {
ASSERT_EQ(original_uuid, wiped_tserver->uuid())
<< "After overriding uuid, new tserver uuid should be equal to original tserver uuid";
}

ASSERT_OK(WaitForRegisteredTserverSet(
original_uuids, 60s, "Wait for master to register original uuids"));
}

} // namespace integration_tests

} // namespace yb
72 changes: 72 additions & 0 deletions src/yb/master/master-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,21 @@ namespace master {
using strings::Substitute;

class MasterTest : public MasterTestBase {
public:
Result<TSHeartbeatResponsePB> SendHeartbeat(
TSToMasterCommonPB common, TSRegistrationPB registration);
};

Result<TSHeartbeatResponsePB> MasterTest::SendHeartbeat(
TSToMasterCommonPB common, TSRegistrationPB registration) {
TSHeartbeatRequestPB req;
TSHeartbeatResponsePB resp;
req.mutable_common()->Swap(&common);
req.mutable_registration()->Swap(&registration);
RETURN_NOT_OK(proxy_heartbeat_->TSHeartbeat(req, &resp, ResetAndGetController()));
return resp;
}

TEST_F(MasterTest, TestPingServer) {
// Ping the server.
server::PingRequestPB req;
Expand All @@ -115,6 +128,14 @@ static void MakeHostPortPB(const std::string& host, uint32_t port, HostPortPB* p
pb->set_port(port);
}

CloudInfoPB MakeCloudInfoPB(std::string cloud, std::string region, std::string zone) {
CloudInfoPB result;
*result.mutable_placement_cloud() = std::move(cloud);
*result.mutable_placement_region() = std::move(region);
*result.mutable_placement_zone() = std::move(zone);
return result;
}

// Test that shutting down a MiniMaster without starting it does not
// SEGV.
TEST_F(MasterTest, TestShutdownWithoutStart) {
Expand Down Expand Up @@ -392,6 +413,57 @@ TEST_F(MasterTest, TestRegisterAndHeartbeat) {
}
}

TEST_F(MasterTest, TestReRegisterRemovedUUID) {
// When a tserver's disk is wiped and the process restarted, the tserver comes back with a
// different uuid. If a quorum is broken by a majority of tservers failing in this way, one
// strategy to repair the quorum is to reset the wiped tservers with their original uuids.
// However this requires the master to re-register a tserver with a uuid it has seen before and
// rejected due to seeing a later sequence number from the same node. This test verifies the
// master process can handle re-registering tservers with uuids it has previously removed.
const std::string first_uuid = "uuid1";
const std::string second_uuid = "uuid2";
vector<shared_ptr<TSDescriptor>> descs;
int seqno = 1;
mini_master_->master()->ts_manager()->GetAllDescriptors(&descs);
EXPECT_EQ(descs.size(), 0);
TSToMasterCommonPB original_common;
TSRegistrationPB registration;
original_common.mutable_ts_instance()->set_permanent_uuid(first_uuid);
original_common.mutable_ts_instance()->set_instance_seqno(seqno++);
MakeHostPortPB("localhost", 1000, registration.mutable_common()->add_private_rpc_addresses());
MakeHostPortPB("localhost", 1000, registration.mutable_common()->add_broadcast_addresses());
MakeHostPortPB("localhost", 2000, registration.mutable_common()->add_http_addresses());
*registration.mutable_common()->mutable_cloud_info() = MakeCloudInfoPB("cloud", "region", "zone");
auto resp = ASSERT_RESULT(SendHeartbeat(original_common, registration));
EXPECT_FALSE(resp.needs_reregister());

mini_master_->master()->ts_manager()->GetAllDescriptors(&descs);
ASSERT_EQ(descs.size(), 1);
auto original_desc = descs[0];

auto new_common = original_common;
new_common.mutable_ts_instance()->set_permanent_uuid(second_uuid);
new_common.mutable_ts_instance()->set_instance_seqno(seqno++);
resp = ASSERT_RESULT(SendHeartbeat(new_common, registration));
EXPECT_FALSE(resp.needs_reregister());
mini_master_->master()->ts_manager()->GetAllDescriptors(&descs);
// This function filters out descriptors of removed tservers so we still expect just 1 descriptor.
ASSERT_EQ(descs.size(), 1);
auto new_desc = descs[0];
EXPECT_EQ(new_desc->permanent_uuid(), second_uuid);
EXPECT_TRUE(original_desc->IsRemoved());

auto updated_original_common = original_common;
updated_original_common.mutable_ts_instance()->set_instance_seqno(seqno++);
resp = ASSERT_RESULT(SendHeartbeat(updated_original_common, registration));
EXPECT_FALSE(resp.needs_reregister());

mini_master_->master()->ts_manager()->GetAllDescriptors(&descs);
ASSERT_EQ(descs.size(), 1);
EXPECT_EQ(descs[0]->permanent_uuid(), first_uuid);
EXPECT_TRUE(new_desc->IsRemoved());
}

TEST_F(MasterTest, TestListTablesWithoutMasterCrash) {
FLAGS_TEST_simulate_slow_table_create_secs = 10;

Expand Down
6 changes: 3 additions & 3 deletions src/yb/master/master_heartbeat_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ class MasterHeartbeatServiceImpl : public MasterServiceBase, public MasterHeartb
s = server_->ts_manager()->LookupTS(req->common().ts_instance(), &ts_desc);
if (s.IsNotFound()) {
LOG(INFO) << "Got heartbeat from unknown tablet server { "
<< req->common().ts_instance().ShortDebugString()
<< " } as " << rpc.requestor_string()
<< "; Asking this server to re-register.";
<< req->common().ts_instance().ShortDebugString() << " } as "
<< rpc.requestor_string()
<< "; Asking this server to re-register. Status from ts lookup: " << s;
resp->set_needs_reregister(true);
resp->set_needs_full_tablet_report(true);
rpc.RespondSuccess();
Expand Down
101 changes: 56 additions & 45 deletions src/yb/master/ts_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ Status TSManager::LookupTS(const NodeInstancePB& instance,
const TSDescriptorPtr* found_ptr =
FindOrNull(servers_by_id_, instance.permanent_uuid());
if (!found_ptr || (*found_ptr)->IsRemoved()) {
return STATUS(NotFound, "unknown tablet server ID", instance.ShortDebugString());
return STATUS_FORMAT(
NotFound,
"unknown tablet server ID, server is in map: $0, server is removed: $1, instance data: $2",
found_ptr != nullptr, found_ptr ? (*found_ptr)->IsRemoved() : false,
instance.ShortDebugString());
}
const TSDescriptorPtr& found = *found_ptr;

if (instance.instance_seqno() != found->latest_seqno()) {
return STATUS(NotFound, "mismatched instance sequence number", instance.ShortDebugString());
return STATUS_FORMAT(
NotFound, "mismatched instance sequence number $0, instance $1", found->latest_seqno(),
instance.ShortDebugString());
}

*ts_desc = found;
Expand All @@ -83,70 +89,77 @@ bool TSManager::LookupTSByUUID(const string& uuid,
return true;
}

bool HasSameHostPort(const google::protobuf::RepeatedPtrField<HostPortPB>& old_addresses,
const google::protobuf::RepeatedPtrField<HostPortPB>& new_addresses) {
for (const auto& old_address : old_addresses) {
for (const auto& new_address : new_addresses) {
if (old_address.host() == new_address.host() && old_address.port() == new_address.port())
bool HasSameHostPort(const HostPortPB& lhs, const HostPortPB& rhs) {
return lhs.host() == rhs.host() && lhs.port() == rhs.port();
}

bool HasSameHostPort(const google::protobuf::RepeatedPtrField<HostPortPB>& lhs,
const google::protobuf::RepeatedPtrField<HostPortPB>& rhs) {
for (const auto& lhs_hp : lhs) {
for (const auto& rhs_hp : rhs) {
if (HasSameHostPort(lhs_hp, rhs_hp)) {
return true;
}
}
}

return false;
}

Status TSManager::RegisterTS(const NodeInstancePB& instance,
const TSRegistrationPB& registration,
CloudInfoPB local_cloud_info,
rpc::ProxyCache* proxy_cache,
RegisteredThroughHeartbeat registered_through_heartbeat) {
TSCountCallback callback_to_call;
bool HasSameHostPort(const ServerRegistrationPB& lhs, const ServerRegistrationPB& rhs) {
return HasSameHostPort(lhs.private_rpc_addresses(), rhs.private_rpc_addresses()) ||
HasSameHostPort(lhs.broadcast_addresses(), rhs.broadcast_addresses());
}

Status TSManager::RegisterTS(
const NodeInstancePB& instance,
const TSRegistrationPB& registration,
CloudInfoPB local_cloud_info,
rpc::ProxyCache* proxy_cache,
RegisteredThroughHeartbeat registered_through_heartbeat) {
TSCountCallback callback_to_call;
{
std::lock_guard<decltype(lock_)> l(lock_);
const string& uuid = instance.permanent_uuid();

auto it = servers_by_id_.find(uuid);
if (it == servers_by_id_.end()) {
// Check if a server with the same host and port already exists.
for (const auto& map_entry : servers_by_id_) {
const auto ts_info = map_entry.second->GetTSInformationPB();

if (HasSameHostPort(ts_info->registration().common().private_rpc_addresses(),
registration.common().private_rpc_addresses()) ||
HasSameHostPort(ts_info->registration().common().broadcast_addresses(),
registration.common().broadcast_addresses())) {
if (ts_info->tserver_instance().instance_seqno() >= instance.instance_seqno()) {
// Skip adding the node since we already have a node with the same rpc address and
// a higher sequence number.
LOG(WARNING) << "Skipping registration for TS " << instance.ShortDebugString()
<< " since an entry with same host/port but a higher sequence number exists "
<< ts_info->ShortDebugString();
return Status::OK();
} else {
LOG(WARNING) << "Removing entry: " << ts_info->ShortDebugString()
<< " since we received registration for a tserver with a higher sequence number: "
<< instance.ShortDebugString();
// Mark the old node to be removed, since we have a newer sequence number.
map_entry.second->SetRemoved();
}
// Check if a server with the same host and port already exists.
for (const auto& map_entry : servers_by_id_) {
const auto ts_info = map_entry.second->GetTSInformationPB();
if (ts_info->tserver_instance().permanent_uuid() == instance.permanent_uuid()) {
continue;
}
if (HasSameHostPort(ts_info->registration().common(), registration.common())) {
if (ts_info->tserver_instance().instance_seqno() >= instance.instance_seqno()) {
// Skip adding the node since we already have a node with the same rpc address and
// a higher sequence number.
LOG(WARNING) << "Skipping registration for TS " << instance.ShortDebugString()
<< " since an entry with same host/port but a higher sequence number exists "
<< ts_info->ShortDebugString();
return Status::OK();
} else {
LOG(WARNING)
<< "Removing entry: " << ts_info->ShortDebugString()
<< " since we received registration for a tserver with a higher sequence number: "
<< instance.ShortDebugString();
// Mark the old node to be removed, since we have a newer sequence number.
map_entry.second->SetRemoved();
}
}
}

auto it = servers_by_id_.find(uuid);
if (it == servers_by_id_.end()) {
auto new_desc = VERIFY_RESULT(TSDescriptor::RegisterNew(
instance, registration, std::move(local_cloud_info), proxy_cache,
registered_through_heartbeat));
InsertOrDie(&servers_by_id_, uuid, std::move(new_desc));
LOG(INFO) << "Registered new tablet server { " << instance.ShortDebugString()
<< " } with Master, full list: " << yb::ToString(servers_by_id_);

} else {
RETURN_NOT_OK(it->second->Register(
instance, registration, std::move(local_cloud_info), proxy_cache));
RETURN_NOT_OK(
it->second->Register(instance, registration, std::move(local_cloud_info), proxy_cache));
it->second->SetRemoved(false);
LOG(INFO) << "Re-registered known tablet server { " << instance.ShortDebugString()
<< " }: " << registration.ShortDebugString();
}

if (!ts_count_callback_.empty()) {
auto new_count = GetCountUnlocked();
if (new_count >= ts_count_callback_min_count_) {
Expand All @@ -155,11 +168,9 @@ Status TSManager::RegisterTS(const NodeInstancePB& instance,
}
}
}

if (!callback_to_call.empty()) {
callback_to_call();
}

return Status::OK();
}

Expand Down

0 comments on commit bc706ba

Please sign in to comment.