Skip to content

Commit

Permalink
[#21054] docdb: Add ABORTED state to clone state manager
Browse files Browse the repository at this point in the history
Summary:
This diff adds the `ABORTED` state and an `abort_message` field to the `SysCloneStateInfoPB` object. A clone is moved to the `ABORTED` state if it fails some step of the cloning processes, or if it was incomplete (not `ABORTED` or `RESTORED`) upon a master failover. There is currently no timeout behavior for clones stuck in tablet creation or the restoring state.

Aborted clones do not currently clean up their namespace; this is left to the user. Aborted clones do not block clones to a different target namespace.

**Upgrade/Rollback safety:**
Only touches a non-production protobuf, guarded by `FLAGS_enable_db_clone`.

Jira: DB-10021

Test Plan:
```
./yb_build.sh --cxx-test clone_state_manager-test --gtest_filter CloneStateManagerTest.AbortInStartTabletsCloning
./yb_build.sh --cxx-test clone_state_manager-test --gtest_filter CloneStateManagerTest.AbortIfFailToSchedulePgCloneSchema
./yb_build.sh --cxx-test clone_state_manager-test --gtest_filter CloneStateManagerTest.AbortInPgSchemaClone
./yb_build.sh --cxx-test clone_state_manager-test --gtest_filter CloneStateManagerTest.AbortInStartTabletsCloningPg
./yb_build.sh --cxx-test clone_state_manager-test --gtest_filter CloneStateManagerTest.AbortInCreatingState
./yb_build.sh --cxx-test clone_state_manager-test --gtest_filter CloneStateManagerTest.AbortInRestoringState
./yb_build.sh --cxx-test clone_state_manager-test --gtest_filter CloneStateManagerTest.AbortIncompleteCloneOnLoad
```

Reviewers: mhaddad

Reviewed By: mhaddad

Subscribers: ybase, bogdan

Differential Revision: https://phorge.dev.yugabyte.com/D32999
  • Loading branch information
SrivastavaAnubhav committed May 6, 2024
1 parent 517e474 commit 76fe86d
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 60 deletions.
4 changes: 4 additions & 0 deletions src/yb/master/catalog_entity_info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ message SysCloneStatePB {
CREATING = 2;
RESTORING = 3;
RESTORED = 4;
ABORTED = 5;
}

optional State aggregate_state = 1;
Expand All @@ -606,6 +607,9 @@ message SysCloneStatePB {
// Used for debugging.
optional string target_namespace_name = 4;
optional fixed64 restore_time = 5;

// A reason for why the clone was aborted.
optional string abort_message = 6;
}

message SchemaVersionMappingPB {
Expand Down
7 changes: 6 additions & 1 deletion src/yb/master/clone/clone_state_entity.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
namespace yb::master {

struct PersistentCloneStateInfo :
public Persistent<SysCloneStatePB, SysRowEntryType::CLONE_STATE> {};
public Persistent<SysCloneStatePB, SysRowEntryType::CLONE_STATE> {
bool IsDone() const {
return pb.aggregate_state() == SysCloneStatePB::RESTORED ||
pb.aggregate_state() == SysCloneStatePB::ABORTED;
}
};

class CloneStateInfo : public RefCountedThreadSafe<CloneStateInfo>,
public MetadataCowWrapper<PersistentCloneStateInfo> {
Expand Down
213 changes: 208 additions & 5 deletions src/yb/master/clone/clone_state_manager-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
#include "yb/master/master_backup.pb.h"
#include "yb/master/master_ddl.pb.h"
#include "yb/master/master_fwd.h"

#include "yb/master/master_types.pb.h"
#include "yb/master/ts_descriptor.h"

#include "yb/util/monotime.h"
#include "yb/util/oid_generator.h"
#include "yb/util/pb_util.h"
#include "yb/util/physical_time.h"
#include "yb/util/status_format.h"
Expand All @@ -47,6 +49,8 @@ DECLARE_bool(enable_db_clone);
namespace yb {
namespace master {

using namespace std::literals;

using ::testing::_;
using ::testing::AnyNumber;
using ::testing::AtLeast;
Expand Down Expand Up @@ -151,15 +155,17 @@ class CloneStateManagerTest : public YBTest {
source_ns_ = make_scoped_refptr<NamespaceInfo>(kSourceNamespaceId, nullptr /* tasks_tracker */);
{
auto lock = source_ns_->LockForWrite();
lock.mutable_data()->pb.set_database_type(kDatabaseType);
lock.mutable_data()->pb.set_database_type(GetDatabaseType());
lock.mutable_data()->pb.set_name(kSourceNamespaceName);
lock.Commit();
}
source_ns_identifier_.set_name(kSourceNamespaceName);
source_ns_identifier_.set_database_type(GetDatabaseType());

target_ns_ = make_scoped_refptr<NamespaceInfo>(kTargetNamespaceId, nullptr /* tasks_tracker */);
{
auto lock = target_ns_->LockForWrite();
lock.mutable_data()->pb.set_database_type(kDatabaseType);
lock.mutable_data()->pb.set_database_type(GetDatabaseType());
lock.mutable_data()->pb.set_name(kTargetNamespaceName);
lock.Commit();
}
Expand Down Expand Up @@ -192,6 +198,17 @@ class CloneStateManagerTest : public YBTest {
}
}

virtual YQLDatabase GetDatabaseType() { return YQL_DATABASE_CQL; }

ListSnapshotSchedulesResponsePB DefaultListSnapshotSchedules() {
ListSnapshotSchedulesResponsePB resp;
auto* schedule = resp.add_schedules();
schedule->set_id(kSnapshotScheduleId.data(), kSnapshotScheduleId.size());
auto table = schedule->mutable_options()->mutable_filter()->mutable_tables()->add_tables();
*table->mutable_namespace_()->mutable_id() = kSourceNamespaceId;
return resp;
}

// Creates a clone state in the CLONE_SCHEMA_STARTED state from a provided
// ExternalTableSnapshotDataMap (instead of using import / export snapshot).
// NB: This does not call EXPECT_CALL(Upsert) because some tests expect this to fail.
Expand Down Expand Up @@ -266,6 +283,24 @@ class CloneStateManagerTest : public YBTest {
return clone_state_manager_->ScheduleCloneOps(clone_state, epoch, not_snapshotted_tablets);
}

Result<std::pair<NamespaceId, uint32_t>> CloneNamespace(
const NamespaceIdentifierPB& source_namespace_identifier,
const HybridTime& restore_time,
const std::string& target_namespace_name,
CoarseTimePoint deadline,
const LeaderEpoch& epoch) {
return clone_state_manager_->CloneNamespace(
source_namespace_identifier, restore_time, target_namespace_name, deadline, epoch);
}

AsyncClonePgSchema::ClonePgSchemaCallbackType MakeDoneClonePgSchemaCallback(
CloneStateInfoPtr clone_state, const SnapshotScheduleId& snapshot_schedule_id,
const std::string& target_namespace_name,
CoarseTimePoint deadline, const LeaderEpoch& epoch) {
return clone_state_manager_->MakeDoneClonePgSchemaCallback(
clone_state, snapshot_schedule_id, target_namespace_name, deadline, epoch);
}

MockExternalFunctions& MockFuncs() {
return static_cast<MockExternalFunctions&>(*clone_state_manager_->external_funcs_);
}
Expand All @@ -285,17 +320,22 @@ class CloneStateManagerTest : public YBTest {
const TableId kTargetTableId = "target_table_id";
const int kNumTablets = 2;
const HybridTime kRestoreTime = HybridTime(12345);
const YQLDatabase kDatabaseType = YQLDatabase::YQL_DATABASE_CQL;
const LeaderEpoch kEpoch = LeaderEpoch(123 /* term */);

NamespaceInfoPtr source_ns_;
NamespaceIdentifierPB source_ns_identifier_;
NamespaceInfoPtr target_ns_;
TableInfoPtr source_table_;
TableInfoPtr target_table_;
std::vector<TabletInfoPtr> source_tablets_;
std::vector<TabletInfoPtr> target_tablets_;
};

class CloneStateManagerPgTest : public CloneStateManagerTest {
protected:
virtual YQLDatabase GetDatabaseType() override { return YQL_DATABASE_PGSQL; }
};

TEST_F(CloneStateManagerTest, CreateCloneState) {
EXPECT_CALL(MockFuncs(), Upsert(_));
auto clone_state = ASSERT_RESULT(CreateCloneState(kSeqNo, DefaultTableSnapshotData()));
Expand Down Expand Up @@ -330,7 +370,7 @@ TEST_F(CloneStateManagerTest, CreateSecondCloneState) {
l.mutable_data()->pb.set_aggregate_state(state);
l.Commit();

if (current_clone_state->LockForRead()->pb.aggregate_state() == SysCloneStatePB::RESTORED) {
if (current_clone_state->LockForRead()->IsDone()) {
EXPECT_CALL(MockFuncs(), Upsert(_));
ASSERT_OK(CreateCloneState(i + 1, DefaultTableSnapshotData()));
} else {
Expand Down Expand Up @@ -477,8 +517,113 @@ TEST_F(CloneStateManagerTest, HandleRestoringStateRestored) {
ASSERT_EQ(clone_state->LockForRead()->pb.aggregate_state(), SysCloneStatePB::RESTORED);
}

void AssertCloneIsAborted(const CloneStateInfoPtr& clone_state) {
auto lock = clone_state->LockForRead();
ASSERT_EQ(lock->pb.aggregate_state(), SysCloneStatePB::ABORTED);
ASSERT_FALSE(lock->pb.abort_message().empty());
}

TEST_F(CloneStateManagerTest, AbortInStartTabletsCloning) {
EXPECT_CALL(MockFuncs(), FindNamespace).WillOnce(Return(source_ns_));
EXPECT_CALL(MockFuncs(), ListSnapshotSchedules)
.WillOnce(DoAll(SetArgPointee<0>(DefaultListSnapshotSchedules()), Return(Status::OK())));
EXPECT_CALL(MockFuncs(), Upsert(_)).WillRepeatedly(Return(Status::OK()));
EXPECT_CALL(MockFuncs(), GenerateSnapshotInfoFromSchedule).WillOnce(Return(
STATUS_FORMAT(IllegalState, "Fail GenerateSnapshotInfoFromSchedule for test")));

auto [source_namespace_id, seq_no] = ASSERT_RESULT(CloneNamespace(
source_ns_identifier_, kRestoreTime, kTargetNamespaceName,
CoarseMonoClock::Now() + 10s /* deadline */, kEpoch));

AssertCloneIsAborted(ASSERT_RESULT(GetCloneStateFromSourceNamespace(kSourceNamespaceId)));
}

TEST_F_EX(CloneStateManagerTest, AbortIfFailToSchedulePgCloneSchema, CloneStateManagerPgTest) {
EXPECT_CALL(MockFuncs(), FindNamespace).WillOnce(Return(source_ns_));
EXPECT_CALL(MockFuncs(), ListSnapshotSchedules)
.WillOnce(DoAll(SetArgPointee<0>(DefaultListSnapshotSchedules()), Return(Status::OK())));
TSDescriptorPtr dummy_ts_desc = std::make_shared<TSDescriptor>("ts0" /* perm_id*/);
EXPECT_CALL(MockFuncs(), PickTserver).WillOnce(Return(dummy_ts_desc));
EXPECT_CALL(MockFuncs(), Upsert(_)).WillRepeatedly(Return(Status::OK()));
EXPECT_CALL(MockFuncs(), ScheduleClonePgSchemaTask).WillOnce(Return(
STATUS_FORMAT(IllegalState, "Fail ScheduleClonePgSchemaTask for test")));

auto [source_namespace_id, seq_no] = ASSERT_RESULT(CloneNamespace(
source_ns_identifier_, kRestoreTime, kTargetNamespaceName,
CoarseMonoClock::Now() + 10s /* deadline */, kEpoch));

AssertCloneIsAborted(ASSERT_RESULT(GetCloneStateFromSourceNamespace(kSourceNamespaceId)));
}

TEST_F_EX(CloneStateManagerTest, AbortInPgSchemaClone, CloneStateManagerPgTest) {
EXPECT_CALL(MockFuncs(), Upsert(_));
auto clone_state = ASSERT_RESULT(CreateCloneState(kSeqNo, DefaultTableSnapshotData()));
auto callback = MakeDoneClonePgSchemaCallback(
clone_state, kSnapshotScheduleId, kTargetNamespaceName,
CoarseMonoClock::Now() + 10s /* deadline */, kEpoch);

// We expect an upsert when aborting the clone.
EXPECT_CALL(MockFuncs(), Upsert(_));
ASSERT_OK(callback(STATUS_FORMAT(IllegalState, "Fail pg schema clone for test")));

AssertCloneIsAborted(clone_state);
}

TEST_F_EX(CloneStateManagerTest, AbortInStartTabletsCloningPg, CloneStateManagerPgTest) {
EXPECT_CALL(MockFuncs(), Upsert(_));
auto clone_state = ASSERT_RESULT(CreateCloneState(kSeqNo, DefaultTableSnapshotData()));
auto callback = MakeDoneClonePgSchemaCallback(
clone_state, kSnapshotScheduleId, kTargetNamespaceName,
CoarseMonoClock::Now() + 10s /* deadline */, kEpoch);

// We expect an upsert when aborting the clone.
EXPECT_CALL(MockFuncs(), GenerateSnapshotInfoFromSchedule).WillOnce(Return(
STATUS_FORMAT(IllegalState, "Fail GenerateSnapshotInfoFromSchedule for test")));
EXPECT_CALL(MockFuncs(), Upsert(_));
ASSERT_OK(callback(Status::OK() /* pg_schema_cloning_status */));

AssertCloneIsAborted(clone_state);
}

TEST_F(CloneStateManagerTest, AbortInCreatingState) {
auto clone_state = ASSERT_RESULT(CreateCloneStateAndStartCloning());

// We expect an upsert when aborting the clone.
EXPECT_CALL(MockFuncs(), GetTabletInfo(_))
.WillOnce(Return(STATUS_FORMAT(IllegalState, "Fail GetTabletInfo for test")));
EXPECT_CALL(MockFuncs(), Upsert(_));
ASSERT_OK(clone_state_manager_->Run());

AssertCloneIsAborted(clone_state);
}

TEST_F(CloneStateManagerTest, AbortInRestoringState) {
auto clone_state = ASSERT_RESULT(CreateCloneStateAndStartCloning());
{
auto lock = clone_state->LockForWrite();
lock.mutable_data()->pb.set_aggregate_state(SysCloneStatePB::RESTORING);
lock.Commit();
}

// We expect an upsert when aborting the clone.
EXPECT_CALL(MockFuncs(), ListRestorations(_, _))
.WillOnce(Return(STATUS_FORMAT(IllegalState, "Fail ListRestorations for test")));
EXPECT_CALL(MockFuncs(), Upsert(_));
ASSERT_OK(clone_state_manager_->Run());

AssertCloneIsAborted(clone_state);
}

TEST_F(CloneStateManagerTest, Load) {
auto clone_state = ASSERT_RESULT(CreateCloneStateAndStartCloning());
{
auto lock = clone_state->LockForWrite();
lock.mutable_data()->pb.set_aggregate_state(SysCloneStatePB::RESTORED);
lock.mutable_data()->pb.set_target_namespace_name(kTargetNamespaceName);
lock.mutable_data()->pb.set_restore_time(kRestoreTime.ToUint64());
lock.mutable_data()->pb.set_clone_request_seq_no(kSeqNo);
lock.Commit();
}

// When the loader runs, we will save the function that is normally passed to
// sys_catalog Load in 'inserter'.
Expand Down Expand Up @@ -513,6 +658,11 @@ TEST_F(CloneStateManagerTest, LoadUsesLatestSeqNo) {
}

auto clone_state2 = ASSERT_RESULT(CreateSecondCloneState());
{
auto lock = clone_state2->LockForWrite();
lock.mutable_data()->pb.set_aggregate_state(SysCloneStatePB::RESTORED);
lock.Commit();
}

// Should load clone_state2 since it has a higher seq_no.
{
Expand Down Expand Up @@ -549,5 +699,58 @@ TEST_F(CloneStateManagerTest, LoadUsesLatestSeqNo) {
}
}

TEST_F(CloneStateManagerTest, AbortIncompleteCloneOnLoad) {
// For the ABORTED state, set a sample abort message to verify that it is preserved on load.
const auto kSampleAbortMessage = "Test abort message";

// Check that each non-terminal state is aborted on load.
for (int i = SysCloneStatePB::State_MIN; i <= SysCloneStatePB::State_MAX; ++i) {
// Create a clone state in state i.
auto clone_state = make_scoped_refptr<CloneStateInfo>(GenerateObjectId());
auto state = SysCloneStatePB_State(i);
{
auto lock = clone_state->LockForWrite();
lock.mutable_data()->pb.set_aggregate_state(state);
lock.mutable_data()->pb.set_source_namespace_id(kSourceNamespaceId);
if (state == SysCloneStatePB::ABORTED) {
lock.mutable_data()->pb.set_abort_message(kSampleAbortMessage);
}
lock.Commit();
}
auto orig_lock = clone_state->LockForRead();

// When the loader runs, we will save the function that is normally passed to
// sys_catalog Load in 'inserter'.
std::function<Status(const std::string&, const SysCloneStatePB&)> inserter;
EXPECT_CALL(MockFuncs(), Load)
.WillOnce(DoAll(SaveArg<1>(&inserter), Return(Status::OK())));
ASSERT_OK(clone_state_manager_->ClearAndRunLoaders());

// Run the inserter to load the clone state.
if (!orig_lock->IsDone()) {
EXPECT_CALL(MockFuncs(), Upsert(_));
}
ASSERT_OK(inserter(clone_state->id(), orig_lock->pb));
auto loaded_clone_state = ASSERT_RESULT(GetCloneStateFromSourceNamespace(kSourceNamespaceId));
auto loaded_lock = loaded_clone_state->LockForRead();

// The state should change to ABORTED if and only if the clone is in a non-terminal state.
switch (orig_lock->pb.aggregate_state()) {
case SysCloneStatePB_State_ABORTED:
ASSERT_EQ(loaded_lock->pb.aggregate_state(), SysCloneStatePB::ABORTED);
ASSERT_EQ(loaded_lock->pb.abort_message(), kSampleAbortMessage);
break;
case SysCloneStatePB_State_RESTORED:
ASSERT_EQ(loaded_lock->pb.aggregate_state(), SysCloneStatePB::RESTORED);
ASSERT_FALSE(loaded_lock->pb.has_abort_message());
break;
default:
ASSERT_EQ(loaded_lock->pb.aggregate_state(), SysCloneStatePB::ABORTED);
ASSERT_STR_CONTAINS(
loaded_lock->pb.abort_message(), "aborted by master failover");
}
}
}

} // namespace master
} // namespace yb

0 comments on commit 76fe86d

Please sign in to comment.