Skip to content

Commit

Permalink
grpc: AsyncInvoke implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
olegbespalov committed Apr 24, 2024
1 parent a98a065 commit 666b236
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 1 deletion.
31 changes: 31 additions & 0 deletions js/modules/k6/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,37 @@ func (c *Client) Invoke(
return c.conn.Invoke(c.vu.Context(), grpcReq)
}

// AsyncInvoke creates and calls a unary RPC by fully qualified method name asynchronously
func (c *Client) AsyncInvoke(
method string,
req goja.Value,
params goja.Value,
) *goja.Promise {
grpcReq, err := c.buildInvokeRequest(method, req, params)

promise, resolve, reject := c.vu.Runtime().NewPromise()
if err != nil {
reject(err)
return promise
}

callback := c.vu.RegisterCallback()
go func() {
res, err := c.conn.Invoke(c.vu.Context(), grpcReq)

callback(func() error {
if err != nil {
reject(err)
return nil //nolint:nilerr // we don't want to return the error
}
resolve(res)
return nil
})
}()

return promise
}

// buildInvokeRequest creates a new InvokeRequest from the given method name, request object and parameters
func (c *Client) buildInvokeRequest(
method string,
Expand Down
102 changes: 101 additions & 1 deletion js/modules/k6/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,22 @@ func TestClient(t *testing.T) {
err: `unknown param: "void"`,
},
},
{
name: "AsyncInvokeInvalidParam",
initString: codeBlock{code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}, { void: true }).then(function(resp) {
throw new Error("should not be here")
}, (err) => {
throw new Error(err)
})`,
err: `unknown param: "void"`,
},
},
{
name: "InvokeNilRequest",
initString: codeBlock{code: `
Expand Down Expand Up @@ -317,6 +333,33 @@ func TestClient(t *testing.T) {
},
},
},
{
name: "AsyncInvoke",
initString: codeBlock{code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.EmptyCallFunc = func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) {
return &grpc_testing.Empty{}, nil
}
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}).then(function(resp) {
if (resp.status !== grpc.StatusOK) {
throw new Error("unexpected error: " + JSON.stringify(resp.error) + "or status: " + resp.status)
}
}, (err) => {
throw new Error("unexpected error: " + err)
})
`,
asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) {
samplesBuf := metrics.GetBufferedSamples(samples)
assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/EmptyCall"))
},
},
},
{
name: "InvokeAnyProto",
initString: codeBlock{code: `
Expand Down Expand Up @@ -387,6 +430,32 @@ func TestClient(t *testing.T) {
throw new Error("server did not receive the correct request message")
}`},
},
{
name: "AsyncRequestMessage",
initString: codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`,
},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.UnaryCallFunc = func(_ context.Context, req *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
if req.Payload == nil || string(req.Payload.Body) != "负载测试" {
return nil, status.Error(codes.InvalidArgument, "")
}
return &grpc_testing.SimpleResponse{}, nil
}
},
vuString: codeBlock{code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/UnaryCall", { payload: { body: "6LSf6L295rWL6K+V"} }).then(function(resp) {
if (resp.status !== grpc.StatusOK) {
throw new Error("server did not receive the correct request message")
}
}, (err) => {
throw new Error("unexpected error: " + err)
});
`},
},
{
name: "RequestHeaders",
initString: codeBlock{
Expand Down Expand Up @@ -464,6 +533,37 @@ func TestClient(t *testing.T) {
},
},
},
{
name: "AsyncResponseMessage",
initString: codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`,
},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.UnaryCallFunc = func(context.Context, *grpc_testing.SimpleRequest) (*grpc_testing.SimpleResponse, error) {
return &grpc_testing.SimpleResponse{
OauthScope: "水",
}, nil
}
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/UnaryCall", {}).then(function(resp) {
if (!resp.message || resp.message.username !== "" || resp.message.oauthScope !== "水") {
throw new Error("unexpected response message: " + JSON.stringify(resp.message))
}
}, (err) => {
throw new Error("unexpected error: " + err)
});
`,
asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) {
samplesBuf := metrics.GetBufferedSamples(samples)
assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/UnaryCall"))
},
},
},
{
name: "ResponseError",
initString: codeBlock{
Expand Down Expand Up @@ -973,7 +1073,7 @@ func TestClient(t *testing.T) {
assertResponse(t, tt.initString, err, val, ts)

ts.ToVUContext()
val, err = ts.Run(tt.vuString.code)
val, err = ts.RunOnEventLoop(tt.vuString.code)
assertResponse(t, tt.vuString, err, val, ts)
})
}
Expand Down

0 comments on commit 666b236

Please sign in to comment.