Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(pubsublite): Test utils for streams (#3153)
Adds the remaining Pub/Sub Lite streaming RPCs to the mock server. Adds utilities for crafting requests and responses.
  • Loading branch information
tmdiep committed Nov 6, 2020
1 parent f0cfd05 commit 5bb2b02
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 23 deletions.
95 changes: 91 additions & 4 deletions pubsublite/internal/test/mock.go
Expand Up @@ -47,6 +47,13 @@ type MockServer interface {
OnTestEnd()
// AddPublishStream adds a verifier for a publish stream of a topic partition.
AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier)
// AddSubscribeStream adds a verifier for a subscribe stream of a partition.
AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier)
// AddCommitStream adds a verifier for a commit stream of a partition.
AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier)
// AddAssignmentStream adds a verifier for a partition assignment stream for a
// subscription.
AddAssignmentStream(subscription string, streamVerifier *RPCVerifier)
}

// NewServer creates a new mock Pub/Sub Lite server.
Expand All @@ -58,6 +65,9 @@ func NewServer() (*Server, error) {
liteServer := newMockLiteServer()
pb.RegisterAdminServiceServer(srv.Gsrv, liteServer)
pb.RegisterPublisherServiceServer(srv.Gsrv, liteServer)
pb.RegisterSubscriberServiceServer(srv.Gsrv, liteServer)
pb.RegisterCursorServiceServer(srv.Gsrv, liteServer)
pb.RegisterPartitionAssignmentServiceServer(srv.Gsrv, liteServer)
srv.Start()
return &Server{LiteServer: liteServer, gRPCServer: srv}, nil
}
Expand All @@ -81,15 +91,21 @@ type streamHolder struct {
type mockLiteServer struct {
pb.AdminServiceServer
pb.PublisherServiceServer
pb.SubscriberServiceServer
pb.CursorServiceServer
pb.PartitionAssignmentServiceServer

mu sync.Mutex

// Global list of verifiers for all unary RPCs. This should be set before the
// test begins.
globalVerifier *RPCVerifier

// Publish stream verifiers by topic & partition.
publishVerifiers *keyedStreamVerifiers
// Stream verifiers by key.
publishVerifiers *keyedStreamVerifiers
subscribeVerifiers *keyedStreamVerifiers
commitVerifiers *keyedStreamVerifiers
assignmentVerifiers *keyedStreamVerifiers

nextStreamID int
activeStreams map[int]*streamHolder
Expand All @@ -102,8 +118,11 @@ func key(path string, partition int) string {

func newMockLiteServer() *mockLiteServer {
return &mockLiteServer{
publishVerifiers: newKeyedStreamVerifiers(),
activeStreams: make(map[int]*streamHolder),
publishVerifiers: newKeyedStreamVerifiers(),
subscribeVerifiers: newKeyedStreamVerifiers(),
commitVerifiers: newKeyedStreamVerifiers(),
assignmentVerifiers: newKeyedStreamVerifiers(),
activeStreams: make(map[int]*streamHolder),
}
}

Expand Down Expand Up @@ -188,6 +207,9 @@ func (s *mockLiteServer) OnTestStart(globalVerifier *RPCVerifier) {
s.testActive = true
s.globalVerifier = globalVerifier
s.publishVerifiers.Reset()
s.subscribeVerifiers.Reset()
s.commitVerifiers.Reset()
s.assignmentVerifiers.Reset()
s.activeStreams = make(map[int]*streamHolder)
}

Expand All @@ -211,6 +233,24 @@ func (s *mockLiteServer) AddPublishStream(topic string, partition int, streamVer
s.publishVerifiers.Push(key(topic, partition), streamVerifier)
}

func (s *mockLiteServer) AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier) {
s.mu.Lock()
defer s.mu.Unlock()
s.subscribeVerifiers.Push(key(subscription, partition), streamVerifier)
}

func (s *mockLiteServer) AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier) {
s.mu.Lock()
defer s.mu.Unlock()
s.commitVerifiers.Push(key(subscription, partition), streamVerifier)
}

func (s *mockLiteServer) AddAssignmentStream(subscription string, streamVerifier *RPCVerifier) {
s.mu.Lock()
defer s.mu.Unlock()
s.assignmentVerifiers.Push(subscription, streamVerifier)
}

// PublisherService implementation.

func (s *mockLiteServer) Publish(stream pb.PublisherService_PublishServer) error {
Expand All @@ -227,6 +267,53 @@ func (s *mockLiteServer) Publish(stream pb.PublisherService_PublishServer) error
return s.handleStream(stream, req, reflect.TypeOf(pb.PublishRequest{}), k, s.publishVerifiers)
}

// SubscriberService implementation.

func (s *mockLiteServer) Subscribe(stream pb.SubscriberService_SubscribeServer) error {
req, err := stream.Recv()
if err != nil {
return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err)
}
if len(req.GetInitial().GetSubscription()) == 0 {
return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial subscribe request: %v", req)
}

initReq := req.GetInitial()
k := key(initReq.GetSubscription(), int(initReq.GetPartition()))
return s.handleStream(stream, req, reflect.TypeOf(pb.SubscribeRequest{}), k, s.subscribeVerifiers)
}

// CursorService implementation.

func (s *mockLiteServer) StreamingCommitCursor(stream pb.CursorService_StreamingCommitCursorServer) error {
req, err := stream.Recv()
if err != nil {
return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err)
}
if len(req.GetInitial().GetSubscription()) == 0 {
return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial streaming commit cursor request: %v", req)
}

initReq := req.GetInitial()
k := key(initReq.GetSubscription(), int(initReq.GetPartition()))
return s.handleStream(stream, req, reflect.TypeOf(pb.StreamingCommitCursorRequest{}), k, s.commitVerifiers)
}

// PartitionAssignmentService implementation.

func (s *mockLiteServer) AssignPartitions(stream pb.PartitionAssignmentService_AssignPartitionsServer) error {
req, err := stream.Recv()
if err != nil {
return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err)
}
if len(req.GetInitial().GetSubscription()) == 0 {
return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial partition assignment request: %v", req)
}

k := req.GetInitial().GetSubscription()
return s.handleStream(stream, req, reflect.TypeOf(pb.PartitionAssignmentRequest{}), k, s.assignmentVerifiers)
}

// AdminService implementation.

func (s *mockLiteServer) GetTopicPartitions(ctx context.Context, req *pb.GetTopicPartitionsRequest) (*pb.TopicPartitions, error) {
Expand Down
7 changes: 7 additions & 0 deletions pubsublite/internal/test/util.go
Expand Up @@ -14,6 +14,8 @@
package test

import (
"strings"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/codes"
Expand All @@ -36,6 +38,11 @@ func ErrorHasCode(got error, wantCode codes.Code) bool {
return false
}

// ErrorHasMsg returns true if an error message contains the desired substring.
func ErrorHasMsg(got error, wantStr string) bool {
return strings.Index(got.Error(), wantStr) >= 0
}

// FakeSource is a fake source that returns a configurable constant.
type FakeSource struct {
Ret int64
Expand Down
19 changes: 0 additions & 19 deletions pubsublite/internal/wire/flow_control_test.go
Expand Up @@ -25,25 +25,6 @@ import (
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

func flowControlReq(tokens flowControlTokens) *pb.FlowControlRequest {
return &pb.FlowControlRequest{
AllowedBytes: tokens.Bytes,
AllowedMessages: tokens.Messages,
}
}

func seqMsgWithOffset(offset int64) *pb.SequencedMessage {
return &pb.SequencedMessage{
Cursor: &pb.Cursor{Offset: offset},
}
}

func seqMsgWithSizeBytes(size int64) *pb.SequencedMessage {
return &pb.SequencedMessage{
SizeBytes: size,
}
}

func TestTokenCounterAdd(t *testing.T) {
// Note: tests are applied to this counter instance in order.
counter := tokenCounter{}
Expand Down

0 comments on commit 5bb2b02

Please sign in to comment.