-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(inputs.redis): Add node discovery for clusters #15209
Changes from 5 commits
ff4bab0
efac4c7
75b2d57
479b7e7
61509bf
2b045c2
0a424c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -32,10 +32,11 @@ type RedisCommand struct { | |||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
type Redis struct { | ||||||||||||||||||||||
Commands []*RedisCommand `toml:"commands"` | ||||||||||||||||||||||
Servers []string `toml:"servers"` | ||||||||||||||||||||||
Username string `toml:"username"` | ||||||||||||||||||||||
Password string `toml:"password"` | ||||||||||||||||||||||
Commands []*RedisCommand `toml:"commands"` | ||||||||||||||||||||||
Servers []string `toml:"servers"` | ||||||||||||||||||||||
Username string `toml:"username"` | ||||||||||||||||||||||
Password string `toml:"password"` | ||||||||||||||||||||||
NodeDiscovery bool `toml:"node_discovery"` | ||||||||||||||||||||||
|
||||||||||||||||||||||
tls.ClientConfig | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
@@ -57,6 +58,12 @@ type RedisClient struct { | |||||||||||||||||||||
tags map[string]string | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
type redisClusterNode struct { | ||||||||||||||||||||||
nodeID string | ||||||||||||||||||||||
host string | ||||||||||||||||||||||
port string | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
// RedisFieldTypes defines the types expected for each of the fields redis reports on | ||||||||||||||||||||||
type RedisFieldTypes struct { | ||||||||||||||||||||||
ActiveDefragHits int64 `json:"active_defrag_hits"` | ||||||||||||||||||||||
|
@@ -222,7 +229,7 @@ func (r *Redis) Init() error { | |||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
func (r *Redis) connect() error { | ||||||||||||||||||||||
if r.connected { | ||||||||||||||||||||||
if r.connected && !r.NodeDiscovery { | ||||||||||||||||||||||
return nil | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
@@ -281,18 +288,58 @@ func (r *Redis) connect() error { | |||||||||||||||||||||
}, | ||||||||||||||||||||||
) | ||||||||||||||||||||||
|
||||||||||||||||||||||
tags := map[string]string{} | ||||||||||||||||||||||
if u.Scheme == "unix" { | ||||||||||||||||||||||
tags["socket"] = u.Path | ||||||||||||||||||||||
if r.NodeDiscovery { | ||||||||||||||||||||||
nodes, err := discoverNodes(&RedisClient{client, make(map[string]string)}) | ||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||
return err | ||||||||||||||||||||||
} | ||||||||||||||||||||||
if nodes == nil { | ||||||||||||||||||||||
return fmt.Errorf("unable to discover nodes from %s", address) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
for _, node := range nodes { | ||||||||||||||||||||||
nodeAddress := "" | ||||||||||||||||||||||
if node.host == "" { | ||||||||||||||||||||||
nodeAddress = address | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
nodeAddress = fmt.Sprintf("%s:%s", node.host, node.port) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
jeremycampbell-okta marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||
|
||||||||||||||||||||||
discoveredClient := redis.NewClient( | ||||||||||||||||||||||
&redis.Options{ | ||||||||||||||||||||||
Addr: nodeAddress, | ||||||||||||||||||||||
Username: username, | ||||||||||||||||||||||
Password: password, | ||||||||||||||||||||||
Network: u.Scheme, | ||||||||||||||||||||||
PoolSize: 1, | ||||||||||||||||||||||
TLSConfig: tlsConfig, | ||||||||||||||||||||||
}, | ||||||||||||||||||||||
) | ||||||||||||||||||||||
|
||||||||||||||||||||||
tags := map[string]string{ | ||||||||||||||||||||||
"server": u.Hostname(), | ||||||||||||||||||||||
"port": node.port, | ||||||||||||||||||||||
"nodeID": node.nodeID, | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
r.clients = append(r.clients, &RedisClient{ | ||||||||||||||||||||||
client: discoveredClient, | ||||||||||||||||||||||
tags: tags, | ||||||||||||||||||||||
}) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
tags["server"] = u.Hostname() | ||||||||||||||||||||||
tags["port"] = u.Port() | ||||||||||||||||||||||
} | ||||||||||||||||||||||
tags := map[string]string{} | ||||||||||||||||||||||
if u.Scheme == "unix" { | ||||||||||||||||||||||
tags["socket"] = u.Path | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
tags["server"] = u.Hostname() | ||||||||||||||||||||||
tags["port"] = u.Port() | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
r.clients = append(r.clients, &RedisClient{ | ||||||||||||||||||||||
client: client, | ||||||||||||||||||||||
tags: tags, | ||||||||||||||||||||||
}) | ||||||||||||||||||||||
r.clients = append(r.clients, &RedisClient{ | ||||||||||||||||||||||
client: client, | ||||||||||||||||||||||
tags: tags, | ||||||||||||||||||||||
}) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
r.connected = true | ||||||||||||||||||||||
|
@@ -302,7 +349,7 @@ func (r *Redis) connect() error { | |||||||||||||||||||||
// Reads stats from all configured servers accumulates stats. | ||||||||||||||||||||||
// Returns one of the errors encountered while gather stats (if any). | ||||||||||||||||||||||
func (r *Redis) Gather(acc telegraf.Accumulator) error { | ||||||||||||||||||||||
if !r.connected { | ||||||||||||||||||||||
if !r.connected || r.NodeDiscovery { | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm if we re-create the slice of clients during each gather, should we also be calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can test this out. I haven't seen any rise in client connectivity in my deployed tests, but this would add some certainty. |
||||||||||||||||||||||
err := r.connect() | ||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||
return err | ||||||||||||||||||||||
|
@@ -344,6 +391,24 @@ func (r *Redis) gatherCommandValues(client Client, acc telegraf.Accumulator) err | |||||||||||||||||||||
return nil | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
func discoverNodes(client Client) ([]redisClusterNode, error) { | ||||||||||||||||||||||
val, err := client.Do("string", "cluster", "nodes") | ||||||||||||||||||||||
|
||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||
if strings.Contains(err.Error(), "unexpected type=") { | ||||||||||||||||||||||
return nil, fmt.Errorf("could not get command result: %w", err) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
jeremycampbell-okta marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||
|
||||||||||||||||||||||
return nil, err | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
str, ok := val.(string) | ||||||||||||||||||||||
if ok { | ||||||||||||||||||||||
return parseClusterNodes(str) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
return nil, fmt.Errorf("could not discover nodes: %w", err) | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer
Suggested change
and maybe fold the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I liked separating parseClusterNodes to make it more easily/directly testable. Can you say more about why you prefer to fold it in? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (To add to this...I expect maintenance of this function may take some work as/if Redis There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's mostly a personal preference but splitting code into two functions, i.e. there is only one caller, where the caller side is very small usually requires the reader to jump between different places and makes following the flow more difficult. Like in this example, the The splitting can be reasonable IMO if a) there are many processing steps, b) a lot of branches or c) complex processing steps. None of those criteria are applicable here as the parsing is fairly "simple" in terms of code... |
||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error { | ||||||||||||||||||||||
info, err := client.Info().Result() | ||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||
|
@@ -354,6 +419,49 @@ func (r *Redis) gatherServer(client Client, acc telegraf.Accumulator) error { | |||||||||||||||||||||
return gatherInfoOutput(rdr, acc, client.BaseTags()) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
// Parse the list of nodes from a `cluster nodes` command | ||||||||||||||||||||||
// This response looks like: | ||||||||||||||||||||||
// | ||||||||||||||||||||||
// d1861060fe6a534d42d8a19aeb36600e18785e04 127.0.0.1:6379 myself - 0 1318428930 1 connected 0-1364 | ||||||||||||||||||||||
// 3886e65cc906bfd9b1f7e7bde468726a052d1dae 127.0.0.1:6380 master - 1318428930 1318428931 2 connected 1365-2729 | ||||||||||||||||||||||
// d289c575dcbc4bdd2931585fd4339089e461a27d 127.0.0.1:6381 master - 1318428931 1318428931 3 connected 2730-4095 | ||||||||||||||||||||||
// | ||||||||||||||||||||||
// Per Redis docs: | ||||||||||||||||||||||
// (https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/) | ||||||||||||||||||||||
// | ||||||||||||||||||||||
// In the above listing the different fields are in order: | ||||||||||||||||||||||
// node id, address:port, flags, last ping sent, last pong received, | ||||||||||||||||||||||
// configuration epoch, link state, slots. | ||||||||||||||||||||||
func parseClusterNodes(nodesResponse string) ([]redisClusterNode, error) { | ||||||||||||||||||||||
lines := strings.Split(nodesResponse, "\n") | ||||||||||||||||||||||
var nodes []redisClusterNode | ||||||||||||||||||||||
|
||||||||||||||||||||||
for _, line := range lines { | ||||||||||||||||||||||
fields := strings.Fields(line) | ||||||||||||||||||||||
|
||||||||||||||||||||||
if len(fields) >= 8 { | ||||||||||||||||||||||
endpointParts := strings.FieldsFunc(fields[1], func(r rune) bool { | ||||||||||||||||||||||
return strings.ContainsRune(":@", r) | ||||||||||||||||||||||
}) | ||||||||||||||||||||||
if string(fields[1][0]) == ":" { | ||||||||||||||||||||||
endpointParts = append([]string{""}, endpointParts...) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
nodes = append(nodes, redisClusterNode{ | ||||||||||||||||||||||
nodeID: fields[0], | ||||||||||||||||||||||
host: endpointParts[0], | ||||||||||||||||||||||
port: endpointParts[1], | ||||||||||||||||||||||
}) | ||||||||||||||||||||||
} else if len(fields) == 0 { | ||||||||||||||||||||||
continue | ||||||||||||||||||||||
} else { | ||||||||||||||||||||||
return nil, fmt.Errorf("unexpected cluster node: \"%s\"", line) | ||||||||||||||||||||||
} | ||||||||||||||||||||||
jeremycampbell-okta marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
return nodes, nil | ||||||||||||||||||||||
} | ||||||||||||||||||||||
|
||||||||||||||||||||||
// gatherInfoOutput gathers | ||||||||||||||||||||||
func gatherInfoOutput( | ||||||||||||||||||||||
rdr io.Reader, | ||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -27,6 +27,12 @@ | |||||
# username = "" | ||||||
# password = "" | ||||||
|
||||||
## Optional Node Discovery for Redis Cluster | ||||||
## Enabling this feature triggers the execution of a `cluster nodes` command | ||||||
## at each metric gathering interval. This command automatically detects | ||||||
## all cluster nodes and retrieves metrics from each of them. | ||||||
# node_discovery = true | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
## Optional TLS Config | ||||||
## Check tls/config.go ClientConfig for more options | ||||||
# tls_enable = true | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The config should show the default values.