Skip to content

Commit

Permalink
Merge pull request #210 from graphite-ng/more-configurables
Browse files Browse the repository at this point in the history
more configurables
  • Loading branch information
Dieterbe committed Aug 16, 2017
2 parents 37b25f9 + 4189eab commit 724ba5e
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 54 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,16 @@ commands:
reconn=<int> reconnection interval in ms
pickle={true,false} pickle output format instead of the default text protocol
spool={true,false} enable spooling for this endpoint
connbuf=<int> connection buffer (how many metrics can be queued, not written into network conn). default 30k
iobuf=<int> buffered io connection buffer in bytes. default: 2M
spoolbuf=<int> num of metrics to buffer across disk-write stalls. practically, tune this to number of metrics in a second. default: 10000
spoolmaxbytesperfile=<int> max filesize for spool files. default: 200MiB (200 * 1024 * 1024)
spoolsyncevery=<int> sync spool to disk every this many metrics. default: 10000
spoolsyncperiod=<int> sync spool to disk every this many milliseconds. default 1000
spoolsleep=<int> sleep this many microseconds(!) in between ingests from bulkdata/redo buffers into spool. default 500
unspoolsleep=<int> sleep this many microseconds(!) in between reads from the spool, when replaying spooled data. default 10



addDest <routeKey> <dest> not implemented yet

Expand Down
11 changes: 3 additions & 8 deletions destination/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ import (
"github.com/graphite-ng/carbon-relay-ng/util"
)

var bufio_buffer_size = 2000000 // in bytes. 4096 is go default

// to make sure writes to In are fast until we really can't keep up
var conn_in_buffer = 30000 // in metrics. (each metric line is typically about 70 bytes)

var keepsafe_initial_cap = 100000 // not very important

// this interval should be long enough to capture all failure modes
Expand Down Expand Up @@ -55,7 +50,7 @@ type Conn struct {
numDropBadPickle metrics.Counter
}

func NewConn(addr string, dest *Destination, periodFlush time.Duration, pickle bool) (*Conn, error) {
func NewConn(addr string, dest *Destination, periodFlush time.Duration, pickle bool, connBufSize, ioBufSize int) (*Conn, error) {
raddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
Expand All @@ -68,9 +63,9 @@ func NewConn(addr string, dest *Destination, periodFlush time.Duration, pickle b
cleanAddr := util.AddrToPath(addr)
connObj := &Conn{
conn: conn,
buffered: NewWriter(conn, bufio_buffer_size, cleanAddr),
buffered: NewWriter(conn, ioBufSize, cleanAddr),
shutdown: make(chan bool, 1), // when we write here, HandleData() may not be running anymore to read from the chan
In: make(chan []byte, conn_in_buffer),
In: make(chan []byte, connBufSize),
dest: dest,
up: true,
pickle: pickle,
Expand Down
51 changes: 39 additions & 12 deletions destination/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ type Destination struct {
cleanAddr string
periodFlush time.Duration
periodReConn time.Duration
connBufSize int // in metrics. (each metric line is typically about 70 bytes). default 30k. to make sure writes to In are fast until conn flushing can't keep up
ioBufSize int // conn io buffer in bytes. 4096 is go default. 2M is our default

SpoolBufSize int
SpoolMaxBytesPerFile int64
SpoolSyncEvery int64
SpoolSyncPeriod time.Duration
SpoolSleep time.Duration // how long to wait between stores to spool
UnspoolSleep time.Duration // how long to wait between loads from spool

// set in/via Run()
In chan []byte `json:"-"` // incoming metrics
Expand All @@ -58,23 +67,31 @@ type Destination struct {
}

// New creates a destination object. Note that it still needs to be told to run via Run().
func New(prefix, sub, regex, addr, spoolDir string, spool, pickle bool, periodFlush, periodReConn time.Duration) (*Destination, error) {
func New(prefix, sub, regex, addr, spoolDir string, spool, pickle bool, periodFlush, periodReConn time.Duration, connBufSize, ioBufSize, spoolBufSize int, spoolMaxBytesPerFile, spoolSyncEvery int64, spoolSyncPeriod, spoolSleep, unspoolSleep time.Duration) (*Destination, error) {
m, err := matcher.New(prefix, sub, regex)
if err != nil {
return nil, err
}
addr, instance := addrInstanceSplit(addr)
cleanAddr := util.AddrToPath(addr)
dest := &Destination{
Matcher: *m,
Addr: addr,
Instance: instance,
SpoolDir: spoolDir,
Spool: spool,
Pickle: pickle,
cleanAddr: cleanAddr,
periodFlush: periodFlush,
periodReConn: periodReConn,
Matcher: *m,
Addr: addr,
Instance: instance,
SpoolDir: spoolDir,
Spool: spool,
Pickle: pickle,
cleanAddr: cleanAddr,
periodFlush: periodFlush,
periodReConn: periodReConn,
connBufSize: connBufSize,
ioBufSize: ioBufSize,
SpoolBufSize: spoolBufSize,
SpoolMaxBytesPerFile: spoolMaxBytesPerFile,
SpoolSyncEvery: spoolSyncEvery,
SpoolSyncPeriod: spoolSyncPeriod,
SpoolSleep: spoolSleep,
UnspoolSleep: unspoolSleep,
}
dest.setMetrics()
return dest, nil
Expand Down Expand Up @@ -168,7 +185,17 @@ func (dest *Destination) Run() {
dest.flushErr = make(chan error)
dest.setSignalConnOnline = make(chan chan struct{})
if dest.Spool {
dest.spool = NewSpool(dest.cleanAddr, dest.SpoolDir) // TODO better naming for spool, because it won't update when addr changes
// TODO better naming for spool, because it won't update when addr changes
dest.spool = NewSpool(
dest.cleanAddr,
dest.SpoolDir,
dest.SpoolBufSize,
dest.SpoolMaxBytesPerFile,
dest.SpoolSyncEvery,
dest.SpoolSyncPeriod,
dest.SpoolSleep,
dest.UnspoolSleep,
)
}
dest.tasks = sync.WaitGroup{}
go dest.relay()
Expand All @@ -193,7 +220,7 @@ func (dest *Destination) updateConn(addr string) {
dest.inConnUpdate <- true
defer func() { dest.inConnUpdate <- false }()
addr, instance := addrInstanceSplit(addr)
conn, err := NewConn(addr, dest, dest.periodFlush, dest.Pickle)
conn, err := NewConn(addr, dest, dest.periodFlush, dest.Pickle, dest.connBufSize, dest.ioBufSize)
if err != nil {
log.Debug("dest %v: %v\n", dest.Addr, err.Error())
return
Expand Down
20 changes: 6 additions & 14 deletions destination/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,12 @@ type Spool struct {
// parameters should be tuned so that:
// can buffer packets for the duration of 1 sync
// buffer no more then needed, esp if we know the queue is slower then the ingest rate
func NewSpool(key, spoolDir string) *Spool {
func NewSpool(key, spoolDir string, bufSize int, maxBytesPerFile, syncEvery int64, syncPeriod, spoolSleep, unspoolSleep time.Duration) *Spool {
dqName := "spool_" + key
// on our virtualized box i see mean write of around 100 micros up to 250 micros, max up to 200 millis.
// in 200 millis we can get up to 10k metrics, so let's make that our queueBuffer size
// for bulk, leaving 500 micros in between every metric should be enough.
// TODO make all these configurable:
queueBuffer := 10000
maxBytesPerFile := int64(200 * 1024 * 1024)
syncEvery := int64(10000)
periodSync := 1 * time.Second
queue := nsqd.NewDiskQueue(dqName, spoolDir, maxBytesPerFile, syncEvery, periodSync).(*nsqd.DiskQueue)

spoolSleep := time.Duration(500) * time.Microsecond
unspoolSleep := time.Duration(10) * time.Microsecond
// bufSize should be tuned to be able to hold the max amount of metrics that can be received
// while the disk subsystem is doing a write/sync. Basically set it to the amount of metrics
// you receive in a second.
queue := nsqd.NewDiskQueue(dqName, spoolDir, maxBytesPerFile, syncEvery, syncPeriod).(*nsqd.DiskQueue)
s := Spool{
key: key,
InRT: make(chan []byte, 10),
Expand All @@ -58,7 +50,7 @@ func NewSpool(key, spoolDir string) *Spool {
spoolSleep: spoolSleep,
unspoolSleep: unspoolSleep,
queue: queue,
queueBuffer: make(chan []byte, queueBuffer),
queueBuffer: make(chan []byte, bufSize),
durationWrite: stats.Timer("spool=" + key + ".operation=write"),
durationBuffer: stats.Timer("spool=" + key + ".operation=buffer"),
numBuffered: stats.Gauge("spool=" + key + ".unit=Metric.status=buffered"),
Expand Down
96 changes: 95 additions & 1 deletion imperatives/imperatives.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ const (
optRegex
optFlush
optReconn
optConnBufSize
optIoBufSize
optSpoolBufSize
optSpoolMaxBytesPerFile
optSpoolSyncEvery
optSpoolSyncPeriod
optSpoolSleep
optUnspoolSleep
optPickle
optSpool
optTrue
Expand Down Expand Up @@ -80,6 +88,14 @@ var tokens = []toki.Def{
{Token: optRegex, Pattern: "regex="},
{Token: optFlush, Pattern: "flush="},
{Token: optReconn, Pattern: "reconn="},
{Token: optConnBufSize, Pattern: "connbuf="},
{Token: optIoBufSize, Pattern: "iobuf="},
{Token: optSpoolBufSize, Pattern: "spoolbuf="},
{Token: optSpoolMaxBytesPerFile, Pattern: "spoolmaxbytesperfile="},
{Token: optSpoolSyncEvery, Pattern: "spoolsyncevery="},
{Token: optSpoolSyncPeriod, Pattern: "spoolsyncperiod="},
{Token: optSpoolSleep, Pattern: "spoolsleep="},
{Token: optUnspoolSleep, Pattern: "unspoolsleep="},
{Token: optPickle, Pattern: "pickle="},
{Token: optSpool, Pattern: "spool="},
{Token: optTrue, Pattern: "true"},
Expand Down Expand Up @@ -707,8 +723,17 @@ func readDestination(s *toki.Scanner, table Table, allowMatcher bool) (dest *des
var spool, pickle bool
flush := 1000
reconn := 10000
connBufSize := 30000
ioBufSize := 2000000
spoolDir = table.GetSpoolDir()

spoolBufSize := 10000
spoolMaxBytesPerFile := int64(200 * 1024 * 1024)
spoolSyncEvery := int64(10000)
spoolSyncPeriod := time.Second
spoolSleep := time.Duration(500) * time.Microsecond
unspoolSleep := time.Duration(10) * time.Microsecond

t := s.Next()
if t.Token != word {
return nil, errors.New("addr not set for endpoint")
Expand Down Expand Up @@ -765,6 +790,75 @@ func readDestination(s *toki.Scanner, table Table, allowMatcher bool) (dest *des
if err != nil {
return nil, fmt.Errorf("unrecognized spool value '%s'", t)
}
case optConnBufSize:
if t = s.Next(); t.Token != num {
return nil, errFmtAddRoute
}
connBufSize, err = strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return nil, err
}
case optIoBufSize:
if t = s.Next(); t.Token != num {
return nil, errFmtAddRoute
}
ioBufSize, err = strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return nil, err
}
case optSpoolBufSize:
if t = s.Next(); t.Token != num {
return nil, errFmtAddRoute
}
spoolBufSize, err = strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return nil, err
}
case optSpoolMaxBytesPerFile:
if t = s.Next(); t.Token != num {
return nil, errFmtAddRoute
}
tmp, err := strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return nil, err
}
spoolMaxBytesPerFile = int64(tmp)
case optSpoolSyncEvery:
if t = s.Next(); t.Token != num {
return nil, errFmtAddRoute
}
tmp, err := strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return nil, err
}
spoolSyncEvery = int64(tmp)
case optSpoolSyncPeriod:
if t = s.Next(); t.Token != num {
return nil, errFmtAddRoute
}
tmp, err := strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return nil, err
}
spoolSyncPeriod = time.Duration(tmp) * time.Millisecond
case optSpoolSleep:
if t = s.Next(); t.Token != num {
return nil, errFmtAddRoute
}
tmp, err := strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return nil, err
}
spoolSleep = time.Duration(tmp) * time.Microsecond
case optUnspoolSleep:
if t = s.Next(); t.Token != num {
return nil, errFmtAddRoute
}
tmp, err := strconv.Atoi(strings.TrimSpace(string(t.Value)))
if err != nil {
return nil, err
}
unspoolSleep = time.Duration(tmp) * time.Microsecond
case toki.EOF:
case sep:
break
Expand All @@ -779,7 +873,7 @@ func readDestination(s *toki.Scanner, table Table, allowMatcher bool) (dest *des
return nil, fmt.Errorf("matching options (prefix, sub, and regex) not allowed for this route type")
}

return destination.New(prefix, sub, regex, addr, spoolDir, spool, pickle, periodFlush, periodReConn)
return destination.New(prefix, sub, regex, addr, spoolDir, spool, pickle, periodFlush, periodReConn, connBufSize, ioBufSize, spoolBufSize, spoolMaxBytesPerFile, spoolSyncEvery, spoolSyncPeriod, spoolSleep, unspoolSleep)
}

func ParseDestinations(destinationConfigs []string, table Table, allowMatcher bool) (destinations []*destination.Destination, err error) {
Expand Down
62 changes: 43 additions & 19 deletions ui/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,37 +154,61 @@ func removeRoute(w http.ResponseWriter, r *http.Request) (interface{}, *handlerE
return make(map[string]string), nil
}
func parseRouteRequest(r *http.Request) (route.Route, *handlerError) {
var request struct {
Address string
Key string
Pickle bool
Spool bool
Type string
Substring string
Prefix string
Regex string
var req struct {
Key string
Type string
Prefix string
Substring string
Regex string
Address string
Spool bool
Pickle bool
periodFlush int
periodReconn int
ConnBufSize int
ConnIoBufSize int
SpoolBufSize int
SpoolMaxBytesPerFile int
SpoolSyncEvery int
spoolSyncPeriod int
SpoolSleep int
UnspoolSleep int
}
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, &handlerError{err, "Couldn't parse json", http.StatusBadRequest}
}
// use hard coded defaults for flush and reconn as specified in
// readDestinations
periodFlush := time.Duration(1000) * time.Millisecond
periodReconn := time.Duration(10000) * time.Millisecond
dest, err := destination.New("", "", "", request.Address, table.SpoolDir, request.Spool, request.Pickle, periodFlush, periodReconn)
dest, err := destination.New(
"",
"",
"",
req.Address,
table.SpoolDir,
req.Spool,
req.Pickle,
time.Duration(req.periodFlush)*time.Millisecond,
time.Duration(req.periodReconn)*time.Millisecond,
req.ConnBufSize,
req.ConnIoBufSize,
req.SpoolBufSize,
int64(req.SpoolMaxBytesPerFile),
int64(req.SpoolSyncEvery),
time.Duration(req.spoolSyncPeriod)*time.Millisecond,
time.Duration(req.SpoolSleep)*time.Microsecond,
time.Duration(req.UnspoolSleep)*time.Microsecond,
)
if err != nil {
return nil, &handlerError{err, "unable to create destination", http.StatusBadRequest}
}

var ro route.Route
var e error
switch request.Type {
switch req.Type {
case "sendAllMatch":
ro, e = route.NewSendAllMatch(request.Key, request.Prefix, request.Substring, request.Regex, []*destination.Destination{dest})
ro, e = route.NewSendAllMatch(req.Key, req.Prefix, req.Substring, req.Regex, []*destination.Destination{dest})
case "sendFirstMatch":
ro, e = route.NewSendFirstMatch(request.Key, request.Prefix, request.Substring, request.Regex, []*destination.Destination{dest})
ro, e = route.NewSendFirstMatch(req.Key, req.Prefix, req.Substring, req.Regex, []*destination.Destination{dest})
default:
return nil, &handlerError{nil, "unknown route type: " + request.Type, http.StatusBadRequest}
return nil, &handlerError{nil, "unknown route type: " + req.Type, http.StatusBadRequest}
}
if e != nil {
return nil, &handlerError{e, "could not create route", http.StatusBadRequest}
Expand Down

0 comments on commit 724ba5e

Please sign in to comment.