Skip to content

Commit

Permalink
[#21540] xCluster: Add Repair APIs for DB Scoped replication
Browse files Browse the repository at this point in the history
Summary:
Adding yb-admin commands:
`repair_xcluster_outbound_replication_add_table <replication_group_id> <table_id> <stream_id>`
`repair_xcluster_outbound_replication_remove_table <replication_group_id> <table_id>`

These will alow us to manually add or remove an individual table from the source side Outbound Replication Group.
`repair_xcluster_outbound_replication_add_table` requires a stream_id which can be created using `bootstrap_cdc_producer`.
`repair_xcluster_outbound_replication_remove_table` will not delete the xcluster stream. It will have to be manually deleted with `delete_cdc_stream`.
NOTE: This is only meant for manual use by DevOps in extreme situations.

**Upgrade/Rollback safety:**
New proto messages and APIs are guarded under `enable_xcluster_api_v2`

Fixes #21540
Jira: DB-10425

Test Plan:
XClusterOutboundReplicationGroupTest.Repair
XClusterOutboundReplicationGroupTest.RepairWithYbAdmin
XClusterDBScopedTest.DropTableOnProducerThenConsumer
XClusterDBScopedTest.DropAllTables
XClusterDBScopedTest.DisableAutoTableProcessing

Reviewers: jhe, slingam, xCluster

Reviewed By: jhe

Subscribers: xCluster, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D34239
  • Loading branch information
hari90 committed Apr 25, 2024
1 parent e0eea85 commit 0ac8a65
Show file tree
Hide file tree
Showing 19 changed files with 524 additions and 20 deletions.
2 changes: 2 additions & 0 deletions src/yb/client/client-internal.cc
Expand Up @@ -322,6 +322,8 @@ YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, CreateXClusterReplication);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, IsCreateXClusterReplicationDone);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, XClusterCreateOutboundReplicationGroup);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, XClusterDeleteOutboundReplicationGroup);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, RepairOutboundXClusterReplicationGroupAddTable);
YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, RepairOutboundXClusterReplicationGroupRemoveTable);

#define YB_CLIENT_SPECIALIZE_SIMPLE_EX_EACH(i, data, set) YB_CLIENT_SPECIALIZE_SIMPLE_EX set

Expand Down
37 changes: 37 additions & 0 deletions src/yb/client/xcluster_client.cc
Expand Up @@ -148,6 +148,43 @@ Status XClusterClient::GetXClusterStreams(
std::move(callback));
}

Status XClusterClient::RepairOutboundXClusterReplicationGroupAddTable(
const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id,
const xrepl::StreamId& stream_id) {
SCHECK(!replication_group_id.empty(), InvalidArgument, "Replication group id is empty");
SCHECK(!table_id.empty(), InvalidArgument, "Table id is empty");
SCHECK(!stream_id.IsNil(), InvalidArgument, "Stream id is empty");

master::RepairOutboundXClusterReplicationGroupAddTableRequestPB req;
req.set_replication_group_id(replication_group_id.ToString());
req.set_table_id(table_id);
req.set_stream_id(stream_id.ToString());

auto resp = CALL_SYNC_LEADER_MASTER_RPC(RepairOutboundXClusterReplicationGroupAddTable, req);

if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
return Status::OK();
}

Status XClusterClient::RepairOutboundXClusterReplicationGroupRemoveTable(
const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id) {
SCHECK(!replication_group_id.empty(), InvalidArgument, "Replication group id is empty");
SCHECK(!table_id.empty(), InvalidArgument, "Table id is empty");

master::RepairOutboundXClusterReplicationGroupRemoveTableRequestPB req;
req.set_replication_group_id(replication_group_id.ToString());
req.set_table_id(table_id);

auto resp = CALL_SYNC_LEADER_MASTER_RPC(RepairOutboundXClusterReplicationGroupRemoveTable, req);

if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
return Status::OK();
}

XClusterRemoteClient::XClusterRemoteClient(const std::string& certs_for_cdc_dir, MonoDelta timeout)
: certs_for_cdc_dir_(certs_for_cdc_dir), timeout_(timeout) {}

Expand Down
7 changes: 7 additions & 0 deletions src/yb/client/xcluster_client.h
Expand Up @@ -80,6 +80,13 @@ class XClusterClient {
const NamespaceId& namespace_id, const std::vector<TableName>& table_names,
const std::vector<PgSchemaName>& pg_schema_names, GetXClusterStreamsCallback callback);

Status RepairOutboundXClusterReplicationGroupAddTable(
const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id,
const xrepl::StreamId& stream_id);

Status RepairOutboundXClusterReplicationGroupRemoveTable(
const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id);

private:
template <typename ResponsePB, typename RequestPB, typename Method>
Result<ResponsePB> SyncLeaderMasterRpc(
Expand Down
131 changes: 119 additions & 12 deletions src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc
Expand Up @@ -20,6 +20,7 @@
DECLARE_bool(enable_xcluster_api_v2);
DECLARE_int32(cdc_parent_tablet_deletion_task_retry_secs);
DECLARE_string(certs_for_cdc_dir);
DECLARE_bool(disable_xcluster_db_scoped_new_table_processing);

using namespace std::chrono_literals;

Expand All @@ -36,6 +37,24 @@ class XClusterDBScopedTest : public XClusterYsqlTestBase {
XClusterYsqlTestBase::SetUp();
ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_xcluster_api_v2) = true;
}

Result<master::GetXClusterStreamsResponsePB> GetXClusterStreams(
const NamespaceId& namespace_id, const std::vector<TableName>& table_names,
const std::vector<PgSchemaName>& pg_schema_names) {
std::promise<Result<master::GetXClusterStreamsResponsePB>> promise;
client::XClusterClient remote_client(*producer_client());
auto outbound_table_info = remote_client.GetXClusterStreams(
CoarseMonoClock::Now() + kTimeout, kReplicationGroupId, namespace_id, table_names,
pg_schema_names, [&promise](Result<master::GetXClusterStreamsResponsePB> result) {
promise.set_value(std::move(result));
});
return promise.get_future().get();
}

Result<master::GetXClusterStreamsResponsePB> GetAllXClusterStreams(
const NamespaceId& namespace_id) {
return GetXClusterStreams(namespace_id, /*table_names=*/{}, /*pg_schema_names=*/{});
}
};

TEST_F(XClusterDBScopedTest, TestCreateWithCheckpoint) {
Expand Down Expand Up @@ -135,22 +154,58 @@ TEST_F(XClusterDBScopedTest, DropTableOnProducerThenConsumer) {
// Perform the drop on consumer cluster. This will also delete the replication stream.
ASSERT_OK(DropYsqlTable(consumer_cluster_, *consumer_table_));

ASSERT_OK(WaitFor(
[&]() -> Result<bool> { return IsTableDeleted(&producer_cluster_, producer_table_->name()); },
kTimeout, "Wait for table to move from hidden to deleted."));
ASSERT_OK(WaitForTableToFullyDelete(producer_cluster_, producer_table_->name(), kTimeout));

auto namespace_id = ASSERT_RESULT(GetNamespaceId(producer_client()));
std::promise<Result<master::GetXClusterStreamsResponsePB>> promise;
client::XClusterClient remote_client(*producer_client());
auto outbound_table_info = remote_client.GetXClusterStreams(
CoarseMonoClock::Now() + kTimeout, kReplicationGroupId, namespace_id,
{producer_table_->name().table_name()}, {producer_table_->name().pgschema_name()},
[&promise](Result<master::GetXClusterStreamsResponsePB> result) {
promise.set_value(std::move(result));
});
auto result = promise.get_future().get();

auto result = GetXClusterStreams(
namespace_id, {producer_table_->name().table_name()},
{producer_table_->name().pgschema_name()});
ASSERT_NOK(result) << result->DebugString();
ASSERT_STR_CONTAINS(result.status().ToString(), "test_table_0 not found in namespace");

auto get_streams_resp = ASSERT_RESULT(GetAllXClusterStreams(namespace_id));
ASSERT_EQ(get_streams_resp.table_infos_size(), 1);
ASSERT_EQ(get_streams_resp.table_infos(0).table_id(), producer_tables_[1]->id());
}

// Test dropping all tables and then creating new tables.
TEST_F(XClusterDBScopedTest, DropAllTables) {
// Drop bg task timer to speed up test.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_parent_tablet_deletion_task_retry_secs) = 1;
// Setup replication with one table
ASSERT_OK(SetUpClusters());

ASSERT_OK(CheckpointReplicationGroup());
ASSERT_OK(CreateReplicationFromCheckpoint());

// Drop the table.
ASSERT_OK(DropYsqlTable(producer_cluster_, *producer_table_));
ASSERT_OK(DropYsqlTable(consumer_cluster_, *consumer_table_));

ASSERT_OK(WaitForTableToFullyDelete(producer_cluster_, producer_table_->name(), kTimeout));

auto namespace_id = ASSERT_RESULT(GetNamespaceId(producer_client()));
auto outbound_streams = ASSERT_RESULT(GetAllXClusterStreams(namespace_id));
ASSERT_EQ(outbound_streams.table_infos_size(), 0);

auto resp = ASSERT_RESULT(GetUniverseReplicationInfo(consumer_cluster_, kReplicationGroupId));
ASSERT_EQ(resp.entry().tables_size(), 0);

// Add a new table.
auto producer_table2_name = ASSERT_RESULT(CreateYsqlTable(
/*idx=*/2, /*num_tablets=*/3, &producer_cluster_));
std::shared_ptr<client::YBTable> producer_table2;
ASSERT_OK(producer_client()->OpenTable(producer_table2_name, &producer_table2));

ASSERT_OK(InsertRowsInProducer(0, 50, producer_table2));

auto consumer_table2_name = ASSERT_RESULT(CreateYsqlTable(
/*idx=*/2, /*num_tablets=*/3, &consumer_cluster_));
std::shared_ptr<client::YBTable> consumer_table2;
ASSERT_OK(producer_client()->OpenTable(consumer_table2_name, &consumer_table2));

ASSERT_OK(VerifyWrittenRecords(producer_table2, consumer_table2));
}

TEST_F(XClusterDBScopedTest, ColocatedDB) {
Expand Down Expand Up @@ -250,4 +305,56 @@ TEST_F(XClusterDBScopedTest, ColocatedDB) {
ASSERT_EQ(resp.entry().tables_size(), 2);
}

// When disable_xcluster_db_scoped_new_table_processing is set make sure we do not checkpoint new
// tables or add them to replication.
TEST_F(XClusterDBScopedTest, DisableAutoTableProcessing) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_disable_xcluster_db_scoped_new_table_processing) = true;

ASSERT_OK(SetUpClusters());
ASSERT_OK(CheckpointReplicationGroup());
ASSERT_OK(CreateReplicationFromCheckpoint());

// Creating a new table on target first should succeed.
auto consumer_table2_name = ASSERT_RESULT(CreateYsqlTable(
/*idx=*/1, /*num_tablets=*/3, &consumer_cluster_));
std::shared_ptr<client::YBTable> consumer_table2;
ASSERT_OK(consumer_client()->OpenTable(consumer_table2_name, &consumer_table2));

// Verify that universe was setup on consumer.
master::GetUniverseReplicationResponsePB resp;
ASSERT_OK(VerifyUniverseReplication(&resp));
ASSERT_EQ(resp.entry().replication_group_id(), kReplicationGroupId);
ASSERT_EQ(resp.entry().tables_size(), 1);
ASSERT_EQ(resp.entry().tables(0), producer_table_->id());

auto producer_table2_name = ASSERT_RESULT(CreateYsqlTable(
/*idx=*/1, /*num_tablets=*/3, &producer_cluster_));
std::shared_ptr<client::YBTable> producer_table2;
ASSERT_OK(producer_client()->OpenTable(producer_table2_name, &producer_table2));

auto namespace_id = ASSERT_RESULT(GetNamespaceId(producer_client()));
auto get_streams_resp = ASSERT_RESULT(GetAllXClusterStreams(namespace_id));
ASSERT_EQ(get_streams_resp.table_infos_size(), 1);
ASSERT_EQ(get_streams_resp.table_infos(0).table_id(), producer_table_->id());

ASSERT_OK(InsertRowsInProducer(0, 100, producer_table2));
ASSERT_NOK(VerifyWrittenRecords(producer_table2, consumer_table2));

// Reenable the flag and make sure new table is added to replication.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_disable_xcluster_db_scoped_new_table_processing) = false;

auto producer_table3_name = ASSERT_RESULT(CreateYsqlTable(
/*idx=*/2, /*num_tablets=*/3, &producer_cluster_));
std::shared_ptr<client::YBTable> producer_table3;
ASSERT_OK(producer_client()->OpenTable(producer_table3_name, &producer_table3));
ASSERT_OK(InsertRowsInProducer(0, 100, producer_table3));

auto consumer_table3_name = ASSERT_RESULT(CreateYsqlTable(
/*idx=*/2, /*num_tablets=*/3, &consumer_cluster_));
std::shared_ptr<client::YBTable> consumer_table3;
ASSERT_OK(consumer_client()->OpenTable(consumer_table3_name, &consumer_table3));

ASSERT_OK(VerifyWrittenRecords(producer_table3, consumer_table3));
}

} // namespace yb
Expand Up @@ -404,5 +404,119 @@ TEST_F(XClusterOutboundReplicationGroupTest, MasterRestartDuringCheckpoint) {
// ASSERT_OK(VerifyWalRetentionOfTable(table_id_2));
}

TEST_F(XClusterOutboundReplicationGroupTest, Repair) {
auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1));
auto table_id_2 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName2));

ASSERT_OK(XClusterClient().XClusterCreateOutboundReplicationGroup(
kReplicationGroupId, {kNamespaceName}));

auto resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_EQ(resp.table_infos_size(), 2);

ASSERT_NOK_STR_CONTAINS(
XClusterClient().RepairOutboundXClusterReplicationGroupRemoveTable(
xcluster::ReplicationGroupId("BadId"), table_id_1),
"xClusterOutboundReplicationGroup BadId not found");

ASSERT_NOK_STR_CONTAINS(
XClusterClient().RepairOutboundXClusterReplicationGroupRemoveTable(
kReplicationGroupId, "BadId"),
"Table BadId not found in xClusterOutboundReplicationGroup");

ASSERT_OK(XClusterClient().RepairOutboundXClusterReplicationGroupRemoveTable(
kReplicationGroupId, table_id_1));

ASSERT_NOK_STR_CONTAINS(
XClusterClient().RepairOutboundXClusterReplicationGroupRemoveTable(
kReplicationGroupId, table_id_1),
"not found in xClusterOutboundReplicationGroup");

resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_EQ(resp.table_infos_size(), 1);
ASSERT_EQ(resp.table_infos(0).table_id(), table_id_2);
const auto table2_stream_id =
ASSERT_RESULT(xrepl::StreamId::FromString(resp.table_infos(0).xrepl_stream_id()));

ASSERT_NOK_STR_CONTAINS(
GetXClusterStreams(kReplicationGroupId, namespace_id_, {kTableName1}, {kPgSchemaName}),
"not found in xClusterOutboundReplicationGroup");

const auto new_stream_ids =
ASSERT_RESULT(BootstrapProducer(producer_cluster(), client_, {table_id_1}));
ASSERT_EQ(new_stream_ids.size(), 1);
const auto& new_stream_id = new_stream_ids.front();

ASSERT_NOK_STR_CONTAINS(
XClusterClient().RepairOutboundXClusterReplicationGroupAddTable(
xcluster::ReplicationGroupId("BadId"), table_id_1, new_stream_id),
"xClusterOutboundReplicationGroup BadId not found");

ASSERT_NOK_STR_CONTAINS(
XClusterClient().RepairOutboundXClusterReplicationGroupAddTable(
kReplicationGroupId, "BadId", new_stream_id),
"Table with identifier BadId not found");

ASSERT_NOK_STR_CONTAINS(
XClusterClient().RepairOutboundXClusterReplicationGroupAddTable(
kReplicationGroupId, table_id_1, xrepl::StreamId::GenerateRandom()),
"not found");

ASSERT_NOK_STR_CONTAINS(
XClusterClient().RepairOutboundXClusterReplicationGroupAddTable(
kReplicationGroupId, table_id_1, table2_stream_id),
"belongs to a different table");

ASSERT_OK(XClusterClient().RepairOutboundXClusterReplicationGroupAddTable(
kReplicationGroupId, table_id_1, new_stream_id));

ASSERT_NOK_STR_CONTAINS(
XClusterClient().RepairOutboundXClusterReplicationGroupAddTable(
kReplicationGroupId, table_id_1, new_stream_id),
"already exists in");

resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_EQ(resp.table_infos_size(), 2);
ASSERT_EQ(resp.table_infos(0).table_id(), table_id_2);
for (const auto& table_info : resp.table_infos()) {
auto stream_id_str = new_stream_id.ToString();
if (table_info.table_id() == table_id_2) {
stream_id_str = table2_stream_id.ToString();
}
ASSERT_EQ(table_info.xrepl_stream_id(), stream_id_str);
}
}

TEST_F(XClusterOutboundReplicationGroupTest, RepairWithYbAdmin) {
auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1));
auto table_id_2 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName2));

ASSERT_OK(XClusterClient().XClusterCreateOutboundReplicationGroup(
kReplicationGroupId, {kNamespaceName}));

auto resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_EQ(resp.table_infos_size(), 2);

ASSERT_OK(CallAdmin(
producer_cluster(), "repair_xcluster_outbound_replication_remove_table", kReplicationGroupId,
table_id_1));

resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_EQ(resp.table_infos_size(), 1);
ASSERT_EQ(resp.table_infos(0).table_id(), table_id_2);

const auto new_stream_ids =
ASSERT_RESULT(BootstrapProducer(producer_cluster(), client_, {table_id_1}));
ASSERT_EQ(new_stream_ids.size(), 1);
const auto& new_stream_id = new_stream_ids.front();

ASSERT_OK(CallAdmin(
producer_cluster(), "repair_xcluster_outbound_replication_add_table", kReplicationGroupId,
table_id_1, new_stream_id.ToString()));

resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_));
ASSERT_EQ(resp.table_infos_size(), 2);
}

} // namespace master
} // namespace yb
13 changes: 10 additions & 3 deletions src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc
Expand Up @@ -383,7 +383,7 @@ Result<YBTableName> XClusterYsqlTestBase::GetYsqlTable(
strings::Substitute("Unable to find table $0 in namespace $1", table_name, namespace_name));
}

Result<bool> XClusterYsqlTestBase::IsTableDeleted(Cluster* cluster, const YBTableName& table_name) {
Result<bool> XClusterYsqlTestBase::IsTableDeleted(Cluster& cluster, const YBTableName& table_name) {
master::ListTablesRequestPB req;
master::ListTablesResponsePB resp;

Expand All @@ -393,8 +393,8 @@ Result<bool> XClusterYsqlTestBase::IsTableDeleted(Cluster* cluster, const YBTabl
req.set_include_not_running(true);

master::MasterDdlProxy master_proxy(
&cluster->client_->proxy_cache(),
VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr());
&cluster.client_->proxy_cache(),
VERIFY_RESULT(cluster.mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr());

rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
Expand All @@ -413,6 +413,13 @@ Result<bool> XClusterYsqlTestBase::IsTableDeleted(Cluster* cluster, const YBTabl
return true;
}

Status XClusterYsqlTestBase::WaitForTableToFullyDelete(
Cluster& cluster, const client::YBTableName& table_name, MonoDelta timeout) {
return LoggedWaitFor(
[&]() -> Result<bool> { return IsTableDeleted(cluster, producer_table_->name()); }, timeout,
"Wait for table to transition to deleted.");
}

Status XClusterYsqlTestBase::DropYsqlTable(
Cluster* cluster, const std::string& namespace_name, const std::string& schema_name,
const std::string& table_name, bool is_index) {
Expand Down
5 changes: 4 additions & 1 deletion src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h
Expand Up @@ -76,7 +76,10 @@ class XClusterYsqlTestBase : public XClusterTestBase {
bool verify_schema_name = false,
bool exclude_system_tables = true);

Result<bool> IsTableDeleted(Cluster* cluster, const client::YBTableName& table_name);
Result<bool> IsTableDeleted(Cluster& cluster, const client::YBTableName& table_name);

Status WaitForTableToFullyDelete(
Cluster& cluster, const client::YBTableName& table_name, MonoDelta timeout);

Status DropYsqlTable(
Cluster* cluster, const std::string& namespace_name, const std::string& schema_name,
Expand Down

0 comments on commit 0ac8a65

Please sign in to comment.