Skip to content

Commit

Permalink
Merge pull request #161 from graphite-ng/route-kafka-mdm
Browse files Browse the repository at this point in the history
Route kafka mdm
  • Loading branch information
Dieterbe committed Feb 16, 2017
2 parents 895d0d1 + 67efb37 commit 06bef07
Show file tree
Hide file tree
Showing 137 changed files with 19,889 additions and 71 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ Note:
if you set the interval to the period between each incoming packet of a given key, and the fmt yields the same key for different input metric keys
- aggregation of individual metrics, i.e. packets for the same key, with different timestamps. For example if you receive values for the same key every second, you can aggregate into minutely buckets by setting interval to 60, and have the fmt yield a unique key for every input metric key. (~ graphite rollups)
- the combination: compute aggregates from values seen with different keys, and at multiple points in time.
* functions currently available: avg and sum
* functions currently available: avg, sum, min, max
* aggregation output is routed via the routing table just like all other metrics. Note that aggregation output will never go back into aggregators (to prevent loops) and also bypasses the validation and blacklist.
* see the included ini for examples

Expand Down
29 changes: 29 additions & 0 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,37 @@ func Avg(in []float64) float64 {
return Sum(in) / float64(len(in))
}

func Max(in []float64) float64 {
if len(in) == 0 {
panic("max() called in aggregator with 0 terms")
}
max := in[0]
for _, val := range in {
if val > max {
max = val
}
}
return max
}

func Min(in []float64) float64 {
if len(in) == 0 {
panic("min() called in aggregator with 0 terms")
}
min := in[0]
for _, val := range in {
if val < min {
min = val
}
}
return min
}

var Funcs = map[string]Func{
"sum": Sum,
"avg": Avg,
"max": Max,
"min": Min,
}

type Aggregator struct {
Expand Down Expand Up @@ -132,6 +160,7 @@ func (a *Aggregator) Flush(ts uint) {
if agg.ts < ts {
result := a.fn(agg.values)
metric := fmt.Sprintf("%s %f %d", string(agg.key), result, agg.ts)
log.Debug("aggregator emitting %q", metric)
a.out <- []byte(metric)
} else {
aggregations2 = append(aggregations2, agg)
Expand Down
9 changes: 9 additions & 0 deletions aggregator/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package aggregator

import logging "github.com/op/go-logging"

var log = logging.MustGetLogger("aggregator") // for tests. overridden by main

func SetLogger(l *logging.Logger) {
log = l
}
2 changes: 2 additions & 0 deletions cmd/carbon-relay-ng/carbon-relay-ng.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/BurntSushi/toml"
"github.com/Dieterbe/go-metrics"
"github.com/graphite-ng/carbon-relay-ng/aggregator"
"github.com/graphite-ng/carbon-relay-ng/badmetrics"
"github.com/graphite-ng/carbon-relay-ng/cfg"
"github.com/graphite-ng/carbon-relay-ng/destination"
Expand Down Expand Up @@ -57,6 +58,7 @@ func init() {
destination.SetLogger(log)
telnet.SetLogger(log)
web.SetLogger(log)
aggregator.SetLogger(log)
}

func usage() {
Expand Down
2 changes: 1 addition & 1 deletion docs/grafana-net.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ notes:
* by specifying a prefix, sub or regex you can only send a subset of your metrics to grafana.net hosted metrics

```
addRoute GrafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int]")
addRoute grafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int]")
```


Expand Down
138 changes: 132 additions & 6 deletions imperatives/imperatives.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,18 @@ const (
addRouteSendFirstMatch
addRouteConsistentHashing
addRouteGrafanaNet
addRouteKafkaMdm
addDest
addRewriter
delRoute
modDest
modRoute
str
sep
sumFn
avgFn
maxFn
minFn
sumFn
num
optPrefix
optAddr
Expand Down Expand Up @@ -63,6 +66,7 @@ var tokens = []toki.Def{
{Token: addRouteSendFirstMatch, Pattern: "addRoute sendFirstMatch"},
{Token: addRouteConsistentHashing, Pattern: "addRoute consistentHashing"},
{Token: addRouteGrafanaNet, Pattern: "addRoute grafanaNet"},
{Token: addRouteKafkaMdm, Pattern: "addRoute kafkaMdm"},
{Token: addDest, Pattern: "addDest"},
{Token: addRewriter, Pattern: "addRewriter"},
{Token: delRoute, Pattern: "delRoute"},
Expand All @@ -87,18 +91,21 @@ var tokens = []toki.Def{
{Token: optOrgId, Pattern: "orgId="},
{Token: str, Pattern: "\".*\""},
{Token: sep, Pattern: "##"},
{Token: sumFn, Pattern: "sum"},
{Token: avgFn, Pattern: "avg"},
{Token: maxFn, Pattern: "max"},
{Token: minFn, Pattern: "min"},
{Token: sumFn, Pattern: "sum"},
{Token: num, Pattern: "[0-9]+( |$)"}, // unfortunately we need the 2nd piece cause otherwise it would match the first of ip addresses etc. this means we need to TrimSpace later
{Token: word, Pattern: "[^ ]+"},
}

// note the two spaces between a route and endpoints
// match options can't have spaces for now. sorry
var errFmtAddBlack = errors.New("addBlack <prefix|sub|regex> <pattern>")
var errFmtAddAgg = errors.New("addAgg <sum|avg> <regex> <fmt> <interval> <wait>")
var errFmtAddAgg = errors.New("addAgg <avg|max|min|sum> <regex> <fmt> <interval> <wait>")
var errFmtAddRoute = errors.New("addRoute <type> <key> [prefix/sub/regex=,..] <dest> [<dest>[...]] where <dest> is <addr> [prefix/sub,regex,flush,reconn,pickle,spool=...]") // note flush and reconn are ints, pickle and spool are true/false. other options are strings
var errFmtAddRouteGrafanaNet = errors.New("addRoute GrafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int concurrency=int orgId=int]")
var errFmtAddRouteGrafanaNet = errors.New("addRoute grafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int concurrency=int orgId=int]")
var errFmtAddRouteKafkaMdm = errors.New("addRoute kafkaMdm key [prefix/sub/regex] broker topic codec schemasFile partitionBy orgId [bufSize=int flushMaxNum=int flushMaxWait=int timeout=int]")
var errFmtAddDest = errors.New("addDest <routeKey> <dest>") // not implemented yet
var errFmtAddRewriter = errors.New("addRewriter <old> <new> <max>")
var errFmtModDest = errors.New("modDest <routeKey> <dest> <addr/prefix/sub/regex=>") // one or more can be specified at once
Expand All @@ -123,6 +130,8 @@ func Apply(table *tbl.Table, cmd string) error {
return readAddRouteConsistentHashing(s, table)
case addRouteGrafanaNet:
return readAddRouteGrafanaNet(s, table)
case addRouteKafkaMdm:
return readAddRouteKafkaMdm(s, table)
case addRewriter:
return readAddRewriter(s, table)
case delRoute:
Expand All @@ -138,8 +147,8 @@ func Apply(table *tbl.Table, cmd string) error {

func readAddAgg(s *toki.Scanner, table *tbl.Table) error {
t := s.Next()
if t.Token != sumFn && t.Token != avgFn {
return errors.New("invalid function. need sum/avg")
if t.Token != sumFn && t.Token != avgFn && t.Token != minFn && t.Token != maxFn {
return errors.New("invalid function. need avg/max/min/sum")
}
fun := string(t.Value)

Expand Down Expand Up @@ -403,6 +412,123 @@ func readAddRouteGrafanaNet(s *toki.Scanner, table *tbl.Table) error {
table.AddRoute(route)
return nil
}
func readAddRouteKafkaMdm(s *toki.Scanner, table *tbl.Table) error {
t := s.Next()
if t.Token != word {
return errFmtAddRouteKafkaMdm
}
key := string(t.Value)

prefix, sub, regex, err := readRouteOpts(s)
if err != nil {
return err
}

t = s.Next()
if t.Token != word {
return errFmtAddRouteKafkaMdm
}
broker := string(t.Value)

t = s.Next()
if t.Token != word {
return errFmtAddRouteKafkaMdm
}
topic := string(t.Value)

t = s.Next()
if t.Token != word {
return errFmtAddRouteKafkaMdm
}
codec := string(t.Value)
if codec != "none" && codec != "gzip" && codec != "snappy" {
return errFmtAddRouteKafkaMdm
}

t = s.Next()
if t.Token != word {
return errFmtAddRouteKafkaMdm
}
schemasFile := string(t.Value)

t = s.Next()
if t.Token != word {
return errFmtAddRouteKafkaMdm
}
partitionBy := string(t.Value)
if partitionBy != "byOrg" && partitionBy != "bySeries" {
return errFmtAddRouteKafkaMdm
}

t = s.Next()
if t.Token != word {
return errFmtAddRouteKafkaMdm
}
orgId, err := strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return errFmtAddRouteKafkaMdm
}

var bufSize = int(1e7) // since a message is typically around 100B this is 1GB
var flushMaxNum = 10000 // number of metrics
var flushMaxWait = 500 // in ms
var timeout = 2000 // in ms

t = s.Next()
for ; t.Token != toki.EOF; t = s.Next() {
switch t.Token {
case optBufSize:
t = s.Next()
if t.Token == num {
bufSize, err = strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return err
}
} else {
return errFmtAddRouteKafkaMdm
}
case optFlushMaxNum:
t = s.Next()
if t.Token == num {
flushMaxNum, err = strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return err
}
} else {
return errFmtAddRouteKafkaMdm
}
case optFlushMaxWait:
t = s.Next()
if t.Token == num {
flushMaxWait, err = strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return err
}
} else {
return errFmtAddRouteKafkaMdm
}
case optTimeout:
t = s.Next()
if t.Token == num {
timeout, err = strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return err
}
} else {
return errFmtAddRouteKafkaMdm
}
default:
return fmt.Errorf("unexpected token %d %q", t.Token, t.Value)
}
}

route, err := route.NewKafkaMdm(key, prefix, sub, regex, broker, topic, codec, schemasFile, partitionBy, bufSize, orgId, flushMaxNum, flushMaxWait, timeout)
if err != nil {
return err
}
table.AddRoute(route)
return nil
}

func readAddRewriter(s *toki.Scanner, table *tbl.Table) error {
var t *toki.Result
Expand Down
63 changes: 4 additions & 59 deletions route/grafananet.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ package route
import (
"bytes"
"crypto/tls"
"fmt"
"hash/fnv"
"net"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -62,24 +59,10 @@ func NewGrafanaNet(key, prefix, sub, regex, addr, apiKey, schemasFile string, sp
if err != nil {
return nil, err
}
schemas, err := persister.ReadWhisperSchemas(schemasFile)
schemas, err := getSchemas(schemasFile)
if err != nil {
return nil, err
}
var defaultFound bool
for _, schema := range schemas {
if schema.Pattern.String() == ".*" {
defaultFound = true
}
if len(schema.Retentions) == 0 {
return nil, fmt.Errorf("retention setting cannot be empty")
}
}
if !defaultFound {
// good graphite health (not sure what graphite does if there's no .*
// but we definitely need to always be able to determine which interval to use
return nil, fmt.Errorf("storage-conf does not have a default '.*' pattern")
}

cleanAddr := util.AddrToPath(addr)

Expand Down Expand Up @@ -170,8 +153,9 @@ func (route *GrafanaNet) run() {
select {
case buf := <-route.buf:
route.numBuffered.Dec(1)
md := parseMetric(buf, route.schemas, route.orgId)
if md == nil {
md, err := parseMetric(buf, route.schemas, route.orgId)
if err != nil {
log.Error("RouteGrafanaNet: %s", err)
continue
}
md.SetId()
Expand Down Expand Up @@ -249,45 +233,6 @@ func (route *GrafanaNet) flush(shard int) {
route.wg.Done()
}

func parseMetric(buf []byte, schemas persister.WhisperSchemas, orgId int) *schema.MetricData {
msg := strings.TrimSpace(string(buf))

elements := strings.Fields(msg)
if len(elements) != 3 {
log.Error("RouteGrafanaNet: %q error: need 3 fields", msg)
return nil
}
name := elements[0]
val, err := strconv.ParseFloat(elements[1], 64)
if err != nil {
log.Error("RouteGrafanaNet: %q error: %s", msg, err.Error())
return nil
}
timestamp, err := strconv.ParseUint(elements[2], 10, 32)
if err != nil {
log.Error("RouteGrafanaNet: %q error: %s", msg, err.Error())
return nil
}

s, ok := schemas.Match(name)
if !ok {
panic(fmt.Errorf("couldn't find a schema for %q - this is impossible since we asserted there was a default with patt .*", name))
}

md := schema.MetricData{
Name: name,
Metric: name,
Interval: s.Retentions[0].SecondsPerPoint(),
Value: val,
Unit: "unknown",
Time: int64(timestamp),
Mtype: "gauge",
Tags: []string{},
OrgId: orgId, // This may be overwritten by the TSDB-GW if it does not match the orgId of the apiKey used
}
return &md
}

func (route *GrafanaNet) Dispatch(buf []byte) {
//conf := route.config.Load().(Config)
// should return as quickly as possible
Expand Down

0 comments on commit 06bef07

Please sign in to comment.