Skip to content

Commit

Permalink
[BACKPORT 2.16][#20428] CDCSDK: Fix addition of new tables to stream …
Browse files Browse the repository at this point in the history
…metadata after drop table

Summary:
Original commit: 201fda8 / D31577

Backport Description:
Had minor merge conflicts in test files

**Problem**:
After a table is dropped, state of cdcsdk stream changes from ACTIVE to DELETING_METADATA and remains in this state forever. For dynamic table addition, stream is required to be in the ACTIVE state. Due to this requirement, new tables created after drop table do not get added to stream metadata & cdc_state table

**Fix**:
In the dynamic table addition codepath, in addition to ACTIVE streams, also consider streams in DELETING_METADATA state.

Primary diff for Dynamic table addition : [[ https://phorge.dev.yugabyte.com/D19909 |https://phorge.dev.yugabyte.com/D19909 ]]

New test:
Added UTs to verify addition of newly created tables after drop table in normal functioning as well as master restart.
Added a test flag  `cdcsdk_skip_processing_dynamic_table_addition` for testing master restart case. This flag will skip the finding & processing of newly added tables by background thread. Refer the primary diff for dynamic table addition for more details on master restart.
Jira: DB-9428

Test Plan: Jenkins: test regex: .*CDCSDK*

Reviewers: asrinivasan, skumar, stiwary

Reviewed By: stiwary

Subscribers: ycdcxcluster, bogdan

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D31651
  • Loading branch information
siddharth2411 committed Jan 15, 2024
1 parent 22a6cb9 commit d2093e2
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 19 deletions.
183 changes: 183 additions & 0 deletions ent/src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ DECLARE_bool(ysql_enable_packed_row);
DECLARE_uint64(ysql_packed_row_size_limit);
DECLARE_string(vmodule);
DECLARE_bool(enable_tablet_split_of_cdcsdk_streamed_tables);
DECLARE_bool(TEST_cdcsdk_skip_processing_dynamic_table_addition);

namespace yb {

Expand Down Expand Up @@ -1654,6 +1655,27 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
return get_resp;
}

void VerifyTablesInStreamMetadata(
const std::string& stream_id, const std::unordered_set<std::string>& expected_table_ids,
const std::string& timeout_msg) {
ASSERT_OK(WaitFor(
[&]() -> Result<bool> {
auto get_resp = GetDBStreamInfo(stream_id);
if (get_resp.ok() && !get_resp->has_error()) {
const uint64_t table_info_size = get_resp->table_info_size();
if (table_info_size == expected_table_ids.size()) {
std::unordered_set<std::string> table_ids;
for (auto entry : get_resp->table_info()) {
table_ids.insert(entry.table_id());
}
if (expected_table_ids == table_ids) return true;
}
}
return false;
},
MonoDelta::FromSeconds(60), timeout_msg));
}

Status ChangeLeaderOfTablet(size_t new_leader_index, const TabletId tablet_id) {
CHECK(!FLAGS_enable_load_balancing);

Expand Down Expand Up @@ -4213,6 +4235,167 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeCDCStreamDelet
ASSERT_EQ(DeleteCDCStream(stream_id), false);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableAfterDropTable)) {
// Setup cluster.
ASSERT_OK(SetUpWithParams(3, 3, false));

const vector<string> table_list_suffix = {"_1", "_2", "_3", "_4"};
const int kNumTables = 4;
vector<YBTableName> table(kNumTables);
int idx = 0;
vector<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> tablets(kNumTables);

while (idx < 3) {
table[idx] = ASSERT_RESULT(CreateTable(
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true,
table_list_suffix[idx]));
ASSERT_OK(test_client()->GetTablets(
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));
ASSERT_OK(WriteEnumsRows(
0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName,
kTableName));
idx += 1;
}
auto stream_id = ASSERT_RESULT(CreateDBStream(EXPLICIT));
SleepFor(MonoDelta::FromSeconds(2));
DropTable(&test_cluster_, "test_table_1");

// Drop table will trigger the background thread to start the stream metadata cleanup, here
// wait for the metadata cleanup to finish by the background thread.
std::unordered_set<std::string> expected_table_ids_after_drop = {
table[1].table_id(), table[2].table_id()};
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids_after_drop, "Waiting for stream metadata cleanup.");

// create a new table and verify that it gets added to stream metadata.
table[idx] = ASSERT_RESULT(CreateTable(
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx]));
ASSERT_OK(test_client()->GetTablets(
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));

std::unordered_set<std::string> expected_table_ids_after_create_table =
expected_table_ids_after_drop;
expected_table_ids_after_create_table.insert(table[idx].table_id());
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids_after_create_table,
"Waiting for GetDBStreamInfo after table creation.");

// verify tablets of the new table are added to cdc_state table.
std::unordered_set<std::string> expected_tablet_ids;
for (idx = 1; idx < 4; idx++) {
expected_tablet_ids.insert(tablets[idx].Get(0).tablet_id());
}

std::unordered_set<TabletId> tablets_found;
client::TableHandle table_handle;
client::YBTableName cdc_state_table(
YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
ASSERT_OK(table_handle.Open(cdc_state_table, test_client()));

for (const auto& row : client::TableRange(table_handle)) {
const auto& row_stream_id = row.column(master::kCdcStreamIdIdx).string_value();
const auto& tablet_id = row.column(master::kCdcTabletIdIdx).string_value();
const auto& checkpoint = row.column(master::kCdcCheckpointIdx).string_value();
LOG(INFO) << "Read cdc_state table row for tablet_id: " << tablet_id
<< " and stream_id: " << stream_id << ", with checkpoint: " << checkpoint;
if (row_stream_id == stream_id) {
tablets_found.insert(tablet_id);
}
}
LOG(INFO) << "tablets found: " << AsString(tablets_found)
<< ", expected tablets: " << AsString(expected_tablet_ids);
ASSERT_EQ(expected_tablet_ids, tablets_found);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestAddTableAfterDropTableAndMasterRestart)) {
// Setup cluster.
ASSERT_OK(SetUpWithParams(1, 1, false));
const vector<string> table_list_suffix = {"_1", "_2", "_3", "_4"};
const int kNumTables = 4;
vector<YBTableName> table(kNumTables);
int idx = 0;
vector<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> tablets(kNumTables);

while (idx < 3) {
table[idx] = ASSERT_RESULT(CreateTable(
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true,
table_list_suffix[idx]));
ASSERT_OK(test_client()->GetTablets(
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));
ASSERT_OK(WriteEnumsRows(
0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName,
kTableName));
idx += 1;
}
auto stream_id = ASSERT_RESULT(CreateDBStream(EXPLICIT));
SleepFor(MonoDelta::FromSeconds(2));
DropTable(&test_cluster_, "test_table_1");

// Drop table will trigger the background thread to start the stream metadata cleanup, here
// wait for the metadata cleanup to finish by the background thread.
std::unordered_set<std::string> expected_table_ids_after_drop = {
table[1].table_id(), table[2].table_id()};
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids_after_drop, "Waiting for stream metadata cleanup.");

// After metadata cleanup, skip processing any newly created table by bg thread.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) = true;

// create a new table and verify that it does not get added to stream metadata.
table[idx] = ASSERT_RESULT(CreateTable(
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true, table_list_suffix[idx]));
ASSERT_OK(test_client()->GetTablets(
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));

SleepFor(MonoDelta::FromSeconds(2 * kTimeMultiplier));
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids_after_drop,
"Waiting for GetDBStreamInfo after table creation.");

// Restart leader master to repopulate namespace_to_cdcsdk_unprocessed_table_map_ in-memory map.
auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster());
ASSERT_OK(leader_master->Restart());
LOG(INFO) << "Master Restarted";
SleepFor(MonoDelta::FromSeconds(5));

// Enable processing of tables that are not part of cdc stream.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) = false;

// verify newly created table has been added to stream metadata.
std::unordered_set<std::string> expected_table_ids_after_create_table =
expected_table_ids_after_drop;
expected_table_ids_after_create_table.insert(table[idx].table_id());
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids_after_create_table,
"Waiting for GetDBStreamInfo after master restart.");

// verify tablets of the new table are added to cdc_state table.
std::unordered_set<std::string> expected_tablet_ids;
for (idx = 1; idx < 4; idx++) {
expected_tablet_ids.insert(tablets[idx].Get(0).tablet_id());
}

std::unordered_set<TabletId> tablets_found;
client::TableHandle table_handle;
client::YBTableName cdc_state_table(
YQL_DATABASE_CQL, master::kSystemNamespaceName, master::kCdcStateTableName);
ASSERT_OK(table_handle.Open(cdc_state_table, test_client()));

for (const auto& row : client::TableRange(table_handle)) {
const auto& row_stream_id = row.column(master::kCdcStreamIdIdx).string_value();
const auto& tablet_id = row.column(master::kCdcTabletIdIdx).string_value();
const auto& checkpoint = row.column(master::kCdcCheckpointIdx).string_value();
LOG(INFO) << "Read cdc_state table row for tablet_id: " << tablet_id
<< " and stream_id: " << stream_id << ", with checkpoint: " << checkpoint;
if (row_stream_id == stream_id) {
tablets_found.insert(tablet_id);
}
}
LOG(INFO) << "tablets found: " << AsString(tablets_found)
<< ", expected tablets: " << AsString(expected_tablet_ids);
ASSERT_EQ(expected_tablet_ids, tablets_found);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestDropTableBeforeXClusterStreamDelete)) {
// Setup cluster.
ASSERT_OK(SetUpWithParams(1, 1, false));
Expand Down
6 changes: 4 additions & 2 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> {

// For CDCSDK Streams, we scan all the tables in the namespace, and compare it with all the
// tables associated with the stream.
if (l->pb.state() == SysCDCStreamEntryPB::ACTIVE &&
if ((l->pb.state() == SysCDCStreamEntryPB::ACTIVE ||
l->pb.state() == SysCDCStreamEntryPB::DELETING_METADATA) &&
ns->state() == SysNamespaceEntryPB::RUNNING) {
catalog_manager_->FindAllTablesMissingInCDCSDKStream(stream, &l);
}
Expand Down Expand Up @@ -3937,7 +3938,8 @@ Status CatalogManager::FindCDCSDKStreamsForAddedTables(
}

auto ltm = stream_info->LockForRead();
if (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE) {
if (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE ||
ltm->pb.state() == SysCDCStreamEntryPB::DELETING_METADATA) {
const auto cdcsdk_unprocessed_tables = stream_info->cdcsdk_unprocessed_tables;
for (const auto& table_id : cdcsdk_unprocessed_tables) {
auto table = FindPtrOrNull(*table_ids_map_, table_id);
Expand Down
42 changes: 25 additions & 17 deletions src/yb/master/catalog_manager_bg_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ DEFINE_test_flag(bool, pause_catalog_manager_bg_loop_start, false,
DEFINE_test_flag(bool, pause_catalog_manager_bg_loop_end, false,
"Pause the bg tasks thread at the end of the loop.");

DEFINE_test_flag(bool, cdcsdk_skip_processing_dynamic_table_addition, false,
"Skip finding unprocessed tables for cdcsdk streams");

DECLARE_bool(enable_ysql);

namespace yb {
Expand Down Expand Up @@ -232,23 +235,28 @@ void CatalogManagerBgTasks::Run() {
}

{
// Find if there have been any new tables added to any namespace with an active cdcsdk
// stream.
TableStreamIdsMap table_unprocessed_streams_map;
// In case of master leader restart of leadership changes, we will scan all streams for
// unprocessed tables, but from the second iteration onwards we will only consider the
// 'cdcsdk_unprocessed_tables' field of CDCStreamInfo object stored in the cdc_state_map.
Status s =
catalog_manager_->FindCDCSDKStreamsForAddedTables(&table_unprocessed_streams_map);

if (s.ok() && !table_unprocessed_streams_map.empty()) {
s = catalog_manager_->AddTabletEntriesToCDCSDKStreamsForNewTables(
table_unprocessed_streams_map);
}
if (!s.ok()) {
YB_LOG_EVERY_N(WARNING, 10)
<< "Encountered failure while trying to add unprocessed tables to cdc_state table: "
<< s.ToString();
if (!FLAGS_TEST_cdcsdk_skip_processing_dynamic_table_addition) {
// Find if there have been any new tables added to any namespace with an active cdcsdk
// stream.
TableStreamIdsMap table_unprocessed_streams_map;
// In case of master leader restart of leadership changes, we will scan all streams for
// unprocessed tables, but from the second iteration onwards we will only consider the
// 'cdcsdk_unprocessed_tables' field of CDCStreamInfo object stored in the cdc_state_map.
Status s =
catalog_manager_->FindCDCSDKStreamsForAddedTables(&table_unprocessed_streams_map);

if (s.ok() && !table_unprocessed_streams_map.empty()) {
s = catalog_manager_->AddTabletEntriesToCDCSDKStreamsForNewTables(
table_unprocessed_streams_map);
}
if (!s.ok()) {
YB_LOG_EVERY_N(WARNING, 10)
<< "Encountered failure while trying to add unprocessed tables to cdc_state table: "
<< s.ToString();
}
} else {
LOG(INFO) << "Skipping processing of dynamic table addition due to "
"cdcsdk_skip_processing_dynamic_table_addition being true";
}
}

Expand Down

0 comments on commit d2093e2

Please sign in to comment.