Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
mavam committed Apr 21, 2024
1 parent daba24a commit ed03587
Showing 1 changed file with 63 additions and 3 deletions.
66 changes: 63 additions & 3 deletions libtenzir/builtins/contexts/lookup_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <tenzir/concept/parseable/tenzir/expression.hpp>
#include <tenzir/concept/parseable/to.hpp>
#include <tenzir/data.hpp>
#include <tenzir/detail/patricia.hpp>
#include <tenzir/detail/range_map.hpp>
#include <tenzir/expression.hpp>
#include <tenzir/fbs/data.hpp>
Expand Down Expand Up @@ -211,7 +212,23 @@ namespace tenzir::plugins::lookup_table {

namespace {

struct subnet_keymaker {
template <class U>
struct rebind {
using other = subnet_keymaker;
};

auto operator()(const view<ip>& addr) const -> detail::sk::patricia_key {
return {as_bytes(addr), 128};
}

auto operator()(const view<subnet>& sn) const -> detail::sk::patricia_key {
return {as_bytes(sn.network()), sn.length()};
}
};

using map_type = tsl::robin_map<key_data, data>;
using subnet_map = detail::sk::patricia_map<subnet, data, subnet_keymaker>;

class ctx final : public virtual context {
public:
Expand All @@ -229,10 +246,29 @@ class ctx final : public virtual context {
auto apply(series array, bool replace) const
-> caf::expected<std::vector<series>> override {
auto builder = series_builder{};
auto subnet_lookup = [&](const auto& value) -> std::optional<view<data>> {
auto match = detail::overload{
[&](const auto&) {
return subnet_entries.end();
},
[&](view<ip> addr) {
return subnet_entries.prefix_match(materialize(addr));
},
[&](view<subnet> sn) {
return subnet_entries.prefix_match(materialize(sn));
},
};
if (auto it = caf::visit(match, value); it != subnet_entries.end()) {
return make_view(it->second);
}
return std::nullopt;
};
for (const auto& value : array.values()) {
if (auto it = context_entries.find(materialize(value));
it != context_entries.end()) {
builder.data(it->second);
} else if (auto x = subnet_lookup(value)) {
builder.data(*x);
} else if (replace and not caf::holds_alternative<caf::none_t>(value)) {
builder.data(value);
} else {
Expand Down Expand Up @@ -276,11 +312,25 @@ class ctx final : public virtual context {

/// Inspects the context.
auto show() const -> record override {
return record{{"num_entries", context_entries.size()}};
// TODO: there's not size() function for the patricia trie, so we need to
// track the number of elements externally and integrate it into this
// output.
return record{
{"num_entries", context_entries.size()},
};
}

auto dump() -> generator<table_slice> override {
auto entry_builder = series_builder{};
for (const auto& [key, value] : subnet_entries) {
auto row = entry_builder.record();
row.field("key", data{key});
row.field("value", value);
if (entry_builder.length() >= context::dump_batch_size_limit) {
co_yield entry_builder.finish_assert_one_slice(
fmt::format("tenzir.{}.info", context_type()));
}
}
for (const auto& [key, value] : context_entries) {
auto row = entry_builder.record();
row.field("key", key.to_original_data());
Expand Down Expand Up @@ -340,8 +390,14 @@ class ctx final : public virtual context {
while (key_it != key_values.end()) {
TENZIR_ASSERT(context_it != context_values.end());
auto materialized_key = materialize(*key_it);
context_entries.insert_or_assign(materialized_key,
materialize(*context_it));
// Subnets never make it into the regular map of entries.
if (caf::holds_alternative<subnet_type>(key_type)) {
const auto& key = caf::get<subnet>(materialized_key);
subnet_entries[key] = materialize(*context_it);
} else {
context_entries.insert_or_assign(materialized_key,
materialize(*context_it));
}
key_values_list.emplace_back(materialized_key);
++key_it;
++context_it;
Expand Down Expand Up @@ -395,9 +451,11 @@ class ctx final : public virtual context {

auto reset(context::parameter_map) -> caf::expected<record> override {
context_entries.clear();
subnet_entries.clear();
return show();
}

// TODO: persist patricia trie.
auto save() const -> caf::expected<save_result> override {
// We save the context by formatting into a record of this format:
// [{key: key, value: value}, ...]
Expand Down Expand Up @@ -431,8 +489,10 @@ class ctx final : public virtual context {

private:
map_type context_entries;
subnet_map subnet_entries;
};

// TODO: do we need a v2 loader now with the subnet optimization?
struct v1_loader : public context_loader {
auto version() const -> int {
return 1;
Expand Down

0 comments on commit ed03587

Please sign in to comment.