Skip to content

Commit

Permalink
More demo-node improvements (#3450)
Browse files Browse the repository at this point in the history
This brings the M57 import pipelines back to the overview page, but they
are stopped in the sense of already completed. I also cherry-picked
@jachris' PR #3449 so we can test all changes in one go.

This also removes the pipeline that imports from `eth0`, because that
interface doesn't seem to be used by the application, at least not in
the cloud platform.
  • Loading branch information
tobim committed Aug 9, 2023
2 parents 42148f9 + 481c9fb commit c8a1108
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 47 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,14 @@ COPY demo-node /demo-node
RUN apt-get update && \
apt install -y \
curl \
jq \
procps \
zstd && \
rm -rf /var/lib/apt/lists/*

RUN /demo-node/load.bash
RUN /demo-node/import.bash
ENTRYPOINT ["/demo-node/entrypoint-noingest.bash"]
ENTRYPOINT ["/demo-node/entrypoint.bash"]

# -- tenzir-ee -------------------------------------------------------------------

Expand Down
1 change: 1 addition & 0 deletions changelog/next/features/3451--replace-schema.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
It is now possible to replace the schema name with `replace #schema="new_name"`.
35 changes: 0 additions & 35 deletions demo-node/entrypoint-noingest.bash

This file was deleted.

37 changes: 30 additions & 7 deletions demo-node/entrypoint.bash
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,46 @@ while ! timeout 100 bash -c "echo > /dev/tcp/127.0.0.1/5160"; do
sleep 1
done

# Read data from eth0. Currently disable because that interface sees very little
# traffic, giving the impression that the pipeline is defunct.
# nic_pipe="from nic eth0 | decapsulate data | import"
# curl -X POST \
# -H "Content-Type: application/json" \https://services.nvd.nist.gov/rest/json/cves/2.0
# -d "{\"name\": \"Example eth0 Import\", \"definition\": \"${nic_pipe}\", \"start_when_created\": false}" \
# http://127.0.0.1:5160/api/v0/pipeline/create

# Continuously import system load data from `vmstat -a -n 1`.
stat_pipe="shell /demo-node/csvstat.sh | read csv | replace #schema=\"vmstat.all\" | unflatten | import"
curl -X POST \
-H "Content-Type: application/json" \
-d "{\"name\": \"System Load\", \"definition\": \"${stat_pipe}\", \"start_when_created\": true}" \
http://127.0.0.1:5160/api/v0/pipeline/create

# Ingest CVEs from https://services.nvd.nist.gov/rest/json/cves/2.0.
# !! Currently disabled because of a scheduling bug.
#cve_pipe="shell /demo-node/live_cve_feed.bash | read json --ndjson | replace #schema=\"nvd.cve\" | import"
#curl -X POST \
# -H "Content-Type: application/json" \
# -d "{\"name\": \"Live CVE Notifications from the NIST API\", \"definition\": \"${cve_pipe}\", \"start_when_created\": true}" \
# http://127.0.0.1:5160/api/v0/pipeline/create

# The shell operator decompresses the data and writes it to `read suricata` on
# the fly.
# !! Not started because the data has already been imported while building the image.
suricata_pipe="shell 'bash -c \\\"curl -s -L https://storage.googleapis.com/tenzir-datasets/M57/suricata.tar.zst | tar -x --zstd --to-stdout\\\"' | read suricata | where #schema != \\\"suricata.stats\\\" | import"
curl -X POST \
-H "Content-Type: application/json" \
-d "{\"name\": \"M57 Suricata Import\", \"definition\": \"${suricata_pipe}\", \"start_when_created\": false}" \
http://127.0.0.1:5160/api/v0/pipeline/create

# The shell operator "streams" the data into a `Zeek/` directory on disk, calls
# `sync` to flush the OS write buffers, and finally sends the concatenation of
# the logs into `read zeek-tsv`.
# !! Not started because the data has already been imported while building the image.
zeek_pipe="shell 'bash -c \\\"curl -s -L https://storage.googleapis.com/tenzir-datasets/M57/zeek.tar.zst | tar -x --zstd; sync; cat Zeek/*.log\\\"' | read zeek-tsv | import"

curl -X POST \
-H "Content-Type: application/json" \
-d "{\"name\": \"M57 Suricata Import\", \"definition\": \"${suricata_pipe}\", \"start_when_created\": true}" \
http://127.0.0.1:5160/api/v0/pipeline/create

curl -X POST \
-H "Content-Type: application/json" \
-d "{\"name\": \"M57 Zeek Import\", \"definition\": \"${zeek_pipe}\", \"start_when_created\": true}" \
-d "{\"name\": \"M57 Zeek Import\", \"definition\": \"${zeek_pipe}\", \"start_when_created\": false}" \
http://127.0.0.1:5160/api/v0/pipeline/create

wait "$NODE_PID"
13 changes: 13 additions & 0 deletions demo-node/live_cve_feed.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

set -euo pipefail

OUTPUT=$(curl --retry 100 -s "https://services.nvd.nist.gov/rest/json/cves/2.0?resultsPerPage=1")
LAST=$(($(echo "$OUTPUT" | jq '.totalResults') - 1000))
while true
do
OUTPUT=$(curl --retry 100 -s "https://services.nvd.nist.gov/rest/json/cves/2.0?startIndex=${LAST}")
echo "$OUTPUT" | jq -c '.vulnerabilities[].cve'
LAST=$(echo "$OUTPUT" | jq '.totalResults')
sleep 10
done
41 changes: 38 additions & 3 deletions libtenzir/builtins/operators/put_extend_replace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,8 @@ class put_extend_operator final
if (slice.rows() == 0)
return {};
const auto& layout = caf::get<record_type>(slice.schema());
auto batch = to_record_batch(slice);
TENZIR_ASSERT(batch);
auto transformations = std::vector<indexed_transformation>{};
auto replace_schema_name = std::optional<std::string>{};
switch (Mode) {
case mode::put: {
// For `put` we drop all fields except for the last one, and then
Expand Down Expand Up @@ -177,6 +176,12 @@ class put_extend_operator final
auto index_to_operand
= std::vector<std::pair<offset, const operand*>>{};
for (const auto& [extractor, operand] : config_.extractor_to_operand) {
if (extractor == "#schema") {
TENZIR_ASSERT_CHEAP(operand);
replace_schema_name
= caf::get<std::string>(caf::get<data>(*operand));
continue;
}
if (not operand) {
ctrl.warn(caf::make_error(
ec::logic_error,
Expand Down Expand Up @@ -213,6 +218,10 @@ class put_extend_operator final
}
// Lastly, apply our transformation.
auto result = transform_columns(slice, transformations);
if (replace_schema_name) {
result = cast(result, type{*replace_schema_name,
caf::get<record_type>(slice.schema())});
}
if (Mode == mode::put) {
auto renamed_schema
= type{"tenzir.put", caf::get<record_type>(result.schema())};
Expand Down Expand Up @@ -269,7 +278,7 @@ class plugin final : public virtual operator_plugin<put_extend_operator<Mode>> {
// clang-format off
const auto p
= required_ws_or_comment
>> ((extractor >> -(optional_ws_or_comment >> '=' >> optional_ws_or_comment >> operand))
>> (((extractor | parsers::str{"#schema"}) >> -(optional_ws_or_comment >> '=' >> optional_ws_or_comment >> operand))
% (optional_ws_or_comment >> ',' >> optional_ws_or_comment))
>> optional_ws_or_comment
>> end_of_pipeline_operator;
Expand All @@ -283,6 +292,32 @@ class plugin final : public virtual operator_plugin<put_extend_operator<Mode>> {
operator_name(Mode), pipeline)),
};
}
for (auto& [ex, op] : config.extractor_to_operand) {
if (ex == "#schema") {
if constexpr (Mode == mode::replace) {
auto* op_ptr = op ? &*op : nullptr;
// FIXME: Chaining `caf::get_if` leads to a segfault.
auto* data_ptr = op_ptr ? caf::get_if<data>(op_ptr) : nullptr;
auto* str_ptr
= data_ptr ? caf::get_if<std::string>(data_ptr) : nullptr;
if (not str_ptr) {
return {
std::string_view{f, l},
caf::make_error(ec::syntax_error,
fmt::format("assignment to `#schema` must be a "
"string literal")),
};
}
} else {
return {
std::string_view{f, l},
caf::make_error(ec::syntax_error,
fmt::format("`{}` does not support `#schema`",
operator_name(Mode))),
};
}
}
}
return {
std::string_view{f, l},
std::make_unique<put_extend_operator<Mode>>(std::move(config)),
Expand Down
14 changes: 13 additions & 1 deletion libtenzir/builtins/operators/shell.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,19 @@ class child {
TENZIR_DEBUG("trying to read {} bytes", buffer.size());
auto* data = reinterpret_cast<char*>(buffer.data());
auto size = detail::narrow_cast<std::streamsize>(buffer.size());
stdout_.read(data, size);
// The `peek()` makes sure that we have at least some data in the
// buffer before we call `readsome()`. The second function by itself does
// not fill the underlying input buffer, and would never produce any data
// without the `peek()`.
// FIXME: This is still quite suboptimal, as `peek` could in theory put
// only a single character into the rdbuf, leading us to produce single byte
// chunks.
// This code used to call `stdout_.read` before, but that function is
// blocking and in turn blocks the pipeline from advancing until the buffer
// is filled, which is also not ideal for shell functions that produce
// output at a low rate.
stdout_.peek();
stdout_.readsome(data, size);
auto bytes_read = stdout_.gcount();
TENZIR_DEBUG("read {} bytes", bytes_read);
return detail::narrow_cast<size_t>(bytes_read);
Expand Down

0 comments on commit c8a1108

Please sign in to comment.