Skip to content

Commit

Permalink
feat(inputs): WIP allow v2 inputs to manipulate and report back their…
Browse files Browse the repository at this point in the history
… state to the agent
  • Loading branch information
pkoutsovasilis committed Apr 25, 2024
1 parent 3ba900b commit 149a0c5
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 21 deletions.
30 changes: 19 additions & 11 deletions filebeat/input/v2/compat/compat.go
Expand Up @@ -31,6 +31,7 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/state"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
Expand All @@ -50,13 +51,14 @@ type factory struct {
// On stop the runner triggers the shutdown signal and waits until the input
// has returned.
type runner struct {
id string
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
sig ctxtool.CancelContext
input v2.Input
connector beat.PipelineConnector
id string
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
sig ctxtool.CancelContext
input v2.Input
connector beat.PipelineConnector
stateHandler state.StateHandler
}

// RunnerFactory creates a cfgfile.RunnerFactory from an input Loader that is
Expand Down Expand Up @@ -109,6 +111,10 @@ func (f *factory) Create(
}, nil
}

func (r *runner) SetStateHandler(handler state.StateHandler) {
r.stateHandler = handler
}

func (r *runner) String() string { return r.input.Name() }

func (r *runner) Start() {
Expand All @@ -121,10 +127,11 @@ func (r *runner) Start() {
log.Infof("Input '%s' starting", name)
err := r.input.Run(
v2.Context{
ID: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
ID: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
StateHandler: r.stateHandler,
},
r.connector,
)
Expand All @@ -140,6 +147,7 @@ func (r *runner) Stop() {
r.sig.Cancel()
r.wg.Wait()
r.log.Infof("Input '%s' stopped (runner)", r.input.Name())
r.stateHandler = nil
}

func configID(config *conf.C) (string, error) {
Expand Down
3 changes: 3 additions & 0 deletions filebeat/input/v2/input.go
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/state"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"

Expand Down Expand Up @@ -93,6 +94,8 @@ type Context struct {

// Cancelation is used by Beats to signal the input to shutdown.
Cancelation Canceler

StateHandler state.StateHandler
}

// TestContext provides the Input Test function with common environmental
Expand Down
7 changes: 7 additions & 0 deletions libbeat/cfgfile/list.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/common/state"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -153,6 +154,12 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {

r.logger.Debugf("Starting runner: %s", runner)
r.runners[hash] = runner
if config.UnitStateHandler != nil {
if runnerWithState, ok := runner.(state.WithStateHandler); ok {
runnerWithState.SetStateHandler(config.UnitStateHandler)
}
}

runner.Start()
moduleStarts.Add(1)
if config.DiagCallback != nil {
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/reload/reload.go
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"sync"

"github.com/elastic/beats/v7/libbeat/common/state"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand All @@ -47,6 +48,8 @@ type ConfigWithMeta struct {

// InputUnitID is the unit's ID that generated this ConfigWithMeta
InputUnitID string

UnitStateHandler state.StateHandler
}

// ReloadableList provides a method to reload the configuration of a list of entities
Expand Down
13 changes: 13 additions & 0 deletions libbeat/common/state/state.go
@@ -0,0 +1,13 @@
package state

Check failure on line 1 in libbeat/common/state/state.go

View workflow job for this annotation

GitHub Actions / check

is missing the license header

import "github.com/elastic/elastic-agent-client/v7/pkg/client"

// StateHandler TBD
type StateHandler interface {
UpdateState(state client.UnitState, msg string) error
}

// WithStateHandler TBD
type WithStateHandler interface {
SetStateHandler(handler StateHandler)
}
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/cel/input.go
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
unitState "github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand Down Expand Up @@ -121,6 +122,9 @@ func sanitizeFileName(name string) string {
func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error {
cfg := src.cfg
log := env.Logger.With("input_url", cfg.Resource.URL)
unitHandler := env.StateHandler

_ = unitHandler.UpdateState(unitState.UnitStateConfiguring, "Configuring CEL")

metrics := newInputMetrics(env.ID)
defer metrics.Close()
Expand Down Expand Up @@ -202,6 +206,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
// In addition to this and the functions and globals available
// from mito/lib, a global, useragent, is available to use
// in requests.
_ = unitHandler.UpdateState(unitState.UnitStateHealthy, "Started CEL")
err = periodically(ctx, cfg.Interval, func() error {
log.Info("process repeated request")
var (
Expand Down
94 changes: 94 additions & 0 deletions x-pack/filebeat/tests/integration/managerV2_test.go
Expand Up @@ -760,6 +760,100 @@ func generateLogFile(t *testing.T, fullPath string) {
}()
}

func TestCELInput(t *testing.T) {

units := []*proto.UnitExpected{
{
Id: "output-unit",
Type: proto.UnitType_OUTPUT,
ConfigStateIdx: 1,
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_INFO,
Config: &proto.UnitExpectedConfig{
Id: "default",
Type: "elasticsearch",
Name: "elasticsearch",
Source: integration.RequireNewStruct(t, map[string]interface{}{
"type": "elasticsearch",
"hosts": []interface{}{"https://127.0.0.1:9200"},
"username": "elastic",
"password": "changeme",
"protocol": "http",
"enabled": true,
"ssl.verification_mode": "none",
}),
},
},
{
Id: "input-unit-1",
Type: proto.UnitType_INPUT,
ConfigStateIdx: 0,
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_DEBUG,
Config: &proto.UnitExpectedConfig{
Id: "cel-cel-1e8b33de-d54a-45cd-90da-23ed71c482e5",
Type: "cel",
Name: "cel-1",
Source: integration.RequireNewStruct(t, map[string]interface{}{
"use_output": "default",
"revision": 1,
}),
DataStream: &proto.DataStream{
Namespace: "default",
},
Meta: &proto.Meta{
Package: &proto.Package{
Name: "cel",
Version: "1.9.0",
},
},
Streams: []*proto.Stream{
{
Id: "cel-cel.cel-1e8b33de-d54a-45cd-90da-23ed71c482e2",
DataStream: &proto.DataStream{
Dataset: "cel.cel",
},
Source: integration.RequireNewStruct(t, map[string]interface{}{
"interval": "1m",
"program": "bytes(get(state.url).Body).as(body, {\n \"events\": [body.decode_json()]\n })",
"redact.delete": false,
"regexp": nil,
"resource.url": "https://api.ipify.org/?format=json",
"publisher_pipeline.disable_host": true,
}),
},
},
},
},
}

server := &mock.StubServerV2{
// The Beat will call the check-in function multiple times:
// - At least once at startup
// - At every state change (starting, configuring, healthy, etc)
// for every Unit.
//
// So we wait until the state matches the desired state
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
fmt.Printf("%v\n", observed)
return &proto.CheckinExpected{
Units: units,
}
},
ActionImpl: func(response *proto.ActionResponse) error {
return nil
},
}

require.NoError(t, server.Start())

t.Log(server.Port)

time.Sleep(10 * time.Hour)

t.Cleanup(server.Stop)
}

// getEventsFromFileOutput reads all events from all the files on dir. If n > 0,
// then it reads up to n events. It considers all files are ndjson, and it skips
// any directory within dir.
Expand Down
57 changes: 47 additions & 10 deletions x-pack/libbeat/management/managerV2.go
Expand Up @@ -50,6 +50,42 @@ func (handler diagnosticHandler) Register(name string, description string, filen
}
}

type unitState struct {
unit *client.Unit
managedByInput bool
}

func (u *unitState) updateState(state client.UnitState, msg string, payload map[string]interface{}) error {
if u == nil || u.unit == nil || u.managedByInput {
return nil
}

return u.unit.UpdateState(state, msg, payload)
}

func (u *unitState) UpdateState(state client.UnitState, msg string) error {
if u == nil || u.unit == nil {
return nil
}

u.managedByInput = true
// TODO: here need to fabricate the protocol with the appropriate streamID

return u.unit.UpdateState(state, msg, nil)
}

func (u *unitState) clientUnit() *client.Unit {
return u.unit
}

func (u *unitState) ID() string {
if u == nil || u.unit == nil {
return ""
}

return u.unit.ID()
}

// unitKey is used to identify a unique unit in a map
// the `ID` of a unit in itself is not unique without its type, only `Type` + `ID` is unique
type unitKey struct {
Expand All @@ -70,7 +106,7 @@ type BeatV2Manager struct {

// track individual units given to us by the V2 API
mx sync.Mutex
units map[unitKey]*client.Unit
units map[unitKey]*unitState
actions []client.Action
forceReload bool

Expand Down Expand Up @@ -202,7 +238,7 @@ func NewV2AgentManagerWithClient(config *Config, registry *reload.Registry, agen
config: config,
logger: log.Named("V2-manager"),
registry: registry,
units: make(map[unitKey]*client.Unit),
units: make(map[unitKey]*unitState),
status: lbmanagement.Running,
message: "Healthy",
stopChan: make(chan struct{}, 1),
Expand Down Expand Up @@ -312,8 +348,8 @@ func (cm *BeatV2Manager) RegisterAction(action client.Action) {
for _, unit := range cm.units {
// actions are only registered on input units (not a requirement by Agent but
// don't see a need in beats to support actions on an output at the moment)
if unit.Type() == client.UnitTypeInput {
unit.RegisterAction(action)
if clientUnit := unit.clientUnit(); clientUnit != nil && clientUnit.Type() == client.UnitTypeInput {
clientUnit.RegisterAction(action)
}
}
}
Expand Down Expand Up @@ -341,8 +377,8 @@ func (cm *BeatV2Manager) UnregisterAction(action client.Action) {
for _, unit := range cm.units {
// actions are only registered on input units (not a requirement by Agent but
// don't see a need in beats to support actions on an output at the moment)
if unit.Type() == client.UnitTypeInput {
unit.UnregisterAction(action)
if clientUnit := unit.clientUnit(); clientUnit != nil && clientUnit.Type() == client.UnitTypeInput {
clientUnit.UnregisterAction(action)
}
}
}
Expand All @@ -369,13 +405,13 @@ func (cm *BeatV2Manager) updateStatuses() {
payload := cm.payload

for _, unit := range cm.units {
expected := unit.Expected()
expected := unit.clientUnit().Expected()
if expected.State == client.UnitStateStopped {
// unit is expected to be stopping (don't adjust the state as the state is now managed by the
// `reload` method and will be marked stopped in that code path)
continue
}
err := unit.UpdateState(status, message, payload)
err := unit.updateState(status, message, payload)
if err != nil {
cm.logger.Errorf("Failed to update unit %s status: %s", unit.ID(), err)
}
Expand All @@ -389,7 +425,7 @@ func (cm *BeatV2Manager) updateStatuses() {
func (cm *BeatV2Manager) addUnit(unit *client.Unit) {
cm.mx.Lock()
defer cm.mx.Unlock()
cm.units[unitKey{unit.Type(), unit.ID()}] = unit
cm.units[unitKey{unit.Type(), unit.ID()}] = &unitState{unit: unit}

// update specific unit to starting
_ = unit.UpdateState(client.UnitStateStarting, "Starting", nil)
Expand Down Expand Up @@ -511,7 +547,7 @@ func (cm *BeatV2Manager) unitListen() {
cm.mx.Lock()
units := make(map[unitKey]*client.Unit, len(cm.units))
for k, u := range cm.units {
units[k] = u
units[k] = u.clientUnit()
}
cm.mx.Unlock()
cm.reload(units)
Expand Down Expand Up @@ -771,6 +807,7 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
for _, in := range inputCfg {
in.DiagCallback = diagnosticHandler{client: unit, log: cm.logger.Named("diagnostic-manager")}
in.InputUnitID = unit.ID()
in.UnitStateHandler = &unitState{unit: unit}
}
inputCfgs[unit.ID()] = expected.Config
inputBeatCfgs = append(inputBeatCfgs, inputCfg...)
Expand Down

0 comments on commit 149a0c5

Please sign in to comment.