Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.redis): Add node discovery for clusters #15209

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/inputs/redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# username = ""
# password = ""

## Optional Node Discovery for Redis Cluster
## Enabling this feature triggers the execution of a `cluster nodes` command
## at each metric gathering interval. This command automatically detects
## all cluster nodes and retrieves metrics from each of them.
# node_discovery = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config should show the default values.

Suggested change
# node_discovery = true
# node_discovery = false


## Optional TLS Config
## Check tls/config.go ClientConfig for more options
# tls_enable = true
Expand Down
134 changes: 118 additions & 16 deletions plugins/inputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ type RedisCommand struct {
}

type Redis struct {
Commands []*RedisCommand `toml:"commands"`
Servers []string `toml:"servers"`
Username string `toml:"username"`
Password string `toml:"password"`
Commands []*RedisCommand `toml:"commands"`
Servers []string `toml:"servers"`
Username string `toml:"username"`
Password string `toml:"password"`
NodeDiscovery bool `toml:"node_discovery"`

tls.ClientConfig

Expand All @@ -57,6 +58,12 @@ type RedisClient struct {
tags map[string]string
}

type redisClusterNode struct {
nodeID string
host string
port string
}

// RedisFieldTypes defines the types expected for each of the fields redis reports on
type RedisFieldTypes struct {
ActiveDefragHits int64 `json:"active_defrag_hits"`
Expand Down Expand Up @@ -222,7 +229,7 @@ func (r *Redis) Init() error {
}

func (r *Redis) connect() error {
if r.connected {
if r.connected && !r.NodeDiscovery {
return nil
}

Expand Down Expand Up @@ -281,18 +288,56 @@ func (r *Redis) connect() error {
},
)

tags := map[string]string{}
if u.Scheme == "unix" {
tags["socket"] = u.Path
if r.NodeDiscovery {
nodes, err := discoverNodes(&RedisClient{client, make(map[string]string)})
if err != nil {
return err
}
if nodes == nil {
return fmt.Errorf("unable to discover nodes from %s", address)
}
for _, node := range nodes {
nodeAddress := address
if node.host != "" {
nodeAddress = node.host + ":" + node.port
}

discoveredClient := redis.NewClient(
&redis.Options{
Addr: nodeAddress,
Username: username,
Password: password,
Network: u.Scheme,
PoolSize: 1,
TLSConfig: tlsConfig,
},
)

tags := map[string]string{
"server": u.Hostname(),
"port": node.port,
"nodeID": node.nodeID,
}

r.clients = append(r.clients, &RedisClient{
client: discoveredClient,
tags: tags,
})
}
} else {
tags["server"] = u.Hostname()
tags["port"] = u.Port()
}
tags := map[string]string{}
if u.Scheme == "unix" {
tags["socket"] = u.Path
} else {
tags["server"] = u.Hostname()
tags["port"] = u.Port()
}

r.clients = append(r.clients, &RedisClient{
client: client,
tags: tags,
})
r.clients = append(r.clients, &RedisClient{
client: client,
tags: tags,
})
}
}

r.connected = true
Expand All @@ -302,7 +347,7 @@ func (r *Redis) connect() error {
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (r *Redis) Gather(acc telegraf.Accumulator) error {
if !r.connected {
if !r.connected || r.NodeDiscovery {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm if we re-create the slice of clients during each gather, should we also be calling client.Close() on the existing clients first inside connect()?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can test this out. I haven't seen any rise in client connectivity in my deployed tests, but this would add some certainty.

err := r.connect()
if err != nil {
return err
Expand Down Expand Up @@ -344,6 +389,20 @@ func (r *Redis) gatherCommandValues(client Client, acc telegraf.Accumulator) err
return nil
}

func discoverNodes(client Client) ([]redisClusterNode, error) {
val, err := client.Do("string", "cluster", "nodes")

if err != nil {
return nil, err
}
Comment on lines +393 to +397
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val, err := client.Do("string", "cluster", "nodes")
if err != nil {
return nil, err
}
val, err := client.Do("string", "cluster", "nodes")
if err != nil {
return nil, err
}


str, ok := val.(string)
if !ok {
return nil, fmt.Errorf("could not discover nodes: %w", err)
}
return parseClusterNodes(str)
}

func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error {
info, err := client.Info().Result()
if err != nil {
Expand All @@ -354,6 +413,49 @@ func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error {
return gatherInfoOutput(rdr, acc, client.BaseTags())
}

// Parse the list of nodes from a `cluster nodes` command
// This response looks like:
//
// d1861060fe6a534d42d8a19aeb36600e18785e04 127.0.0.1:6379 myself - 0 1318428930 1 connected 0-1364
// 3886e65cc906bfd9b1f7e7bde468726a052d1dae 127.0.0.1:6380 master - 1318428930 1318428931 2 connected 1365-2729
// d289c575dcbc4bdd2931585fd4339089e461a27d 127.0.0.1:6381 master - 1318428931 1318428931 3 connected 2730-4095
//
// Per Redis docs:
// (https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/)
//
// In the above listing the different fields are in order:
// node id, address:port, flags, last ping sent, last pong received,
// configuration epoch, link state, slots.
func parseClusterNodes(nodesResponse string) ([]redisClusterNode, error) {
lines := strings.Split(nodesResponse, "\n")
nodes := make([]redisClusterNode, 0, len(lines))

for _, line := range lines {
fields := strings.Fields(line)

if len(fields) == 0 {
continue
}
if len(fields) < 8 {
return nil, fmt.Errorf("unexpected cluster node: \"%s\"", line)
}

endpointParts := strings.FieldsFunc(fields[1], func(r rune) bool {
return strings.ContainsRune(":@", r)
})
if string(fields[1][0]) == ":" {
endpointParts = append([]string{""}, endpointParts...)
}
nodes = append(nodes, redisClusterNode{
nodeID: fields[0],
host: endpointParts[0],
port: endpointParts[1],
})
}

return nodes, nil
}

// gatherInfoOutput gathers
func gatherInfoOutput(
rdr io.Reader,
Expand Down
79 changes: 79 additions & 0 deletions plugins/inputs/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redis
import (
"bufio"
"fmt"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -62,6 +63,84 @@ func TestRedisConnectIntegration(t *testing.T) {
require.NoError(t, err)
}

func TestRedisConnectWithNodeDiscoveryIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

servicePort := "6379"
container := testutil.Container{
Image: "redis:alpine",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
Cmd: []string{
"redis-server",
"--port", servicePort,
"--appendonly", "yes",
"--cluster-enabled", "yes",
"--cluster-config-file", "nodes.conf",
"--cluster-node-timeout", "5000",
},
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer container.Terminate()

addr := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])

r := &Redis{
Log: testutil.Logger{},
Servers: []string{addr},
NodeDiscovery: true,
}

var acc testutil.Accumulator

err = acc.GatherError(r.Gather)
require.NoError(t, err)
}

func TestParseClusterNodes(t *testing.T) {
var tests = map[string]struct {
clusterNodesResponse string
expectedNodes []redisClusterNode
}{
"SingleContainerCluster": {
"5998443a50112d5a7fa619c0b044451df052974e :6379@16379 myself,master - 0 0 0 connected\n",
[]redisClusterNode{
{nodeID: "5998443a50112d5a7fa619c0b044451df052974e", host: "", port: "6379"},
},
},
"ClusterNodesResponseA": {
`d1861060fe6a534d42d8a19aeb36600e18785e04 127.0.0.1:6379 myself - 0 1318428930 1 connected 0-1364
3886e65cc906bfd9b1f7e7bde468726a052d1dae 127.0.0.1:6380 master - 1318428930 1318428931 2 connected 1365-2729
d289c575dcbc4bdd2931585fd4339089e461a27d 127.0.0.1:6381 master - 1318428931 1318428931 3 connected 2730-4095`,
[]redisClusterNode{
{nodeID: "d1861060fe6a534d42d8a19aeb36600e18785e04", host: "127.0.0.1", port: "6379"},
{nodeID: "3886e65cc906bfd9b1f7e7bde468726a052d1dae", host: "127.0.0.1", port: "6380"},
{nodeID: "d289c575dcbc4bdd2931585fd4339089e461a27d", host: "127.0.0.1", port: "6381"},
},
},
"ClusterNodesResponseB": {
`4ce37a099986f2d0465955e2e66937d6893aa0e1 10.64.82.45:11006@16379 myself,master - 0 1713739012000 5 connected 5462-10922
d6eb119a1f050982cc901ae663e7448867e49f7c 10.64.82.46:11005@16379 master - 0 1713739011916 4 connected 10923-16383
3a386fb6930d8f6c1a6536082071eb2f32590d31 10.64.82.46:11007@16379 master - 0 1713739012922 6 connected 0-5461`,
[]redisClusterNode{
{nodeID: "4ce37a099986f2d0465955e2e66937d6893aa0e1", host: "10.64.82.45", port: "11006"},
{nodeID: "d6eb119a1f050982cc901ae663e7448867e49f7c", host: "10.64.82.46", port: "11005"},
{nodeID: "3a386fb6930d8f6c1a6536082071eb2f32590d31", host: "10.64.82.46", port: "11007"},
},
},
}

for tname, tt := range tests {
testResponse, _ := parseClusterNodes(tt.clusterNodesResponse)
if !reflect.DeepEqual(testResponse, tt.expectedNodes) {
t.Error(tname, "fail! Got:\n", testResponse, "\nExpected:\n", tt.expectedNodes)
}
}
}

func TestRedis_Commands(t *testing.T) {
const redisListKey = "test-list-length"
var acc testutil.Accumulator
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/redis/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
# username = ""
# password = ""

## Optional Node Discovery for Redis Cluster
## Enabling this feature triggers the execution of a `cluster nodes` command
## at each metric gathering interval. This command automatically detects
## all cluster nodes and retrieves metrics from each of them.
# node_discovery = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# node_discovery = true
# node_discovery = false


## Optional TLS Config
## Check tls/config.go ClientConfig for more options
# tls_enable = true
Expand Down