From 0ac8a65932ce0bbd5f510d592b143d4ce8e33785 Mon Sep 17 00:00:00 2001 From: Hari Krishna Sunder Date: Thu, 25 Apr 2024 10:53:49 -0700 Subject: [PATCH] [#21540] xCluster: Add Repair APIs for DB Scoped replication Summary: Adding yb-admin commands: `repair_xcluster_outbound_replication_add_table ` `repair_xcluster_outbound_replication_remove_table ` These will alow us to manually add or remove an individual table from the source side Outbound Replication Group. `repair_xcluster_outbound_replication_add_table` requires a stream_id which can be created using `bootstrap_cdc_producer`. `repair_xcluster_outbound_replication_remove_table` will not delete the xcluster stream. It will have to be manually deleted with `delete_cdc_stream`. NOTE: This is only meant for manual use by DevOps in extreme situations. **Upgrade/Rollback safety:** New proto messages and APIs are guarded under `enable_xcluster_api_v2` Fixes #21540 Jira: DB-10425 Test Plan: XClusterOutboundReplicationGroupTest.Repair XClusterOutboundReplicationGroupTest.RepairWithYbAdmin XClusterDBScopedTest.DropTableOnProducerThenConsumer XClusterDBScopedTest.DropAllTables XClusterDBScopedTest.DisableAutoTableProcessing Reviewers: jhe, slingam, xCluster Reviewed By: jhe Subscribers: xCluster, ybase Differential Revision: https://phorge.dev.yugabyte.com/D34239 --- src/yb/client/client-internal.cc | 2 + src/yb/client/xcluster_client.cc | 37 +++++ src/yb/client/xcluster_client.h | 7 + .../xcluster/xcluster_db_scoped-test.cc | 131 ++++++++++++++++-- ...luster_outbound_replication_group-itest.cc | 114 +++++++++++++++ .../xcluster/xcluster_ysql_test_base.cc | 13 +- .../xcluster/xcluster_ysql_test_base.h | 5 +- src/yb/master/master_replication.proto | 25 ++++ src/yb/master/master_replication_service.cc | 2 + .../add_table_to_xcluster_target_task.cc | 8 ++ src/yb/master/xcluster/xcluster_manager.cc | 31 +++++ src/yb/master/xcluster/xcluster_manager.h | 8 ++ .../xcluster_outbound_replication_group.cc | 54 +++++++- .../xcluster_outbound_replication_group.h | 6 + .../xcluster/xcluster_source_manager.cc | 29 ++++ .../master/xcluster/xcluster_source_manager.h | 8 ++ src/yb/tools/yb-admin_cli.cc | 44 ++++++ src/yb/tools/yb-admin_client.cc | 13 ++ src/yb/tools/yb-admin_client.h | 7 + 19 files changed, 524 insertions(+), 20 deletions(-) diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index 0b3a64752dd..8612e7cdade 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -322,6 +322,8 @@ YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, CreateXClusterReplication); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, IsCreateXClusterReplicationDone); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, XClusterCreateOutboundReplicationGroup); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, XClusterDeleteOutboundReplicationGroup); +YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, RepairOutboundXClusterReplicationGroupAddTable); +YB_CLIENT_SPECIALIZE_SIMPLE_EX(Replication, RepairOutboundXClusterReplicationGroupRemoveTable); #define YB_CLIENT_SPECIALIZE_SIMPLE_EX_EACH(i, data, set) YB_CLIENT_SPECIALIZE_SIMPLE_EX set diff --git a/src/yb/client/xcluster_client.cc b/src/yb/client/xcluster_client.cc index fd0b3092116..4f0a08e126c 100644 --- a/src/yb/client/xcluster_client.cc +++ b/src/yb/client/xcluster_client.cc @@ -148,6 +148,43 @@ Status XClusterClient::GetXClusterStreams( std::move(callback)); } +Status XClusterClient::RepairOutboundXClusterReplicationGroupAddTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id, + const xrepl::StreamId& stream_id) { + SCHECK(!replication_group_id.empty(), InvalidArgument, "Replication group id is empty"); + SCHECK(!table_id.empty(), InvalidArgument, "Table id is empty"); + SCHECK(!stream_id.IsNil(), InvalidArgument, "Stream id is empty"); + + master::RepairOutboundXClusterReplicationGroupAddTableRequestPB req; + req.set_replication_group_id(replication_group_id.ToString()); + req.set_table_id(table_id); + req.set_stream_id(stream_id.ToString()); + + auto resp = CALL_SYNC_LEADER_MASTER_RPC(RepairOutboundXClusterReplicationGroupAddTable, req); + + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + return Status::OK(); +} + +Status XClusterClient::RepairOutboundXClusterReplicationGroupRemoveTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id) { + SCHECK(!replication_group_id.empty(), InvalidArgument, "Replication group id is empty"); + SCHECK(!table_id.empty(), InvalidArgument, "Table id is empty"); + + master::RepairOutboundXClusterReplicationGroupRemoveTableRequestPB req; + req.set_replication_group_id(replication_group_id.ToString()); + req.set_table_id(table_id); + + auto resp = CALL_SYNC_LEADER_MASTER_RPC(RepairOutboundXClusterReplicationGroupRemoveTable, req); + + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + return Status::OK(); +} + XClusterRemoteClient::XClusterRemoteClient(const std::string& certs_for_cdc_dir, MonoDelta timeout) : certs_for_cdc_dir_(certs_for_cdc_dir), timeout_(timeout) {} diff --git a/src/yb/client/xcluster_client.h b/src/yb/client/xcluster_client.h index d0b4641814c..f11d1c6c84c 100644 --- a/src/yb/client/xcluster_client.h +++ b/src/yb/client/xcluster_client.h @@ -80,6 +80,13 @@ class XClusterClient { const NamespaceId& namespace_id, const std::vector& table_names, const std::vector& pg_schema_names, GetXClusterStreamsCallback callback); + Status RepairOutboundXClusterReplicationGroupAddTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id, + const xrepl::StreamId& stream_id); + + Status RepairOutboundXClusterReplicationGroupRemoveTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id); + private: template Result SyncLeaderMasterRpc( diff --git a/src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc b/src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc index ac4415d9b0d..0974ca27569 100644 --- a/src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster_db_scoped-test.cc @@ -20,6 +20,7 @@ DECLARE_bool(enable_xcluster_api_v2); DECLARE_int32(cdc_parent_tablet_deletion_task_retry_secs); DECLARE_string(certs_for_cdc_dir); +DECLARE_bool(disable_xcluster_db_scoped_new_table_processing); using namespace std::chrono_literals; @@ -36,6 +37,24 @@ class XClusterDBScopedTest : public XClusterYsqlTestBase { XClusterYsqlTestBase::SetUp(); ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_xcluster_api_v2) = true; } + + Result GetXClusterStreams( + const NamespaceId& namespace_id, const std::vector& table_names, + const std::vector& pg_schema_names) { + std::promise> promise; + client::XClusterClient remote_client(*producer_client()); + auto outbound_table_info = remote_client.GetXClusterStreams( + CoarseMonoClock::Now() + kTimeout, kReplicationGroupId, namespace_id, table_names, + pg_schema_names, [&promise](Result result) { + promise.set_value(std::move(result)); + }); + return promise.get_future().get(); + } + + Result GetAllXClusterStreams( + const NamespaceId& namespace_id) { + return GetXClusterStreams(namespace_id, /*table_names=*/{}, /*pg_schema_names=*/{}); + } }; TEST_F(XClusterDBScopedTest, TestCreateWithCheckpoint) { @@ -135,22 +154,58 @@ TEST_F(XClusterDBScopedTest, DropTableOnProducerThenConsumer) { // Perform the drop on consumer cluster. This will also delete the replication stream. ASSERT_OK(DropYsqlTable(consumer_cluster_, *consumer_table_)); - ASSERT_OK(WaitFor( - [&]() -> Result { return IsTableDeleted(&producer_cluster_, producer_table_->name()); }, - kTimeout, "Wait for table to move from hidden to deleted.")); + ASSERT_OK(WaitForTableToFullyDelete(producer_cluster_, producer_table_->name(), kTimeout)); auto namespace_id = ASSERT_RESULT(GetNamespaceId(producer_client())); - std::promise> promise; - client::XClusterClient remote_client(*producer_client()); - auto outbound_table_info = remote_client.GetXClusterStreams( - CoarseMonoClock::Now() + kTimeout, kReplicationGroupId, namespace_id, - {producer_table_->name().table_name()}, {producer_table_->name().pgschema_name()}, - [&promise](Result result) { - promise.set_value(std::move(result)); - }); - auto result = promise.get_future().get(); + + auto result = GetXClusterStreams( + namespace_id, {producer_table_->name().table_name()}, + {producer_table_->name().pgschema_name()}); ASSERT_NOK(result) << result->DebugString(); ASSERT_STR_CONTAINS(result.status().ToString(), "test_table_0 not found in namespace"); + + auto get_streams_resp = ASSERT_RESULT(GetAllXClusterStreams(namespace_id)); + ASSERT_EQ(get_streams_resp.table_infos_size(), 1); + ASSERT_EQ(get_streams_resp.table_infos(0).table_id(), producer_tables_[1]->id()); +} + +// Test dropping all tables and then creating new tables. +TEST_F(XClusterDBScopedTest, DropAllTables) { + // Drop bg task timer to speed up test. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_parent_tablet_deletion_task_retry_secs) = 1; + // Setup replication with one table + ASSERT_OK(SetUpClusters()); + + ASSERT_OK(CheckpointReplicationGroup()); + ASSERT_OK(CreateReplicationFromCheckpoint()); + + // Drop the table. + ASSERT_OK(DropYsqlTable(producer_cluster_, *producer_table_)); + ASSERT_OK(DropYsqlTable(consumer_cluster_, *consumer_table_)); + + ASSERT_OK(WaitForTableToFullyDelete(producer_cluster_, producer_table_->name(), kTimeout)); + + auto namespace_id = ASSERT_RESULT(GetNamespaceId(producer_client())); + auto outbound_streams = ASSERT_RESULT(GetAllXClusterStreams(namespace_id)); + ASSERT_EQ(outbound_streams.table_infos_size(), 0); + + auto resp = ASSERT_RESULT(GetUniverseReplicationInfo(consumer_cluster_, kReplicationGroupId)); + ASSERT_EQ(resp.entry().tables_size(), 0); + + // Add a new table. + auto producer_table2_name = ASSERT_RESULT(CreateYsqlTable( + /*idx=*/2, /*num_tablets=*/3, &producer_cluster_)); + std::shared_ptr producer_table2; + ASSERT_OK(producer_client()->OpenTable(producer_table2_name, &producer_table2)); + + ASSERT_OK(InsertRowsInProducer(0, 50, producer_table2)); + + auto consumer_table2_name = ASSERT_RESULT(CreateYsqlTable( + /*idx=*/2, /*num_tablets=*/3, &consumer_cluster_)); + std::shared_ptr consumer_table2; + ASSERT_OK(producer_client()->OpenTable(consumer_table2_name, &consumer_table2)); + + ASSERT_OK(VerifyWrittenRecords(producer_table2, consumer_table2)); } TEST_F(XClusterDBScopedTest, ColocatedDB) { @@ -250,4 +305,56 @@ TEST_F(XClusterDBScopedTest, ColocatedDB) { ASSERT_EQ(resp.entry().tables_size(), 2); } +// When disable_xcluster_db_scoped_new_table_processing is set make sure we do not checkpoint new +// tables or add them to replication. +TEST_F(XClusterDBScopedTest, DisableAutoTableProcessing) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_disable_xcluster_db_scoped_new_table_processing) = true; + + ASSERT_OK(SetUpClusters()); + ASSERT_OK(CheckpointReplicationGroup()); + ASSERT_OK(CreateReplicationFromCheckpoint()); + + // Creating a new table on target first should succeed. + auto consumer_table2_name = ASSERT_RESULT(CreateYsqlTable( + /*idx=*/1, /*num_tablets=*/3, &consumer_cluster_)); + std::shared_ptr consumer_table2; + ASSERT_OK(consumer_client()->OpenTable(consumer_table2_name, &consumer_table2)); + + // Verify that universe was setup on consumer. + master::GetUniverseReplicationResponsePB resp; + ASSERT_OK(VerifyUniverseReplication(&resp)); + ASSERT_EQ(resp.entry().replication_group_id(), kReplicationGroupId); + ASSERT_EQ(resp.entry().tables_size(), 1); + ASSERT_EQ(resp.entry().tables(0), producer_table_->id()); + + auto producer_table2_name = ASSERT_RESULT(CreateYsqlTable( + /*idx=*/1, /*num_tablets=*/3, &producer_cluster_)); + std::shared_ptr producer_table2; + ASSERT_OK(producer_client()->OpenTable(producer_table2_name, &producer_table2)); + + auto namespace_id = ASSERT_RESULT(GetNamespaceId(producer_client())); + auto get_streams_resp = ASSERT_RESULT(GetAllXClusterStreams(namespace_id)); + ASSERT_EQ(get_streams_resp.table_infos_size(), 1); + ASSERT_EQ(get_streams_resp.table_infos(0).table_id(), producer_table_->id()); + + ASSERT_OK(InsertRowsInProducer(0, 100, producer_table2)); + ASSERT_NOK(VerifyWrittenRecords(producer_table2, consumer_table2)); + + // Reenable the flag and make sure new table is added to replication. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_disable_xcluster_db_scoped_new_table_processing) = false; + + auto producer_table3_name = ASSERT_RESULT(CreateYsqlTable( + /*idx=*/2, /*num_tablets=*/3, &producer_cluster_)); + std::shared_ptr producer_table3; + ASSERT_OK(producer_client()->OpenTable(producer_table3_name, &producer_table3)); + ASSERT_OK(InsertRowsInProducer(0, 100, producer_table3)); + + auto consumer_table3_name = ASSERT_RESULT(CreateYsqlTable( + /*idx=*/2, /*num_tablets=*/3, &consumer_cluster_)); + std::shared_ptr consumer_table3; + ASSERT_OK(consumer_client()->OpenTable(consumer_table3_name, &consumer_table3)); + + ASSERT_OK(VerifyWrittenRecords(producer_table3, consumer_table3)); +} + } // namespace yb diff --git a/src/yb/integration-tests/xcluster/xcluster_outbound_replication_group-itest.cc b/src/yb/integration-tests/xcluster/xcluster_outbound_replication_group-itest.cc index 93efd38c753..090b39bc9ab 100644 --- a/src/yb/integration-tests/xcluster/xcluster_outbound_replication_group-itest.cc +++ b/src/yb/integration-tests/xcluster/xcluster_outbound_replication_group-itest.cc @@ -404,5 +404,119 @@ TEST_F(XClusterOutboundReplicationGroupTest, MasterRestartDuringCheckpoint) { // ASSERT_OK(VerifyWalRetentionOfTable(table_id_2)); } +TEST_F(XClusterOutboundReplicationGroupTest, Repair) { + auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1)); + auto table_id_2 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName2)); + + ASSERT_OK(XClusterClient().XClusterCreateOutboundReplicationGroup( + kReplicationGroupId, {kNamespaceName})); + + auto resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_)); + ASSERT_EQ(resp.table_infos_size(), 2); + + ASSERT_NOK_STR_CONTAINS( + XClusterClient().RepairOutboundXClusterReplicationGroupRemoveTable( + xcluster::ReplicationGroupId("BadId"), table_id_1), + "xClusterOutboundReplicationGroup BadId not found"); + + ASSERT_NOK_STR_CONTAINS( + XClusterClient().RepairOutboundXClusterReplicationGroupRemoveTable( + kReplicationGroupId, "BadId"), + "Table BadId not found in xClusterOutboundReplicationGroup"); + + ASSERT_OK(XClusterClient().RepairOutboundXClusterReplicationGroupRemoveTable( + kReplicationGroupId, table_id_1)); + + ASSERT_NOK_STR_CONTAINS( + XClusterClient().RepairOutboundXClusterReplicationGroupRemoveTable( + kReplicationGroupId, table_id_1), + "not found in xClusterOutboundReplicationGroup"); + + resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_)); + ASSERT_EQ(resp.table_infos_size(), 1); + ASSERT_EQ(resp.table_infos(0).table_id(), table_id_2); + const auto table2_stream_id = + ASSERT_RESULT(xrepl::StreamId::FromString(resp.table_infos(0).xrepl_stream_id())); + + ASSERT_NOK_STR_CONTAINS( + GetXClusterStreams(kReplicationGroupId, namespace_id_, {kTableName1}, {kPgSchemaName}), + "not found in xClusterOutboundReplicationGroup"); + + const auto new_stream_ids = + ASSERT_RESULT(BootstrapProducer(producer_cluster(), client_, {table_id_1})); + ASSERT_EQ(new_stream_ids.size(), 1); + const auto& new_stream_id = new_stream_ids.front(); + + ASSERT_NOK_STR_CONTAINS( + XClusterClient().RepairOutboundXClusterReplicationGroupAddTable( + xcluster::ReplicationGroupId("BadId"), table_id_1, new_stream_id), + "xClusterOutboundReplicationGroup BadId not found"); + + ASSERT_NOK_STR_CONTAINS( + XClusterClient().RepairOutboundXClusterReplicationGroupAddTable( + kReplicationGroupId, "BadId", new_stream_id), + "Table with identifier BadId not found"); + + ASSERT_NOK_STR_CONTAINS( + XClusterClient().RepairOutboundXClusterReplicationGroupAddTable( + kReplicationGroupId, table_id_1, xrepl::StreamId::GenerateRandom()), + "not found"); + + ASSERT_NOK_STR_CONTAINS( + XClusterClient().RepairOutboundXClusterReplicationGroupAddTable( + kReplicationGroupId, table_id_1, table2_stream_id), + "belongs to a different table"); + + ASSERT_OK(XClusterClient().RepairOutboundXClusterReplicationGroupAddTable( + kReplicationGroupId, table_id_1, new_stream_id)); + + ASSERT_NOK_STR_CONTAINS( + XClusterClient().RepairOutboundXClusterReplicationGroupAddTable( + kReplicationGroupId, table_id_1, new_stream_id), + "already exists in"); + + resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_)); + ASSERT_EQ(resp.table_infos_size(), 2); + ASSERT_EQ(resp.table_infos(0).table_id(), table_id_2); + for (const auto& table_info : resp.table_infos()) { + auto stream_id_str = new_stream_id.ToString(); + if (table_info.table_id() == table_id_2) { + stream_id_str = table2_stream_id.ToString(); + } + ASSERT_EQ(table_info.xrepl_stream_id(), stream_id_str); + } +} + +TEST_F(XClusterOutboundReplicationGroupTest, RepairWithYbAdmin) { + auto table_id_1 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName1)); + auto table_id_2 = ASSERT_RESULT(CreateYsqlTable(kNamespaceName, kTableName2)); + + ASSERT_OK(XClusterClient().XClusterCreateOutboundReplicationGroup( + kReplicationGroupId, {kNamespaceName})); + + auto resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_)); + ASSERT_EQ(resp.table_infos_size(), 2); + + ASSERT_OK(CallAdmin( + producer_cluster(), "repair_xcluster_outbound_replication_remove_table", kReplicationGroupId, + table_id_1)); + + resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_)); + ASSERT_EQ(resp.table_infos_size(), 1); + ASSERT_EQ(resp.table_infos(0).table_id(), table_id_2); + + const auto new_stream_ids = + ASSERT_RESULT(BootstrapProducer(producer_cluster(), client_, {table_id_1})); + ASSERT_EQ(new_stream_ids.size(), 1); + const auto& new_stream_id = new_stream_ids.front(); + + ASSERT_OK(CallAdmin( + producer_cluster(), "repair_xcluster_outbound_replication_add_table", kReplicationGroupId, + table_id_1, new_stream_id.ToString())); + + resp = ASSERT_RESULT(GetXClusterStreams(kReplicationGroupId, namespace_id_)); + ASSERT_EQ(resp.table_infos_size(), 2); +} + } // namespace master } // namespace yb diff --git a/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc b/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc index b845ae23877..3e1f75b4fbc 100644 --- a/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc +++ b/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc @@ -383,7 +383,7 @@ Result XClusterYsqlTestBase::GetYsqlTable( strings::Substitute("Unable to find table $0 in namespace $1", table_name, namespace_name)); } -Result XClusterYsqlTestBase::IsTableDeleted(Cluster* cluster, const YBTableName& table_name) { +Result XClusterYsqlTestBase::IsTableDeleted(Cluster& cluster, const YBTableName& table_name) { master::ListTablesRequestPB req; master::ListTablesResponsePB resp; @@ -393,8 +393,8 @@ Result XClusterYsqlTestBase::IsTableDeleted(Cluster* cluster, const YBTabl req.set_include_not_running(true); master::MasterDdlProxy master_proxy( - &cluster->client_->proxy_cache(), - VERIFY_RESULT(cluster->mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr()); + &cluster.client_->proxy_cache(), + VERIFY_RESULT(cluster.mini_cluster_->GetLeaderMiniMaster())->bound_rpc_addr()); rpc::RpcController rpc; rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); @@ -413,6 +413,13 @@ Result XClusterYsqlTestBase::IsTableDeleted(Cluster* cluster, const YBTabl return true; } +Status XClusterYsqlTestBase::WaitForTableToFullyDelete( + Cluster& cluster, const client::YBTableName& table_name, MonoDelta timeout) { + return LoggedWaitFor( + [&]() -> Result { return IsTableDeleted(cluster, producer_table_->name()); }, timeout, + "Wait for table to transition to deleted."); +} + Status XClusterYsqlTestBase::DropYsqlTable( Cluster* cluster, const std::string& namespace_name, const std::string& schema_name, const std::string& table_name, bool is_index) { diff --git a/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h b/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h index 7b5eef54763..55697da763b 100644 --- a/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h +++ b/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h @@ -76,7 +76,10 @@ class XClusterYsqlTestBase : public XClusterTestBase { bool verify_schema_name = false, bool exclude_system_tables = true); - Result IsTableDeleted(Cluster* cluster, const client::YBTableName& table_name); + Result IsTableDeleted(Cluster& cluster, const client::YBTableName& table_name); + + Status WaitForTableToFullyDelete( + Cluster& cluster, const client::YBTableName& table_name, MonoDelta timeout); Status DropYsqlTable( Cluster* cluster, const std::string& namespace_name, const std::string& schema_name, diff --git a/src/yb/master/master_replication.proto b/src/yb/master/master_replication.proto index c38238735f6..562015b710f 100644 --- a/src/yb/master/master_replication.proto +++ b/src/yb/master/master_replication.proto @@ -636,6 +636,25 @@ message IsCreateXClusterReplicationDoneResponsePB { optional AppStatusPB replication_error = 3; } +message RepairOutboundXClusterReplicationGroupAddTableRequestPB { + required string replication_group_id = 1; + required bytes table_id = 2; + required bytes stream_id = 3; +} + +message RepairOutboundXClusterReplicationGroupAddTableResponsePB { + optional MasterErrorPB error = 1; +} + +message RepairOutboundXClusterReplicationGroupRemoveTableRequestPB { + required string replication_group_id = 1; + required bytes table_id = 2; +} + +message RepairOutboundXClusterReplicationGroupRemoveTableResponsePB { + optional MasterErrorPB error = 1; +} + service MasterReplication { option (yb.rpc.custom_service_name) = "yb.master.MasterService"; @@ -726,6 +745,12 @@ service MasterReplication { returns (IsXClusterBootstrapRequiredResponsePB); rpc GetXClusterStreams(GetXClusterStreamsRequestPB) returns (GetXClusterStreamsResponsePB); + rpc RepairOutboundXClusterReplicationGroupAddTable( + RepairOutboundXClusterReplicationGroupAddTableRequestPB) + returns (RepairOutboundXClusterReplicationGroupAddTableResponsePB); + rpc RepairOutboundXClusterReplicationGroupRemoveTable( + RepairOutboundXClusterReplicationGroupRemoveTableRequestPB) + returns (RepairOutboundXClusterReplicationGroupRemoveTableResponsePB); // xCluster APIsV2 rpc CreateXClusterReplication(CreateXClusterReplicationRequestPB) diff --git a/src/yb/master/master_replication_service.cc b/src/yb/master/master_replication_service.cc index 7c8837e8fe4..b1d982649af 100644 --- a/src/yb/master/master_replication_service.cc +++ b/src/yb/master/master_replication_service.cc @@ -72,6 +72,8 @@ class MasterReplicationServiceImpl : public MasterServiceBase, public MasterRepl (GetXClusterStreams) (CreateXClusterReplication) (IsCreateXClusterReplicationDone) + (RepairOutboundXClusterReplicationGroupAddTable) + (RepairOutboundXClusterReplicationGroupRemoveTable) ) }; diff --git a/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc b/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc index df75211cde3..ce35af7275e 100644 --- a/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc +++ b/src/yb/master/xcluster/add_table_to_xcluster_target_task.cc @@ -200,6 +200,14 @@ AddTableToXClusterTargetTask::GetXClusterSafeTimeWithoutDdlQueue() { return std::nullopt; } + if (!safe_time_res->is_valid()) { + // If this is the first table in the namespace then the safe time will be invalid. Which means + // it is not yet valid to scan any data in the namespace. Since we only care about progression + // of the safe time and dont read any data we can safely use Min HT. + // This case will not occur if we have DDL replication, since we never drop the DDL_QUEUE table. + return HybridTime::kMin; + } + SCHECK( !safe_time_res->is_special(), IllegalState, "Invalid safe time $0 for namespace $1", *safe_time_res, namespace_id); diff --git a/src/yb/master/xcluster/xcluster_manager.cc b/src/yb/master/xcluster/xcluster_manager.cc index a5a45e8e5b9..2fad4bf3a96 100644 --- a/src/yb/master/xcluster/xcluster_manager.cc +++ b/src/yb/master/xcluster/xcluster_manager.cc @@ -32,6 +32,11 @@ DEFINE_RUNTIME_PREVIEW_bool(enable_xcluster_api_v2, false, "Allow the usage of v2 xCluster APIs that support DB Scoped replication groups"); +DEFINE_RUNTIME_bool(disable_xcluster_db_scoped_new_table_processing, false, + "When set disables automatic checkpointing of newly created tables on the source and adding " + "table to inbound replication group on target"); +TAG_FLAG(disable_xcluster_db_scoped_new_table_processing, advanced); + #define LOG_FUNC_AND_RPC \ LOG_WITH_FUNC(INFO) << req->ShortDebugString() << ", from: " << RequestorString(rpc) @@ -365,9 +370,35 @@ Status XClusterManager::IsCreateXClusterReplicationDone( return Status::OK(); } +Status XClusterManager::RepairOutboundXClusterReplicationGroupAddTable( + const RepairOutboundXClusterReplicationGroupAddTableRequestPB* req, + RepairOutboundXClusterReplicationGroupAddTableResponsePB* resp, rpc::RpcContext* rpc, + const LeaderEpoch& epoch) { + LOG_FUNC_AND_RPC; + + return XClusterSourceManager::RepairOutboundReplicationGroupAddTable( + xcluster::ReplicationGroupId(req->replication_group_id()), req->table_id(), + VERIFY_RESULT(xrepl::StreamId::FromString(req->stream_id())), epoch); +} + +Status XClusterManager::RepairOutboundXClusterReplicationGroupRemoveTable( + const RepairOutboundXClusterReplicationGroupRemoveTableRequestPB* req, + RepairOutboundXClusterReplicationGroupRemoveTableResponsePB* resp, rpc::RpcContext* rpc, + const LeaderEpoch& epoch) { + LOG_FUNC_AND_RPC; + + return XClusterSourceManager::RepairOutboundReplicationGroupRemoveTable( + xcluster::ReplicationGroupId(req->replication_group_id()), req->table_id(), epoch); +} + std::vector> XClusterManager::GetPostTabletCreateTasks( const TableInfoPtr& table_info, const LeaderEpoch& epoch) { std::vector> result; + + if (FLAGS_disable_xcluster_db_scoped_new_table_processing) { + return result; + } + { auto tasks = XClusterSourceManager::GetPostTabletCreateTasks(table_info, epoch); MoveCollection(&tasks, &result); diff --git a/src/yb/master/xcluster/xcluster_manager.h b/src/yb/master/xcluster/xcluster_manager.h index 3183d78df2e..46ba55fbdb1 100644 --- a/src/yb/master/xcluster/xcluster_manager.h +++ b/src/yb/master/xcluster/xcluster_manager.h @@ -135,6 +135,14 @@ class XClusterManager : public XClusterManagerIf, const IsCreateXClusterReplicationDoneRequestPB* req, IsCreateXClusterReplicationDoneResponsePB* resp, rpc::RpcContext* rpc, const LeaderEpoch& epoch); + Status RepairOutboundXClusterReplicationGroupAddTable( + const RepairOutboundXClusterReplicationGroupAddTableRequestPB* req, + RepairOutboundXClusterReplicationGroupAddTableResponsePB* resp, rpc::RpcContext* rpc, + const LeaderEpoch& epoch); + Status RepairOutboundXClusterReplicationGroupRemoveTable( + const RepairOutboundXClusterReplicationGroupRemoveTableRequestPB* req, + RepairOutboundXClusterReplicationGroupRemoveTableResponsePB* resp, rpc::RpcContext* rpc, + const LeaderEpoch& epoch); std::vector> GetPostTabletCreateTasks( const TableInfoPtr& table_info, const LeaderEpoch& epoch); diff --git a/src/yb/master/xcluster/xcluster_outbound_replication_group.cc b/src/yb/master/xcluster/xcluster_outbound_replication_group.cc index 91a2eb5eea4..e5353247425 100644 --- a/src/yb/master/xcluster/xcluster_outbound_replication_group.cc +++ b/src/yb/master/xcluster/xcluster_outbound_replication_group.cc @@ -549,10 +549,15 @@ XClusterOutboundReplicationGroup::GetNamespaceCheckpointInfo( for (const auto& table_info : table_infos) { const auto& table_id = table_info->id(); - SCHECK( - namespace_info->table_infos().count(table_id), IllegalState, - Format( - "Table $0 exists in namespace $1, but not in $2", table_id, namespace_id, ToString())); + if (namespace_info->table_infos().count(table_id) == 0) { + // We do not have this table! It has been manually removed using the repair APIs. + // If user explicitly requested this table then fail the request. + // Else we are getting all tables in the database so ignore it. + SCHECK( + table_names.empty(), NotFound, Format("Table $0 not found in $1", table_id, ToString())); + continue; + } + auto& namespace_table_info = namespace_info->table_infos().at(table_id); if (!namespace_table_info.has_stream_id() || namespace_table_info.is_checkpointing()) { VLOG_WITH_PREFIX_AND_FUNC(1) << "xCluster stream for Table " << table_id << " in Namespace " @@ -876,6 +881,8 @@ Status XClusterOutboundReplicationGroup::RemoveStreams( bool upsert_needed = false; for (const auto& stream : streams) { + VLOG_WITH_PREFIX(1) << "Removing stream " << stream->ToString(); + for (const auto& table_id : stream->table_id()) { for (auto& [ns_id, ns_info] : *pb.mutable_namespace_infos()) { auto table_info = FindOrNull(ns_info.table_infos(), table_id); @@ -918,4 +925,43 @@ void XClusterOutboundReplicationGroup::StartPostLoadTasks(const LeaderEpoch& epo StartNamespaceCheckpointTasks(namespace_ids, epoch); } +Status XClusterOutboundReplicationGroup::RepairAddTable( + const NamespaceId& namespace_id, const TableId& table_id, const xrepl::StreamId& stream_id, + const LeaderEpoch& epoch) { + std::lock_guard mutex_lock(mutex_); + auto l = VERIFY_RESULT(LockForWrite()); + + auto* ns_info = VERIFY_RESULT(GetNamespaceInfo(namespace_id)); + SCHECK( + !ns_info->mutable_table_infos()->count(table_id), AlreadyPresent, + "Table $0 already exists in $1", table_id, ToString()); + + NamespaceInfoPB::TableInfoPB table_info; + table_info.set_stream_id(stream_id.ToString()); + table_info.set_is_checkpointing(false); + table_info.set_is_part_of_initial_bootstrap(false); + ns_info->mutable_table_infos()->insert({table_id, std::move(table_info)}); + + return Upsert(l, epoch); +} + +Status XClusterOutboundReplicationGroup::RepairRemoveTable( + const TableId& table_id, const LeaderEpoch& epoch) { + std::lock_guard mutex_lock(mutex_); + auto l = VERIFY_RESULT(LockForWrite()); + auto& outbound_group_pb = l.mutable_data()->pb; + + bool table_removed = false; + for (auto& [namespace_id, namespace_info] : *outbound_group_pb.mutable_namespace_infos()) { + if (namespace_info.mutable_table_infos()->erase(table_id)) { + table_removed = true; + break; + } + } + + SCHECK(table_removed, NotFound, "Table $0 not found in $1", table_id, ToString()); + + return Upsert(l, epoch); +} + } // namespace yb::master diff --git a/src/yb/master/xcluster/xcluster_outbound_replication_group.h b/src/yb/master/xcluster/xcluster_outbound_replication_group.h index 3458444ee8c..6b225d1dae7 100644 --- a/src/yb/master/xcluster/xcluster_outbound_replication_group.h +++ b/src/yb/master/xcluster/xcluster_outbound_replication_group.h @@ -113,6 +113,12 @@ class XClusterOutboundReplicationGroup void StartPostLoadTasks(const LeaderEpoch& epoch) EXCLUDES(mutex_); + Status RepairAddTable( + const NamespaceId& namespace_id, const TableId& table_id, const xrepl::StreamId& stream_id, + const LeaderEpoch& epoch) EXCLUDES(mutex_); + + Status RepairRemoveTable(const TableId& table_id, const LeaderEpoch& epoch) EXCLUDES(mutex_); + private: friend class XClusterOutboundReplicationGroupMocked; friend class AddTableToXClusterSourceTask; diff --git a/src/yb/master/xcluster/xcluster_source_manager.cc b/src/yb/master/xcluster/xcluster_source_manager.cc index 8d62f3b644e..7ba62cef092 100644 --- a/src/yb/master/xcluster/xcluster_source_manager.cc +++ b/src/yb/master/xcluster/xcluster_source_manager.cc @@ -1077,4 +1077,33 @@ Status XClusterSourceManager::MarkIndexBackfillCompleted( return Status::OK(); } + +Status XClusterSourceManager::RepairOutboundReplicationGroupAddTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id, + const xrepl::StreamId& stream_id, const LeaderEpoch& epoch) { + auto table_info = VERIFY_RESULT(catalog_manager_.FindTableById(table_id)); + + auto stream_info = VERIFY_RESULT(catalog_manager_.GetXReplStreamInfo(stream_id)); + auto stream_table_ids = stream_info->table_id(); + SCHECK( + stream_info->IsXClusterStream() && stream_table_ids.size() == 1, InvalidArgument, + Format("Stream $0 is not valid for use in xCluster", stream_id)); + SCHECK_EQ( + stream_table_ids.Get(0), table_id, InvalidArgument, + Format("Stream $0 belongs to a different table", stream_id)); + + auto outbound_replication_group = + VERIFY_RESULT(GetOutboundReplicationGroup(replication_group_id)); + return outbound_replication_group->RepairAddTable( + table_info->namespace_id(), table_id, stream_id, epoch); +} + +Status XClusterSourceManager::RepairOutboundReplicationGroupRemoveTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id, + const LeaderEpoch& epoch) { + auto outbound_replication_group = + VERIFY_RESULT(GetOutboundReplicationGroup(replication_group_id)); + return outbound_replication_group->RepairRemoveTable(table_id, epoch); +} + } // namespace yb::master diff --git a/src/yb/master/xcluster/xcluster_source_manager.h b/src/yb/master/xcluster/xcluster_source_manager.h index 5c465bf00f2..7748f995f3f 100644 --- a/src/yb/master/xcluster/xcluster_source_manager.h +++ b/src/yb/master/xcluster/xcluster_source_manager.h @@ -139,6 +139,14 @@ class XClusterSourceManager { Status MarkIndexBackfillCompleted( const std::unordered_set& index_ids, const LeaderEpoch& epoch); + Status RepairOutboundReplicationGroupAddTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id, + const xrepl::StreamId& stream_id, const LeaderEpoch& epoch); + + Status RepairOutboundReplicationGroupRemoveTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id, + const LeaderEpoch& epoch); + private: friend class XClusterOutboundReplicationGroup; diff --git a/src/yb/tools/yb-admin_cli.cc b/src/yb/tools/yb-admin_cli.cc index 31e6f65f31e..19fa50bddfa 100644 --- a/src/yb/tools/yb-admin_cli.cc +++ b/src/yb/tools/yb-admin_cli.cc @@ -2374,6 +2374,48 @@ Status drop_xcluster_replication_action( return Status::OK(); } +const auto repair_xcluster_outbound_replication_add_table_args = + " "; +Status repair_xcluster_outbound_replication_add_table_action( + const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { + if (args.size() != 3) { + return ClusterAdminCli::kInvalidArguments; + } + + const auto replication_group_id = xcluster::ReplicationGroupId(args[0]); + const auto& table_id = args[1]; + const auto stream_id = VERIFY_RESULT(xrepl::StreamId::FromString(args[2])); + + RETURN_NOT_OK(client->RepairOutboundXClusterReplicationGroupAddTable( + replication_group_id, table_id, stream_id)); + + std::cout << "Table " << table_id << " successfully added to outbound xCluster Replication group " + << replication_group_id << endl; + + return Status::OK(); +} + +const auto repair_xcluster_outbound_replication_remove_table_args = + " "; +Status repair_xcluster_outbound_replication_remove_table_action( + const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { + if (args.size() != 2) { + return ClusterAdminCli::kInvalidArguments; + } + + auto replication_group_id = xcluster::ReplicationGroupId(args[0]); + const auto& table_id = args[1]; + + RETURN_NOT_OK( + client->RepairOutboundXClusterReplicationGroupRemoveTable(replication_group_id, table_id)); + + std::cout << "Table " << table_id + << " successfully removed from outbound xCluster Replication group " + << replication_group_id << endl; + + return Status::OK(); +} + } // namespace void ClusterAdminCli::RegisterCommandHandlers() { @@ -2494,6 +2536,8 @@ void ClusterAdminCli::RegisterCommandHandlers() { REGISTER_COMMAND(is_xcluster_bootstrap_required); REGISTER_COMMAND(setup_xcluster_replication); REGISTER_COMMAND(drop_xcluster_replication); + REGISTER_COMMAND(repair_xcluster_outbound_replication_add_table); + REGISTER_COMMAND(repair_xcluster_outbound_replication_remove_table); } Result> ResolveTableNames( diff --git a/src/yb/tools/yb-admin_client.cc b/src/yb/tools/yb-admin_client.cc index 39c3328d3ad..6443800b4ca 100644 --- a/src/yb/tools/yb-admin_client.cc +++ b/src/yb/tools/yb-admin_client.cc @@ -4577,6 +4577,19 @@ Status ClusterAdminClient::DeleteXClusterOutboundReplicationGroup( return XClusterClient().XClusterDeleteOutboundReplicationGroup(replication_group_id); } +Status ClusterAdminClient::RepairOutboundXClusterReplicationGroupAddTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id, + const xrepl::StreamId& stream_id) { + return XClusterClient().RepairOutboundXClusterReplicationGroupAddTable( + replication_group_id, table_id, stream_id); +} + +Status ClusterAdminClient::RepairOutboundXClusterReplicationGroupRemoveTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id) { + return XClusterClient().RepairOutboundXClusterReplicationGroupRemoveTable( + replication_group_id, table_id); +} + client::XClusterClient ClusterAdminClient::XClusterClient() { return client::XClusterClient(*yb_client_); } diff --git a/src/yb/tools/yb-admin_client.h b/src/yb/tools/yb-admin_client.h index ff0a1269645..db2bf52931b 100644 --- a/src/yb/tools/yb-admin_client.h +++ b/src/yb/tools/yb-admin_client.h @@ -474,6 +474,13 @@ class ClusterAdminClient { Status DeleteXClusterOutboundReplicationGroup( const xcluster::ReplicationGroupId& replication_group_id); + Status RepairOutboundXClusterReplicationGroupAddTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id, + const xrepl::StreamId& stream_id); + + Status RepairOutboundXClusterReplicationGroupRemoveTable( + const xcluster::ReplicationGroupId& replication_group_id, const TableId& table_id); + protected: // Fetch the locations of the replicas for a given tablet from the Master. Status GetTabletLocations(const TabletId& tablet_id,