Skip to content

Commit

Permalink
feat(pubsublite): Refactoring and unit tests for retryableStream (#3160)
Browse files Browse the repository at this point in the history
Minor refactoring and unit tests for retryableStream. Eliminated test flakiness in mock server.
  • Loading branch information
tmdiep committed Nov 17, 2020
1 parent 432f2c2 commit 82945ce
Show file tree
Hide file tree
Showing 5 changed files with 682 additions and 163 deletions.
161 changes: 55 additions & 106 deletions pubsublite/internal/test/mock.go
Expand Up @@ -15,12 +15,12 @@ package test

import (
"context"
"fmt"
"io"
"reflect"
"sync"

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/internal/uid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -40,20 +40,11 @@ type Server struct {
// This is the interface that should be used by tests.
type MockServer interface {
// OnTestStart must be called at the start of each test to clear any existing
// state and set the verifier for unary RPCs.
OnTestStart(globalVerifier *RPCVerifier)
// state and set the test verifiers.
OnTestStart(*Verifiers)
// OnTestEnd should be called at the end of each test to flush the verifiers
// (i.e. check whether any expected requests were not sent to the server).
OnTestEnd()
// AddPublishStream adds a verifier for a publish stream of a topic partition.
AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier)
// AddSubscribeStream adds a verifier for a subscribe stream of a partition.
AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier)
// AddCommitStream adds a verifier for a commit stream of a partition.
AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier)
// AddAssignmentStream adds a verifier for a partition assignment stream for a
// subscription.
AddAssignmentStream(subscription string, streamVerifier *RPCVerifier)
}

// NewServer creates a new mock Pub/Sub Lite server.
Expand Down Expand Up @@ -97,79 +88,63 @@ type mockLiteServer struct {

mu sync.Mutex

// Global list of verifiers for all unary RPCs. This should be set before the
// test begins.
globalVerifier *RPCVerifier

// Stream verifiers by key.
publishVerifiers *keyedStreamVerifiers
subscribeVerifiers *keyedStreamVerifiers
commitVerifiers *keyedStreamVerifiers
assignmentVerifiers *keyedStreamVerifiers

nextStreamID int
activeStreams map[int]*streamHolder
testActive bool
}

func key(path string, partition int) string {
return fmt.Sprintf("%s:%d", path, partition)
testVerifiers *Verifiers
testIDs *uid.Space
currentTestID string
}

func newMockLiteServer() *mockLiteServer {
return &mockLiteServer{
publishVerifiers: newKeyedStreamVerifiers(),
subscribeVerifiers: newKeyedStreamVerifiers(),
commitVerifiers: newKeyedStreamVerifiers(),
assignmentVerifiers: newKeyedStreamVerifiers(),
activeStreams: make(map[int]*streamHolder),
testIDs: uid.NewSpace("mockLiteServer", nil),
}
}

func (s *mockLiteServer) startStream(stream grpc.ServerStream, verifier *RPCVerifier) (id int) {
s.mu.Lock()
defer s.mu.Unlock()

id = s.nextStreamID
s.nextStreamID++
s.activeStreams[id] = &streamHolder{stream: stream, verifier: verifier}
return
}

func (s *mockLiteServer) endStream(id int) {
func (s *mockLiteServer) popGlobalVerifiers(request interface{}) (interface{}, error) {
s.mu.Lock()
defer s.mu.Unlock()

delete(s.activeStreams, id)
if s.testVerifiers == nil {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: previous test has ended")
}
return s.testVerifiers.GlobalVerifier.Pop(request)
}

func (s *mockLiteServer) popStreamVerifier(key string, keyedVerifiers *keyedStreamVerifiers) (*RPCVerifier, error) {
func (s *mockLiteServer) popStreamVerifier(key string) (*RPCVerifier, error) {
s.mu.Lock()
defer s.mu.Unlock()

return keyedVerifiers.Pop(key)
if s.testVerifiers == nil {
return nil, status.Errorf(codes.FailedPrecondition, "mockserver: previous test has ended")
}
return s.testVerifiers.streamVerifiers.Pop(key)
}

func (s *mockLiteServer) handleStream(stream grpc.ServerStream, req interface{}, requestType reflect.Type, key string, keyedVerifiers *keyedStreamVerifiers) (err error) {
verifier, err := s.popStreamVerifier(key, keyedVerifiers)
func (s *mockLiteServer) handleStream(stream grpc.ServerStream, req interface{}, requestType reflect.Type, key string) (err error) {
testID := s.currentTest()
if testID == "" {
return status.Errorf(codes.FailedPrecondition, "mockserver: previous test has ended")
}
verifier, err := s.popStreamVerifier(key)
if err != nil {
return err
}

id := s.startStream(stream, verifier)

// Verify initial request.
retResponse, retErr := verifier.Pop(req)
var ok bool

for {
// See comments for RPCVerifier.Push for valid stream request/response
// combinations.
if retErr != nil {
err = retErr
break
}
if err = stream.SendMsg(retResponse); err != nil {
err = status.Errorf(codes.FailedPrecondition, "mockserver: stream send error: %v", err)
break
if retResponse != nil {
if err = stream.SendMsg(retResponse); err != nil {
err = status.Errorf(codes.FailedPrecondition, "mockserver: stream send error: %v", err)
break
}
}

// Check whether the next response isn't blocked on a request.
Expand All @@ -185,70 +160,47 @@ func (s *mockLiteServer) handleStream(stream grpc.ServerStream, req interface{},
err = status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error: %v", err)
break
}
if testID != s.currentTest() {
err = status.Errorf(codes.FailedPrecondition, "mockserver: previous test has ended")
break
}
retResponse, retErr = verifier.Pop(req)
}

// Check whether the stream ended prematurely.
verifier.Flush()
s.endStream(id)
if testID == s.currentTest() {
verifier.Flush()
}
return
}

// MockServer implementation.

func (s *mockLiteServer) OnTestStart(globalVerifier *RPCVerifier) {
func (s *mockLiteServer) OnTestStart(verifiers *Verifiers) {
s.mu.Lock()
defer s.mu.Unlock()

if s.testActive {
if s.currentTestID != "" {
panic("mockserver is already in use by another test")
}

s.testActive = true
s.globalVerifier = globalVerifier
s.publishVerifiers.Reset()
s.subscribeVerifiers.Reset()
s.commitVerifiers.Reset()
s.assignmentVerifiers.Reset()
s.activeStreams = make(map[int]*streamHolder)
s.currentTestID = s.testIDs.New()
s.testVerifiers = verifiers
}

func (s *mockLiteServer) OnTestEnd() {
s.mu.Lock()
defer s.mu.Unlock()

s.testActive = false
if s.globalVerifier != nil {
s.globalVerifier.Flush()
}

for _, as := range s.activeStreams {
as.verifier.Flush()
s.currentTestID = ""
if s.testVerifiers != nil {
s.testVerifiers.flush()
}
}

func (s *mockLiteServer) AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier) {
s.mu.Lock()
defer s.mu.Unlock()
s.publishVerifiers.Push(key(topic, partition), streamVerifier)
}

func (s *mockLiteServer) AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier) {
s.mu.Lock()
defer s.mu.Unlock()
s.subscribeVerifiers.Push(key(subscription, partition), streamVerifier)
}

func (s *mockLiteServer) AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier) {
func (s *mockLiteServer) currentTest() string {
s.mu.Lock()
defer s.mu.Unlock()
s.commitVerifiers.Push(key(subscription, partition), streamVerifier)
}

func (s *mockLiteServer) AddAssignmentStream(subscription string, streamVerifier *RPCVerifier) {
s.mu.Lock()
defer s.mu.Unlock()
s.assignmentVerifiers.Push(subscription, streamVerifier)
return s.currentTestID
}

// PublisherService implementation.
Expand All @@ -263,8 +215,8 @@ func (s *mockLiteServer) Publish(stream pb.PublisherService_PublishServer) error
}

initReq := req.GetInitialRequest()
k := key(initReq.GetTopic(), int(initReq.GetPartition()))
return s.handleStream(stream, req, reflect.TypeOf(pb.PublishRequest{}), k, s.publishVerifiers)
k := keyPartition(publishStreamType, initReq.GetTopic(), int(initReq.GetPartition()))
return s.handleStream(stream, req, reflect.TypeOf(pb.PublishRequest{}), k)
}

// SubscriberService implementation.
Expand All @@ -279,8 +231,8 @@ func (s *mockLiteServer) Subscribe(stream pb.SubscriberService_SubscribeServer)
}

initReq := req.GetInitial()
k := key(initReq.GetSubscription(), int(initReq.GetPartition()))
return s.handleStream(stream, req, reflect.TypeOf(pb.SubscribeRequest{}), k, s.subscribeVerifiers)
k := keyPartition(subscribeStreamType, initReq.GetSubscription(), int(initReq.GetPartition()))
return s.handleStream(stream, req, reflect.TypeOf(pb.SubscribeRequest{}), k)
}

// CursorService implementation.
Expand All @@ -295,8 +247,8 @@ func (s *mockLiteServer) StreamingCommitCursor(stream pb.CursorService_Streaming
}

initReq := req.GetInitial()
k := key(initReq.GetSubscription(), int(initReq.GetPartition()))
return s.handleStream(stream, req, reflect.TypeOf(pb.StreamingCommitCursorRequest{}), k, s.commitVerifiers)
k := keyPartition(commitStreamType, initReq.GetSubscription(), int(initReq.GetPartition()))
return s.handleStream(stream, req, reflect.TypeOf(pb.StreamingCommitCursorRequest{}), k)
}

// PartitionAssignmentService implementation.
Expand All @@ -310,17 +262,14 @@ func (s *mockLiteServer) AssignPartitions(stream pb.PartitionAssignmentService_A
return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial partition assignment request: %v", req)
}

k := req.GetInitial().GetSubscription()
return s.handleStream(stream, req, reflect.TypeOf(pb.PartitionAssignmentRequest{}), k, s.assignmentVerifiers)
k := key(assignmentStreamType, req.GetInitial().GetSubscription())
return s.handleStream(stream, req, reflect.TypeOf(pb.PartitionAssignmentRequest{}), k)
}

// AdminService implementation.

func (s *mockLiteServer) GetTopicPartitions(ctx context.Context, req *pb.GetTopicPartitionsRequest) (*pb.TopicPartitions, error) {
s.mu.Lock()
defer s.mu.Unlock()

retResponse, retErr := s.globalVerifier.Pop(req)
retResponse, retErr := s.popGlobalVerifiers(req)
if retErr != nil {
return nil, retErr
}
Expand Down

0 comments on commit 82945ce

Please sign in to comment.