Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add baseline resync feature with read support #387

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 78 additions & 2 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,67 @@ struct repl_key {
std::string to_string() const { return fmt::format("server={}, term={}, dsn={}", server_id, term, dsn); }
};

#if 0
struct repl_snapshot {
uint64_t last_log_idx_{0};
uint64_t last_log_term_{0};
};
#endif
using repl_snapshot = nuraft::snapshot;
using repl_snapshot_ptr = nuraft::ptr< nuraft::snapshot >;

// Consumers of the ReplDevListener dont have to know what underlying
// snapshot implementation is used. Consumers can export and save the state
// of the snapshot using serialize and load the state using deserialize.
class snapshot_context {
public:
snapshot_context(int64_t lsn) : lsn_(lsn) {}
virtual ~snapshot_context() = default;
virtual void deserialize(const sisl::io_blob_safe& snp_ctx) = 0;
virtual sisl::io_blob_safe serialize() = 0;
int64_t get_lsn() { return lsn_; }

protected:
int64_t lsn_;
};

class nuraft_snapshot_context : public snapshot_context {
public:
nuraft_snapshot_context(nuraft::snapshot& snp) : snapshot_context(snp.get_last_log_idx()) {
auto snp_buf = snp.serialize();
snapshot_ = nuraft::snapshot::deserialize(*snp_buf);
}

void deserialize(const sisl::io_blob_safe& snp_ctx) override {
// Load the context from the io blob to nuraft buffer.
auto snp_buf = nuraft::buffer::alloc(snp_ctx.size());
nuraft::buffer_serializer bs(snp_buf);
bs.put_raw(snp_ctx.cbytes(), snp_ctx.size());
snapshot_ = nuraft::snapshot::deserialize(bs);
lsn_ = snapshot_->get_last_log_idx();
}

sisl::io_blob_safe serialize() override {
// Dump the context from nuraft buffer to the io blob.
auto snp_buf = snapshot_->serialize();
sisl::io_blob_safe blob{s_cast< size_t >(snp_buf->size())};
std::memcpy(blob.bytes(), snp_buf->data_begin(), snp_buf->size());
return blob;
}

nuraft::ptr< nuraft::snapshot > nuraft_snapshot() { return snapshot_; }

private:
nuraft::ptr< nuraft::snapshot > snapshot_;
};

struct snapshot_data {
void* user_ctx{nullptr};
int64_t offset{0};
sisl::io_blob_safe blob;
bool is_first_obj{false};
bool is_last_obj{false};
};

struct repl_journal_entry;
struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::thread_safe_counter > {
Expand Down Expand Up @@ -198,8 +255,27 @@ class ReplDevListener {
/// @brief Called when the replica set is being stopped
virtual void on_replica_stop() = 0;

/// @brief Called when the snapshot is being created by nuraft;
virtual AsyncReplResult<> create_snapshot(repl_snapshot& s) = 0;
/// @brief Called when the snapshot is being created by nuraft
virtual AsyncReplResult<> create_snapshot(shared< snapshot_context > context) = 0;

/// @brief Called when nuraft does the baseline resync and in the end apply snapshot.
virtual bool apply_snapshot(shared< snapshot_context > context) = 0;

/// @brief Get the last snapshot saved.
virtual shared< snapshot_context > last_snapshot() = 0;

/// @brief Called on the leader side when the follower wants to do baseline resync and leader
/// uses offset given by the follower to the know the current state of the follower.
/// Leader sends the snapshot data to the follower in batch. This callback is called multiple
/// times on the leader till all the data is transferred to the follower. is_last_obj in
/// snapshot_data will be true once all the data has been trasnferred. After this the raft on
/// the follower side can do the incremental resync.
virtual int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0;

/// @brief Called on the follower when the leader sends the data during the baseline resyc.
/// is_last_obj in in snapshot_data will be true once all the data has been transfered.
/// After this the raft on the follower side can do the incremental resync.
virtual void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0;

private:
std::weak_ptr< ReplDev > m_repl_dev;
Expand Down
6 changes: 5 additions & 1 deletion src/lib/common/resource_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ void ResourceMgr::trigger_truncate() {

void ResourceMgr::start_timer() {
auto const res_mgr_timer_ms = HS_DYNAMIC_CONFIG(resource_limits.resource_audit_timer_ms);
LOGINFO("resource audit timer is set to {} usec", res_mgr_timer_ms);
if (res_mgr_timer_ms == 0) {
LOGINFO("resource audit timer is 0. Disabling timer");
return;
}

LOGINFO("resource audit timer is set to {} usec", res_mgr_timer_ms);
m_res_audit_timer_hdl = iomanager.schedule_global_timer(
res_mgr_timer_ms * 1000 * 1000, true /* recurring */, nullptr /* cookie */, iomgr::reactor_regex::all_worker,
[this](void*) {
Expand Down
19 changes: 15 additions & 4 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sisl/fds/utils.hpp>
#include "common/homestore_assert.hpp"
#include <homestore/homestore.hpp>
#include <iomgr/iomgr_flip.hpp>

using namespace homestore;

Expand Down Expand Up @@ -193,7 +194,7 @@ nuraft::ptr< nuraft::log_entry > HomeRaftLogStore::entry_at(ulong index) {
auto log_bytes = m_log_store->read_sync(to_store_lsn(index));
nle = to_nuraft_log_entry(log_bytes);
} catch (const std::exception& e) {
REPL_STORE_LOG(ERROR, "entry_at({}) index out_of_range", index);
REPL_STORE_LOG(ERROR, "entry_at({}) index out_of_range start {} end {}", index, start_index(), last_index());
throw e;
}
return nle;
Expand All @@ -205,7 +206,7 @@ ulong HomeRaftLogStore::term_at(ulong index) {
auto log_bytes = m_log_store->read_sync(to_store_lsn(index));
term = extract_term(log_bytes);
} catch (const std::exception& e) {
REPL_STORE_LOG(ERROR, "term_at({}) index out_of_range", index);
REPL_STORE_LOG(ERROR, "term_at({}) index out_of_range start {} end {}", index, start_index(), last_index());
throw e;
}
return term;
Expand Down Expand Up @@ -277,10 +278,11 @@ void HomeRaftLogStore::apply_pack(ulong index, nuraft::buffer& pack) {

bool HomeRaftLogStore::compact(ulong compact_lsn) {
auto cur_max_lsn = m_log_store->get_contiguous_issued_seq_num(m_last_durable_lsn);
REPL_STORE_LOG(TRACE, "compact called with compact_lsn={} cur_max_lsn={}", compact_lsn, cur_max_lsn);
if (cur_max_lsn < to_store_lsn(compact_lsn)) {
// release this assert if for some use case, we should tolorant this case;
// for now, don't expect this case to happen.
RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn);
// RELEASE_ASSERT(false, "compact_lsn={} is beyond the current max_lsn={}", compact_lsn, cur_max_lsn);

// We need to fill the remaining entries with dummy data.
for (auto lsn{cur_max_lsn + 1}; lsn <= to_store_lsn(compact_lsn); ++lsn) {
Expand All @@ -293,7 +295,13 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) {
// we rely on resrouce mgr timer to trigger truncate for all log stores in system;
// this will be friendly for multiple logstore on same logdev;

// m_log_store->truncate(to_store_lsn(compact_lsn));
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("force_home_raft_log_truncate")) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest a parameterized flip, so you can force raft log truncate on specific logstore. This will trigger on every logstore truncate always.

REPL_STORE_LOG(TRACE, "Flip force_home_raft_log_truncate is enabled, force truncation, compact_lsn={}",
compact_lsn);
m_log_store->truncate(to_store_lsn(compact_lsn));
}
#endif

return true;
}
Expand All @@ -307,4 +315,7 @@ ulong HomeRaftLogStore::last_durable_index() {
m_last_durable_lsn = m_log_store->get_contiguous_completed_seq_num(m_last_durable_lsn);
return to_repl_lsn(m_last_durable_lsn);
}

void HomeRaftLogStore::set_last_durable_lsn(repl_lsn_t lsn) { m_last_durable_lsn = to_store_lsn(lsn); }

} // namespace homestore
2 changes: 2 additions & 0 deletions src/lib/replication/log_store/home_raft_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ class HomeRaftLogStore : public nuraft::log_store {
*/
ulong last_index() const;

void set_last_durable_lsn(repl_lsn_t lsn);

/**
* Truncates the log store
*
Expand Down
2 changes: 1 addition & 1 deletion src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace homestore {
uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
// We don't want to transform anything that is not an app log
if (entry->get_val_type() != nuraft::log_val_type::app_log) { return HomeRaftLogStore::append(entry); }

if (entry->get_buf_ptr()->size() == 0) { return HomeRaftLogStore::append(entry); }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isn't easier to put || with above condition?

repl_req_ptr_t rreq = m_sm.localize_journal_entry_finish(*entry);
ulong lsn = HomeRaftLogStore::append(entry);
m_sm.link_lsn_to_req(rreq, int64_cast(lsn));
Expand Down
11 changes: 6 additions & 5 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,11 @@ void RaftReplDev::use_config(json_superblk raft_config_sb) { m_raft_config_sb =

void RaftReplDev::on_create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) {
RD_LOG(DEBUG, "create_snapshot last_idx={}/term={}", s.get_last_log_idx(), s.get_last_log_term());
repl_snapshot snapshot{.last_log_idx_ = s.get_last_log_idx(), .last_log_term_ = s.get_last_log_term()};
auto result = m_listener->create_snapshot(snapshot).get();
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto result = m_listener->create_snapshot(snp_ctx).get();
auto null_except = std::shared_ptr< std::exception >();
HS_REL_ASSERT(result.hasError() == false, "Not expecting creating snapshot to return false. ");
m_last_snapshot = nuraft::cs_new< nuraft::snapshot >(s.get_last_log_idx(), s.get_last_log_term(),
s.get_last_config(), s.size(), s.get_type());

auto ret_val{true};
if (when_done) { when_done(ret_val, null_except); }
}
Expand Down Expand Up @@ -519,7 +518,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
if (rreqs.size() == 0) { return; }

std::vector<::flatbuffers::Offset< RequestEntry > > entries;
std::vector< ::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs.size());

shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
Expand Down Expand Up @@ -947,6 +946,8 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
for (auto& entry : entries) {
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }

auto req = m_state_machine->localize_journal_entry_prepare(*entry);
if (req == nullptr) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
Expand Down
5 changes: 1 addition & 4 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,6 @@ class RaftReplDev : public ReplDev,

RaftReplDevMetrics m_metrics;

nuraft::ptr< nuraft::snapshot > m_last_snapshot{nullptr};

static std::atomic< uint64_t > s_next_group_ordinal;

public:
Expand Down Expand Up @@ -118,6 +116,7 @@ class RaftReplDev : public ReplDev,
std::string my_replica_id_str() const { return boost::uuids::to_string(m_my_repl_id); }
uint32_t get_blk_size() const override;
repl_lsn_t get_last_commit_lsn() const { return m_commit_upto_lsn.load(); }
void set_last_commit_lsn(repl_lsn_t lsn) { m_commit_upto_lsn.store(lsn); }

// void truncate_if_needed() override;

Expand Down Expand Up @@ -162,8 +161,6 @@ class RaftReplDev : public ReplDev,
m_data_journal->truncate(num_reserved_entries, m_compact_lsn.load());
}

nuraft::ptr< nuraft::snapshot > get_last_snapshot() { return m_last_snapshot; }

protected:
//////////////// All nuraft::state_mgr overrides ///////////////////////
nuraft::ptr< nuraft::cluster_config > load_config() override;
Expand Down
48 changes: 46 additions & 2 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,10 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
return m_success_ptr;
}

uint64_t RaftStateMachine::last_commit_index() { return uint64_cast(m_rd.get_last_commit_lsn()); }
uint64_t RaftStateMachine::last_commit_index() {
RD_LOG(DEBUG, "Raft channel: last_commit_index {}", uint64_cast(m_rd.get_last_commit_lsn()));
return uint64_cast(m_rd.get_last_commit_lsn());
}

void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) {
rreq->lsn = lsn;
Expand All @@ -252,7 +255,48 @@ void RaftStateMachine::create_snapshot(nuraft::snapshot& s, nuraft::async_result
m_rd.on_create_snapshot(s, when_done);
}

int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out,
bool& is_last_obj) {
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto snp_data = std::make_shared< snapshot_data >();
snp_data->user_ctx = user_ctx;
snp_data->offset = obj_id;
snp_data->is_last_obj = is_last_obj;

// Listener will read the snapshot data and we pass through the same.
int ret = m_rd.m_listener->read_snapshot_data(snp_ctx, snp_data);
if (ret < 0) return ret;

// Update user_ctx and whether is_last_obj
user_ctx = snp_data->user_ctx;
is_last_obj = snp_data->is_last_obj;

// We are doing a copy here.
data_out = nuraft::buffer::alloc(snp_data->blob.size());
nuraft::buffer_serializer bs(data_out);
bs.put_raw(snp_data->blob.cbytes(), snp_data->blob.size());
return ret;
}

void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj,
bool is_last_obj) {
obj_id++;
// TODO
}

bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) {
m_rd.set_last_commit_lsn(s.get_last_log_idx());
m_rd.m_data_journal->set_last_durable_lsn(s.get_last_log_idx());
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
return m_rd.m_listener->apply_snapshot(snp_ctx);
}

std::string RaftStateMachine::rdev_name() const { return m_rd.rdev_name(); }

nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() { return m_rd.get_last_snapshot(); }
nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() {
auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(m_rd.m_listener->last_snapshot());
if (s == nullptr) return nullptr;
return s->nuraft_snapshot();
}

} // namespace homestore
7 changes: 5 additions & 2 deletions src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,13 @@ class RaftStateMachine : public nuraft::state_machine {
raft_buf_ptr_t commit_ext(const nuraft::state_machine::ext_op_params& params) override;
void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); }

bool apply_snapshot(nuraft::snapshot&) override { return false; }

void create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) override;
nuraft::ptr< nuraft::snapshot > last_snapshot() override;
int read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out,
bool& is_last_obj) override;
void save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj,
bool is_last_obj) override;
bool apply_snapshot(nuraft::snapshot& s) override;

////////// APIs outside of nuraft::state_machine requirements ////////////////////
ReplServiceError propose_to_raft(repl_req_ptr_t rreq);
Expand Down