Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.redis): Add node discovery for clusters #15209

Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/inputs/redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
# username = ""
# password = ""

## Optional Node Discovery for Redis Cluster
## Enabling this feature triggers the execution of a `cluster nodes` command
## at each metric gathering interval. This command automatically detects
## all cluster nodes and retrieves metrics from each of them.
# node_discovery = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The config should show the default values.

Suggested change
# node_discovery = true
# node_discovery = false


## Optional TLS Config
## Check tls/config.go ClientConfig for more options
# tls_enable = true
Expand Down
140 changes: 124 additions & 16 deletions plugins/inputs/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ type RedisCommand struct {
}

type Redis struct {
Commands []*RedisCommand `toml:"commands"`
Servers []string `toml:"servers"`
Username string `toml:"username"`
Password string `toml:"password"`
Commands []*RedisCommand `toml:"commands"`
Servers []string `toml:"servers"`
Username string `toml:"username"`
Password string `toml:"password"`
NodeDiscovery bool `toml:"node_discovery"`

tls.ClientConfig

Expand All @@ -57,6 +58,12 @@ type RedisClient struct {
tags map[string]string
}

type redisClusterNode struct {
nodeID string
host string
port string
}

// RedisFieldTypes defines the types expected for each of the fields redis reports on
type RedisFieldTypes struct {
ActiveDefragHits int64 `json:"active_defrag_hits"`
Expand Down Expand Up @@ -222,7 +229,7 @@ func (r *Redis) Init() error {
}

func (r *Redis) connect() error {
if r.connected {
if r.connected && !r.NodeDiscovery {
return nil
}

Expand Down Expand Up @@ -281,18 +288,58 @@ func (r *Redis) connect() error {
},
)

tags := map[string]string{}
if u.Scheme == "unix" {
tags["socket"] = u.Path
if r.NodeDiscovery {
nodes, err := discoverNodes(&RedisClient{client, make(map[string]string)})
if err != nil {
return err
}
if nodes == nil {
return fmt.Errorf("unable to discover nodes from %s", address)
}
for _, node := range nodes {
nodeAddress := ""
if node.host == "" {
nodeAddress = address
} else {
nodeAddress = fmt.Sprintf("%s:%s", node.host, node.port)
}
jeremycampbell-okta marked this conversation as resolved.
Show resolved Hide resolved

discoveredClient := redis.NewClient(
&redis.Options{
Addr: nodeAddress,
Username: username,
Password: password,
Network: u.Scheme,
PoolSize: 1,
TLSConfig: tlsConfig,
},
)

tags := map[string]string{
"server": u.Hostname(),
"port": node.port,
"nodeID": node.nodeID,
}

r.clients = append(r.clients, &RedisClient{
client: discoveredClient,
tags: tags,
})
}
} else {
tags["server"] = u.Hostname()
tags["port"] = u.Port()
}
tags := map[string]string{}
if u.Scheme == "unix" {
tags["socket"] = u.Path
} else {
tags["server"] = u.Hostname()
tags["port"] = u.Port()
}

r.clients = append(r.clients, &RedisClient{
client: client,
tags: tags,
})
r.clients = append(r.clients, &RedisClient{
client: client,
tags: tags,
})
}
}

r.connected = true
Expand All @@ -302,7 +349,7 @@ func (r *Redis) connect() error {
// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (r *Redis) Gather(acc telegraf.Accumulator) error {
if !r.connected {
if !r.connected || r.NodeDiscovery {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm if we re-create the slice of clients during each gather, should we also be calling client.Close() on the existing clients first inside connect()?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can test this out. I haven't seen any rise in client connectivity in my deployed tests, but this would add some certainty.

err := r.connect()
if err != nil {
return err
Expand Down Expand Up @@ -344,6 +391,24 @@ func (r *Redis) gatherCommandValues(client Client, acc telegraf.Accumulator) err
return nil
}

func discoverNodes(client Client) ([]redisClusterNode, error) {
val, err := client.Do("string", "cluster", "nodes")

if err != nil {
if strings.Contains(err.Error(), "unexpected type=") {
return nil, fmt.Errorf("could not get command result: %w", err)
}
jeremycampbell-okta marked this conversation as resolved.
Show resolved Hide resolved

return nil, err
}

str, ok := val.(string)
if ok {
return parseClusterNodes(str)
}
return nil, fmt.Errorf("could not discover nodes: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer

Suggested change
str, ok := val.(string)
if ok {
return parseClusterNodes(str)
}
return nil, fmt.Errorf("could not discover nodes: %w", err)
str, ok := val.(string)
if !ok {
return nil, fmt.Errorf("could not discover nodes: %w", err)
}
return parseClusterNodes(str)

and maybe fold the parseClusterNodes function into this one...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked separating parseClusterNodes to make it more easily/directly testable. Can you say more about why you prefer to fold it in?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(To add to this...I expect maintenance of this function may take some work as/if Redis cluster nodes format changes in the future. As you can see from the tests, there are already 3 variations.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mostly a personal preference but splitting code into two functions, i.e. there is only one caller, where the caller side is very small usually requires the reader to jump between different places and makes following the flow more difficult. Like in this example, the discoverNodes function reads, "do a API call and parse the result". This adds no information to the reader as you cannot see how and what is parsed.

The splitting can be reasonable IMO if a) there are many processing steps, b) a lot of branches or c) complex processing steps. None of those criteria are applicable here as the parsing is fairly "simple" in terms of code...

}

func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error {
info, err := client.Info().Result()
if err != nil {
Expand All @@ -354,6 +419,49 @@ func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error {
return gatherInfoOutput(rdr, acc, client.BaseTags())
}

// Parse the list of nodes from a `cluster nodes` command
// This response looks like:
//
// d1861060fe6a534d42d8a19aeb36600e18785e04 127.0.0.1:6379 myself - 0 1318428930 1 connected 0-1364
// 3886e65cc906bfd9b1f7e7bde468726a052d1dae 127.0.0.1:6380 master - 1318428930 1318428931 2 connected 1365-2729
// d289c575dcbc4bdd2931585fd4339089e461a27d 127.0.0.1:6381 master - 1318428931 1318428931 3 connected 2730-4095
//
// Per Redis docs:
// (https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/)
//
// In the above listing the different fields are in order:
// node id, address:port, flags, last ping sent, last pong received,
// configuration epoch, link state, slots.
func parseClusterNodes(nodesResponse string) ([]redisClusterNode, error) {
lines := strings.Split(nodesResponse, "\n")
var nodes []redisClusterNode

for _, line := range lines {
fields := strings.Fields(line)

if len(fields) >= 8 {
endpointParts := strings.FieldsFunc(fields[1], func(r rune) bool {
return strings.ContainsRune(":@", r)
})
if string(fields[1][0]) == ":" {
endpointParts = append([]string{""}, endpointParts...)
}

nodes = append(nodes, redisClusterNode{
nodeID: fields[0],
host: endpointParts[0],
port: endpointParts[1],
})
} else if len(fields) == 0 {
continue
} else {
return nil, fmt.Errorf("unexpected cluster node: \"%s\"", line)
}
jeremycampbell-okta marked this conversation as resolved.
Show resolved Hide resolved
}

return nodes, nil
}

// gatherInfoOutput gathers
func gatherInfoOutput(
rdr io.Reader,
Expand Down
79 changes: 79 additions & 0 deletions plugins/inputs/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redis
import (
"bufio"
"fmt"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -62,6 +63,84 @@ func TestRedisConnectIntegration(t *testing.T) {
require.NoError(t, err)
}

func TestRedisConnectWithNodeDiscoveryIntegration(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

servicePort := "6379"
container := testutil.Container{
Image: "redis:alpine",
ExposedPorts: []string{servicePort},
WaitingFor: wait.ForListeningPort(nat.Port(servicePort)),
Cmd: []string{
"redis-server",
"--port", servicePort,
"--appendonly", "yes",
"--cluster-enabled", "yes",
"--cluster-config-file", "nodes.conf",
"--cluster-node-timeout", "5000",
},
}
err := container.Start()
require.NoError(t, err, "failed to start container")
defer container.Terminate()

addr := fmt.Sprintf("tcp://%s:%s", container.Address, container.Ports[servicePort])

r := &Redis{
Log: testutil.Logger{},
Servers: []string{addr},
NodeDiscovery: true,
}

var acc testutil.Accumulator

err = acc.GatherError(r.Gather)
require.NoError(t, err)
}

func TestParseClusterNodes(t *testing.T) {
var tests = map[string]struct {
clusterNodesResponse string
expectedNodes []redisClusterNode
}{
"SingleContainerCluster": {
"5998443a50112d5a7fa619c0b044451df052974e :6379@16379 myself,master - 0 0 0 connected\n",
[]redisClusterNode{
{nodeID: "5998443a50112d5a7fa619c0b044451df052974e", host: "", port: "6379"},
},
},
"ClusterNodesResponseA": {
`d1861060fe6a534d42d8a19aeb36600e18785e04 127.0.0.1:6379 myself - 0 1318428930 1 connected 0-1364
3886e65cc906bfd9b1f7e7bde468726a052d1dae 127.0.0.1:6380 master - 1318428930 1318428931 2 connected 1365-2729
d289c575dcbc4bdd2931585fd4339089e461a27d 127.0.0.1:6381 master - 1318428931 1318428931 3 connected 2730-4095`,
[]redisClusterNode{
{nodeID: "d1861060fe6a534d42d8a19aeb36600e18785e04", host: "127.0.0.1", port: "6379"},
{nodeID: "3886e65cc906bfd9b1f7e7bde468726a052d1dae", host: "127.0.0.1", port: "6380"},
{nodeID: "d289c575dcbc4bdd2931585fd4339089e461a27d", host: "127.0.0.1", port: "6381"},
},
},
"ClusterNodesResponseB": {
`4ce37a099986f2d0465955e2e66937d6893aa0e1 10.64.82.45:11006@16379 myself,master - 0 1713739012000 5 connected 5462-10922
d6eb119a1f050982cc901ae663e7448867e49f7c 10.64.82.46:11005@16379 master - 0 1713739011916 4 connected 10923-16383
3a386fb6930d8f6c1a6536082071eb2f32590d31 10.64.82.46:11007@16379 master - 0 1713739012922 6 connected 0-5461`,
[]redisClusterNode{
{nodeID: "4ce37a099986f2d0465955e2e66937d6893aa0e1", host: "10.64.82.45", port: "11006"},
{nodeID: "d6eb119a1f050982cc901ae663e7448867e49f7c", host: "10.64.82.46", port: "11005"},
{nodeID: "3a386fb6930d8f6c1a6536082071eb2f32590d31", host: "10.64.82.46", port: "11007"},
},
},
}

for tname, tt := range tests {
testResponse, _ := parseClusterNodes(tt.clusterNodesResponse)
if !reflect.DeepEqual(testResponse, tt.expectedNodes) {
t.Error(tname, "fail! Got:\n", testResponse, "\nExpected:\n", tt.expectedNodes)
}
}
}

func TestRedis_Commands(t *testing.T) {
const redisListKey = "test-list-length"
var acc testutil.Accumulator
Expand Down
6 changes: 6 additions & 0 deletions plugins/inputs/redis/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
# username = ""
# password = ""

## Optional Node Discovery for Redis Cluster
## Enabling this feature triggers the execution of a `cluster nodes` command
## at each metric gathering interval. This command automatically detects
## all cluster nodes and retrieves metrics from each of them.
# node_discovery = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# node_discovery = true
# node_discovery = false


## Optional TLS Config
## Check tls/config.go ClientConfig for more options
# tls_enable = true
Expand Down