-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(inputs.redis): Add node discovery for clusters #15209
Changes from all commits
ff4bab0
efac4c7
75b2d57
479b7e7
61509bf
2b045c2
0a424c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -32,10 +32,11 @@ type RedisCommand struct { | |||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
type Redis struct { | ||||||||||||||||||||
Commands []*RedisCommand `toml:"commands"` | ||||||||||||||||||||
Servers []string `toml:"servers"` | ||||||||||||||||||||
Username string `toml:"username"` | ||||||||||||||||||||
Password string `toml:"password"` | ||||||||||||||||||||
Commands []*RedisCommand `toml:"commands"` | ||||||||||||||||||||
Servers []string `toml:"servers"` | ||||||||||||||||||||
Username string `toml:"username"` | ||||||||||||||||||||
Password string `toml:"password"` | ||||||||||||||||||||
NodeDiscovery bool `toml:"node_discovery"` | ||||||||||||||||||||
|
||||||||||||||||||||
tls.ClientConfig | ||||||||||||||||||||
|
||||||||||||||||||||
|
@@ -57,6 +58,12 @@ type RedisClient struct { | |||||||||||||||||||
tags map[string]string | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
type redisClusterNode struct { | ||||||||||||||||||||
nodeID string | ||||||||||||||||||||
host string | ||||||||||||||||||||
port string | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// RedisFieldTypes defines the types expected for each of the fields redis reports on | ||||||||||||||||||||
type RedisFieldTypes struct { | ||||||||||||||||||||
ActiveDefragHits int64 `json:"active_defrag_hits"` | ||||||||||||||||||||
|
@@ -222,7 +229,7 @@ func (r *Redis) Init() error { | |||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func (r *Redis) connect() error { | ||||||||||||||||||||
if r.connected { | ||||||||||||||||||||
if r.connected && !r.NodeDiscovery { | ||||||||||||||||||||
return nil | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
|
@@ -281,18 +288,56 @@ func (r *Redis) connect() error { | |||||||||||||||||||
}, | ||||||||||||||||||||
) | ||||||||||||||||||||
|
||||||||||||||||||||
tags := map[string]string{} | ||||||||||||||||||||
if u.Scheme == "unix" { | ||||||||||||||||||||
tags["socket"] = u.Path | ||||||||||||||||||||
if r.NodeDiscovery { | ||||||||||||||||||||
nodes, err := discoverNodes(&RedisClient{client, make(map[string]string)}) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return err | ||||||||||||||||||||
} | ||||||||||||||||||||
if nodes == nil { | ||||||||||||||||||||
return fmt.Errorf("unable to discover nodes from %s", address) | ||||||||||||||||||||
} | ||||||||||||||||||||
for _, node := range nodes { | ||||||||||||||||||||
nodeAddress := address | ||||||||||||||||||||
if node.host != "" { | ||||||||||||||||||||
nodeAddress = node.host + ":" + node.port | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
discoveredClient := redis.NewClient( | ||||||||||||||||||||
&redis.Options{ | ||||||||||||||||||||
Addr: nodeAddress, | ||||||||||||||||||||
Username: username, | ||||||||||||||||||||
Password: password, | ||||||||||||||||||||
Network: u.Scheme, | ||||||||||||||||||||
PoolSize: 1, | ||||||||||||||||||||
TLSConfig: tlsConfig, | ||||||||||||||||||||
}, | ||||||||||||||||||||
) | ||||||||||||||||||||
|
||||||||||||||||||||
tags := map[string]string{ | ||||||||||||||||||||
"server": u.Hostname(), | ||||||||||||||||||||
"port": node.port, | ||||||||||||||||||||
"nodeID": node.nodeID, | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
r.clients = append(r.clients, &RedisClient{ | ||||||||||||||||||||
client: discoveredClient, | ||||||||||||||||||||
tags: tags, | ||||||||||||||||||||
}) | ||||||||||||||||||||
} | ||||||||||||||||||||
} else { | ||||||||||||||||||||
tags["server"] = u.Hostname() | ||||||||||||||||||||
tags["port"] = u.Port() | ||||||||||||||||||||
} | ||||||||||||||||||||
tags := map[string]string{} | ||||||||||||||||||||
if u.Scheme == "unix" { | ||||||||||||||||||||
tags["socket"] = u.Path | ||||||||||||||||||||
} else { | ||||||||||||||||||||
tags["server"] = u.Hostname() | ||||||||||||||||||||
tags["port"] = u.Port() | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
r.clients = append(r.clients, &RedisClient{ | ||||||||||||||||||||
client: client, | ||||||||||||||||||||
tags: tags, | ||||||||||||||||||||
}) | ||||||||||||||||||||
r.clients = append(r.clients, &RedisClient{ | ||||||||||||||||||||
client: client, | ||||||||||||||||||||
tags: tags, | ||||||||||||||||||||
}) | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
r.connected = true | ||||||||||||||||||||
|
@@ -302,7 +347,7 @@ func (r *Redis) connect() error { | |||||||||||||||||||
// Reads stats from all configured servers accumulates stats. | ||||||||||||||||||||
// Returns one of the errors encountered while gather stats (if any). | ||||||||||||||||||||
func (r *Redis) Gather(acc telegraf.Accumulator) error { | ||||||||||||||||||||
if !r.connected { | ||||||||||||||||||||
if !r.connected || r.NodeDiscovery { | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm if we re-create the slice of clients during each gather, should we also be calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can test this out. I haven't seen any rise in client connectivity in my deployed tests, but this would add some certainty. |
||||||||||||||||||||
err := r.connect() | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return err | ||||||||||||||||||||
|
@@ -344,6 +389,20 @@ func (r *Redis) gatherCommandValues(client Client, acc telegraf.Accumulator) err | |||||||||||||||||||
return nil | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func discoverNodes(client Client) ([]redisClusterNode, error) { | ||||||||||||||||||||
val, err := client.Do("string", "cluster", "nodes") | ||||||||||||||||||||
|
||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
return nil, err | ||||||||||||||||||||
} | ||||||||||||||||||||
Comment on lines
+393
to
+397
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||
|
||||||||||||||||||||
str, ok := val.(string) | ||||||||||||||||||||
if !ok { | ||||||||||||||||||||
return nil, fmt.Errorf("could not discover nodes: %w", err) | ||||||||||||||||||||
} | ||||||||||||||||||||
return parseClusterNodes(str) | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error { | ||||||||||||||||||||
info, err := client.Info().Result() | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
|
@@ -354,6 +413,49 @@ func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error { | |||||||||||||||||||
return gatherInfoOutput(rdr, acc, client.BaseTags()) | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// Parse the list of nodes from a `cluster nodes` command | ||||||||||||||||||||
// This response looks like: | ||||||||||||||||||||
// | ||||||||||||||||||||
// d1861060fe6a534d42d8a19aeb36600e18785e04 127.0.0.1:6379 myself - 0 1318428930 1 connected 0-1364 | ||||||||||||||||||||
// 3886e65cc906bfd9b1f7e7bde468726a052d1dae 127.0.0.1:6380 master - 1318428930 1318428931 2 connected 1365-2729 | ||||||||||||||||||||
// d289c575dcbc4bdd2931585fd4339089e461a27d 127.0.0.1:6381 master - 1318428931 1318428931 3 connected 2730-4095 | ||||||||||||||||||||
// | ||||||||||||||||||||
// Per Redis docs: | ||||||||||||||||||||
// (https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/) | ||||||||||||||||||||
// | ||||||||||||||||||||
// In the above listing the different fields are in order: | ||||||||||||||||||||
// node id, address:port, flags, last ping sent, last pong received, | ||||||||||||||||||||
// configuration epoch, link state, slots. | ||||||||||||||||||||
func parseClusterNodes(nodesResponse string) ([]redisClusterNode, error) { | ||||||||||||||||||||
lines := strings.Split(nodesResponse, "\n") | ||||||||||||||||||||
nodes := make([]redisClusterNode, 0, len(lines)) | ||||||||||||||||||||
|
||||||||||||||||||||
for _, line := range lines { | ||||||||||||||||||||
fields := strings.Fields(line) | ||||||||||||||||||||
|
||||||||||||||||||||
if len(fields) == 0 { | ||||||||||||||||||||
continue | ||||||||||||||||||||
} | ||||||||||||||||||||
if len(fields) < 8 { | ||||||||||||||||||||
return nil, fmt.Errorf("unexpected cluster node: \"%s\"", line) | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
endpointParts := strings.FieldsFunc(fields[1], func(r rune) bool { | ||||||||||||||||||||
return strings.ContainsRune(":@", r) | ||||||||||||||||||||
}) | ||||||||||||||||||||
if string(fields[1][0]) == ":" { | ||||||||||||||||||||
endpointParts = append([]string{""}, endpointParts...) | ||||||||||||||||||||
} | ||||||||||||||||||||
nodes = append(nodes, redisClusterNode{ | ||||||||||||||||||||
nodeID: fields[0], | ||||||||||||||||||||
host: endpointParts[0], | ||||||||||||||||||||
port: endpointParts[1], | ||||||||||||||||||||
}) | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
return nodes, nil | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// gatherInfoOutput gathers | ||||||||||||||||||||
func gatherInfoOutput( | ||||||||||||||||||||
rdr io.Reader, | ||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -27,6 +27,12 @@ | |||||
# username = "" | ||||||
# password = "" | ||||||
|
||||||
## Optional Node Discovery for Redis Cluster | ||||||
## Enabling this feature triggers the execution of a `cluster nodes` command | ||||||
## at each metric gathering interval. This command automatically detects | ||||||
## all cluster nodes and retrieves metrics from each of them. | ||||||
# node_discovery = true | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
## Optional TLS Config | ||||||
## Check tls/config.go ClientConfig for more options | ||||||
# tls_enable = true | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The config should show the default values.