Skip to content

Commit

Permalink
[8.6](backport #34346) Add checks to ensure reloading of units if the…
Browse files Browse the repository at this point in the history
… configuration actually changed. (#34348)

* Add checks to ensure reloading of units if the configuration actually changed. (#34346)

* Add checks to ensure reloading of units if the configuration actually changed.

* Add changelog entry.

(cherry picked from commit 5b1f828)

* Run make check and commit the changes. (#34349)

The format step was skipped in
#34346

Co-authored-by: Blake Rouse <blake.rouse@elastic.co>
Co-authored-by: Craig MacKenzie <craig.mackenzie@elastic.co>
  • Loading branch information
3 people committed Jan 23, 2023
1 parent aaa3266 commit 14f2f8d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix namespacing on self-monitoring {pull}32336[32336]
- Fix race condition when stopping runners {pull}32433[32433]
- Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491]
- Add checks to ensure reloading of units if the configuration actually changed. {pull}34346[34346]

*Auditbeat*

Expand Down
86 changes: 59 additions & 27 deletions x-pack/libbeat/management/managerV2.go
Expand Up @@ -13,18 +13,20 @@ import (
"syscall"
"time"

"github.com/gofrs/uuid"
"github.com/joeshaw/multierror"
"go.uber.org/zap/zapcore"

"github.com/gofrs/uuid"
gproto "google.golang.org/protobuf/proto"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/libbeat/common/reload"
lbmanagement "github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/libbeat/common/reload"
lbmanagement "github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/version"
)

// unitKey is used to identify a unique unit in a map
Expand Down Expand Up @@ -66,13 +68,17 @@ type BeatV2Manager struct {

isRunning bool

// is set on first instance of a config reload,
// allowing us to restart the beat if stopOnOutputReload is set
outputIsConfigured bool
// set with the last applied output config
// allows tracking if the configuration actually changed and if the
// beat needs to restart if stopOnOutputReload is set
lastOutputCfg *proto.UnitExpectedConfig

// set with the last applied input configs
lastInputCfgs map[string]*proto.UnitExpectedConfig

// used for the debug callback to report as-running config
lastOutputCfg *reload.ConfigWithMeta
lastInputCfg []*reload.ConfigWithMeta
lastBeatOutputCfg *reload.ConfigWithMeta
lastBeatInputCfgs []*reload.ConfigWithMeta
}

// ================================
Expand Down Expand Up @@ -496,22 +502,31 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error {
return fmt.Errorf("failed to reload output: %w", err)
}
cm.lastOutputCfg = nil
return nil
}

if cm.stopOnOutputReload && cm.outputIsConfigured {
cm.logger.Info("beat is restarting because output changed")
_ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil)
cm.Stop()
cm.lastBeatOutputCfg = nil
return nil
}

_, _, rawConfig := unit.Expected()
if rawConfig == nil {
// should not happen; hard stop
return fmt.Errorf("output unit has no config")
}

if cm.lastOutputCfg != nil && gproto.Equal(cm.lastOutputCfg, rawConfig) {
// configuration for the output did not change; do nothing
cm.logger.Debug("Skipped reloading output; configuration didn't change")
return nil
}

cm.logger.Debugf("Got output unit config '%s'", rawConfig.GetId())

if cm.stopOnOutputReload && cm.lastOutputCfg != nil {
cm.logger.Info("beat is restarting because output changed")
_ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil)
cm.Stop()
return nil
}

reloadConfig, err := groupByOutputs(rawConfig)
if err != nil {
return fmt.Errorf("failed to generate config for output: %w", err)
Expand All @@ -521,9 +536,8 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error {
if err != nil {
return fmt.Errorf("failed to reload output: %w", err)
}
cm.lastOutputCfg = reloadConfig
// set to true, we'll reload the output if we need to re-configure
cm.outputIsConfigured = true
cm.lastOutputCfg = rawConfig
cm.lastBeatOutputCfg = reloadConfig
return nil
}

Expand All @@ -533,22 +547,40 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
return fmt.Errorf("failed to find beat reloadable type 'input'")
}

var inputCfgs []*reload.ConfigWithMeta
inputCfgs := make(map[string]*proto.UnitExpectedConfig, len(inputUnits))
inputBeatCfgs := make([]*reload.ConfigWithMeta, 0, len(inputUnits))
agentInfo := cm.client.AgentInfo()
for _, unit := range inputUnits {
_, _, rawConfig := unit.Expected()
if rawConfig == nil {
// should not happen; hard stop
return fmt.Errorf("input unit %s has no config", unit.ID())
}

var prevCfg *proto.UnitExpectedConfig
if cm.lastInputCfgs != nil {
prevCfg, _ = cm.lastInputCfgs[unit.ID()]
}
if prevCfg != nil && gproto.Equal(prevCfg, rawConfig) {
// configuration for the input did not change; do nothing
cm.logger.Debugf("Skipped reloading input unit %s; configuration didn't change", unit.ID())
continue
}

inputCfg, err := generateBeatConfig(rawConfig, agentInfo)
if err != nil {
return fmt.Errorf("failed to generate configuration for unit %s: %w", unit.ID(), err)
}
inputCfgs = append(inputCfgs, inputCfg...)
inputCfgs[unit.ID()] = rawConfig
inputBeatCfgs = append(inputBeatCfgs, inputCfg...)
}

err := obj.Reload(inputCfgs)
err := obj.Reload(inputBeatCfgs)
if err != nil {
return fmt.Errorf("failed to reloading inputs: %w", err)
}
cm.lastInputCfg = inputCfgs
cm.lastInputCfgs = inputCfgs
cm.lastBeatInputCfgs = inputBeatCfgs
return nil
}

Expand All @@ -557,7 +589,7 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
func (cm *BeatV2Manager) handleDebugYaml() []byte {
// generate input
inputList := []map[string]interface{}{}
for _, module := range cm.lastInputCfg {
for _, module := range cm.lastBeatInputCfgs {
var inputMap map[string]interface{}
err := module.Config.Unpack(&inputMap)
if err != nil {
Expand All @@ -569,8 +601,8 @@ func (cm *BeatV2Manager) handleDebugYaml() []byte {

// generate output
outputCfg := map[string]interface{}{}
if cm.lastOutputCfg != nil {
err := cm.lastOutputCfg.Config.Unpack(&outputCfg)
if cm.lastBeatOutputCfg != nil {
err := cm.lastBeatOutputCfg.Config.Unpack(&outputCfg)
if err != nil {
cm.logger.Errorf("error unpacking output config for debug callback: %s", err)
return nil
Expand Down

0 comments on commit 14f2f8d

Please sign in to comment.