Skip to content

Commit

Permalink
Merge branch 'master' into conan
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxichen committed Apr 19, 2024
2 parents 523ec40 + 6ae31e1 commit e4b4612
Show file tree
Hide file tree
Showing 31 changed files with 507 additions and 70 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.2.5"
version = "6.3.2"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
64 changes: 64 additions & 0 deletions src/include/homestore/logstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
* to set this to true on cases where there are multiple log stores, so that once all in-memory truncation is
* completed, a device truncation can be triggered for all the logstores. The device truncation is more
* expensive and grouping them together yields better results.
*
* Note: this flag currently is not used, meaning all truncate is in memory only;
* @return number of records to truncate
*/
void truncate(logstore_seq_num_t upto_seq_num, bool in_memory_truncate_only = true);
Expand Down Expand Up @@ -274,18 +276,80 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {

nlohmann::json get_status(int verbosity) const;

/**
* Retrieves the truncation information before device truncation.
*
* @return A constant reference to the truncation_info object representing the truncation information.
*/
const truncation_info& pre_device_truncation();

/**
* \brief post device truncation processing.
*
* This function is used to update safe truncation boundary to the specified `trunc_upto_key`.
*
* \param trunc_upto_key The key indicating the log entry up to which truncation has been performed.
*/
void post_device_truncation(const logdev_key& trunc_upto_key);

/**
* Handles the completion of a write operation in the log store.
*
* @param req The logstore_req object representing the completed write operation.
* @param ld_key The logdev_key associated with the completed write operation.
*/
void on_write_completion(logstore_req* req, const logdev_key& ld_key);

/**
* \brief Handles the completion of a read operation in the log store.
*
* This function is called when a read operation in the log store has completed.
* It takes a pointer to a logstore_req object and a logdev_key object as parameters.
*
* \param req The pointer to the logstore_req object representing the read request.
* \param ld_key The logdev_key object representing the key used for the read operation.
*/
void on_read_completion(logstore_req* req, const logdev_key& ld_key);

/**
* @brief Handles the event when a log is found.
*
* This function is called when a log is found in the log store. It takes the sequence number of the log,
* the log device key, the flush log device key, and the log buffer as parameters.
*
* During LogDev::do_load during recovery boot, whenever a log is found, the associated logstore's on_log_found
* method is called.
*
* @param seq_num The sequence number of the log.
* @param ld_key The log device key.
* @param flush_ld_key The flush log device key.
* @param buf The log buffer.
*/
void on_log_found(logstore_seq_num_t seq_num, const logdev_key& ld_key, const logdev_key& flush_ld_key,
log_buffer buf);
/**
* @brief Handles the completion of a batch flush operation to update internal state.
*
* This function is called when a batch flush operation is completed.
* It takes a `logdev_key` parameter that represents the key of the flushed batch.
*
* This function is also called during log store recovery;
*
* @param flush_batch_ld_key The key of the flushed batch.
*/
void on_batch_completion(const logdev_key& flush_batch_ld_key);

private:
/**
* Truncates the log store up to the specified sequence number.
*
* @param upto_seq_num The sequence number up to which the log store should be truncated.
*/
void do_truncate(logstore_seq_num_t upto_seq_num);

int search_max_le(logstore_seq_num_t input_sn);

private:
logstore_id_t m_store_id;
std::shared_ptr< LogDev > m_logdev;
sisl::StreamTracker< logstore_record > m_records;
Expand Down
6 changes: 6 additions & 0 deletions src/include/homestore/logstore_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ class LogStoreService {
uint32_t used_size() const;
uint32_t total_size() const;
iomgr::io_fiber_t flush_thread() { return m_flush_fiber; }

/**
* This is used when the actual LogDev truncate is triggered;
*
* @return The IO fiber associated with the truncate thread.
*/
iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; }

private:
Expand Down
3 changes: 3 additions & 0 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ using remote_blkid_list_t = folly::small_vector< RemoteBlkId, 4 >;
using replica_id_t = uuid_t;
using group_id_t = uuid_t;

using store_lsn_t = int64_t;
using repl_lsn_t = int64_t;

struct peer_info {
// Peer ID.
replica_id_t id_;
Expand Down
9 changes: 9 additions & 0 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <sisl/fds/utils.hpp>
#include <sisl/grpc/generic_service.hpp>
#include <homestore/replication/repl_decls.h>
#include <libnuraft/snapshot.hxx>

namespace nuraft {
template < typename T >
Expand Down Expand Up @@ -50,6 +51,11 @@ struct repl_key {
std::string to_string() const { return fmt::format("server={}, term={}, dsn={}", server_id, term, dsn); }
};

struct repl_snapshot {
uint64_t last_log_idx_{0};
uint64_t last_log_term_{0};
};

struct repl_journal_entry;
struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::thread_safe_counter > {
friend class SoloReplDev;
Expand Down Expand Up @@ -192,6 +198,9 @@ class ReplDevListener {
/// @brief Called when the replica set is being stopped
virtual void on_replica_stop() = 0;

/// @brief Called when the snapshot is being created by nuraft;
virtual AsyncReplResult<> create_snapshot(repl_snapshot& s) = 0;

private:
std::weak_ptr< ReplDev > m_repl_dev;
};
Expand Down
1 change: 0 additions & 1 deletion src/include/homestore/replication_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ VENUM(repl_impl_type, uint8_t,
solo // For single node - no replication
);


class ReplApplication;

class ReplicationService {
Expand Down
2 changes: 1 addition & 1 deletion src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ CPManager::CPManager() :
nullptr);

resource_mgr().register_dirty_buf_exceed_cb(
[this]([[maybe_unused]] int64_t dirty_buf_count) { this->trigger_cp_flush(false /* false */); });
[this]([[maybe_unused]] int64_t dirty_buf_count, bool critical) { this->trigger_cp_flush(false /* force */); });

start_cp_thread();
}
Expand Down
26 changes: 21 additions & 5 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,21 @@ table ResourceLimits {
/* precentage of memory used during recovery */
memory_in_recovery_precent: uint32 = 40;

/* journal size used percentage */
journal_size_percent: uint32 = 50;
/* journal size used percentage high watermark -- trigger cp */
journal_vdev_size_percent: uint32 = 50;

/* journal size used percentage critical watermark -- trigger truncation */
journal_vdev_size_percent_critical: uint32 = 90;

/* [not used] journal descriptor size (NuObject: Per PG) Threshold in MB -- ready for truncation */
journal_descriptor_size_threshold_mb: uint32 = 2048(hotswap);

/* num entries that raft logstore wants to reserve -- its truncate should not across this */
/* 0 means HomeStore doesn't reserve anything and let nuraft controlls the truncation */
raft_logstore_reserve_threshold: uint32 = 0 (hotswap);

/* resource audit timer in ms */
resource_audit_timer_ms: uint32 = 120000;

/* We crash if volume is 95 percent filled and no disk space left */
vol_threshhold_used_size_p: uint32 = 95;
Expand Down Expand Up @@ -199,14 +212,17 @@ table Consensus {
heartbeat_period_ms: uint32 = 250;

// Re-election timeout low and high mark
elect_to_low_ms: uint32 = 900;
elect_to_high_ms: uint32 = 1400;
elect_to_low_ms: uint32 = 800;
elect_to_high_ms: uint32 = 1700;

// When a new member is being synced, the batch size of number of logs to be shipped
log_sync_batch_size: int32 = 100;

// Log distance with which snapshot/compact needs to happen. 0 means snapshot is disabled
snapshot_freq_distance: int32 = 0;
snapshot_freq_distance: uint32 = 2000;

// Num reserved log items while triggering compact from raft server, only consumed by nuraft server;
num_reserved_log_items: uint32 = 20000;

// Max append batch size
max_append_batch_size: int32 = 64;
Expand Down
89 changes: 79 additions & 10 deletions src/lib/common/resource_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,75 @@
*
*********************************************************************************/
#include <homestore/homestore.hpp>
#include <homestore/logstore_service.hpp>
#include <homestore/replication_service.hpp>
#include <iomgr/iomgr_flip.hpp>
#include "resource_mgr.hpp"
#include "homestore_assert.hpp"
#include "replication/repl_dev/raft_repl_dev.h"

namespace homestore {
ResourceMgr& resource_mgr() { return hs()->resource_mgr(); }

void ResourceMgr::set_total_cap(uint64_t total_cap) { m_total_cap = total_cap; }
void ResourceMgr::start(uint64_t total_cap) {
m_total_cap = total_cap;
start_timer();
}

void ResourceMgr::stop() {
LOGINFO("Cancel resource manager timer.");
iomanager.cancel_timer(m_res_audit_timer_hdl);
m_res_audit_timer_hdl = iomgr::null_timer_handle;
}

//
// 1. Conceptually in rare case(not poosible for NuObject, possibly true for NuBlox2.0) truncate itself can't garunteen
// the space is freed up upto satisfy resource manager. e.g. multiple log stores on this same descriptor and one
// logstore lagging really behind and not able to truncate much space. Doing multiple truncation won't help in this
// case.
// 2. And any write on any other descriptor will trigger a high_watermark_check, and if it were to trigger critial
// alert on this vdev, truncation will be made immediately on all descriptors;
// 3. If still no space can be freed, there is nothing we can't here to back pressure to above layer by rejecting log
// writes on this descriptor;
//
void ResourceMgr::trigger_truncate() {
if (hs()->has_repl_data_service()) {
// first make sure all repl dev's underlying raft log store make corresponding reservation during
// truncate -- set the safe truncate boundary for each raft log store;
hs()->repl_service().iterate_repl_devs([](cshared< ReplDev >& rd) {
// lock is already taken by repl service layer;
std::dynamic_pointer_cast< RaftReplDev >(rd)->truncate(
HS_DYNAMIC_CONFIG(resource_limits.raft_logstore_reserve_threshold));
});

// next do device truncate which go through all logdevs and truncate them;
hs()->logstore_service().device_truncate();
}

// TODO: add device_truncate callback to audit how much space was freed per each LogDev and add related
// metrics;
}

void ResourceMgr::start_timer() {
auto const res_mgr_timer_ms = HS_DYNAMIC_CONFIG(resource_limits.resource_audit_timer_ms);
LOGINFO("resource audit timer is set to {} usec", res_mgr_timer_ms);

m_res_audit_timer_hdl = iomanager.schedule_global_timer(
res_mgr_timer_ms * 1000 * 1000, true /* recurring */, nullptr /* cookie */, iomgr::reactor_regex::all_worker,
[this](void*) {
// all resource timely audit routine should arrive here;
this->trigger_truncate();
},
true /* wait_to_schedule */);
}

/* monitor dirty buffer count */
void ResourceMgr::inc_dirty_buf_size(const uint32_t size) {
HS_REL_ASSERT_GT(size, 0);
const auto dirty_buf_cnt = m_hs_dirty_buf_cnt.fetch_add(size, std::memory_order_relaxed);
COUNTER_INCREMENT(m_metrics, dirty_buf_cnt, size);
if (m_dirty_buf_exceed_cb && ((dirty_buf_cnt + size) > get_dirty_buf_limit())) {
m_dirty_buf_exceed_cb(dirty_buf_cnt + size);
m_dirty_buf_exceed_cb(dirty_buf_cnt + size, false /* critical */);
}
}

Expand Down Expand Up @@ -106,22 +160,37 @@ uint64_t ResourceMgr::get_cache_size() const {
return ((HS_STATIC_CONFIG(input.io_mem_size()) * HS_DYNAMIC_CONFIG(resource_limits.cache_size_percent)) / 100);
}

/* monitor journal size */
bool ResourceMgr::check_journal_size(const uint64_t used_size, const uint64_t total_size) {
if (m_journal_exceed_cb) {
bool ResourceMgr::check_journal_descriptor_size(const uint64_t used_size) const {
return (used_size >= get_journal_descriptor_size_limit());
}

/* monitor journal vdev size */
bool ResourceMgr::check_journal_vdev_size(const uint64_t used_size, const uint64_t total_size) {
if (m_journal_vdev_exceed_cb) {
const uint32_t used_pct = (100 * used_size / total_size);
if (used_pct >= HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent)) {
m_journal_exceed_cb(used_size);
if (used_pct >= get_journal_vdev_size_limit()) {
m_journal_vdev_exceed_cb(used_size, used_pct >= get_journal_vdev_size_critical_limit() /* is_critical */);
HS_LOG_EVERY_N(WARN, base, 50, "high watermark hit, used percentage: {}, high watermark percentage: {}",
used_pct, HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent));
used_pct, get_journal_vdev_size_limit());
return true;
}
}
return false;
}
void ResourceMgr::register_journal_exceed_cb(exceed_limit_cb_t cb) { m_journal_exceed_cb = std::move(cb); }

uint32_t ResourceMgr::get_journal_size_limit() const { return HS_DYNAMIC_CONFIG(resource_limits.journal_size_percent); }
void ResourceMgr::register_journal_vdev_exceed_cb(exceed_limit_cb_t cb) { m_journal_vdev_exceed_cb = std::move(cb); }

uint32_t ResourceMgr::get_journal_descriptor_size_limit() const {
return HS_DYNAMIC_CONFIG(resource_limits.journal_descriptor_size_threshold_mb) * 1024 * 1024;
}

uint32_t ResourceMgr::get_journal_vdev_size_critical_limit() const {
return HS_DYNAMIC_CONFIG(resource_limits.journal_vdev_size_percent_critical);
}

uint32_t ResourceMgr::get_journal_vdev_size_limit() const {
return HS_DYNAMIC_CONFIG(resource_limits.journal_vdev_size_percent);
}

/* monitor chunk size */
void ResourceMgr::check_chunk_free_size_and_trigger_cp(uint64_t free_size, uint64_t alloc_size) {}
Expand Down

0 comments on commit e4b4612

Please sign in to comment.