Skip to content

Commit

Permalink
add max message size options (#296)
Browse files Browse the repository at this point in the history
* add max message size options

* remove ghz.log

* improvments for max size and docs

* working on max size in cli

* fix max size dail options

* main comment

* remove test files
  • Loading branch information
bojand committed Aug 8, 2021
1 parent a73afa9 commit d362856
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 56 deletions.
51 changes: 44 additions & 7 deletions cmd/ghz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"

"github.com/alecthomas/kingpin"
"github.com/dustin/go-humanize"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -248,16 +249,26 @@ var (
debug = kingpin.Flag("debug", "The path to debug log file.").
PlaceHolder(" ").IsSetByUser(&isDebugSet).String()

isHostSet = false
host = kingpin.Arg("host", "Host and port to test.").String()

isEnableCompressionSet = false
enableCompression = kingpin.Flag("enable-compression", "Enable Gzip compression on requests.").
Short('e').Default("false").IsSetByUser(&isEnableCompressionSet).Bool()

isLBStrategySet = false
lbStrategy = kingpin.Flag("lb-strategy", "Client load balancing strategy.").
PlaceHolder(" ").IsSetByUser(&isLBStrategySet).String()

// message size
isMaxRecvMsgSizeSet = false
maxRecvMsgSize = kingpin.Flag("max-recv-message-size", "Maximum message size the client can receive.").
PlaceHolder(" ").IsSetByUser(&isMaxRecvMsgSizeSet).String()

isMaxSendMsgSizeSet = false
maxSendMsgSize = kingpin.Flag("max-send-message-size", "Maximum message size the client can send.").
PlaceHolder(" ").IsSetByUser(&isMaxSendMsgSizeSet).String()

// host main argument
isHostSet = false
host = kingpin.Arg("host", "Host and port to test.").String()
)

func main() {
Expand Down Expand Up @@ -397,30 +408,44 @@ func createConfigFromArgs(cfg *runner.Config) error {
*md = strings.TrimSpace(*md)
if *md != "" {
if err := json.Unmarshal([]byte(*md), &metadata); err != nil {
return fmt.Errorf("Error unmarshaling metadata '%v': %v", *md, err.Error())
return fmt.Errorf("error unmarshaling metadata '%v': %v", *md, err.Error())
}
}

var dataObj interface{}
if *data != "@" && strings.TrimSpace(*data) != "" {
if err := json.Unmarshal([]byte(*data), &dataObj); err != nil {
return fmt.Errorf("Error unmarshaling data '%v': %v", *data, err.Error())
return fmt.Errorf("error unmarshaling data '%v': %v", *data, err.Error())
}
}

var tagsMap map[string]string
*tags = strings.TrimSpace(*tags)
if *tags != "" {
if err := json.Unmarshal([]byte(*tags), &tagsMap); err != nil {
return fmt.Errorf("Error unmarshaling tags '%v': %v", *tags, err.Error())
return fmt.Errorf("error unmarshaling tags '%v': %v", *tags, err.Error())
}
}

var rmdMap map[string]string
*rmd = strings.TrimSpace(*rmd)
if *rmd != "" {
if err := json.Unmarshal([]byte(*rmd), &rmdMap); err != nil {
return fmt.Errorf("Error unmarshaling reflection metadata '%v': %v", *rmd, err.Error())
return fmt.Errorf("error unmarshaling reflection metadata '%v': %v", *rmd, err.Error())
}
}

if isMaxRecvMsgSizeSet {
_, err := humanize.ParseBytes(*maxRecvMsgSize)
if err != nil {
return errors.New("invalid max call recv message size: " + err.Error())
}
}

if isMaxSendMsgSizeSet {
_, err := humanize.ParseBytes(*maxSendMsgSize)
if err != nil {
return errors.New("invalid max call send message size: " + err.Error())
}
}

Expand Down Expand Up @@ -480,6 +505,8 @@ func createConfigFromArgs(cfg *runner.Config) error {
cfg.CMaxDuration = runner.Duration(*cMaxDuration)
cfg.CountErrors = *countErrors
cfg.LBStrategy = *lbStrategy
cfg.MaxCallRecvMsgSize = *maxRecvMsgSize
cfg.MaxCallSendMsgSize = *maxSendMsgSize

return nil
}
Expand Down Expand Up @@ -723,6 +750,16 @@ func mergeConfig(dest *runner.Config, src *runner.Config) error {
dest.CMaxDuration = src.CMaxDuration
}

// message size

if isMaxRecvMsgSizeSet {
dest.MaxCallRecvMsgSize = src.MaxCallRecvMsgSize
}

if isMaxSendMsgSizeSet {
dest.MaxCallSendMsgSize = src.MaxCallSendMsgSize
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/alecthomas/kingpin v1.3.8-0.20191105203113-8c96d1c22481
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751
github.com/bojand/hri v1.1.0
github.com/dustin/go-humanize v1.0.0
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.16.0 // indirect
github.com/go-playground/validator v9.30.0+incompatible
Expand All @@ -29,6 +30,6 @@ require (
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/tools v0.0.0-20200812195022-5ae4c3c160a0
google.golang.org/grpc v1.34.0
google.golang.org/protobuf v1.25.0 // indirect
google.golang.org/protobuf v1.25.0
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
)
23 changes: 2 additions & 21 deletions go.sum

Large diffs are not rendered by default.

26 changes: 22 additions & 4 deletions runner/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/jinzhu/configor"

humanize "github.com/dustin/go-humanize"
)

// Duration is our duration with TOML support
Expand Down Expand Up @@ -112,22 +114,24 @@ type Config struct {
LoadStepDuration Duration `json:"load-step-duration" toml:"load-step-duration" yaml:"load-step-duration"`
LoadMaxDuration Duration `json:"load-max-duration" toml:"load-max-duration" yaml:"load-max-duration"`
LBStrategy string `json:"lb-strategy" toml:"lb-strategy" yaml:"lb-strategy"`
MaxCallRecvMsgSize string `json:"max-recv-message-size" toml:"max-recv-message-size" yaml:"max-recv-message-size"`
MaxCallSendMsgSize string `json:"max-send-message-size" toml:"max-send-message-size" yaml:"max-send-message-size"`
}

func checkData(data interface{}) error {
_, isObjData := data.(map[string]interface{})
if !isObjData {
arrData, isArrData := data.([]interface{})
if !isArrData {
return errors.New("Unsupported type for Data")
return errors.New("unsupported type for Data")
}
if len(arrData) == 0 {
return errors.New("Data array must not be empty")
return errors.New("data array must not be empty")
}
for _, elem := range arrData {
_, isObjData = elem.(map[string]interface{})
if !isObjData {
return errors.New("Data array contains unsupported type")
return errors.New("data array contains unsupported type")
}
}

Expand All @@ -152,7 +156,7 @@ func LoadConfig(p string, c *Config) error {
for k, v := range objData {
sk, isString := k.(string)
if !isString {
return errors.New("Data key must string")
return errors.New("data key must string")
}
if len(sk) > 0 {
nd[sk] = v
Expand All @@ -174,5 +178,19 @@ func LoadConfig(p string, c *Config) error {
c.ZStop = "close"
}

if c.MaxCallRecvMsgSize != "" {
_, err = humanize.ParseBytes(c.MaxCallRecvMsgSize)
if err != nil {
return errors.New("invalid max call recv message size: " + err.Error())
}
}

if c.MaxCallSendMsgSize != "" {
_, err = humanize.ParseBytes(c.MaxCallSendMsgSize)
if err != nil {
return errors.New("invalid max call send message size: " + err.Error())
}
}

return nil
}
17 changes: 12 additions & 5 deletions runner/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,21 @@ func TestConfig_Load(t *testing.T) {
Data: map[string]interface{}{
"f_strings": []interface{}{"123", "456"},
},
Format: "summary",
DialTimeout: Duration(10 * time.Second),
LoadSchedule: "const",
CSchedule: "const",
CStart: 1,
Format: "summary",
DialTimeout: Duration(10 * time.Second),
LoadSchedule: "const",
CSchedule: "const",
CStart: 1,
MaxCallRecvMsgSize: "1024mb",
MaxCallSendMsgSize: "2000mib",
},
true,
},
{
"invalid message size",
&Config{},
false,
},
}

for i, tt := range tests {
Expand Down
50 changes: 43 additions & 7 deletions runner/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ import (
"github.com/bojand/ghz/load"
"github.com/jhump/protoreflect/desc"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"

humanize "github.com/dustin/go-humanize"
)

// BinaryDataFunc is a function that can be used for provide binary data for request programatically.
Expand All @@ -38,13 +41,14 @@ const ScheduleLine = "line"
// RunConfig represents the request Configs
type RunConfig struct {
// call settings
call string
host string
proto string
importPaths []string
protoset string
protosetBinary []byte
enableCompression bool
call string
host string
proto string
importPaths []string
protoset string
protosetBinary []byte
enableCompression bool
defaultCallOptions []grpc.CallOption

// security settings
creds credentials.TransportCredentials
Expand Down Expand Up @@ -1044,6 +1048,15 @@ func WithStreamMessageProvider(fn StreamMessageProviderFunc) Option {
}
}

// WithDefaultCallOptions sets the default CallOptions for calls over the connection.
func WithDefaultCallOptions(opts []grpc.CallOption) Option {
return func(o *RunConfig) error {
o.defaultCallOptions = opts

return nil
}
}

func createClientTransportCredentials(skipVerify bool, cacertFile, clientCertFile, clientKeyFile, cname string) (credentials.TransportCredentials, error) {
var tlsConf tls.Config

Expand Down Expand Up @@ -1146,6 +1159,29 @@ func fromConfig(cfg *Config) []Option {
},
)

var defaultCallOptions []grpc.CallOption
if cfg.MaxCallRecvMsgSize != "" {
v, err := humanize.ParseBytes(cfg.MaxCallRecvMsgSize)
if err != nil {
return nil
}

defaultCallOptions = append(defaultCallOptions, grpc.MaxCallRecvMsgSize(int(v)))
}

if cfg.MaxCallSendMsgSize != "" {
v, err := humanize.ParseBytes(cfg.MaxCallSendMsgSize)
if err != nil {
return nil
}

defaultCallOptions = append(defaultCallOptions, grpc.MaxCallSendMsgSize(int(v)))
}

if len(defaultCallOptions) > 0 {
options = append(options, WithDefaultCallOptions(defaultCallOptions))
}

if strings.TrimSpace(cfg.MetadataPath) != "" {
options = append(options, WithMetadataFromFile(strings.TrimSpace(cfg.MetadataPath)))
}
Expand Down
20 changes: 20 additions & 0 deletions runner/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)

func TestRunConfig_newRunConfig(t *testing.T) {
Expand Down Expand Up @@ -496,6 +497,25 @@ func TestRunConfig_newRunConfig(t *testing.T) {
assert.Error(t, err)
})

t.Run("with max size", func(t *testing.T) {
c, err := NewConfig(" call ", " localhost:50050 ",
WithProtoFile("testdata/data.proto", []string{}),
WithDefaultCallOptions(
[]grpc.CallOption{
grpc.MaxCallSendMsgSize(1024000),
grpc.MaxCallRecvMsgSize(2048000),
},
),
)

assert.NoError(t, err)

assert.Equal(t, []grpc.CallOption{
grpc.MaxCallSendMsgSize(1024000),
grpc.MaxCallRecvMsgSize(2048000),
}, c.defaultCallOptions)
})

t.Run("with load step", func(t *testing.T) {
t.Run("no step", func(t *testing.T) {
_, err := NewConfig(" call ", " localhost:50050 ",
Expand Down
19 changes: 12 additions & 7 deletions runner/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,18 @@ func (b *Requester) newClientConn(withStatsHandler bool) (*grpc.ClientConn, erro
opts = append(opts, grpc.WithAuthority(b.config.authority))
}

if len(b.config.defaultCallOptions) > 0 {
opts = append(opts, grpc.WithDefaultCallOptions(b.config.defaultCallOptions...))
} else {
// increase max receive and send message sizes
opts = append(opts,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(math.MaxInt32),
grpc.MaxCallSendMsgSize(math.MaxInt32),
))

}

ctx := context.Background()
ctx, _ = context.WithTimeout(ctx, b.config.dialTimeout)
// cancel is ignored here as connection.Close() is used.
Expand Down Expand Up @@ -327,13 +339,6 @@ func (b *Requester) newClientConn(withStatsHandler bool) (*grpc.ClientConn, erro
b.config.log.Debugw("Creating client connection", "options", opts)
}

// increase max receive and send message sizes
opts = append(opts,
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(math.MaxInt32),
grpc.MaxCallSendMsgSize(math.MaxInt32),
))

if b.config.lbStrategy != "" {
opts = append(opts, grpc.WithBalancerName(b.config.lbStrategy))
}
Expand Down
10 changes: 6 additions & 4 deletions testdata/config/config5.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
"proto": "grpcbin.proto",
"call": "grpcbin.GRPCBin.DummyUnary",
"host": "127.0.0.1:9000",
"duration":"20s",
"max-duration":"60s",
"stream-interval":"25s",
"timeout":"30s",
"duration": "20s",
"max-duration": "60s",
"stream-interval": "25s",
"timeout": "30s",
"max-recv-message-size": "1024mb",
"max-send-message-size": "2000mib",
"data": {
"f_strings": [
"123",
Expand Down
2 changes: 2 additions & 0 deletions testdata/config/config5.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ duration = "20s"
max-duration = "60s"
stream-interval = "25s"
timeout = "30s"
max-recv-message-size = "1024mb"
max-send-message-size = "2000mib"

[data]
f_strings = [
Expand Down

0 comments on commit d362856

Please sign in to comment.