Skip to content

Commit

Permalink
Merge pull request #254 from bojand/stream_close_data_perf
Browse files Browse the repository at this point in the history
Additional stream options
  • Loading branch information
bojand committed Jan 2, 2021
2 parents 33676b4 + ac7339a commit 37484c7
Show file tree
Hide file tree
Showing 21 changed files with 3,025 additions and 617 deletions.
112 changes: 58 additions & 54 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
[![Donate](https://img.shields.io/badge/Donate-PayPal-green.svg?style=flat-square)](https://www.paypal.me/bojandj)
[![Buy me a coffee](https://img.shields.io/badge/buy%20me-a%20coffee-orange.svg?style=flat-square)](https://www.buymeacoffee.com/bojand)

Simple [gRPC](http://grpc.io/) benchmarking and load testing tool inspired by [hey](https://github.com/rakyll/hey/) and [grpcurl](https://github.com/fullstorydev/grpcurl).
[gRPC](http://grpc.io/) benchmarking and load testing tool.

## Documentation

Expand Down Expand Up @@ -59,62 +59,66 @@ go build .
usage: ghz [<flags>] [<host>]
Flags:
-h, --help Show context-sensitive help (also try --help-long and --help-man).
--config= Path to the JSON or TOML config file that specifies all the test run settings.
--proto= The Protocol Buffer .proto file.
--protoset= The compiled protoset file. Alternative to proto. -proto takes precedence.
--call= A fully-qualified method name in 'package.Service/method' or 'package.Service.Method' format.
-i, --import-paths= Comma separated list of proto import paths. The current working directory and the directory of the protocol buffer file are automatically added to the import list.
--cacert= File containing trusted root certificates for verifying the server.
--cert= File containing client certificate (public key), to present to the server. Must also provide -key option.
--key= File containing client private key, to present to the server. Must also provide -cert option.
--cname= Server name override when validating TLS certificate - useful for self signed certs.
--skipTLS Skip TLS client verification of the server's certificate chain and host name.
--insecure Use plaintext and insecure connection.
--authority= Value to be used as the :authority pseudo-header. Only works if -insecure is used.
--async Make requests asynchronous as soon as possible. Does not wait for request to finish before sending next one.
-r, --rps=0 Requests per second (RPS) rate limit for constant load schedule. Default is no rate limit.
--load-schedule="const" Specifies the load schedule. Options are const, step, or line. Default is const.
--load-start=0 Specifies the RPS load start value for step or line schedules.
--load-step=0 Specifies the load step value or slope value.
--load-end=0 Specifies the load end value for step or line load schedules.
--load-step-duration=0 Specifies the load step duration value for step load schedule.
--load-max-duration=0 Specifies the max load duration value for step or line load schedule.
-c, --concurrency=50 Number of request workers to run concurrently for const concurrency schedule. Default is 50.
-h, --help Show context-sensitive help (also try --help-long and --help-man).
--config= Path to the JSON or TOML config file that specifies all the test run settings.
--proto= The Protocol Buffer .proto file.
--protoset= The compiled protoset file. Alternative to proto. -proto takes precedence.
--call= A fully-qualified method name in 'package.Service/method' or 'package.Service.Method' format.
-i, --import-paths= Comma separated list of proto import paths. The current working directory and the directory of the protocol buffer file are automatically added to the import list.
--cacert= File containing trusted root certificates for verifying the server.
--cert= File containing client certificate (public key), to present to the server. Must also provide -key option.
--key= File containing client private key, to present to the server. Must also provide -cert option.
--cname= Server name override when validating TLS certificate - useful for self signed certs.
--skipTLS Skip TLS client verification of the server's certificate chain and host name.
--insecure Use plaintext and insecure connection.
--authority= Value to be used as the :authority pseudo-header. Only works if -insecure is used.
--async Make requests asynchronous as soon as possible. Does not wait for request to finish before sending next one.
-r, --rps=0 Requests per second (RPS) rate limit for constant load schedule. Default is no rate limit.
--load-schedule="const" Specifies the load schedule. Options are const, step, or line. Default is const.
--load-start=0 Specifies the RPS load start value for step or line schedules.
--load-step=0 Specifies the load step value or slope value.
--load-end=0 Specifies the load end value for step or line load schedules.
--load-step-duration=0 Specifies the load step duration value for step load schedule.
--load-max-duration=0 Specifies the max load duration value for step or line load schedule.
-c, --concurrency=50 Number of request workers to run concurrently for const concurrency schedule. Default is 50.
--concurrency-schedule="const"
Concurrency change schedule. Options are const, step, or line. Default is const.
--concurrency-start=0 Concurrency start value for step and line concurrency schedules.
--concurrency-end=0 Concurrency end value for step and line concurrency schedules.
--concurrency-step=1 Concurrency step / slope value for step and line concurrency schedules.
Concurrency change schedule. Options are const, step, or line. Default is const.
--concurrency-start=0 Concurrency start value for step and line concurrency schedules.
--concurrency-end=0 Concurrency end value for step and line concurrency schedules.
--concurrency-step=1 Concurrency step / slope value for step and line concurrency schedules.
--concurrency-step-duration=0
Specifies the concurrency step duration value for step concurrency schedule.
Specifies the concurrency step duration value for step concurrency schedule.
--concurrency-max-duration=0
Specifies the max concurrency adjustment duration value for step or line concurrency schedule.
-n, --total=200 Number of requests to run. Default is 200.
-t, --timeout=20s Timeout for each request. Default is 20s, use 0 for infinite.
-z, --duration=0 Duration of application to send requests. When duration is reached, application stops and exits. If duration is specified, n is ignored. Examples: -z 10s -z 3m.
-x, --max-duration=0 Maximum duration of application to send requests with n setting respected. If duration is reached before n requests are completed, application stops and exits. Examples: -x 10s -x 3m.
--duration-stop="close" Specifies how duration stop is reported. Options are close, wait or ignore. Default is close.
-d, --data= The call data as stringified JSON. If the value is '@' then the request contents are read from stdin.
-D, --data-file= File path for call data JSON file. Examples: /home/user/file.json or ./file.json.
-b, --binary The call data comes as serialized binary message or multiple count-prefixed messages read from stdin.
-B, --binary-file= File path for the call data as serialized binary message or multiple count-prefixed messages.
-m, --metadata= Request metadata as stringified JSON.
-M, --metadata-file= File path for call metadata JSON file. Examples: /home/user/metadata.json or ./metadata.json.
--stream-interval=0 Interval for stream requests between message sends.
--reflect-metadata= Reflect metadata as stringified JSON used only for reflection request.
-o, --output= Output path. If none provided stdout is used.
-O, --format= Output format. One of: summary, csv, json, pretty, html, influx-summary, influx-details. Default is summary.
--skipFirst=0 Skip the first X requests when doing the results tally.
--connections=1 Number of connections to use. Concurrency is distributed evenly among all the connections. Default is 1.
--connect-timeout=10s Connection timeout for the initial connection dial. Default is 10s.
--keepalive=0 Keepalive time duration. Only used if present and above 0.
--name= User specified name for the test.
--tags= JSON representation of user-defined string tags.
--cpus=12 Number of cpu cores to use.
--debug= The path to debug log file.
-e, --enable-compression Enable Gzip compression on requests.
-v, --version Show application version.
Specifies the max concurrency adjustment duration value for step or line concurrency schedule.
-n, --total=200 Number of requests to run. Default is 200.
-t, --timeout=20s Timeout for each request. Default is 20s, use 0 for infinite.
-z, --duration=0 Duration of application to send requests. When duration is reached, application stops and exits. If duration is specified, n is ignored. Examples: -z 10s -z 3m.
-x, --max-duration=0 Maximum duration of application to send requests with n setting respected. If duration is reached before n requests are completed, application stops and exits. Examples: -x 10s -x 3m.
--duration-stop="close" Specifies how duration stop is reported. Options are close, wait or ignore. Default is close.
-d, --data= The call data as stringified JSON. If the value is '@' then the request contents are read from stdin.
-D, --data-file= File path for call data JSON file. Examples: /home/user/file.json or ./file.json.
-b, --binary The call data comes as serialized binary message or multiple count-prefixed messages read from stdin.
-B, --binary-file= File path for the call data as serialized binary message or multiple count-prefixed messages.
-m, --metadata= Request metadata as stringified JSON.
-M, --metadata-file= File path for call metadata JSON file. Examples: /home/user/metadata.json or ./metadata.json.
--stream-interval=0 Interval for stream requests between message sends.
--stream-call-duration=0 Duration after which client will close the stream in each streaming call.
--stream-call-count=0 Count of messages sent, after which client will close the stream in each streaming call.
--stream-dynamic-messages In streaming calls, regenerate and apply call template data on every message send.
--reflect-metadata= Reflect metadata as stringified JSON used only for reflection request.
-o, --output= Output path. If none provided stdout is used.
-O, --format= Output format. One of: summary, csv, json, pretty, html, influx-summary, influx-details. Default is summary.
--skipFirst=0 Skip the first X requests when doing the results tally.
--count-errors Count erroneous (non-OK) resoponses in stats calculations.
--connections=1 Number of connections to use. Concurrency is distributed evenly among all the connections. Default is 1.
--connect-timeout=10s Connection timeout for the initial connection dial. Default is 10s.
--keepalive=0 Keepalive time duration. Only used if present and above 0.
--name= User specified name for the test.
--tags= JSON representation of user-defined string tags.
--cpus=12 Number of cpu cores to use.
--debug= The path to debug log file.
-e, --enable-compression Enable Gzip compression on requests.
-v, --version Show application version.
Args:
[<host>] Host and port to test.
Expand Down
44 changes: 40 additions & 4 deletions cmd/ghz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ var (
si = kingpin.Flag("stream-interval", "Interval for stream requests between message sends.").
Default("0").IsSetByUser(&isSISet).Duration()

isSCSet = false
scd = kingpin.Flag("stream-call-duration", "Duration after which client will close the stream in each streaming call.").
Default("0").IsSetByUser(&isSCSet).Duration()

isSCCSet = false
scc = kingpin.Flag("stream-call-count", "Count of messages sent, after which client will close the stream in each streaming call.").
Default("0").IsSetByUser(&isSCCSet).Uint()

isSDMSet = false
sdm = kingpin.Flag("stream-dynamic-messages", "In streaming calls, regenerate and apply call template data on every message send.").
Default("false").IsSetByUser(&isSDMSet).Bool()

isRMDSet = false
rmd = kingpin.Flag("reflect-metadata", "Reflect metadata as stringified JSON used only for reflection request.").
PlaceHolder(" ").IsSetByUser(&isRMDSet).String()
Expand All @@ -201,6 +213,10 @@ var (
skipFirst = kingpin.Flag("skipFirst", "Skip the first X requests when doing the results tally.").
Default("0").IsSetByUser(&isSkipFirstSet).Uint()

isCESet = false
countErrors = kingpin.Flag("count-errors", "Count erroneous (non-OK) resoponses in stats calculations.").
Default("false").IsSetByUser(&isCESet).Bool()

// Connection
isConnSet = false
conns = kingpin.Flag("connections", "Number of connections to use. Concurrency is distributed evenly among all the connections. Default is 1.").
Expand Down Expand Up @@ -426,6 +442,9 @@ func createConfigFromArgs(cfg *runner.Config) error {
cfg.Metadata = metadata
cfg.MetadataPath = *mdPath
cfg.SI = runner.Duration(*si)
cfg.StreamCallDuration = runner.Duration(*scd)
cfg.StreamCallCount = *scc
cfg.StreamDynamicMessages = *sdm
cfg.Output = *output
cfg.Format = *format
cfg.ImportPaths = iPaths
Expand All @@ -451,6 +470,7 @@ func createConfigFromArgs(cfg *runner.Config) error {
cfg.CEnd = *cEnd
cfg.CStepDuration = runner.Duration(*cStepDuration)
cfg.CMaxDuration = runner.Duration(*cMaxDuration)
cfg.CountErrors = *countErrors

return nil
}
Expand Down Expand Up @@ -492,10 +512,6 @@ func mergeConfig(dest *runner.Config, src *runner.Config) error {
dest.SkipTLSVerify = src.SkipTLSVerify
}

if isSkipFirstSet {
dest.SkipFirst = src.SkipFirst
}

if isInsecSet {
dest.Insecure = src.Insecure
}
Expand All @@ -508,6 +524,14 @@ func mergeConfig(dest *runner.Config, src *runner.Config) error {
dest.CName = src.CName
}

if isSkipFirstSet {
dest.SkipFirst = src.SkipFirst
}

if isCESet {
dest.CountErrors = src.CountErrors
}

// run

if isNSet {
Expand Down Expand Up @@ -562,6 +586,18 @@ func mergeConfig(dest *runner.Config, src *runner.Config) error {
dest.SI = src.SI
}

if isSCSet {
dest.StreamCallDuration = src.StreamCallDuration
}

if isSCCSet {
dest.StreamCallCount = src.StreamCallCount
}

if isSDMSet {
dest.StreamDynamicMessages = src.StreamDynamicMessages
}

if isOutputSet {
dest.Output = src.Output
}
Expand Down
84 changes: 72 additions & 12 deletions internal/helloworld/greeter_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

context "golang.org/x/net/context"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)

Expand All @@ -30,17 +31,20 @@ var Bidi CallType = "bidi"

// Greeter implements the GreeterServer for tests
type Greeter struct {
streamData []*HelloReply
StreamData []*HelloReply

Stats *HWStatsHandler

mutex *sync.RWMutex
callCounts map[CallType]int
calls map[CallType][][]*HelloRequest

sendMutex *sync.RWMutex
sendCounts map[CallType]map[int]int
}

func randomSleep() {
msCount := rand.Intn(4) + 1
func randomSleep(max int) {
msCount := rand.Intn(max) + 1
time.Sleep(time.Millisecond * time.Duration(msCount))
}

Expand All @@ -62,12 +66,38 @@ func (s *Greeter) recordMessage(ct CallType, callIdx int, msg *HelloRequest) {
s.calls[ct][callIdx] = append(s.calls[ct][callIdx], msg)
}

func (s *Greeter) recordStreamSendCounter(ct CallType, callIdx int) {
s.sendMutex.Lock()
defer s.sendMutex.Unlock()

s.sendCounts[ct][callIdx] = s.sendCounts[ct][callIdx] + 1
}

// SayHello implements helloworld.GreeterServer
func (s *Greeter) SayHello(ctx context.Context, in *HelloRequest) (*HelloReply, error) {
callIdx := s.recordCall(Unary)
s.recordMessage(Unary, callIdx, in)

randomSleep()
if in.GetName() == "__record_metadata__" {
mdval := ""
md, ok := metadata.FromIncomingContext(ctx)
if ok {
for k, v := range md {
if k == "token" {
mdval = mdval + k + ":"
for _, vv := range v {
mdval = mdval + vv
}
}
}
}

newReq := &HelloRequest{Name: in.GetName() + "||" + mdval}
s.recordMessage(Unary, callIdx, newReq)
} else {
s.recordMessage(Unary, callIdx, in)
}

randomSleep(4)

return &HelloReply{Message: "Hello " + in.Name}, nil
}
Expand All @@ -77,12 +107,14 @@ func (s *Greeter) SayHellos(req *HelloRequest, stream Greeter_SayHellosServer) e
callIdx := s.recordCall(ServerStream)
s.recordMessage(ServerStream, callIdx, req)

randomSleep()

for _, msg := range s.streamData {
for _, msg := range s.StreamData {
if err := stream.Send(msg); err != nil {
return err
}

randomSleep(4)

s.recordStreamSendCounter(ServerStream, callIdx)
}

return nil
Expand All @@ -92,7 +124,7 @@ func (s *Greeter) SayHellos(req *HelloRequest, stream Greeter_SayHellosServer) e
func (s *Greeter) SayHelloCS(stream Greeter_SayHelloCSServer) error {
callIdx := s.recordCall(ClientStream)

randomSleep()
randomSleep(4)

msgCount := 0

Expand All @@ -114,8 +146,6 @@ func (s *Greeter) SayHelloCS(stream Greeter_SayHelloCSServer) error {
func (s *Greeter) SayHelloBidi(stream Greeter_SayHelloBidiServer) error {
callIdx := s.recordCall(Bidi)

randomSleep()

for {
in, err := stream.Recv()
if err == io.EOF {
Expand All @@ -126,10 +156,13 @@ func (s *Greeter) SayHelloBidi(stream Greeter_SayHelloBidiServer) error {
}

s.recordMessage(Bidi, callIdx, in)

msg := "Hello " + in.Name
if err := stream.Send(&HelloReply{Message: msg}); err != nil {
return err
}

s.recordStreamSendCounter(ServerStream, callIdx)
}
}

Expand All @@ -151,6 +184,14 @@ func (s *Greeter) ResetCounters() {

s.mutex.Unlock()

s.sendMutex.Lock()
s.sendCounts = make(map[CallType]map[int]int)
s.sendCounts[Unary] = make(map[int]int)
s.sendCounts[ServerStream] = make(map[int]int)
s.sendCounts[ClientStream] = make(map[int]int)
s.sendCounts[Bidi] = make(map[int]int)
s.sendMutex.Unlock()

if s.Stats != nil {
s.Stats.mutex.Lock()
s.Stats.connCount = 0
Expand Down Expand Up @@ -181,6 +222,25 @@ func (s *Greeter) GetCalls(key CallType) [][]*HelloRequest {
return nil
}

// GetSendCounts gets the stream send counts
func (s *Greeter) GetSendCounts(key CallType) map[int]int {
s.sendMutex.RLock()
defer s.sendMutex.RUnlock()

val, ok := s.sendCounts[key]

if ok {
cm := map[int]int{}
for k, v := range val {
cm[k] = v
}

return cm
}

return nil
}

// GetConnectionCount gets the connection count
func (s *Greeter) GetConnectionCount() int {
return s.Stats.GetConnectionCount()
Expand All @@ -195,7 +255,7 @@ func NewGreeter() *Greeter {
{Message: "Hello Sara"},
}

greeter := &Greeter{streamData: streamData, mutex: &sync.RWMutex{}}
greeter := &Greeter{StreamData: streamData, mutex: &sync.RWMutex{}, sendMutex: &sync.RWMutex{}}
greeter.ResetCounters()

return greeter
Expand Down

0 comments on commit 37484c7

Please sign in to comment.