Skip to content

Commit

Permalink
Add timeout options to summarize (#4209)
Browse files Browse the repository at this point in the history
> [!NOTE]
> I did not spend much time thinking about the naming of these two
options. This is bound to change with TQL2 in the Tenzir v5 release
anyways, but it's better to have these capabilities now than to wait.

This adds two new options to the `summarize` operator:
- `timeout <duration>` specifies the maximum lifetime of a bucket from
the time it was created at.
- `update-timeout <duration>` specifies the maximum lifetime of a bucket
from the time it was last updated.

These options basically enable streaming aggregations. The `timeout`
option is useful to guarantee that events are held back no more than the
specified duration. The `update-timeout` is useful when operating on a
stream of events where it's known that events that would be sorted into
the same group arrive at near the same time.
  • Loading branch information
dominiklohmann committed May 14, 2024
2 parents f93d1d6 + 85a918e commit e6c73a4
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 8 deletions.
8 changes: 8 additions & 0 deletions changelog/next/features/4209--streaming-summarize.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The `summarize` operator gained two new options: `timeout` and `update-timeout`,
which enable streaming aggregations. They specifiy the maximum time a bucket in
the operator may exist, tracked from the arrival of the first and last event in
the bucket, respectively. The `timeout` is useful to guarantee that events are
held back no more than the specified duration, and the `update-timeout` is
useful to finish aggregations earlier in cases where events that would be sorted
into the same buckets arrive within the specified duration, allowing results to
be seen earlier.
81 changes: 76 additions & 5 deletions libtenzir/builtins/operators/summarize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,20 @@ struct configuration {
/// Resolution for time-columns in the group-by columns.
std::optional<duration> time_resolution = {};

/// Maximum lifetime of a bucket, counted from its creation and last update,
/// respectively.
std::optional<duration> created_timeout = {};
std::optional<duration> update_timeout = {};

/// Configuration for aggregation columns.
std::vector<aggregation> aggregations = {};

friend auto inspect(auto& f, configuration& x) -> bool {
return f.object(x).fields(f.field("group_by_extractors",
x.group_by_extractors),
f.field("time_resolution", x.time_resolution),
f.field("created_timeout", x.created_timeout),
f.field("update_timeout", x.update_timeout),
f.field("aggregations", x.aggregations));
}
};
Expand Down Expand Up @@ -502,6 +509,7 @@ class implementation {
// This lambda is called for consecutive rows that belong to the same group
// and updates its aggregation functions.
auto update_bucket = [&](bucket& bucket, int64_t offset, int64_t length) {
bucket.updated_at = std::chrono::steady_clock::now();
for (auto [aggr, input] :
zip_equal(bucket.aggregations, aggregation_arrays)) {
if (!input) {
Expand All @@ -518,9 +526,9 @@ class implementation {
aggr.get_active()->add(*(*input)->Slice(offset, length));
}
};
// Step 3: Iterate over all rows of the batch, and determine a slidin window
// of rows beloging to the same batch that is as large as possible, then
// update the corresponding bucket.
// Step 3: Iterate over all rows of the batch, and determine a sliding
// window of rows belonging to the same batch that is as large as possible,
// then update the corresponding bucket.
auto first_row = int64_t{0};
auto* first_bucket = find_or_create_bucket(first_row);
TENZIR_ASSERT(slice.rows() > 0);
Expand All @@ -537,6 +545,42 @@ class implementation {
detail::narrow<int64_t>(slice.rows()) - first_row);
}

auto check_timeouts(const configuration& config)
-> generator<caf::expected<table_slice>> {
if (not config.created_timeout and not config.update_timeout) {
co_return;
}
const auto now = std::chrono::steady_clock::now();
auto copy = implementation{};
if (config.created_timeout) {
const auto threshold = now - *config.created_timeout;
for (const auto& [key, bucket] : buckets) {
if (bucket->created_at < threshold) {
copy.buckets.try_emplace(key, bucket);
}
}
}
if (config.update_timeout) {
TENZIR_ASSERT(config.update_timeout);
const auto threshold = now - *config.update_timeout;
for (const auto& [key, bucket] : buckets) {
if (bucket->updated_at < threshold) {
copy.buckets.try_emplace(key, bucket);
}
}
}
if (copy.buckets.empty()) {
co_return;
}
for (const auto& [key, _] : copy.buckets) {
const auto num_erased = buckets.erase(key);
TENZIR_ASSERT(num_erased == 1);
}
for (auto&& result : std::move(copy).finish(config)) {
co_yield std::move(result);
}
}

/// Returns the summarization results after the input is done.
auto finish(
const configuration& config) && -> generator<caf::expected<table_slice>> {
Expand Down Expand Up @@ -740,6 +784,12 @@ class implementation {
/// schemas where the input column is missing, which means that we don't
/// know which type to use until we get schema where the column exists.
std::vector<aggregation> aggregations;

/// The time when this bucket was created and last updated, respectively.
std::chrono::steady_clock::time_point created_at
= std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point updated_at
= std::chrono::steady_clock::now();
};

/// We cache the offsets and types of the resolved columns for each schema.
Expand Down Expand Up @@ -770,13 +820,20 @@ class summarize_operator final : public crtp_operator<summarize_operator> {
auto impl = implementation{};
for (auto&& slice : input) {
if (slice.rows() == 0) {
for (auto&& result : impl.check_timeouts(config_)) {
if (not result) {
diagnostic::error(result.error()).emit(ctrl.diagnostics());
co_return;
}
co_yield std::move(*result);
}
co_yield {};
continue;
}
impl.add(slice, config_, ctrl.diagnostics());
}
for (auto&& result : std::move(impl).finish(config_)) {
if (!result) {
if (not result) {
diagnostic::error(result.error()).emit(ctrl.diagnostics());
co_return;
}
Expand All @@ -788,6 +845,13 @@ class summarize_operator final : public crtp_operator<summarize_operator> {
return "summarize";
}

auto input_independent() const -> bool override {
if (config_.created_timeout or config_.update_timeout) {
return true;
}
return false;
}

auto optimize(expression const& filter, event_order order) const
-> optimize_result override {
// Note: The `unordered` relies on commutativity of the aggregation functions.
Expand Down Expand Up @@ -823,10 +887,15 @@ class plugin final : public virtual operator_plugin<summarize_operator> {
>> extractor_list)
>> -(required_ws_or_comment >> "resolution"
>> required_ws_or_comment >> duration)
>> -(required_ws_or_comment >> "timeout"
>> required_ws_or_comment >> duration)
>> -(required_ws_or_comment >> "update-timeout"
>> required_ws_or_comment >> duration)
>> optional_ws_or_comment >> end_of_pipeline_operator;
std::tuple<std::vector<std::tuple<caf::optional<std::string>, std::string,
std::string>>,
std::vector<std::string>, std::optional<tenzir::duration>>
std::vector<std::string>, std::optional<tenzir::duration>,
std::optional<tenzir::duration>, std::optional<tenzir::duration>>
parsed_aggregations{};
if (!p(f, l, parsed_aggregations)) {
return {
Expand Down Expand Up @@ -871,6 +940,8 @@ class plugin final : public virtual operator_plugin<summarize_operator> {
}
config.group_by_extractors = std::move(std::get<1>(parsed_aggregations));
config.time_resolution = std::move(std::get<2>(parsed_aggregations));
config.created_timeout = std::move(std::get<3>(parsed_aggregations));
config.update_timeout = std::move(std::get<4>(parsed_aggregations));
if (config.time_resolution and config.group_by_extractors.empty()) {
return {
std::string_view{f, l},
Expand Down
24 changes: 21 additions & 3 deletions web/docs/operators/summarize.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ Groups events and applies aggregate functions on each group.
## Synopsis

```
summarize <[field=]aggregation>... [by <extractor>... [resolution <duration>]]
summarize <[field=]aggregation>...
[by <extractor>... [resolution <duration>]]
[timeout <duration>]
[update-timeout <duration>]
```

## Description
Expand Down Expand Up @@ -56,13 +59,28 @@ groups. If `by` is omitted, all events are assigned to the same group.
### `resolution <duration>`

The `resolution` option specifies an optional duration value that specifies the
tolerance when comparing time values in the `group-by` section. For example,
`01:48` is rounded down to `01:00` when a 1-hour `resolution` is used.
tolerance when comparing time values in the `by` section. For example, `01:48`
is rounded down to `01:00` when a 1-hour `resolution` is used.

NB: we introduced the `resolution` option as a stop-gap measure to compensate for
the lack of a rounding function. The ability to apply functions in the grouping
expression will replace this option in the future.

### `timeout <duration>`

The `timeout` option specifies how long an aggregation may take, measured per
group in the `by` section from when the group is created, or if no group exists
from the time when first event arrived at the operator.

If values occur again after the timeout, a new group with an independent
aggregation will be created.

### `update-timeout <duration>`

The `update-timeout` functions just like the `timeout` option, but instead of
measuring from the first event of a group the timeout refreshes whenever an
element is added to a group.

## Examples

Group the input by `src_ip` and aggregate all unique `dest_port` values into a
Expand Down

0 comments on commit e6c73a4

Please sign in to comment.