Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial (WIP) #5052

Closed
wants to merge 16 commits into from
11 changes: 11 additions & 0 deletions CHANGELOG-6.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ Versioning](http://semver.org/spec/v2.0.0.html).

## Unreleased

### 2024-03-07

### Added
- Addition of a new watcher configuration to monitor the user updates
- Added the exit mechanism to disconnect agent when user is disabled
- Added a struct in store which will get passed down for userConfigs

### Changed
- The session config to watch over the user updated and the wizzard bus


### Changed
- Upgraded CI Go version to 1.21.3
- Upgraded jwt version to 4.4.3
Expand Down
32 changes: 31 additions & 1 deletion backend/agentd/agentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/prometheus/client_golang/prometheus"
"github.com/sensu/sensu-go/agent"
corev2 "github.com/sensu/core/v2"
"github.com/sensu/sensu-go/agent"
"github.com/sensu/sensu-go/backend/apid/actions"
"github.com/sensu/sensu-go/backend/apid/middlewares"
"github.com/sensu/sensu-go/backend/apid/routers"
Expand Down Expand Up @@ -105,6 +105,7 @@ type Agentd struct {
serveWaitTime time.Duration
ready func()
backendEntity *corev2.Entity
userWatcher <-chan *store.WatchEventUserConfig
}

// Config configures an Agentd.
Expand All @@ -121,6 +122,7 @@ type Config struct {
EtcdClientTLSConfig *tls.Config
Watcher <-chan store.WatchEventEntityConfig
BackendEntity *corev2.Entity
UserWatcher <-chan *store.WatchEventUserConfig
}

// Option is a functional option.
Expand Down Expand Up @@ -149,6 +151,7 @@ func New(c Config, opts ...Option) (*Agentd, error) {
etcdClientTLSConfig: c.EtcdClientTLSConfig,
serveWaitTime: c.ServeWaitTime,
backendEntity: c.BackendEntity,
userWatcher: c.UserWatcher,
}

// prepare server TLS config
Expand Down Expand Up @@ -275,6 +278,7 @@ func (a *Agentd) Start() error {
func (a *Agentd) runWatcher() {
defer func() {
logger.Warn("shutting down entity config watcher")
logger.Warn("shutting down user config watcher")
}()
for {
select {
Expand All @@ -287,10 +291,18 @@ func (a *Agentd) runWatcher() {
if err := a.handleEvent(event); err != nil {
logger.WithError(err).Error("error handling entity config watch event")
}
case userEvent, ok := <-a.userWatcher:
if !ok {
return
}
if err := a.handleUserEvent(userEvent); err != nil {
logger.WithError(err).Error("error handling user config watch event")
}
}
}
}

// adding the config updates to the etcd bus for watcher to consume
func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error {
if event.Entity == nil {
return errors.New("nil entity received from entity config watcher")
Expand All @@ -308,6 +320,24 @@ func (a *Agentd) handleEvent(event store.WatchEventEntityConfig) error {
return nil
}

// adding the UserConfig updates to the etcd bus for the watcher to consume
func (a *Agentd) handleUserEvent(event *store.WatchEventUserConfig) error {
if event.User == nil {
return errors.New("nil entry received from the user config watcher")
}

topic := messaging.UserConfigTopic(event.User.Username)
if err := a.bus.Publish(topic, event); err != nil {
logger.WithField("topic", topic).WithError(err).
Error("unable to publish a user config update to the bus")
return err
}

logger.WithField("topic", topic).
Debug("successfully published an user config update to the bus")
return nil
}

// Stop Agentd.
func (a *Agentd) Stop() error {
a.cancel()
Expand Down
9 changes: 4 additions & 5 deletions backend/agentd/agentd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

corev2 "github.com/sensu/core/v2"
corev3 "github.com/sensu/core/v3"
"github.com/sensu/sensu-go/backend/apid/middlewares"
Expand All @@ -22,6 +17,10 @@ import (
"github.com/sensu/sensu-go/transport"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
)

func TestAgentdMiddlewares(t *testing.T) {
Expand Down
111 changes: 104 additions & 7 deletions backend/agentd/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/sensu/sensu-go/agent"
corev2 "github.com/sensu/core/v2"
corev3 "github.com/sensu/core/v3"
"github.com/sensu/sensu-go/agent"
"github.com/sensu/sensu-go/backend/messaging"
"github.com/sensu/sensu-go/backend/metrics"
"github.com/sensu/sensu-go/backend/ringv2"
Expand Down Expand Up @@ -95,6 +95,8 @@ type Session struct {
marshal agent.MarshalFunc
unmarshal agent.UnmarshalFunc
entityConfig *entityConfig
userConfig *userConfig
user string
mu sync.Mutex
subscriptionsMap map[string]subscription
}
Expand All @@ -111,12 +113,23 @@ type entityConfig struct {
updatesChannel chan interface{}
}

// userConfig is used by a session to subscribe to entity config updates
type userConfig struct {
subscription chan messaging.Subscription
updatesChannel chan interface{}
}

// Receiver returns the channel for incoming entity updates from the entity
// watcher
func (e *entityConfig) Receiver() chan<- interface{} {
return e.updatesChannel
}

// Receiver returns the channel for incoming entity updates from the entity watcher
func (u *userConfig) Receiver() chan<- interface{} {
return u.updatesChannel
}

func newSessionHandler(s *Session) *handler.MessageHandler {
handler := handler.NewMessageHandler()
handler.AddHandler(transport.MessageTypeKeepalive, s.handleKeepalive)
Expand Down Expand Up @@ -149,6 +162,7 @@ type SessionConfig struct {
// with the session has been buried. Necessary when running parallel keepalived
// workers.
BurialReceiver *BurialReceiver
userConfig *userConfig
}

type BurialReceiver struct {
Expand Down Expand Up @@ -193,10 +207,15 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) {
ringPool: cfg.RingPool,
unmarshal: cfg.Unmarshal,
marshal: cfg.Marshal,
user: cfg.User,
entityConfig: &entityConfig{
subscriptions: make(chan messaging.Subscription, 1),
updatesChannel: make(chan interface{}, 10),
},
userConfig: &userConfig{
subscription: make(chan messaging.Subscription, 1),
updatesChannel: make(chan interface{}, 10),
},
}

// Optionally subscribe to burial notifications
Expand All @@ -211,6 +230,13 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) {
}()
}

if len(cfg.User) > 0 {
_, err := s.bus.Subscribe(messaging.UserConfigTopic(cfg.User), cfg.AgentName, s.userConfig)
if err != nil {
return nil, err
}
}

if err := s.bus.Publish(messaging.TopicKeepalive, makeEntitySwitchBurialEvent(cfg)); err != nil {
return nil, err
}
Expand Down Expand Up @@ -321,6 +347,44 @@ func (s *Session) sender() {
for {
var msg *transport.Message
select {
//---- user -----//

case u := <-s.userConfig.updatesChannel:
watchEvent, ok := u.(*store.WatchEventUserConfig)
if !ok {
logger.Errorf("session received unexoected user struct : %T", u)
continue
}
// Handle the delete/disable event
switch watchEvent.Action {
case store.WatchUpdate:
if watchEvent.Disabled {
logger.Warn("The user associated with the agent is now disabled")
return
}
logger.Println("The update operation has been performed on user")
default:
panic("unhandled default case")
}

if watchEvent.User == nil {
logger.Error("session received nil user in watch event")
}
//
lagger := logger.WithFields(logrus.Fields{
"action": watchEvent.Action.String(),
"user": watchEvent.User.Username,
"namespace": watchEvent.User.GetMetadata().GetNamespace(),
})
lagger.Debug("user update received")

bytes, err := s.marshal(watchEvent.User)
if err != nil {
lagger.WithError(err).Error("session failed to serialize user config")
}
msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes)

// ---- entity ----//
case e := <-s.entityConfig.updatesChannel:
watchEvent, ok := e.(*store.WatchEventEntityConfig)
if !ok {
Expand Down Expand Up @@ -448,7 +512,9 @@ func (s *Session) sender() {
// 2. Start receiver
// 3. Start goroutine that waits for context cancellation, and shuts down service.
func (s *Session) Start() (err error) {
defer close(s.userConfig.subscription)
defer close(s.entityConfig.subscriptions)

sessionCounter.WithLabelValues(s.cfg.Namespace).Inc()
s.wg = &sync.WaitGroup{}
s.wg.Add(2)
Expand All @@ -471,22 +537,45 @@ func (s *Session) Start() (err error) {
"agent": s.cfg.AgentName,
"namespace": s.cfg.Namespace,
})

// Subscribe the agent to its entity_config topic
topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName)
lager.WithField("topic", topic).Debug("subscribing to topic")
// Get a unique name for the agent, which will be used as the consumer of the
// bus, in order to avoid problems with an agent reconnecting before its
// session is ended
agentName := agentUUID(s.cfg.Namespace, s.cfg.AgentName)

// Subscribe the agent to its user_config topic

userTopic := messaging.UserConfigTopic(s.cfg.User)
logger.WithField("topic", userTopic).Debug("subscribing to topic")
userSubscription, usrErr := s.bus.Subscribe(userTopic, agentName, s.userConfig)
if usrErr != nil {
lager.WithError(err).Error("error starting subscription")
return err
}
s.userConfig.subscription <- userSubscription

// Send back this user config to the agent so it uses that rather than it's local config
watchEvent := &store.WatchEventUserConfig{
Action: store.WatchUpdate,
User: &corev2.User{},
}
usrErr = s.bus.Publish(messaging.UserConfigTopic(s.cfg.AgentName), watchEvent)
if usrErr != nil {
lager.WithError(err).Error("error publishing user config")
return err
}

// Subscribe the agent to its entity_config topic

topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName)
lager.WithField("topic", topic).Debug("subscribing to topic")

// Determine if the entity already exists
subscription, err := s.bus.Subscribe(topic, agentName, s.entityConfig)
if err != nil {
lager.WithError(err).Error("error starting subscription")
return err
}
s.entityConfig.subscriptions <- subscription

// Determine if the entity already exists
req := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev3.EntityConfig{}).StoreName())
wrapper, err := s.storev2.Get(req)
if err != nil {
Expand Down Expand Up @@ -577,6 +666,7 @@ func (s *Session) stop() {
logger.WithError(err).Error("error closing session")
}
}()
defer close(s.userConfig.updatesChannel)
defer close(s.entityConfig.updatesChannel)
defer close(s.checkChannel)

Expand All @@ -602,6 +692,13 @@ func (s *Session) stop() {
}
}

// Remove the user config subscription
for sub := range s.userConfig.subscription {
if err := sub.Cancel(); err != nil {
logger.WithError(err).Error("unable to unsubscribe from message bus")
}
}

// Unsubscribe the session from every configured check subscriptions
s.unsubscribe(s.cfg.Subscriptions)
}
Expand Down