Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shard placement table persistence #18283

Merged
merged 17 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/v/bytes/include/bytes/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ inline iobuf bytes_to_iobuf(const bytes& in) {
return out;
}

inline iobuf bytes_to_iobuf(bytes_view in) {
iobuf out;
// NOLINTNEXTLINE
out.append(reinterpret_cast<const char*>(in.data()), in.size());
return out;
}

// NOLINTNEXTLINE(cert-dcl58-cpp): hash<> specialization
namespace std {
template<>
Expand Down
9 changes: 7 additions & 2 deletions src/v/cluster/cluster_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,16 @@ std::optional<shard_placement_target> placement_target_on_node(
// expected shard is determined by the resulting assignment
// (including cancellation effects).
return shard_placement_target{
log_revision, resulting_shard_on_node.value()};
replicas_view.assignment.group,
log_revision,
resulting_shard_on_node.value()};
} else {
// partition is moved away from this node, but we keep the original
// replica until update is finished.
return shard_placement_target{
log_revision, orig_shard_on_node.value()};
replicas_view.assignment.group,
log_revision,
orig_shard_on_node.value()};
}
} else if (replicas_view.update) {
// if partition appears on the node as a result of the update, create
Expand All @@ -252,6 +256,7 @@ std::optional<shard_placement_target> placement_target_on_node(
replicas_view.update->get_target_replicas(), node);
if (updated_shard_on_node) {
return shard_placement_target{
replicas_view.assignment.group,
replicas_view.update->get_update_revision(),
updated_shard_on_node.value()};
}
Expand Down
8 changes: 6 additions & 2 deletions src/v/cluster/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ ss::future<> controller::wire_up() {
std::ref(_partition_allocator),
std::ref(_node_status_table));
})
.then([this] { return _shard_placement.start(); })
.then([this] {
return _shard_placement.start(ss::sharded_parameter(
[this] { return std::ref(_storage.local().kvs()); }));
})
.then([this] { _probe.start(); });
}

Expand Down Expand Up @@ -438,8 +441,9 @@ ss::future<> controller::start(
})
.then([this] {
return _shard_balancer.start_single(
std::ref(_tp_state),
std::ref(_shard_placement),
std::ref(_feature_table),
std::ref(_tp_state),
std::ref(_backend));
})
.then(
Expand Down
92 changes: 81 additions & 11 deletions src/v/cluster/shard_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@
#include "cluster/cluster_utils.h"
#include "cluster/logger.h"
#include "config/node_config.h"
#include "ssx/async_algorithm.h"

namespace cluster {

shard_balancer::shard_balancer(
ss::sharded<topic_table>& topics,
ss::sharded<shard_placement_table>& spt,
ss::sharded<features::feature_table>& features,
ss::sharded<topic_table>& topics,
ss::sharded<controller_backend>& cb)
: _topics(topics)
, _shard_placement(spt)
: _shard_placement(spt.local())
, _features(features.local())
, _topics(topics)
, _controller_backend(cb)
, _self(*config::node().node_id()) {}

Expand All @@ -32,19 +35,81 @@ ss::future<> shard_balancer::start() {
"method can only be invoked on shard {}",
shard_id);

// We expect topic_table to remain unchanged throughout the method
// invocation because it is supposed to be called after local controller
// replay is finished but before we start getting new controller updates
// from the leader.
auto tt_version = _topics.local().topics_map_revision();

co_await _shard_placement.invoke_on_all([this](shard_placement_table& spt) {
return spt.initialize(_topics.local(), _self);
});
if (_shard_placement.is_persistence_enabled()) {
// 1. collect the set of node-local ntps from topic_table

chunked_hash_map<raft::group_id, model::ntp> local_group2ntp;
chunked_hash_map<model::ntp, model::revision_id> local_ntp2log_revision;
const auto& topics = _topics.local();
ssx::async_counter counter;
for (const auto& [ns_tp, md_item] : topics.all_topics_metadata()) {
vassert(
tt_version == topics.topics_map_revision(),
"topic_table unexpectedly changed");

co_await ssx::async_for_each_counter(
counter,
md_item.get_assignments().begin(),
md_item.get_assignments().end(),
[&](const partition_assignment& p_as) {
vassert(
tt_version == topics.topics_map_revision(),
"topic_table unexpectedly changed");

model::ntp ntp{ns_tp.ns, ns_tp.tp, p_as.id};
auto replicas_view = topics.get_replicas_view(
ntp, md_item, p_as);
auto log_rev = log_revision_on_node(replicas_view, _self);
if (log_rev) {
local_group2ntp.emplace(
replicas_view.assignment.group, ntp);
local_ntp2log_revision.emplace(ntp, *log_rev);
}
});
}

// 2. restore shard_placement_table from the kvstore

co_await _shard_placement.initialize_from_kvstore(local_group2ntp);

// 3. assign non-assigned ntps that have to be assigned

co_await ssx::async_for_each_counter(
counter,
local_ntp2log_revision.begin(),
local_ntp2log_revision.end(),
[&](const std::pair<const model::ntp&, model::revision_id> kv) {
const auto& [ntp, log_revision] = kv;
auto existing_target = _shard_placement.get_target(ntp);
if (
!existing_target
|| existing_target->log_revision != log_revision) {
_to_assign.insert(ntp);
}
});
co_await do_assign_ntps();
} else {
co_await _shard_placement.initialize_from_topic_table(_topics, _self);

if (_features.is_active(
features::feature::shard_placement_persistence)) {
co_await _shard_placement.enable_persistence();
}
}

// we shouldn't be receiving any controller updates at this point, so no
// risk of missing a notification between initializing shard_placement_table
// and subscribing.
vassert(
tt_version == _topics.local().topics_map_revision(),
"topic_table unexpectedly changed");

// we shouldn't be receiving any controller updates at this point, so no
// risk of missing a notification between initializing shard_placement_table
// and subscribing.
_topic_table_notify_handle = _topics.local().register_delta_notification(
[this](topic_table::delta_range_t deltas_range) {
for (const auto& delta : deltas_range) {
Expand Down Expand Up @@ -88,6 +153,12 @@ ss::future<> shard_balancer::assign_fiber() {
co_return;
}

if (
_features.is_active(features::feature::shard_placement_persistence)
&& !_shard_placement.is_persistence_enabled()) {
co_await _shard_placement.enable_persistence();
}

co_await do_assign_ntps();
}
}
Expand Down Expand Up @@ -118,8 +189,7 @@ ss::future<> shard_balancer::assign_ntp(const model::ntp& ntp) {
target);

try {
co_await _shard_placement.local().set_target(
ntp, target, shard_callback);
co_await _shard_placement.set_target(ntp, target, shard_callback);
} catch (...) {
auto ex = std::current_exception();
if (!ssx::is_shutdown_exception(ex)) {
Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/shard_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ class shard_balancer {
static constexpr ss::shard_id shard_id = 0;

shard_balancer(
ss::sharded<topic_table>&,
ss::sharded<shard_placement_table>&,
ss::sharded<features::feature_table>&,
ss::sharded<topic_table>&,
ss::sharded<controller_backend>&);

ss::future<> start();
Expand All @@ -46,8 +47,9 @@ class shard_balancer {
ss::future<> assign_ntp(const model::ntp&);

private:
shard_placement_table& _shard_placement;
features::feature_table& _features;
ss::sharded<topic_table>& _topics;
ss::sharded<shard_placement_table>& _shard_placement;
ss::sharded<controller_backend>& _controller_backend;
model::node_id _self;

Expand Down