Skip to content

Commit

Permalink
add slowlog file rotate (#107)
Browse files Browse the repository at this point in the history
* - add slowlog file rotate based on size
- add slowlog file total limit

* - increasing node connection inputs size
- update changelog
- update version

* - update slowlog config to command line flag
  • Loading branch information
liuhao1024 authored and Yiming Yu committed Sep 17, 2019
1 parent 00d867e commit 73ec50d
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 51 deletions.
22 changes: 13 additions & 9 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ import (
)

var (
check bool
stat string
metrics bool
confFile string
clusterConfFile string
reload bool
slowlogFile string
slowlogSlowerThan int
check bool
stat string
metrics bool
confFile string
clusterConfFile string
reload bool
slowlogFile string
slowlogSlowerThan int
slowlogMaxBytes int
slowlogBackupCount int
)

type clustersFlag []string
Expand Down Expand Up @@ -54,6 +56,8 @@ func init() {
flag.BoolVar(&reload, "reload", false, "reloading the servers in cluster config file.")
flag.StringVar(&slowlogFile, "slowlog", "", "slowlog is the file where slowlog output")
flag.IntVar(&slowlogSlowerThan, "slower-than", 0, "slower-than is the microseconds which slowlog must slower than.")
flag.IntVar(&slowlogMaxBytes, "slower-max-bytes", 500000000, "slower-max-bytes is maximum size of slow log file.")
flag.IntVar(&slowlogBackupCount, "slower-backup-count", 7, "slower-backup-count is maximum backup count of slow log file.")
}

func main() {
Expand All @@ -71,7 +75,7 @@ func main() {
defer log.Close()
}
// init slowlog if need
err := slowlog.Init(slowlogFile)
err := slowlog.Init(slowlogFile, slowlogMaxBytes, slowlogBackupCount)
if err != nil {
log.Errorf("fail to init slowlog due %s", err)
}
Expand Down
5 changes: 5 additions & 0 deletions proxy/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Overlord-proxy

## Version 1.8.5
1. add slowlog file rotate based on size
2. add slowlog file total limit
3. increasing node connection inputs size

## Version 1.8.4
1. fix time record MarkEndInput, which reduce memory consumption by 60%
2. try fetch cluster nodes when key moved
Expand Down
2 changes: 1 addition & 1 deletion proxy/proto/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewNodeConnPipe(conns int32, newNc func() NodeConn) (ncp *NodeConnPipe) {
errCh: make(chan error, 1),
}
for i := int32(0); i < ncp.conns; i++ {
ncp.inputs[i] = make(chan *Message, pipeMaxCount*pipeMaxCount)
ncp.inputs[i] = make(chan *Message, pipeMaxCount*pipeMaxCount*16)
ncp.mps[i] = newMsgPipe(ncp.inputs[i], newNc, ncp.errCh)
}
return
Expand Down
118 changes: 86 additions & 32 deletions proxy/slowlog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package slowlog

import (
"bufio"
"encoding/json"
"fmt"
"os"
"time"

"encoding/json"
"overlord/pkg/log"
"overlord/proxy/proto"
)
Expand All @@ -19,6 +20,11 @@ type fileHandler struct {
encoder *json.Encoder
exchange chan *proto.SlowlogEntry
flushInterval time.Duration

fileName string
curBytes int
maxBytes int
backupCount int
}

func (f *fileHandler) save(cluster string, entry *proto.SlowlogEntry) {
Expand All @@ -29,63 +35,111 @@ func (f *fileHandler) save(cluster string, entry *proto.SlowlogEntry) {
}
}

func (f *fileHandler) openFile(file string) error {
var (
fd *os.File
err error
)
if _, err = os.Stat(file); os.IsNotExist(err) {
func (f *fileHandler) openFile() error {
if _, err := os.Stat(f.fileName); os.IsNotExist(err) {
// path/to/whatever does not exist
fd, err = os.Create(file)
f.fd, err = os.Create(f.fileName)
if err != nil {
return err
}
} else {
fd, err = os.OpenFile(file, os.O_APPEND|os.O_WRONLY, 0755)
f.fd, err = os.OpenFile(f.fileName, os.O_APPEND|os.O_WRONLY, 0755)
if err != nil {
return err
}
}

fdStat, err := f.fd.Stat()
if err != nil {
return err
}
f.curBytes = int(fdStat.Size())

f.fd = fd
f.wr = bufio.NewWriter(f.fd)
f.wr = bufio.NewWriterSize(f.fd, 40960)
f.encoder = json.NewEncoder(f.wr)

go func() {
defer f.fd.Close()
var ticker = time.NewTicker(f.flushInterval)
return nil
}

func (f *fileHandler) rotate() {
fdStat, err := f.fd.Stat()
if err != nil {
return
}

if f.maxBytes <= 0 {
return
} else if fdStat.Size() < int64(f.maxBytes) {
f.curBytes = int(fdStat.Size())
return
}

if f.backupCount > 0 {
_ = f.fd.Close()

for i := f.backupCount - 1; i > 0; i-- {
sfn := fmt.Sprintf("%s.%d", f.fileName, i)
dfn := fmt.Sprintf("%s.%d", f.fileName, i+1)
_ = os.Rename(sfn, dfn)
}

dfn := fmt.Sprintf("%s.1", f.fileName)
_ = os.Rename(f.fileName, dfn)

err := f.openFile()
if err != nil {
return
}
}
}

func (f *fileHandler) close() error {
if f.fd != nil {
return f.fd.Close()
}
return nil
}

for {
select {
case entry := <-f.exchange:
err := f.encoder.Encode(entry)
func (f *fileHandler) run() {
defer f.close()
ticker := time.NewTicker(f.flushInterval)

for {
select {
case entry := <-f.exchange:
err := f.encoder.Encode(entry)
if err != nil {
log.Errorf("fail to write slowlog into file due %s", err)
return
}
case <-ticker.C:
f.rotate() // check slowlog file size and rotate slowlog file
if f.wr.Buffered() > 0 {
err := f.wr.Flush()
if err != nil {
log.Errorf("fail to write slowlog into file due %s", err)
log.Errorf("fail to flush slowlog due %s", err)
return
}
case <-ticker.C:
if f.wr.Size() > 0 {
err := f.wr.Flush()
if err != nil {
log.Errorf("fail to flush slowlog due %s", err)
return
}
}
}
}
}()

return nil
}
}

var fh *fileHandler

// initFileHandler will init the file handler to the given file
func initFileHandler(file string) error {
func initFileHandler(fileName string, maxBytes int, backupCount int) error {
fh = &fileHandler{
exchange: make(chan *proto.SlowlogEntry, 2048),
flushInterval: time.Second * 5,
maxBytes: maxBytes,
backupCount: backupCount,
fileName: fileName,
}
err := fh.openFile()
if err != nil {
return err
}
return fh.openFile(file)
go fh.run()
return err
}
13 changes: 7 additions & 6 deletions proxy/slowlog/slowlog.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package slowlog

import (
"overlord/pkg/log"
"overlord/proxy/proto"
"sync"
"sync/atomic"

"overlord/pkg/log"
"overlord/proxy/proto"
)

const slowlogMaxCount = 1024
Expand Down Expand Up @@ -89,11 +90,11 @@ func Get(name string) Handler {
}

// Init slowlog with file and http
func Init(file string) error {
func Init(fileName string, maxBytes int, backupCount int) error {
registerSlowlogHTTP()
if file == "" {
if fileName == "" {
return nil
}
log.Infof("setup slowlog for file [%s]", file)
return initFileHandler(file)
log.Infof("setup slowlog for file [%s]", fileName)
return initFileHandler(fileName, maxBytes, backupCount)
}
6 changes: 3 additions & 3 deletions version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
const (
OverlordMajor = 1
OverlordMinor = 8
OverlordPatch = 4
OverlordPatch = 5
)

var (
showVersion bool
vstr string
vbytes []byte
vstr string
vbytes []byte
)

func init() {
Expand Down

0 comments on commit 73ec50d

Please sign in to comment.