Skip to content

Commit

Permalink
Merge pull request #156 from Derecho-Project/config_restart
Browse files Browse the repository at this point in the history
According to the performance test using 'bandwidth_test,' the performance agrees with that before this pull request. And since nobody has given further comments on this. I confirmed this pull request.
  • Loading branch information
songweijia committed May 1, 2020
2 parents d818c30 + b5d2a25 commit 7f68235
Show file tree
Hide file tree
Showing 15 changed files with 580 additions and 340 deletions.
32 changes: 26 additions & 6 deletions include/derecho/conf/conf.hpp
Expand Up @@ -17,11 +17,12 @@ namespace derecho {
/** The single configuration file for derecho **/
class Conf {
private:
// Configuration Table:
// config name --> default value
//String constants for config options
#define CONF_DERECHO_LEADER_IP "DERECHO/leader_ip"
#define CONF_DERECHO_LEADER_GMS_PORT "DERECHO/leader_gms_port"
#define CONF_DERECHO_LEADER_EXTERNAL_PORT "DERECHO/leader_external_port"
#define CONF_DERECHO_RESTART_LEADERS "DERECHO/restart_leaders"
#define CONF_DERECHO_RESTART_LEADER_PORTS "DERECHO/restart_leader_ports"
#define CONF_DERECHO_LOCAL_ID "DERECHO/local_id"
#define CONF_DERECHO_LOCAL_IP "DERECHO/local_ip"
#define CONF_DERECHO_GMS_PORT "DERECHO/gms_port"
Expand All @@ -31,6 +32,8 @@ class Conf {
#define CONF_DERECHO_EXTERNAL_PORT "DERECHO/external_port"
#define CONF_DERECHO_HEARTBEAT_MS "DERECHO/heartbeat_ms"
#define CONF_DERECHO_SST_POLL_CQ_TIMEOUT_MS "DERECHO/sst_poll_cq_timeout_ms"
#define CONF_DERECHO_RESTART_TIMEOUT_MS "DERECHO/restart_timeout_ms"
#define CONF_DERECHO_ENABLE_BACKUP_RESTART_LEADERS "DERECHO/enable_backup_restart_leaders"
#define CONF_DERECHO_DISABLE_PARTITIONING_SAFETY "DERECHO/disable_partitioning_safety"

#define CONF_DERECHO_MAX_P2P_REQUEST_PAYLOAD_SIZE "DERECHO/max_p2p_request_payload_size"
Expand All @@ -55,12 +58,15 @@ class Conf {
#define CONF_PERS_MAX_DATA_SIZE "PERS/max_data_size"
#define CONF_LOGGER_DEFAULT_LOG_NAME "LOGGER/default_log_name"
#define CONF_LOGGER_DEFAULT_LOG_LEVEL "LOGGER/default_log_level"

// Configuration Table:
// config name --> default value
std::map<const std::string, std::string> config = {
// [DERECHO]
{CONF_DERECHO_LEADER_IP, "127.0.0.1"},
{CONF_DERECHO_LEADER_GMS_PORT, "23580"},
{CONF_DERECHO_LEADER_EXTERNAL_PORT, "32645"},
{CONF_DERECHO_RESTART_LEADERS, "127.0.0.1"},
{CONF_DERECHO_RESTART_LEADER_PORTS, "23580"},
{CONF_DERECHO_LOCAL_ID, "0"},
{CONF_DERECHO_LOCAL_IP, "127.0.0.1"},
{CONF_DERECHO_GMS_PORT, "23580"},
Expand All @@ -70,10 +76,12 @@ class Conf {
{CONF_DERECHO_EXTERNAL_PORT, "32645"},
{CONF_SUBGROUP_DEFAULT_RDMC_SEND_ALGORITHM, "binomial_send"},
{CONF_DERECHO_SST_POLL_CQ_TIMEOUT_MS, "2000"},
{CONF_DERECHO_RESTART_TIMEOUT_MS, "2000"},
{CONF_DERECHO_DISABLE_PARTITIONING_SAFETY, "true"},
{CONF_DERECHO_MAX_P2P_REQUEST_PAYLOAD_SIZE, "10240"},
{CONF_DERECHO_MAX_P2P_REPLY_PAYLOAD_SIZE, "10240"},
{CONF_DERECHO_P2P_WINDOW_SIZE, "16"},
{CONF_DERECHO_ENABLE_BACKUP_RESTART_LEADERS, "false"},
{CONF_DERECHO_MAX_P2P_REQUEST_PAYLOAD_SIZE, "10240"},
{CONF_DERECHO_MAX_P2P_REPLY_PAYLOAD_SIZE, "10240"},
{CONF_DERECHO_P2P_WINDOW_SIZE, "16"},
// [SUBGROUP/<subgroupname>]
{CONF_SUBGROUP_DEFAULT_MAX_PAYLOAD_SIZE, "10240"},
{CONF_SUBGROUP_DEFAULT_MAX_REPLY_PAYLOAD_SIZE, "10240"},
Expand Down Expand Up @@ -207,5 +215,17 @@ const float getConfFloat(const std::string& key);
const double getConfDouble(const std::string& key);
const bool getConfBoolean(const std::string& key);
const bool hasCustomizedConfKey(const std::string& key);

/**
* Splits a string into a vector of strings using a delimiting string. This is
* helpful for parsing "list-like" config options, which are comma-delimited
* sequences of strings or numbers (so the default delimiter is ",").
* @param str The string to split
* @param delimiter The string to use as the delimiter for splitting
* @return A vector of substrings of the input string, partitioned on the
* delimiter. The delimiter is not included in any substring.
*/
std::vector<std::string> split_string(const std::string& str, const std::string& delimiter = ",");

} // namespace derecho
#endif // CONF_HPP
110 changes: 56 additions & 54 deletions include/derecho/core/detail/group_impl.hpp
Expand Up @@ -114,49 +114,35 @@ void Group<ReplicatedTypes...>::set_external_caller_pointer(std::type_index type
...);
}

/* There is only one constructor */
template <typename... ReplicatedTypes>
Group<ReplicatedTypes...>::Group(const CallbackSet& callbacks,
const SubgroupInfo& subgroup_info,
IDeserializationContext* deserialization_context,
std::vector<view_upcall_t> _view_upcalls,
Factory<ReplicatedTypes>... factories)
: my_id(getConfUInt32(CONF_DERECHO_LOCAL_ID)),
is_starting_leader((getConfString(CONF_DERECHO_LOCAL_IP) == getConfString(CONF_DERECHO_LEADER_IP))
&& (getConfUInt16(CONF_DERECHO_GMS_PORT) == getConfUInt16(CONF_DERECHO_LEADER_GMS_PORT))),
leader_connection([&]() -> std::optional<tcp::socket> {
if(!is_starting_leader) {
return tcp::socket{getConfString(CONF_DERECHO_LEADER_IP), getConfUInt16(CONF_DERECHO_LEADER_GMS_PORT)};
}
return std::nullopt;
}()),
user_deserialization_context(deserialization_context),
persistence_manager(objects_by_subgroup_id, callbacks.local_persistence_callback),
//Initially empty, all connections are added in the new view callback
tcp_sockets(std::make_shared<tcp::tcp_connections>(my_id, std::map<node_id_t, std::pair<ip_addr_t, uint16_t>>{{my_id, {getConfString(CONF_DERECHO_LOCAL_IP), getConfUInt16(CONF_DERECHO_RPC_PORT)}}})),
view_manager([&]() {
if(is_starting_leader) {
return ViewManager(subgroup_info,
{std::type_index(typeid(ReplicatedTypes))...},
std::disjunction_v<has_persistent_fields<ReplicatedTypes>...>,
tcp_sockets, objects_by_subgroup_id,
persistence_manager.get_callbacks(),
_view_upcalls);
} else {
return ViewManager(leader_connection.value(),
subgroup_info,
{std::type_index(typeid(ReplicatedTypes))...},
std::disjunction_v<has_persistent_fields<ReplicatedTypes>...>,
tcp_sockets, objects_by_subgroup_id,
persistence_manager.get_callbacks(),
_view_upcalls);
}
}()),
view_manager(subgroup_info,
{std::type_index(typeid(ReplicatedTypes))...},
std::disjunction_v<has_persistent_fields<ReplicatedTypes>...>,
tcp_sockets, objects_by_subgroup_id,
persistence_manager.get_callbacks(),
_view_upcalls),
rpc_manager(view_manager, deserialization_context),
factories(make_kind_map(factories...)) {
bool in_total_restart = view_manager.first_init();
//State transfer must complete before an initial view can commit, and must retry if the view is aborted
bool initial_view_confirmed = false;
bool restart_leader_failed = false;
while(!initial_view_confirmed) {
if(restart_leader_failed) {
//Retry connecting to the restart leader
dbg_default_warn("Restart leader failed during 2PC! Trying again...");
in_total_restart = view_manager.restart_to_initial_view();
}
//This might be the shard leaders from the previous view,
//or the nodes with the longest logs in their shard if we're doing total restart,
//or empty if this is the first View of a new group
Expand All @@ -165,25 +151,34 @@ Group<ReplicatedTypes...>::Group(const CallbackSet& callbacks,
//this node needs to receive object state from
std::set<std::pair<subgroup_id_t, node_id_t>> subgroups_and_leaders_to_receive
= construct_objects<ReplicatedTypes...>(view_manager.get_current_or_restart_view().get(),
old_shard_leaders);
//These functions are no-ops if we're not doing total restart
view_manager.truncate_logs();
view_manager.send_logs();
old_shard_leaders, in_total_restart);
if(in_total_restart) {
view_manager.truncate_logs();
view_manager.send_logs();
}
receive_objects(subgroups_and_leaders_to_receive);
if(is_starting_leader) {
bool leader_has_quorum = true;
initial_view_confirmed = view_manager.leader_prepare_initial_view(leader_has_quorum);
if(!leader_has_quorum) {
//If quorum was lost due to failures during the prepare message,
//stop here and wait for more nodes to rejoin before going back to state-transfer
view_manager.await_rejoining_nodes(my_id);
if(view_manager.is_starting_leader()) {
if(in_total_restart) {
bool leader_has_quorum = true;
view_manager.leader_prepare_initial_view(initial_view_confirmed, leader_has_quorum);
if(!leader_has_quorum) {
//If quorum was lost due to failures during the prepare message,
//stop here and wait for more nodes to rejoin before going back to state-transfer
view_manager.await_rejoining_nodes(my_id);
}
} else {
initial_view_confirmed = true;
}
} else {
//This will wait for a new view to be sent if the view was aborted
initial_view_confirmed = view_manager.check_view_committed(leader_connection.value());
//It must be called even in the non-restart case, since the initial view could still be aborted
view_manager.check_view_committed(initial_view_confirmed, restart_leader_failed);
if(restart_leader_failed && !(in_total_restart && getConfBoolean(CONF_DERECHO_ENABLE_BACKUP_RESTART_LEADERS))) {
throw derecho_exception("Leader crashed before it could send the initial View! Try joining again at the new leader.");
}
}
}
if(is_starting_leader) {
if(view_manager.is_starting_leader()) {
//In restart mode, once a prepare is successful, send a commit
//(this function does nothing if we're not doing total restart)
view_manager.leader_commit_initial_view();
Expand All @@ -203,7 +198,7 @@ Group<ReplicatedTypes...>::Group(const CallbackSet& callbacks,
persistence_manager.start();
}

//nope there's two now
/* A simpler constructor that uses "default" options for callbacks, upcalls, and deserialization context */
template <typename... ReplicatedTypes>
Group<ReplicatedTypes...>::Group(const SubgroupInfo& subgroup_info, Factory<ReplicatedTypes>... factories)
: Group({}, subgroup_info, nullptr, {}, factories...) {}
Expand All @@ -222,7 +217,8 @@ template <typename... ReplicatedTypes>
template <typename FirstType, typename... RestTypes>
std::set<std::pair<subgroup_id_t, node_id_t>> Group<ReplicatedTypes...>::construct_objects(
const View& curr_view,
const vector_int64_2d& old_shard_leaders) {
const vector_int64_2d& old_shard_leaders,
bool in_restart) {
std::set<std::pair<subgroup_id_t, uint32_t>> subgroups_to_receive;
if(!curr_view.is_adequately_provisioned) {
return subgroups_to_receive;
Expand Down Expand Up @@ -250,13 +246,15 @@ std::set<std::pair<subgroup_id_t, node_id_t>> Group<ReplicatedTypes...>::constru
objects_by_subgroup_id.erase(subgroup_id);
replicated_objects.template get<FirstType>().erase(old_object);
}
//Determine if there is existing state for this shard on another node
bool has_previous_leader = old_shard_leaders.size() > subgroup_id
&& old_shard_leaders[subgroup_id].size() > shard_num
&& old_shard_leaders[subgroup_id][shard_num] > -1
&& old_shard_leaders[subgroup_id][shard_num] != my_id;
//If we don't have a Replicated<T> for this (type, subgroup index), we just became a member of the shard
if(replicated_objects.template get<FirstType>().count(subgroup_index) == 0) {
//Determine if there is existing state for this shard that will need to be received
bool has_previous_leader = old_shard_leaders.size() > subgroup_id
&& old_shard_leaders[subgroup_id].size() > shard_num
&& old_shard_leaders[subgroup_id][shard_num] > -1
&& old_shard_leaders[subgroup_id][shard_num] != my_id;
dbg_default_debug("Constructing a Replicated Object for type {}, subgroup {}, shard {}",
typeid(FirstType).name(), subgroup_id, shard_num);
if(has_previous_leader) {
subgroups_to_receive.emplace(subgroup_id, old_shard_leaders[subgroup_id][shard_num]);
}
Expand All @@ -276,8 +274,13 @@ std::set<std::pair<subgroup_id_t, node_id_t>> Group<ReplicatedTypes...>::constru
// Store a reference to the Replicated<T> just constructed
objects_by_subgroup_id.emplace(subgroup_id,
replicated_objects.template get<FirstType>().at(subgroup_index));
break; // This node can be in at most one shard, so stop here
} else if(in_restart && has_previous_leader) {
//In restart mode, construct_objects may be called multiple times if the initial view
//is aborted, so we need to receive state for this shard even if we already constructed
//the Replicated<T> for it
subgroups_to_receive.emplace(subgroup_id, old_shard_leaders[subgroup_id][shard_num]);
}
break; // This node can be in at most one shard, so stop here
}
}
if(!in_subgroup) {
Expand All @@ -295,7 +298,7 @@ std::set<std::pair<subgroup_id_t, node_id_t>> Group<ReplicatedTypes...>::constru
my_id, subgroup_id, rpc_manager));
}
}
return functional_insert(subgroups_to_receive, construct_objects<RestTypes...>(curr_view, old_shard_leaders));
return functional_insert(subgroups_to_receive, construct_objects<RestTypes...>(curr_view, old_shard_leaders, in_restart));
}

template <typename... ReplicatedTypes>
Expand All @@ -315,7 +318,7 @@ void Group<ReplicatedTypes...>::set_up_components() {
view_manager.register_initialize_objects_upcall([this](node_id_t my_id, const View& view,
const vector_int64_2d& old_shard_leaders) {
std::set<std::pair<subgroup_id_t, node_id_t>> subgroups_and_leaders
= construct_objects<ReplicatedTypes...>(view, old_shard_leaders);
= construct_objects<ReplicatedTypes...>(view, old_shard_leaders, false);
receive_objects(subgroups_and_leaders);
});
}
Expand Down Expand Up @@ -349,8 +352,7 @@ ShardIterator<SubgroupType> Group<ReplicatedTypes...>::get_shard_iterator(uint32
try {
auto& EC = external_callers.template get<SubgroupType>().at(subgroup_index);
SharedLockedReference<View> curr_view = view_manager.get_current_view();
auto subgroup_id = curr_view.get().subgroup_ids_by_type_id.at(index_of_type<SubgroupType, ReplicatedTypes...>)
.at(subgroup_index);
auto subgroup_id = curr_view.get().subgroup_ids_by_type_id.at(index_of_type<SubgroupType, ReplicatedTypes...>).at(subgroup_index);
const auto& shard_subviews = curr_view.get().subgroup_shard_views.at(subgroup_id);
std::vector<node_id_t> shard_reps(shard_subviews.size());
for(uint i = 0; i < shard_subviews.size(); ++i) {
Expand Down Expand Up @@ -380,10 +382,10 @@ void Group<ReplicatedTypes...>::receive_objects(const std::set<std::pair<subgrou
subgroup_and_leader.first, subgroup_and_leader.second);
std::size_t buffer_size;
bool success = leader_socket.get().read(buffer_size);
assert_always(success);
if(!success) throw derecho_exception("Fatal error: Subgroup leader failed during state transfer.");
char* buffer = new char[buffer_size];
success = leader_socket.get().read(buffer, buffer_size);
assert_always(success);
if(!success) throw derecho_exception("Fatal error: Subgroup leader failed during state transfer.");
subgroup_object.receive_object(buffer);
delete[] buffer;
}
Expand Down
30 changes: 25 additions & 5 deletions include/derecho/core/detail/restart_state.hpp
Expand Up @@ -55,14 +55,35 @@ inline std::string ragged_trim_filename(subgroup_id_t subgroup_num, uint32_t sha
using ragged_trim_map_t = std::map<subgroup_id_t, std::map<uint32_t, std::unique_ptr<RaggedTrim>>>;

struct RestartState {
/** List of logged ragged trim states recovered from the last known View,
/**
* List of logged ragged trim states recovered from the last known View,
* either read locally from this node's logs or received from the restart
* leader. */
* leader.
*/
ragged_trim_map_t logged_ragged_trim;
/** Map from (subgroup ID, shard num) to ID of the "restart leader" for that
/**
* Map from (subgroup ID, shard num) to ID of the "restart leader" for that
* shard, which is the node with the longest persistent log for that shard's
* replicated state. */
* replicated state.
*/
std::vector<std::vector<int64_t>> restart_shard_leaders;
/**
* List of IP addresses of potential restart leaders (of the overall process)
* in descending priority order
*/
std::vector<std::string> restart_leader_ips;
/**
* List of GMS ports of potential restart leaders in descending priority order
*/
std::vector<uint16_t> restart_leader_ports;
/**
* The number of restart leaders that have failed; this is the current index
* into restart_leader_ips.
*/
uint32_t num_leader_failures;
/**
* Reads the logs stored at this node and initializes logged_ragged_trim
*/
void load_ragged_trim(const View& curr_view);
/**
* Computes the persistent version corresponding to a ragged trim proposal,
Expand Down Expand Up @@ -136,7 +157,6 @@ class RestartLeaderState {
std::unique_ptr<View> update_curr_and_next_restart_view();

public:
static const int RESTART_LEADER_TIMEOUT = 2000;
RestartLeaderState(std::unique_ptr<View> _curr_view, RestartState& restart_state,
const SubgroupInfo& subgroup_info,
const node_id_t my_id);
Expand Down

0 comments on commit 7f68235

Please sign in to comment.