Skip to content

Commit

Permalink
add version&noop for memcache binary protocol (#102)
Browse files Browse the repository at this point in the history
* add version&noop for memcache binary protocol

* complete memcache binary protocol.

* fix memcache text request key&data no copy bug.

* fix go sum

* rm go sum for etcd.io checksum
  • Loading branch information
felixhao authored and Yiming Yu committed Sep 29, 2019
1 parent 73ec50d commit eb1e8a6
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 131 deletions.
4 changes: 4 additions & 0 deletions proxy/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Overlord-proxy

## Version 1.9.0
1. complete memcache binary protocol.
2. fix memcache text request key&data no copy bug.

## Version 1.8.5
1. add slowlog file rotate based on size
2. add slowlog file total limit
Expand Down
27 changes: 25 additions & 2 deletions proxy/proto/memcache/binary/node_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ func (n *nodeConn) Write(m *proto.Message) (err error) {
err = errors.WithStack(ErrAssertReq)
return
}
if _, ok := noNeedNodeTypes[mcr.respType]; ok {
return
}

_ = n.bw.Write(magicReqBytes)

cmd := mcr.respType
if cmd == RequestTypeGetQ || cmd == RequestTypeGetKQ {
cmd = RequestTypeGetK
if noq, ok := qReplaceNoQTypes[cmd]; ok {
cmd = noq
}
_ = n.bw.Write(cmd.Bytes())
_ = n.bw.Write(mcr.keyLen)
Expand Down Expand Up @@ -100,6 +104,15 @@ func (n *nodeConn) Read(m *proto.Message) (err error) {
return
}
mcr.data = mcr.data[:0]

if _, ok := noNeedNodeTypes[mcr.respType]; ok {
if mcr.respType == RequestTypeVersion {
versionRespHeader(mcr)
mcr.data = append(mcr.data, versionRespBytes...)
}
return
}

REREAD:
var bs []byte
if bs, err = n.br.ReadExact(requestHeaderLen); err == bufio.ErrBufferFull {
Expand Down Expand Up @@ -143,3 +156,13 @@ func (n *nodeConn) Close() error {
func (n *nodeConn) Closed() bool {
return atomic.LoadInt32(&n.state) == closed
}

func versionRespHeader(req *MCRequest) {
req.magic = magicResp
copy(req.keyLen, zeroTwoBytes)
copy(req.extraLen, zeroBytes)
copy(req.status, zeroTwoBytes)
copy(req.bodyLen, versionFourBytes)
copy(req.opaque, zeroFourBytes)
copy(req.cas, zeroEightBytes)
}
22 changes: 16 additions & 6 deletions proxy/proto/memcache/binary/proxy_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
// memcached binary protocol: https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped
const (
requestHeaderLen = 24
proxyReadBufSize = 1024
)

type proxyConn struct {
Expand All @@ -27,7 +28,7 @@ type proxyConn struct {
func NewProxyConn(rw *libnet.Conn) proto.ProxyConn {
p := &proxyConn{
// TODO: optimus zero
br: bufio.NewReader(rw, bufio.Get(1024)),
br: bufio.NewReader(rw, bufio.Get(proxyReadBufSize)),
bw: bufio.NewWriter(rw),
completed: true,
}
Expand Down Expand Up @@ -80,21 +81,30 @@ NEXTGET:
return
}
switch req.respType {
case RequestTypeNoop, RequestTypeVersion, RequestTypeQuit, RequestTypeQuitQ:
req.key = req.key[:0]
req.data = req.data[:0]
return
case RequestTypeSet, RequestTypeAdd, RequestTypeReplace, RequestTypeGet, RequestTypeGetK,
RequestTypeDelete, RequestTypeIncr, RequestTypeDecr, RequestTypeAppend, RequestTypePrepend, RequestTypeTouch, RequestTypeGat:
RequestTypeDelete, RequestTypeIncr, RequestTypeDecr, RequestTypeAppend, RequestTypePrepend,
RequestTypeTouch, RequestTypeGat:
if err = p.decodeCommon(m, req); err == bufio.ErrBufferFull {
p.br.Advance(-requestHeaderLen)
return
}
return
case RequestTypeGetQ, RequestTypeGetKQ:
case RequestTypeGetQ, RequestTypeGetKQ, RequestTypeSetQ, RequestTypeAddQ, RequestTypeReplaceQ,
RequestTypeIncrQ, RequestTypeDecrQ, RequestTypeAppendQ, RequestTypePrependQ:
REQAGAIN:
if err = p.decodeCommon(m, req); err == bufio.ErrBufferFull {
p.br.Advance(-requestHeaderLen)
return
if err = p.br.Read(); err != nil {
return
}
goto REQAGAIN // NOTE: try to read again for this request
}
goto NEXTGET
}
err = errors.Wrapf(ErrBadRequest, "MC decoder unsupport command:%x", req.respType)
err = errors.Wrapf(ErrBadRequest, "MC decoder unsupport command:%d", req.respType)
return
}

Expand Down

0 comments on commit eb1e8a6

Please sign in to comment.