Skip to content

Commit

Permalink
Redis plug-in node discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremycampbell-okta committed Apr 22, 2024
1 parent 2de8026 commit a8bbaf7
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 16 deletions.
146 changes: 130 additions & 16 deletions plugins/inputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ type RedisCommand struct {
}

type Redis struct {
Commands []*RedisCommand `toml:"commands"`
Servers []string `toml:"servers"`
Username string `toml:"username"`
Password string `toml:"password"`
Commands []*RedisCommand `toml:"commands"`
Servers []string `toml:"servers"`
Username string `toml:"username"`
Password string `toml:"password"`
NodeDiscovery bool `toml:"node_discovery"`

tls.ClientConfig

Expand All @@ -57,6 +58,12 @@ type RedisClient struct {
tags map[string]string
}

type redisClusterNode struct {
nodeId string
host string
port string
}

// RedisFieldTypes defines the types expected for each of the fields redis reports on
type RedisFieldTypes struct {
ActiveDefragHits int64 `json:"active_defrag_hits"`
Expand Down Expand Up @@ -222,7 +229,7 @@ func (r *Redis) Init() error {
}

func (r *Redis) connect() error {
if r.connected {
if r.connected && !r.NodeDiscovery {
return nil
}

Expand Down Expand Up @@ -281,18 +288,61 @@ func (r *Redis) connect() error {
},
)

tags := map[string]string{}
if u.Scheme == "unix" {
tags["socket"] = u.Path
if r.NodeDiscovery {
nodes, err := discoverNodes(&RedisClient{client, make(map[string]string)})
if err != nil {
return err
}
if nodes == nil {
return fmt.Errorf("unable to discover nodes from %s", address)
}
for _, node := range nodes {

nodeAddress := ""
if node.host == "" {
nodeAddress = address
} else {
nodeAddress = fmt.Sprintf("%s:%s", node.host, node.port)
}

discoveredClient := redis.NewClient(
&redis.Options{
Addr: nodeAddress,
Username: username,
Password: password,
Network: u.Scheme,
PoolSize: 1,
TLSConfig: tlsConfig,
},
)

tags := map[string]string{
"server": u.Hostname(),
"port": node.port,
"nodeId": node.nodeId,
// "replication_role": node.replicationRole,
}

r.clients = append(r.clients, &RedisClient{
client: discoveredClient,
tags: tags,
})
}

} else {
tags["server"] = u.Hostname()
tags["port"] = u.Port()
}
tags := map[string]string{}
if u.Scheme == "unix" {
tags["socket"] = u.Path
} else {
tags["server"] = u.Hostname()
tags["port"] = u.Port()
}

r.clients = append(r.clients, &RedisClient{
client: client,
tags: tags,
})
r.clients = append(r.clients, &RedisClient{
client: client,
tags: tags,
})
}
}

r.connected = true
Expand All @@ -302,7 +352,7 @@ func (r *Redis) connect() error {
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (r *Redis) Gather(acc telegraf.Accumulator) error {
if !r.connected {
if !r.connected || r.NodeDiscovery {
err := r.connect()
if err != nil {
return err
Expand Down Expand Up @@ -344,6 +394,26 @@ func (r *Redis) gatherCommandValues(client Client, acc telegraf.Accumulator) err
return nil
}

func discoverNodes(client Client) ([]redisClusterNode, error) {
val, err := client.Do("string", "cluster", "nodes")

if err != nil {
if strings.Contains(err.Error(), "unexpected type=") {
return nil, fmt.Errorf("could not get command result: %w", err)
}

return nil, err
}

str, ok := val.(string)
if ok {
return parseClusterNodes(str)
} else {
return nil, fmt.Errorf("could not discover nodes: %w", err)
}

}

func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error {
info, err := client.Info().Result()
if err != nil {
Expand All @@ -354,6 +424,50 @@ func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error {
return gatherInfoOutput(rdr, acc, client.BaseTags())
}

// Parse the list of nodes from a `cluster nodes` command
// This response looks like:
//
// d1861060fe6a534d42d8a19aeb36600e18785e04 127.0.0.1:6379 myself - 0 1318428930 1 connected 0-1364
// 3886e65cc906bfd9b1f7e7bde468726a052d1dae 127.0.0.1:6380 master - 1318428930 1318428931 2 connected 1365-2729
// d289c575dcbc4bdd2931585fd4339089e461a27d 127.0.0.1:6381 master - 1318428931 1318428931 3 connected 2730-4095
//
// Per Redis docs:
// (https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/)
//
// In the above listing the different fields are in order:
// node id, address:port, flags, last ping sent, last pong received,
// configuration epoch, link state, slots.
func parseClusterNodes(nodesResponse string) ([]redisClusterNode, error) {

lines := strings.Split(nodesResponse, "\n")
var nodes []redisClusterNode

for _, line := range lines {
fields := strings.Fields(line)

if len(fields) >= 8 {
endpointParts := strings.FieldsFunc(fields[1], func(r rune) bool {
return strings.ContainsRune(":@", r)
})
if string(fields[1][0]) == ":" {
endpointParts = append([]string{""}, endpointParts...)
}

nodes = append(nodes, redisClusterNode{
nodeId: fields[0],
host: endpointParts[0],
port: endpointParts[1],
})
} else if len(fields) == 0 {
continue
} else {
return nil, fmt.Errorf("unexpected cluster node: \"%s\"", line)
}
}

return nodes, nil
}

// gatherInfoOutput gathers
func gatherInfoOutput(
rdr io.Reader,
Expand Down
81 changes: 81 additions & 0 deletions plugins/inputs/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redis
import (
"bufio"
"fmt"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -62,6 +63,86 @@ func TestRedisConnectIntegration(t *testing.T) {
require.NoError(t, err)
}

func TestRedisConnectWithNodeDiscoveryIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

servicePort := "6379"
container := testutil.Container{
Image: "redis:alpine",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
Cmd: []string{
"redis-server",
"--port", servicePort,
"--appendonly", "yes",
"--cluster-enabled", "yes",
"--cluster-config-file", "nodes.conf",
"--cluster-node-timeout", "5000",
},
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer container.Terminate()

addr := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])

r := &Redis{
Log: testutil.Logger{},
Servers: []string{addr},
NodeDiscovery: true,
}

var acc testutil.Accumulator

err = acc.GatherError(r.Gather)
require.NoError(t, err)
}

func TestParseClusterNodes(t *testing.T) {

var tests = map[string]struct {
clusterNodesResponse string
expectedNodes []redisClusterNode
}{
"SingleContainerCluster": {
"5998443a50112d5a7fa619c0b044451df052974e :6379@16379 myself,master - 0 0 0 connected\n",
[]redisClusterNode{
{nodeId: "5998443a50112d5a7fa619c0b044451df052974e", host: "", port: "6379"},
},
},
"ClusterNodesResponseA": {
`d1861060fe6a534d42d8a19aeb36600e18785e04 127.0.0.1:6379 myself - 0 1318428930 1 connected 0-1364
3886e65cc906bfd9b1f7e7bde468726a052d1dae 127.0.0.1:6380 master - 1318428930 1318428931 2 connected 1365-2729
d289c575dcbc4bdd2931585fd4339089e461a27d 127.0.0.1:6381 master - 1318428931 1318428931 3 connected 2730-4095`,
[]redisClusterNode{
{nodeId: "d1861060fe6a534d42d8a19aeb36600e18785e04", host: "127.0.0.1", port: "6379"},
{nodeId: "3886e65cc906bfd9b1f7e7bde468726a052d1dae", host: "127.0.0.1", port: "6380"},
{nodeId: "d289c575dcbc4bdd2931585fd4339089e461a27d", host: "127.0.0.1", port: "6381"},
},
},
"ClusterNodesResponseB": {
`4ce37a099986f2d0465955e2e66937d6893aa0e1 10.64.82.45:11006@16379 myself,master - 0 1713739012000 5 connected 5462-10922
d6eb119a1f050982cc901ae663e7448867e49f7c 10.64.82.46:11005@16379 master - 0 1713739011916 4 connected 10923-16383
3a386fb6930d8f6c1a6536082071eb2f32590d31 10.64.82.46:11007@16379 master - 0 1713739012922 6 connected 0-5461`,
[]redisClusterNode{
{nodeId: "4ce37a099986f2d0465955e2e66937d6893aa0e1", host: "10.64.82.45", port: "11006"},
{nodeId: "d6eb119a1f050982cc901ae663e7448867e49f7c", host: "10.64.82.46", port: "11005"},
{nodeId: "3a386fb6930d8f6c1a6536082071eb2f32590d31", host: "10.64.82.46", port: "11007"},
},
},
}

for tname, tt := range tests {
testResponse, _ := parseClusterNodes(tt.clusterNodesResponse)
if !reflect.DeepEqual(testResponse, tt.expectedNodes) {
// t.Errorf("%s fail!", tname)
t.Error(tname, "fail! Got:\n", testResponse, "\nExpected:\n", tt.expectedNodes)
}
}
}

func TestRedis_Commands(t *testing.T) {
const redisListKey = "test-list-length"
var acc testutil.Accumulator
Expand Down

0 comments on commit a8bbaf7

Please sign in to comment.