Skip to content

Commit

Permalink
feat(management): implement support for reporting input health includ…
Browse files Browse the repository at this point in the history
…ing multiple streams
  • Loading branch information
pkoutsovasilis committed May 9, 2024
1 parent 3ba900b commit c3ef49b
Show file tree
Hide file tree
Showing 14 changed files with 1,064 additions and 131 deletions.
30 changes: 19 additions & 11 deletions filebeat/input/v2/compat/compat.go
Expand Up @@ -31,6 +31,7 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
Expand All @@ -50,13 +51,14 @@ type factory struct {
// On stop the runner triggers the shutdown signal and waits until the input
// has returned.
type runner struct {
id string
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
sig ctxtool.CancelContext
input v2.Input
connector beat.PipelineConnector
id string
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
sig ctxtool.CancelContext
input v2.Input
connector beat.PipelineConnector
statusReporter status.StatusReporter
}

// RunnerFactory creates a cfgfile.RunnerFactory from an input Loader that is
Expand Down Expand Up @@ -109,6 +111,10 @@ func (f *factory) Create(
}, nil
}

func (r *runner) SetStatusReporter(reported status.StatusReporter) {
r.statusReporter = reported
}

func (r *runner) String() string { return r.input.Name() }

func (r *runner) Start() {
Expand All @@ -121,10 +127,11 @@ func (r *runner) Start() {
log.Infof("Input '%s' starting", name)
err := r.input.Run(
v2.Context{
ID: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
ID: r.id,
Agent: *r.agent,
Logger: log,
Cancelation: r.sig,
StatusReporter: r.statusReporter,
},
r.connector,
)
Expand All @@ -140,6 +147,7 @@ func (r *runner) Stop() {
r.sig.Cancel()
r.wg.Wait()
r.log.Infof("Input '%s' stopped (runner)", r.input.Name())
r.statusReporter = nil
}

func configID(config *conf.C) (string, error) {
Expand Down
3 changes: 3 additions & 0 deletions filebeat/input/v2/input.go
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"

Expand Down Expand Up @@ -93,6 +94,8 @@ type Context struct {

// Cancelation is used by Beats to signal the input to shutdown.
Cancelation Canceler

StatusReporter status.StatusReporter
}

// TestContext provides the Input Test function with common environmental
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Expand Up @@ -154,7 +154,7 @@ require (
golang.org/x/crypto v0.21.0
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616
golang.org/x/mod v0.14.0
golang.org/x/net v0.21.0
golang.org/x/net v0.23.0
golang.org/x/oauth2 v0.10.0
golang.org/x/sync v0.5.0
golang.org/x/sys v0.18.0
Expand Down Expand Up @@ -407,7 +407,6 @@ require (

replace (
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/consumption/armconsumption => github.com/elastic/azure-sdk-for-go/sdk/resourcemanager/consumption/armconsumption v1.1.0-elastic

github.com/Microsoft/go-winio => github.com/bi-zone/go-winio v0.4.15
github.com/Microsoft/hcsshim => github.com/Microsoft/hcsshim v0.9.8
github.com/Shopify/sarama => github.com/elastic/sarama v1.19.1-0.20220310193331-ebc2b0d8eef3
Expand All @@ -417,6 +416,7 @@ replace (
github.com/docker/go-plugins-helpers => github.com/elastic/go-plugins-helpers v0.0.0-20200207104224-bdf17607b79f
github.com/dop251/goja => github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20
github.com/dop251/goja_nodejs => github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6
github.com/elastic/elastic-agent-client/v7 => github.com/pkoutsovasilis/elastic-agent-client/v7 v7.10.0
github.com/fsnotify/fsevents => github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270
github.com/fsnotify/fsnotify => github.com/adriansr/fsnotify v1.4.8-0.20211018144411-a81f2b630e7c
github.com/g8rswimmer/go-sfdc => github.com/elastic/go-sfdc v0.0.0-20201201191151-3190c381b3e1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Expand Up @@ -677,8 +677,6 @@ github.com/elastic/ebpfevents v0.6.0 h1:BrL3m7JFK7U6h2jkbk3xAWWs//IZnugCHEDds5u2
github.com/elastic/ebpfevents v0.6.0/go.mod h1:ESG9gw7N+n5yCCMgdg1IIJENKWSmX7+X0Fi9GUs9nvU=
github.com/elastic/elastic-agent-autodiscover v0.6.8 h1:BSXz+QwjZAEt08G+T3GDGl14Bh9a6zD8luNCvZut/b8=
github.com/elastic/elastic-agent-autodiscover v0.6.8/go.mod h1:hFeFqneS2r4jD0/QzGkrNk0YVdN0JGh7lCWdsH7zcI4=
github.com/elastic/elastic-agent-client/v7 v7.8.1 h1:J9wZc/0mUvSEok0X5iR5+n60Jgb+AWooKddb3XgPWqM=
github.com/elastic/elastic-agent-client/v7 v7.8.1/go.mod h1:axl1nkdqc84YRFkeJGD9jExKNPUrOrzf3DFo2m653nY=
github.com/elastic/elastic-agent-libs v0.7.5 h1:4UMqB3BREvhwecYTs/L23oQp1hs/XUkcunPlmTZn5yg=
github.com/elastic/elastic-agent-libs v0.7.5/go.mod h1:pGMj5myawdqu+xE+WKvM5FQzKQ/MonikkWOzoFTJxaU=
github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3 h1:sb+25XJn/JcC9/VL8HX4r4QXSUq4uTNzGS2kxOE7u1U=
Expand Down Expand Up @@ -1656,6 +1654,8 @@ github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6J
github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5/go.mod h1:eCbImbZ95eXtAUIbLAuAVnBnwf83mjf6QIVH8SHYwqQ=
github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE=
github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU=
github.com/pkoutsovasilis/elastic-agent-client/v7 v7.10.0 h1:J8CZpQL/8wy9xfauqAdpMsZGiHOVDCH8xSo1YwpGOYA=
github.com/pkoutsovasilis/elastic-agent-client/v7 v7.10.0/go.mod h1:/AeiwX9zxG99eUNrLhpApTpwmE71Qwuh4ozObn7a0ss=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
Expand Down Expand Up @@ -2219,8 +2219,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190130055435-99b60b757ec1/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
7 changes: 7 additions & 0 deletions libbeat/cfgfile/list.go
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -153,6 +154,12 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error {

r.logger.Debugf("Starting runner: %s", runner)
r.runners[hash] = runner
if config.StatusReporter != nil {
if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok {
runnerWithStatus.SetStatusReporter(config.StatusReporter)
}
}

runner.Start()
moduleStarts.Add(1)
if config.DiagCallback != nil {
Expand Down
3 changes: 3 additions & 0 deletions libbeat/common/reload/reload.go
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"sync"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand All @@ -47,6 +48,8 @@ type ConfigWithMeta struct {

// InputUnitID is the unit's ID that generated this ConfigWithMeta
InputUnitID string

StatusReporter status.StatusReporter
}

// ReloadableList provides a method to reload the configuration of a list of entities
Expand Down
40 changes: 5 additions & 35 deletions libbeat/management/management.go
Expand Up @@ -21,49 +21,19 @@ import (
"sync"

"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

// Status describes the current status of the beat.
type Status int

//go:generate stringer -type=Status
const (
// Unknown is initial status when none has been reported.
Unknown Status = iota
// Starting is status describing application is starting.
Starting
// Configuring is status describing application is configuring.
Configuring
// Running is status describing application is running.
Running
// Degraded is status describing application is degraded.
Degraded
// Failed is status describing application is failed. This status should
// only be used in the case the beat should stop running as the failure
// cannot be recovered.
Failed
// Stopping is status describing application is stopping.
Stopping
// Stopped is status describing application is stopped.
Stopped
)

// DebugK used as key for all things central management
var DebugK = "centralmgmt"

// StatusReporter provides a method to update current status of the beat.
type StatusReporter interface {
// UpdateStatus called when the status of the beat has changed.
UpdateStatus(status Status, msg string)
}

// Manager interacts with the beat to provide status updates and to receive
// configurations.
type Manager interface {
StatusReporter
status.StatusReporter

// Enabled returns true if manager is enabled.
Enabled() bool
Expand Down Expand Up @@ -133,7 +103,7 @@ func NewManager(cfg *config.C, registry *reload.Registry) (Manager, error) {
}
return &fallbackManager{
logger: logp.NewLogger("mgmt"),
status: Unknown,
status: status.Unknown,
msg: "",
}, nil
}
Expand All @@ -152,13 +122,13 @@ func SetManagerFactory(factory ManagerFactory) {
type fallbackManager struct {
logger *logp.Logger
lock sync.Mutex
status Status
status status.Status
msg string
stopFunc func()
stopOnce sync.Once
}

func (n *fallbackManager) UpdateStatus(status Status, msg string) {
func (n *fallbackManager) UpdateStatus(status status.Status, msg string) {
n.lock.Lock()
defer n.lock.Unlock()
if n.status != status || n.msg != msg {
Expand Down
38 changes: 38 additions & 0 deletions libbeat/management/status/status.go
@@ -0,0 +1,38 @@
package status

// Status describes the current status of the beat.
type Status int

//go:generate stringer -type=Status
const (
// Unknown is initial status when none has been reported.
Unknown Status = iota
// Starting is status describing application is starting.
Starting
// Configuring is status describing application is configuring.
Configuring
// Running is status describing application is running.
Running
// Degraded is status describing application is degraded.
Degraded
// Failed is status describing application is failed. This status should
// only be used in the case the beat should stop running as the failure
// cannot be recovered.
Failed
// Stopping is status describing application is stopping.
Stopping
// Stopped is status describing application is stopped.
Stopped
)

// StatusReporter provides a method to update current status of a unit.
type StatusReporter interface {
// UpdateStatus updates the status of the unit.
UpdateStatus(status Status, msg string)
}

// WithStatusReporter provides a method to set a status reporter
type WithStatusReporter interface {
// SetStatusReporter sets the status reporter
SetStatusReporter(reporter StatusReporter)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c3ef49b

Please sign in to comment.