Skip to content

Commit

Permalink
Merge branch 'main' into topic/minor-fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
tobim committed Apr 5, 2024
2 parents e14d244 + f980665 commit e6eb45b
Show file tree
Hide file tree
Showing 15 changed files with 106 additions and 63 deletions.
3 changes: 0 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ RUN mkdir -p \
/var/lib/tenzir \
/var/log/tenzir

EXPOSE 5158/tcp

WORKDIR /var/lib/tenzir
VOLUME ["/var/lib/tenzir"]

Expand Down Expand Up @@ -158,7 +156,6 @@ RUN apt-get update && \

USER tenzir:tenzir

EXPOSE 5158/tcp
WORKDIR /var/lib/tenzir
VOLUME ["/var/cache/tenzir", "/var/lib/tenzir"]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Nodes now shut down with a non-zero exit code when pipelines configured as part
of the `tenzir.yaml` file fail to start, making such configuration errors easier
to spot.
2 changes: 2 additions & 0 deletions changelog/next/bug-fixes/4099--wrong-expose.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Tenzir Docker images no longer expose 5158/tcp by default, as this prevented
running multiple containers in the same network or in host mode.
2 changes: 2 additions & 0 deletions changelog/next/features/4095--batch-timeout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The `batch` operator gained a new `--timeout <duration>` option that controls
the maixmum latency for withholding events for batching.
2 changes: 1 addition & 1 deletion contrib/tenzir-plugins
3 changes: 0 additions & 3 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@
"TENZIR_LOG_FILE=/var/log/tenzir/server.log"
"TENZIR_ENDPOINT=0.0.0.0"
];
ExposedPorts = {
"5158/tcp" = {};
};
WorkingDir = "${tenzir-dir}";
Volumes = {
"${tenzir-dir}" = {};
Expand Down
29 changes: 25 additions & 4 deletions libtenzir/builtins/operators/batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,24 @@ class batch_operator final : public crtp_operator<batch_operator> {
public:
batch_operator() = default;

batch_operator(uint64_t limit) : limit_{limit} {
batch_operator(uint64_t limit, duration timeout)
: limit_{limit}, timeout_{timeout} {
// nop
}

auto operator()(generator<table_slice> input) const
-> generator<table_slice> {
auto buffer = std::vector<table_slice>{};
auto num_buffered = uint64_t{0};
auto last_yield = std::chrono::steady_clock::now();
for (auto&& slice : input) {
const auto now = std::chrono::steady_clock::now();
if (now - last_yield > timeout_ and num_buffered > 0) {
TENZIR_ASSERT(num_buffered < limit_);
last_yield = now;
co_yield concatenate(std::exchange(buffer, {}));
num_buffered = 0;
}
if (slice.rows() == 0) {
co_yield {};
continue;
Expand All @@ -43,6 +52,7 @@ class batch_operator final : public crtp_operator<batch_operator> {
auto [lhs, rhs] = split(buffer, limit_);
auto result = concatenate(std::move(lhs));
num_buffered -= result.rows();
last_yield = now;
co_yield std::move(result);
buffer = std::move(rhs);
}
Expand All @@ -53,6 +63,7 @@ class batch_operator final : public crtp_operator<batch_operator> {
auto [lhs, rhs] = split(buffer, limit_);
auto result = concatenate(std::move(lhs));
num_buffered -= result.rows();
last_yield = now;
co_yield std::move(result);
buffer = std::move(rhs);
}
Expand All @@ -76,11 +87,12 @@ class batch_operator final : public crtp_operator<batch_operator> {
friend auto inspect(auto& f, batch_operator& x) -> bool {
return f.object(x)
.pretty_name("batch_operator")
.fields(f.field("limit", x.limit_));
.fields(f.field("limit", x.limit_), f.field("timeout", x.timeout_));
}

private:
uint64_t limit_ = defaults::import::table_slice_size;
duration timeout_ = {};
};

class plugin final : public virtual operator_plugin<batch_operator> {
Expand All @@ -93,15 +105,24 @@ class plugin final : public virtual operator_plugin<batch_operator> {
auto parser = argument_parser{"batch", "https://docs.tenzir.com/next/"
"operators/transformations/batch"};
auto limit = std::optional<located<uint64_t>>{};
auto timeout = std::optional<located<duration>>{};
parser.add(limit, "<limit>");
parser.add("-t,--timeout", timeout, "<limit>");
parser.parse(p);
if (limit and limit->inner == 0) {
diagnostic::error("batch size must not be 0")
.primary(limit->source)
.throw_();
}
return limit ? std::make_unique<batch_operator>(limit->inner)
: std::make_unique<batch_operator>();
if (timeout and timeout->inner <= duration::zero()) {
diagnostic::error("timeout must be a positive duration")
.primary(timeout->source)
.throw_();
}

return std::make_unique<batch_operator>(
limit ? limit->inner : defaults::import::table_slice_size,
timeout ? timeout->inner : duration::max());
}
};

Expand Down
19 changes: 13 additions & 6 deletions libtenzir/include/tenzir/shutdown.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,30 @@ namespace tenzir {
/// shutdown sequence.
/// @param self The actor to terminate.
/// @param xs Actors that need to shutdown before *self* quits.
/// @param reason The error message to terminate with; defaults to
/// caf::exit_reason::user_shutdown.
/// @relates terminate
template <class Policy>
void shutdown(caf::event_based_actor* self, std::vector<caf::actor> xs);
void shutdown(caf::event_based_actor* self, std::vector<caf::actor> xs,
caf::error reason = caf::exit_reason::user_shutdown);

template <class Policy, class... Ts>
void shutdown(caf::typed_event_based_actor<Ts...>* self,
std::vector<caf::actor> xs) {
std::vector<caf::actor> xs,
caf::error reason = caf::exit_reason::user_shutdown) {
auto handle = caf::actor_cast<caf::event_based_actor*>(self);
shutdown<Policy>(handle, std::move(xs));
shutdown<Policy>(handle, std::move(xs), std::move(reason));
}

template <class Policy>
void shutdown(caf::scoped_actor& self, std::vector<caf::actor> xs);
void shutdown(caf::scoped_actor& self, std::vector<caf::actor> xs,
caf::error reason = caf::exit_reason::user_shutdown);

template <class Policy, class Actor>
void shutdown(Actor&& self, caf::actor x) {
shutdown<Policy>(std::forward<Actor>(self), std::vector{std::move(x)});
void shutdown(Actor&& self, caf::actor x,
caf::error reason = caf::exit_reason::user_shutdown) {
shutdown<Policy>(std::forward<Actor>(self), std::vector{std::move(x)},
std::move(reason));
}

} // namespace tenzir
6 changes: 4 additions & 2 deletions libtenzir/src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,11 @@ node(node_actor::stateful_pointer<node_state> self, std::string /*name*/,
auto core_shutdown_sequence
= [=, core_shutdown_handles = std::move(core_shutdown_handles),
filesystem_handle = std::move(filesystem_handle)]() mutable {
for (const auto& comp : core_shutdown_handles)
for (const auto& comp : core_shutdown_handles) {
self->demonitor(comp);
shutdown<policy::sequential>(self, std::move(core_shutdown_handles));
}
shutdown<policy::sequential>(self, std::move(core_shutdown_handles),
msg.reason);
// We deliberately do not send an exit message to the filesystem
// actor, as that would mean that actors not tracked by the component
// registry which hold a strong handle to the filesystem actor cannot
Expand Down
23 changes: 15 additions & 8 deletions libtenzir/src/shutdown.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
namespace tenzir {

template <class Policy>
void shutdown(caf::event_based_actor* self, std::vector<caf::actor> xs) {
void shutdown(caf::event_based_actor* self, std::vector<caf::actor> xs,
caf::error reason) {
// Ignore duplicate EXIT messages except for hard kills.
self->set_exit_handler([=](const caf::exit_msg& msg) {
if (msg.reason == caf::exit_reason::kill) {
Expand All @@ -36,9 +37,9 @@ void shutdown(caf::event_based_actor* self, std::vector<caf::actor> xs) {
// Terminate actors as requested.
terminate<Policy>(self, std::move(xs))
.then(
[=](atom::done) {
[=, reason = std::move(reason)](atom::done) mutable {
TENZIR_DEBUG("{} terminates after shutting down all dependents", *self);
self->quit(caf::exit_reason::user_shutdown);
self->quit(std::move(reason));
},
[=](const caf::error& err) {
die(
Expand All @@ -47,17 +48,21 @@ void shutdown(caf::event_based_actor* self, std::vector<caf::actor> xs) {
}

template void
shutdown<policy::sequential>(caf::event_based_actor*, std::vector<caf::actor>);
shutdown<policy::sequential>(caf::event_based_actor*, std::vector<caf::actor>,
caf::error reason);

template void
shutdown<policy::parallel>(caf::event_based_actor*, std::vector<caf::actor>);
shutdown<policy::parallel>(caf::event_based_actor*, std::vector<caf::actor>,
caf::error reason);

template <class Policy>
void shutdown(caf::scoped_actor& self, std::vector<caf::actor> xs) {
void shutdown(caf::scoped_actor& self, std::vector<caf::actor> xs,
caf::error reason) {
terminate<Policy>(self, std::move(xs))
.receive(
[&](atom::done) {
TENZIR_DEBUG("{} terminates after shutting down all dependents", *self);
self->send_exit(self, std::move(reason));
},
[&](const caf::error& err) {
die(
Expand All @@ -66,9 +71,11 @@ void shutdown(caf::scoped_actor& self, std::vector<caf::actor> xs) {
}

template void
shutdown<policy::sequential>(caf::scoped_actor&, std::vector<caf::actor>);
shutdown<policy::sequential>(caf::scoped_actor&, std::vector<caf::actor>,
caf::error reason);

template void
shutdown<policy::parallel>(caf::scoped_actor&, std::vector<caf::actor>);
shutdown<policy::parallel>(caf::scoped_actor&, std::vector<caf::actor>,
caf::error reason);

} // namespace tenzir
2 changes: 1 addition & 1 deletion nix/tenzir/plugins/source.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "tenzir-plugins",
"url": "git@github.com:tenzir/tenzir-plugins",
"ref": "main",
"rev": "ba4ad114a0626c53fc3786292d8f11f04d210f1b",
"rev": "8c59533820b830cffad8c16c0a43a768f135e1a1",
"submodules": true,
"shallow": true
}
7 changes: 6 additions & 1 deletion web/docs/operators/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@ underlying pipeline execution engine. Use with caution!
## Synopsis

```
batch [<limit>]
batch [--timeout <duration>] [<limit>]
```

## Description

The `batch` operator takes its input and rewrites it into batches of up to the
desired size.

### `--timeout <duration>`

Specifies a maximum latency for events passing through the batch operator. When
unspecified, an infinite duration is used.

### `<limit>`

An unsigned integer denoting how many events to put into one batch at most.
Expand Down
18 changes: 9 additions & 9 deletions web/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions web/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ license = "BSD-3-Clause"

[tool.poetry.dependencies]
python = "^3.10"
nbconvert = "^7.16.2"
ipykernel = "^6.29.3"
nbconvert = "^7.16.3"
ipykernel = "^6.29.4"
bash_kernel = "^0.9.3"
jupyter = "^1.0.0"

Expand Down

0 comments on commit e6eb45b

Please sign in to comment.