Skip to content

Commit

Permalink
[#21775] docdb: Avoid segfault on catalog reload / collector call races.
Browse files Browse the repository at this point in the history
Summary:
The existing `GetClusterConfig()` function can segfault if it is called while the catalog manager is reloading the system catalog because the `cluster_config_` field is reset during catalog loading. However it does already return a `Status` object, there is just no codepath that returns a not-OK status. This diff refactors the function to return a `Result` instead, and returns a not-OK status if the `cluster_config_` field is null. Since all clients are already dealing with the returned `Status` object in the old signature, this was a minimal change that makes calls safe, regardless of whether the caller holds the leader lease or not.
Jira: DB-10650

Test Plan:
Existing tests to be sure the refactor is safe.

I didn't add a test that the race is fixed as the race is difficult to expose deterministically.

Reviewers: asrivastava, mhaddad, xCluster, hsunder

Reviewed By: asrivastava, hsunder

Subscribers: ybase, esheng, slingam

Differential Revision: https://phorge.dev.yugabyte.com/D34609
  • Loading branch information
druzac committed May 3, 2024
1 parent 76fa1f5 commit 347fc74
Show file tree
Hide file tree
Showing 16 changed files with 127 additions and 122 deletions.
4 changes: 2 additions & 2 deletions src/yb/integration-tests/create-table-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,8 @@ TEST_F(CreateTableStressTest, TestHeartbeatDeadline) {
ASSERT_OK(WaitForRunningTabletCount(cluster_->mini_master(), table_name,
FLAGS_num_test_tablets, &resp));

master::SysClusterConfigEntryPB config;
ASSERT_OK(cluster_->mini_master()->catalog_manager().GetClusterConfig(&config));
master::SysClusterConfigEntryPB config =
ASSERT_RESULT(cluster_->mini_master()->catalog_manager().GetClusterConfig());
auto universe_uuid = config.universe_uuid();

// Grab TS#1 and Generate a Full Report for it.
Expand Down
36 changes: 18 additions & 18 deletions src/yb/integration-tests/master_heartbeat-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,10 @@ TEST_F(MasterHeartbeatITest, IgnoreEarlierHeartbeatFromSameTSProcess) {
// of order.
master::MasterHeartbeatProxy master_proxy(
proxy_cache_.get(), mini_cluster_->mini_master()->bound_rpc_addr());
GetMasterClusterConfigResponsePB config_resp;
ASSERT_OK(catalog_mgr.GetClusterConfig(&config_resp));
auto cluster_config = ASSERT_RESULT(catalog_mgr.GetClusterConfig());
master::TSHeartbeatRequestPB req;
*req.mutable_common() = MakeTSToMasterCommonPB(ts.get(), ts->latest_seqno());
req.set_universe_uuid(config_resp.cluster_config().universe_uuid());
req.set_universe_uuid(cluster_config.universe_uuid());
const auto original_latest_report_sequence_number = ts->latest_report_sequence_number();
*req.mutable_tablet_report() = MakeTabletReportPBWithNewLeader(
ts.get(), tablet.get(), true, original_latest_report_sequence_number + 2);
Expand All @@ -247,7 +246,7 @@ TEST_F(MasterHeartbeatITest, IgnoreEarlierHeartbeatFromSameTSProcess) {
{
// Now we send another heartbeat, but in this one the new TS was bootstrapping the tablet.
master::TSHeartbeatRequestPB second_req;
second_req.set_universe_uuid(config_resp.cluster_config().universe_uuid());
second_req.set_universe_uuid(cluster_config.universe_uuid());
*second_req.mutable_common() = req.common();
second_req.set_num_live_tablets(0);
auto* bootstrapping_report = second_req.mutable_tablet_report();
Expand Down Expand Up @@ -304,13 +303,12 @@ TEST_F(MasterHeartbeatITest, ProcessHeartbeatAfterTSRestart) {
auto ts = *extra_ts_it;
master::MasterHeartbeatProxy master_proxy(
proxy_cache_.get(), mini_cluster_->mini_master()->bound_rpc_addr());
GetMasterClusterConfigResponsePB config_resp;
ASSERT_OK(catalog_mgr.GetClusterConfig(&config_resp));
auto cluster_config = ASSERT_RESULT(catalog_mgr.GetClusterConfig());
master::TSHeartbeatRequestPB req;
ASSERT_GT(ts->latest_report_sequence_number(), 0);
// Use a later sequence number to simulate the tserver restarting.
*req.mutable_common() = MakeTSToMasterCommonPB(ts.get(), ts->latest_seqno() + 10);
req.set_universe_uuid(config_resp.cluster_config().universe_uuid());
req.set_universe_uuid(cluster_config.universe_uuid());
*req.mutable_registration() = ts->GetTSInformationPB()->registration();
*req.mutable_tablet_report() = MakeTabletReportPBWithNewLeader(
ts.get(), tablet.get(), /* incremental */ false, /* report_sequence_number */ 0);
Expand Down Expand Up @@ -340,9 +338,9 @@ class MasterHeartbeatITestWithUpgrade : public YBTableTestBase {
proxy_cache_ = std::make_unique<rpc::ProxyCache>(client_->messenger());
}

Status GetClusterConfig(GetMasterClusterConfigResponsePB *config_resp) {
Result<master::SysClusterConfigEntryPB> GetClusterConfig() {
const auto* master = VERIFY_RESULT(mini_cluster_->GetLeaderMiniMaster());
return master->catalog_manager().GetClusterConfig(config_resp);
return master->catalog_manager().GetClusterConfig();
}

Status ClearUniverseUuid() {
Expand All @@ -357,9 +355,8 @@ class MasterHeartbeatITestWithUpgrade : public YBTableTestBase {
};

TEST_F(MasterHeartbeatITestWithUpgrade, ClearUniverseUuidToRecoverUniverse) {
GetMasterClusterConfigResponsePB resp;
ASSERT_OK(GetClusterConfig(&resp));
auto cluster_config_version = resp.cluster_config().version();
auto cluster_config = ASSERT_RESULT(GetClusterConfig());
auto cluster_config_version = cluster_config.version();
LOG(INFO) << "Cluster Config version : " << cluster_config_version;

// Attempt to clear universe uuid. Should fail when it is not set.
Expand All @@ -370,26 +367,29 @@ TEST_F(MasterHeartbeatITestWithUpgrade, ClearUniverseUuidToRecoverUniverse) {

// Wait for ClusterConfig version to increase.
ASSERT_OK(LoggedWaitFor([&]() {
if (!GetClusterConfig(&resp).ok()) {
auto config_result = GetClusterConfig();
if (!config_result.ok()) {
return false;
}
auto& config = *config_result;

if (!resp.cluster_config().has_universe_uuid()) {
if (!config.has_universe_uuid()) {
return false;
}
cluster_config = std::move(config);

return true;
}, 60s, "Waiting for new universe uuid to be generated"));

ASSERT_GE(resp.cluster_config().version(), cluster_config_version);
LOG(INFO) << "Updated cluster config version:" << resp.cluster_config().version();
LOG(INFO) << "Universe UUID:" << resp.cluster_config().universe_uuid();
ASSERT_GE(cluster_config.version(), cluster_config_version);
LOG(INFO) << "Updated cluster config version:" << cluster_config.version();
LOG(INFO) << "Universe UUID:" << cluster_config.universe_uuid();

// Wait for propagation of universe_uuid.
ASSERT_OK(LoggedWaitFor([&]() {
for (auto& ts : mini_cluster_->mini_tablet_servers()) {
auto uuid_str = ts->server()->fs_manager()->GetUniverseUuidFromTserverInstanceMetadata();
if (!uuid_str.ok() || uuid_str.get() != resp.cluster_config().universe_uuid()) {
if (!uuid_str.ok() || uuid_str.get() != cluster_config.universe_uuid()) {
return false;
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/yb/integration-tests/mini_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,9 @@ namespace {
Status ChangeClusterConfig(
master::CatalogManagerIf* catalog_manager,
std::function<void(SysClusterConfigEntryPB*)> config_changer) {
GetMasterClusterConfigResponsePB config_resp;
RETURN_NOT_OK(catalog_manager->GetClusterConfig(&config_resp));

ChangeMasterClusterConfigRequestPB change_req;
*change_req.mutable_cluster_config() = std::move(*config_resp.mutable_cluster_config());
*change_req.mutable_cluster_config() =
VERIFY_RESULT(catalog_manager->GetClusterConfig());
SysClusterConfigEntryPB* config = change_req.mutable_cluster_config();

config_changer(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,8 @@ class XClusterTabletSplitITest : public CdcTabletSplitITest {
}

auto GetConsumerMap() {
master::SysClusterConfigEntryPB cluster_info;
auto& cm = EXPECT_RESULT(cluster_->GetLeaderMiniMaster())->catalog_manager();
EXPECT_OK(cm.GetClusterConfig(&cluster_info));
auto cluster_info = EXPECT_RESULT(cm.GetClusterConfig());
auto producer_map = cluster_info.mutable_consumer_registry()->mutable_producer_map();
auto it = producer_map->find(kProducerClusterId);
EXPECT_NE(it, producer_map->end());
Expand Down
34 changes: 25 additions & 9 deletions src/yb/integration-tests/xcluster/xcluster-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,18 +281,36 @@ class XClusterTestNoParam : public XClusterYcqlTestBase {
// Verify that universe was setup on consumer.
master::GetUniverseReplicationResponsePB resp;
RETURN_NOT_OK(VerifyUniverseReplication(&resp));
CHECK_EQ(resp.entry().replication_group_id(), kReplicationGroupId);
CHECK_EQ(resp.entry().tables_size(), producer_tables_.size());
SCHECK_EQ(
resp.entry().replication_group_id(), kReplicationGroupId, InternalError,
Format(
"Unexpected replication group id: actual $0, expected $1",
resp.entry().replication_group_id(), kReplicationGroupId));
SCHECK_EQ(
resp.entry().tables_size(), producer_tables_.size(), InternalError,
Format(
"Unexpected tables size: actual $0, expected $1", resp.entry().tables_size(),
producer_tables_.size()));
for (uint32_t i = 0; i < producer_tables_.size(); i++) {
CHECK_EQ(resp.entry().tables(i), producer_tables_[i]->id());
SCHECK_EQ(
resp.entry().tables(i), producer_tables_[i]->id(), InternalError,
Format(
"Unexpected table entry: actual $0, expected $1", resp.entry().tables(i),
producer_tables_[i]->id()));
}

// Verify that CDC streams were created on producer for all tables.
for (size_t i = 0; i < producer_tables_.size(); i++) {
master::ListCDCStreamsResponsePB stream_resp;
RETURN_NOT_OK(GetCDCStreamForTable(producer_tables_[i]->id(), &stream_resp));
CHECK_EQ(stream_resp.streams_size(), 1);
CHECK_EQ(stream_resp.streams(0).table_id().Get(0), producer_tables_[i]->id());
SCHECK_EQ(
stream_resp.streams_size(), 1, InternalError,
Format("Unexpected streams size: actual $0, expected $1", stream_resp.streams_size(), 1));
SCHECK_EQ(
stream_resp.streams(0).table_id().Get(0), producer_tables_[i]->id(), InternalError,
Format(
"Unexpected table_id: actual $0, expected $1",
stream_resp.streams(0).table_id().Get(0), producer_tables_[i]->id()));
}

for (size_t i = 0; i < producer_tables_.size(); i++) {
Expand Down Expand Up @@ -688,9 +706,8 @@ TEST_P(XClusterTest, SetupUniverseReplicationErrorChecking) {

master::SetupUniverseReplicationRequestPB setup_universe_req;
master::SetupUniverseReplicationResponsePB setup_universe_resp;
master::SysClusterConfigEntryPB cluster_info;
auto& cm = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->catalog_manager();
CHECK_OK(cm.GetClusterConfig(&cluster_info));
auto cluster_info = ASSERT_RESULT(cm.GetClusterConfig());
setup_universe_req.set_replication_group_id(cluster_info.cluster_uuid());

string master_addr = producer_cluster()->GetMasterAddresses();
Expand Down Expand Up @@ -814,9 +831,8 @@ TEST_P(XClusterTest, SetupNamespaceReplicationWithBootstrapRequestFailures) {
master::SetupNamespaceReplicationWithBootstrapRequestPB req;
master::SetupNamespaceReplicationWithBootstrapResponsePB resp;

master::SysClusterConfigEntryPB cluster_info;
auto& cm = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster())->catalog_manager();
CHECK_OK(cm.GetClusterConfig(&cluster_info));
auto cluster_info = ASSERT_RESULT(cm.GetClusterConfig());
req.set_replication_id(cluster_info.cluster_uuid());
req.mutable_producer_namespace()->CopyFrom(producer_namespace);
string master_addr = producer_cluster()->GetMasterAddresses();
Expand Down
24 changes: 10 additions & 14 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11797,27 +11797,24 @@ Status CatalogManager::GoIntoShellMode() {
return Status::OK();
}

Status CatalogManager::GetClusterConfig(GetMasterClusterConfigResponsePB* resp) {
return GetClusterConfig(resp->mutable_cluster_config());
}

Status CatalogManager::GetClusterConfig(SysClusterConfigEntryPB* config) {
Result<SysClusterConfigEntryPB> CatalogManager::GetClusterConfig() {
auto cluster_config = ClusterConfig();
DCHECK(cluster_config) << "Missing cluster config for master!";
auto l = cluster_config->LockForRead();
*config = l->pb;
return Status::OK();
SCHECK_NOTNULL(cluster_config);
return cluster_config->LockForRead()->pb;
}

Result<int32_t> CatalogManager::GetClusterConfigVersion() {
auto cluster_config = ClusterConfig();
if (!cluster_config) {
return STATUS(IllegalState, "Cluster config is not initialized");
}
SCHECK_NOTNULL(cluster_config);
auto l = cluster_config->LockForRead();
return l->pb.version();
}

Status CatalogManager::GetClusterConfig(GetMasterClusterConfigResponsePB* resp) {
*resp->mutable_cluster_config() = VERIFY_RESULT(GetClusterConfig());
return Status::OK();
}

Status CatalogManager::SetClusterConfig(
const ChangeMasterClusterConfigRequestPB* req, ChangeMasterClusterConfigResponsePB* resp) {
SysClusterConfigEntryPB config(req->cluster_config());
Expand Down Expand Up @@ -12470,8 +12467,7 @@ Status CatalogManager::SysCatalogRespectLeaderAffinity() {
}

Status CatalogManager::GetAllAffinitizedZones(vector<AffinitizedZonesSet>* affinitized_zones) {
SysClusterConfigEntryPB config;
RETURN_NOT_OK(GetClusterConfig(&config));
SysClusterConfigEntryPB config = VERIFY_RESULT(GetClusterConfig());
auto& replication_info = config.replication_info();

CatalogManagerUtil::GetAllAffinitizedZones(replication_info, affinitized_zones);
Expand Down
8 changes: 5 additions & 3 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -959,11 +959,13 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
// respective version number), modify the values it wants of said config and issuing a write
// afterwards, without changing the version number. In case the version number does not match
// on the server, the change will fail and the client will have to retry the get, as someone
// must havGetTableInfoe updated the config in the meantime.
Status GetClusterConfig(GetMasterClusterConfigResponsePB* resp) override;
Status GetClusterConfig(SysClusterConfigEntryPB* config) override;
// must have updated the config in the meantime.
Result<SysClusterConfigEntryPB> GetClusterConfig() override;
Result<int32_t> GetClusterConfigVersion();

// Helper for the GetMasterClusterConfig RPC.
Status GetClusterConfig(GetMasterClusterConfigResponsePB* resp);

Status SetClusterConfig(
const ChangeMasterClusterConfigRequestPB* req,
ChangeMasterClusterConfigResponsePB* resp) override;
Expand Down
3 changes: 1 addition & 2 deletions src/yb/master/catalog_manager_ext.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3417,8 +3417,7 @@ bool ShouldResendRegistry(

Status CatalogManager::FillHeartbeatResponse(const TSHeartbeatRequestPB* req,
TSHeartbeatResponsePB* resp) {
SysClusterConfigEntryPB cluster_config;
RETURN_NOT_OK(GetClusterConfig(&cluster_config));
SysClusterConfigEntryPB cluster_config = VERIFY_RESULT(GetClusterConfig());
RETURN_NOT_OK(FillHeartbeatResponseEncryption(cluster_config, req, resp));
RETURN_NOT_OK(snapshot_coordinator_.FillHeartbeatResponse(resp));
return FillHeartbeatResponseCDC(cluster_config, req, resp);
Expand Down
4 changes: 1 addition & 3 deletions src/yb/master/catalog_manager_if.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,7 @@ class CatalogManagerIf {
virtual Status GetYsqlDBCatalogVersion(
uint32_t db_oid, uint64_t* catalog_version, uint64_t* last_breaking_version) = 0;

virtual Status GetClusterConfig(GetMasterClusterConfigResponsePB* resp) = 0;
virtual Status GetClusterConfig(SysClusterConfigEntryPB* config) = 0;

virtual Result<SysClusterConfigEntryPB> GetClusterConfig() = 0;

virtual Status SetClusterConfig(
const ChangeMasterClusterConfigRequestPB* req, ChangeMasterClusterConfigResponsePB* resp) = 0;
Expand Down
3 changes: 1 addition & 2 deletions src/yb/master/cluster_balance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,7 @@ void ClusterLoadBalancer::RunLoadBalancerWithOptions(Options* options) {

void ClusterLoadBalancer::RunLoadBalancer(const LeaderEpoch& epoch) {
epoch_ = epoch;
SysClusterConfigEntryPB config;
CHECK_OK(catalog_manager_->GetClusterConfig(&config));
SysClusterConfigEntryPB config = CHECK_RESULT(catalog_manager_->GetClusterConfig());

std::unique_ptr<Options> options_unique_ptr =
std::make_unique<Options>();
Expand Down

0 comments on commit 347fc74

Please sign in to comment.