Skip to content

Commit

Permalink
Merge pull request #194 from guillaumeautran/ga_delta_agg_189
Browse files Browse the repository at this point in the history
New aggregators: `delta` and `stdev`
  • Loading branch information
Dieterbe committed Jun 19, 2017
2 parents 051dae3 + bc5f887 commit 2dca812
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 36 deletions.
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ build:
find . -name '*.go' | grep -v '^\.\/vendor' | xargs gofmt -w -s
CGO_ENABLED=0 go build ./cmd/carbon-relay-ng

test:
go test ./...

docker: build
docker build --tag=raintank/carbon-relay-ng:latest .

Expand Down Expand Up @@ -80,7 +83,7 @@ rpm: build
--license BSD \
--url https://github.com/graphite-ng/carbon-relay-ng \
-C redhat .
rm -rf redhat
rm -rf redhat

rpm-centos6: build
mkdir build/centos-6
Expand Down Expand Up @@ -131,4 +134,4 @@ run: build
run-docker:
docker run --rm -p 2003:2003 -p 2004:2004 -p 8081:8081 -v $(pwd)/examples:/conf -v $(pwd)/spool:/spool raintank/carbon-relay-ng

.PHONY: all deb gh-pages install man
.PHONY: all deb gh-pages install man test
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ Note:
if you set the interval to the period between each incoming packet of a given key, and the fmt yields the same key for different input metric keys
- aggregation of individual metrics, i.e. packets for the same key, with different timestamps. For example if you receive values for the same key every second, you can aggregate into minutely buckets by setting interval to 60, and have the fmt yield a unique key for every input metric key. (~ graphite rollups)
- the combination: compute aggregates from values seen with different keys, and at multiple points in time.
* functions currently available: avg, sum, min, max, last
* functions currently available: avg, delta, last, max, min, stdev, sum
* aggregation output is routed via the routing table just like all other metrics. Note that aggregation output will never go back into aggregators (to prevent loops) and also bypasses the validation and blacklist.
* see the included ini for examples

Expand Down Expand Up @@ -214,8 +214,12 @@ commands:

addAgg <func> <regex> <fmt> <interval> <wait> add a new aggregation rule.
<func>: aggregation function to use
sum
avg
delta
max
min
stdev
sum
<regex> regex to match incoming metrics. supports groups (numbered, see fmt)
<fmt> format of output metric. you can use $1, $2, etc to refer to numbered groups
<interval> align odd timestamps of metrics into buckets by this interval in seconds.
Expand Down
71 changes: 54 additions & 17 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,45 @@ package aggregator
import (
"bytes"
"fmt"
"math"
"regexp"
"strconv"
"time"
)

type Func func(in []float64) float64

func Sum(in []float64) float64 {
sum := float64(0)
for _, term := range in {
sum += term
}
return sum
}

func Avg(in []float64) float64 {
if len(in) == 0 {
panic("avg() called in aggregator with 0 terms")
}
return Sum(in) / float64(len(in))
}

func Delta(in []float64) float64 {
if len(in) == 0 {
panic("delta() called in aggregator with 0 terms")
}
min := in[0]
max := in[0]
for _, val := range in {
if val > max {
max = val
} else if val < min {
min = val
}
}
return max - min
}

func Last(in []float64) float64 {
if len(in) == 0 {
panic("last() called in aggregator with 0 terms")
}
last := in[len(in)-1]
return last
}

func Max(in []float64) float64 {
if len(in) == 0 {
panic("max() called in aggregator with 0 terms")
Expand All @@ -51,20 +68,40 @@ func Min(in []float64) float64 {
return min
}

func Last(in []float64) float64 {
func Stdev(in []float64) float64 {
if len(in) == 0 {
panic("last() called in aggregator with 0 terms")
panic("stdev() called in aggregator with 0 terms")
}
last := in[len(in)-1]
return last
// Get the average (or mean) of the series
mean := Avg(in)

// Calculate the variance
variance := float64(0)
for _, term := range in {
variance += math.Pow((float64(term) - mean), float64(2))
}
variance /= float64(len(in))

// Calculate the standard deviation
return math.Sqrt(variance)
}

func Sum(in []float64) float64 {
sum := float64(0)
for _, term := range in {
sum += term
}
return sum
}

var Funcs = map[string]Func{
"sum": Sum,
"avg": Avg,
"max": Max,
"min": Min,
"last": Last,
"avg": Avg,
"delta": Delta,
"last": Last,
"max": Max,
"min": Min,
"stdev": Stdev,
"sum": Sum,
}

type Aggregator struct {
Expand Down
78 changes: 78 additions & 0 deletions aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package aggregator

import (
"testing"
)

func TestScanner(t *testing.T) {
cases := []struct {
in []float64
avg float64
delta float64
last float64
max float64
min float64
stdev float64
sum float64
}{
{
[]float64{5, 4, 7, 4, 2, 5, 4, 9},
5,
7,
9,
9,
2,
2,
40,
},
{
[]float64{6, 2, 3, 1},
3,
5,
1,
6,
1,
1.8708286933869707,
12,
},
}
for i, e := range cases {
var actual float64

actual = Avg(e.in)
if actual != e.avg {
t.Fatalf("case %d AVG - expected %v, actual %v", i, e.avg, actual)
}

actual = Delta(e.in)
if actual != e.delta {
t.Fatalf("case %d DELTA - expected %v, actual %v", i, e.delta, actual)
}

actual = Last(e.in)
if actual != e.last {
t.Fatalf("case %d LAST - expected %v, actual %v", i, e.last, actual)
}

actual = Max(e.in)
if actual != e.max {
t.Fatalf("case %d MAX - expected %v, actual %v", i, e.max, actual)
}

actual = Min(e.in)
if actual != e.min {
t.Fatalf("case %d MIN - expected %v, actual %v", i, e.min, actual)
}

actual = Stdev(e.in)
if actual != e.stdev {
t.Fatalf("case %d STDEV - expected %v, actual %v", i, e.stdev, actual)
}

actual = Sum(e.in)
if actual != e.sum {
t.Fatalf("case %d SUM - expected %v, actual %v", i, e.sum, actual)
}

}
}
12 changes: 8 additions & 4 deletions imperatives/imperatives.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ const (
str
sep
avgFn
deltaFn
lastFn
maxFn
minFn
stdevFn
sumFn
lastFn
num
optPrefix
optAddr
Expand Down Expand Up @@ -96,14 +98,16 @@ var tokens = []toki.Def{
{Token: minFn, Pattern: "min"},
{Token: sumFn, Pattern: "sum"},
{Token: lastFn, Pattern: "last"},
{Token: deltaFn, Pattern: "delta"},
{Token: stdevFn, Pattern: "stdev"},
{Token: num, Pattern: "[0-9]+( |$)"}, // unfortunately we need the 2nd piece cause otherwise it would match the first of ip addresses etc. this means we need to TrimSpace later
{Token: word, Pattern: "[^ ]+"},
}

// note the two spaces between a route and endpoints
// match options can't have spaces for now. sorry
var errFmtAddBlack = errors.New("addBlack <prefix|sub|regex> <pattern>")
var errFmtAddAgg = errors.New("addAgg <avg|max|min|sum|last> <regex> <fmt> <interval> <wait>")
var errFmtAddAgg = errors.New("addAgg <avg|delta|last|max|min|stdev|sum> <regex> <fmt> <interval> <wait>")
var errFmtAddRoute = errors.New("addRoute <type> <key> [prefix/sub/regex=,..] <dest> [<dest>[...]] where <dest> is <addr> [prefix/sub,regex,flush,reconn,pickle,spool=...]") // note flush and reconn are ints, pickle and spool are true/false. other options are strings
var errFmtAddRouteGrafanaNet = errors.New("addRoute grafanaNet key [prefix/sub/regex] addr apiKey schemasFile [spool=true/false sslverify=true/false bufSize=int flushMaxNum=int flushMaxWait=int timeout=int concurrency=int orgId=int]")
var errFmtAddRouteKafkaMdm = errors.New("addRoute kafkaMdm key [prefix/sub/regex] broker topic codec schemasFile partitionBy orgId [bufSize=int flushMaxNum=int flushMaxWait=int timeout=int]")
Expand Down Expand Up @@ -160,8 +164,8 @@ func Apply(table Table, cmd string) error {

func readAddAgg(s *toki.Scanner, table Table) error {
t := s.Next()
if t.Token != sumFn && t.Token != avgFn && t.Token != minFn && t.Token != maxFn && t.Token != lastFn {
return errors.New("invalid function. need avg/max/min/sum/last")
if t.Token != sumFn && t.Token != avgFn && t.Token != minFn && t.Token != maxFn && t.Token != lastFn && t.Token != deltaFn && t.Token != stdevFn {
return errors.New("invalid function. need avg/max/min/sum/last/delta/stdev")
}
fun := string(t.Value)

Expand Down
2 changes: 2 additions & 0 deletions ui/telnet/telnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ commands:
addAgg <func> <regex> <fmt> <interval> <wait> add a new aggregation rule.
<func>: aggregation function to use
avg
delta
last
max
min
stdev
sum
<regex> regex to match incoming metrics. supports groups (numbered, see fmt)
<fmt> format of output metric. you can use $1, $2, etc to refer to numbered groups
Expand Down
15 changes: 11 additions & 4 deletions ui/web/admin_http_assets/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,28 @@ app.controller("MainCtl", ["$scope", "$resource", "$modal", function($scope, $re
$scope.validAggFunc = (function() {
return {
test: function(value) {
if (value == "sum") {
if (value == "avg") {
return true;
}
if (value == "min") {
if (value == "delta") {
return true;
}
if (value == "avg") {
if (value == "last") {
return true;
}
if (value == "max") {
return true;
}
if (value == "last") {
if (value == "min") {
return true;
}
if (value == "stdev") {
return true;
}
if (value == "sum") {
return true;
}

return false;
}
};
Expand Down
2 changes: 1 addition & 1 deletion ui/web/admin_http_assets/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ <h2>Aggregators</h2>
<td class="form-group has-feedback">
<input ng-model="newAgg.Fun" name="fun" class="form-control" placeholder="Function" required ng-pattern="validAggFunc">
<div ng-show="aggForm.fun.$invalid">
<span ng-show="aggForm.fun.$error.pattern">Expected aggregation function: sum, min, max, avg or last</span>
<span ng-show="aggForm.fun.$error.pattern">Expected aggregation function: avg, delta, last, max, min, stdev or sum</span>
</div>
</td>
<td class="form-group has-feedback">
Expand Down

0 comments on commit 2dca812

Please sign in to comment.