From 5bb2b0218d355bc558b03f24db1a0786a3489cac Mon Sep 17 00:00:00 2001 From: tmdiep Date: Sat, 7 Nov 2020 06:02:05 +1100 Subject: [PATCH] feat(pubsublite): Test utils for streams (#3153) Adds the remaining Pub/Sub Lite streaming RPCs to the mock server. Adds utilities for crafting requests and responses. --- pubsublite/internal/test/mock.go | 95 +++++++- pubsublite/internal/test/util.go | 7 + pubsublite/internal/wire/flow_control_test.go | 19 -- pubsublite/internal/wire/requests_test.go | 218 ++++++++++++++++++ 4 files changed, 316 insertions(+), 23 deletions(-) create mode 100644 pubsublite/internal/wire/requests_test.go diff --git a/pubsublite/internal/test/mock.go b/pubsublite/internal/test/mock.go index 0c8c5a4aa7d..d27a437ae88 100644 --- a/pubsublite/internal/test/mock.go +++ b/pubsublite/internal/test/mock.go @@ -47,6 +47,13 @@ type MockServer interface { OnTestEnd() // AddPublishStream adds a verifier for a publish stream of a topic partition. AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier) + // AddSubscribeStream adds a verifier for a subscribe stream of a partition. + AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier) + // AddCommitStream adds a verifier for a commit stream of a partition. + AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier) + // AddAssignmentStream adds a verifier for a partition assignment stream for a + // subscription. + AddAssignmentStream(subscription string, streamVerifier *RPCVerifier) } // NewServer creates a new mock Pub/Sub Lite server. @@ -58,6 +65,9 @@ func NewServer() (*Server, error) { liteServer := newMockLiteServer() pb.RegisterAdminServiceServer(srv.Gsrv, liteServer) pb.RegisterPublisherServiceServer(srv.Gsrv, liteServer) + pb.RegisterSubscriberServiceServer(srv.Gsrv, liteServer) + pb.RegisterCursorServiceServer(srv.Gsrv, liteServer) + pb.RegisterPartitionAssignmentServiceServer(srv.Gsrv, liteServer) srv.Start() return &Server{LiteServer: liteServer, gRPCServer: srv}, nil } @@ -81,6 +91,9 @@ type streamHolder struct { type mockLiteServer struct { pb.AdminServiceServer pb.PublisherServiceServer + pb.SubscriberServiceServer + pb.CursorServiceServer + pb.PartitionAssignmentServiceServer mu sync.Mutex @@ -88,8 +101,11 @@ type mockLiteServer struct { // test begins. globalVerifier *RPCVerifier - // Publish stream verifiers by topic & partition. - publishVerifiers *keyedStreamVerifiers + // Stream verifiers by key. + publishVerifiers *keyedStreamVerifiers + subscribeVerifiers *keyedStreamVerifiers + commitVerifiers *keyedStreamVerifiers + assignmentVerifiers *keyedStreamVerifiers nextStreamID int activeStreams map[int]*streamHolder @@ -102,8 +118,11 @@ func key(path string, partition int) string { func newMockLiteServer() *mockLiteServer { return &mockLiteServer{ - publishVerifiers: newKeyedStreamVerifiers(), - activeStreams: make(map[int]*streamHolder), + publishVerifiers: newKeyedStreamVerifiers(), + subscribeVerifiers: newKeyedStreamVerifiers(), + commitVerifiers: newKeyedStreamVerifiers(), + assignmentVerifiers: newKeyedStreamVerifiers(), + activeStreams: make(map[int]*streamHolder), } } @@ -188,6 +207,9 @@ func (s *mockLiteServer) OnTestStart(globalVerifier *RPCVerifier) { s.testActive = true s.globalVerifier = globalVerifier s.publishVerifiers.Reset() + s.subscribeVerifiers.Reset() + s.commitVerifiers.Reset() + s.assignmentVerifiers.Reset() s.activeStreams = make(map[int]*streamHolder) } @@ -211,6 +233,24 @@ func (s *mockLiteServer) AddPublishStream(topic string, partition int, streamVer s.publishVerifiers.Push(key(topic, partition), streamVerifier) } +func (s *mockLiteServer) AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier) { + s.mu.Lock() + defer s.mu.Unlock() + s.subscribeVerifiers.Push(key(subscription, partition), streamVerifier) +} + +func (s *mockLiteServer) AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier) { + s.mu.Lock() + defer s.mu.Unlock() + s.commitVerifiers.Push(key(subscription, partition), streamVerifier) +} + +func (s *mockLiteServer) AddAssignmentStream(subscription string, streamVerifier *RPCVerifier) { + s.mu.Lock() + defer s.mu.Unlock() + s.assignmentVerifiers.Push(subscription, streamVerifier) +} + // PublisherService implementation. func (s *mockLiteServer) Publish(stream pb.PublisherService_PublishServer) error { @@ -227,6 +267,53 @@ func (s *mockLiteServer) Publish(stream pb.PublisherService_PublishServer) error return s.handleStream(stream, req, reflect.TypeOf(pb.PublishRequest{}), k, s.publishVerifiers) } +// SubscriberService implementation. + +func (s *mockLiteServer) Subscribe(stream pb.SubscriberService_SubscribeServer) error { + req, err := stream.Recv() + if err != nil { + return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err) + } + if len(req.GetInitial().GetSubscription()) == 0 { + return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial subscribe request: %v", req) + } + + initReq := req.GetInitial() + k := key(initReq.GetSubscription(), int(initReq.GetPartition())) + return s.handleStream(stream, req, reflect.TypeOf(pb.SubscribeRequest{}), k, s.subscribeVerifiers) +} + +// CursorService implementation. + +func (s *mockLiteServer) StreamingCommitCursor(stream pb.CursorService_StreamingCommitCursorServer) error { + req, err := stream.Recv() + if err != nil { + return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err) + } + if len(req.GetInitial().GetSubscription()) == 0 { + return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial streaming commit cursor request: %v", req) + } + + initReq := req.GetInitial() + k := key(initReq.GetSubscription(), int(initReq.GetPartition())) + return s.handleStream(stream, req, reflect.TypeOf(pb.StreamingCommitCursorRequest{}), k, s.commitVerifiers) +} + +// PartitionAssignmentService implementation. + +func (s *mockLiteServer) AssignPartitions(stream pb.PartitionAssignmentService_AssignPartitionsServer) error { + req, err := stream.Recv() + if err != nil { + return status.Errorf(codes.FailedPrecondition, "mockserver: stream recv error before initial request: %v", err) + } + if len(req.GetInitial().GetSubscription()) == 0 { + return status.Errorf(codes.InvalidArgument, "mockserver: received invalid initial partition assignment request: %v", req) + } + + k := req.GetInitial().GetSubscription() + return s.handleStream(stream, req, reflect.TypeOf(pb.PartitionAssignmentRequest{}), k, s.assignmentVerifiers) +} + // AdminService implementation. func (s *mockLiteServer) GetTopicPartitions(ctx context.Context, req *pb.GetTopicPartitionsRequest) (*pb.TopicPartitions, error) { diff --git a/pubsublite/internal/test/util.go b/pubsublite/internal/test/util.go index 5486c13100e..979c8cc012e 100644 --- a/pubsublite/internal/test/util.go +++ b/pubsublite/internal/test/util.go @@ -14,6 +14,8 @@ package test import ( + "strings" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/codes" @@ -36,6 +38,11 @@ func ErrorHasCode(got error, wantCode codes.Code) bool { return false } +// ErrorHasMsg returns true if an error message contains the desired substring. +func ErrorHasMsg(got error, wantStr string) bool { + return strings.Index(got.Error(), wantStr) >= 0 +} + // FakeSource is a fake source that returns a configurable constant. type FakeSource struct { Ret int64 diff --git a/pubsublite/internal/wire/flow_control_test.go b/pubsublite/internal/wire/flow_control_test.go index ca6d0661510..f38255b3ee4 100644 --- a/pubsublite/internal/wire/flow_control_test.go +++ b/pubsublite/internal/wire/flow_control_test.go @@ -25,25 +25,6 @@ import ( pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" ) -func flowControlReq(tokens flowControlTokens) *pb.FlowControlRequest { - return &pb.FlowControlRequest{ - AllowedBytes: tokens.Bytes, - AllowedMessages: tokens.Messages, - } -} - -func seqMsgWithOffset(offset int64) *pb.SequencedMessage { - return &pb.SequencedMessage{ - Cursor: &pb.Cursor{Offset: offset}, - } -} - -func seqMsgWithSizeBytes(size int64) *pb.SequencedMessage { - return &pb.SequencedMessage{ - SizeBytes: size, - } -} - func TestTokenCounterAdd(t *testing.T) { // Note: tests are applied to this counter instance in order. counter := tokenCounter{} diff --git a/pubsublite/internal/wire/requests_test.go b/pubsublite/internal/wire/requests_test.go new file mode 100644 index 00000000000..94944a9066b --- /dev/null +++ b/pubsublite/internal/wire/requests_test.go @@ -0,0 +1,218 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" + +// AdminService + +func topicPartitionsReq(topicPath string) *pb.GetTopicPartitionsRequest { + return &pb.GetTopicPartitionsRequest{Name: topicPath} +} + +func topicPartitionsResp(count int) *pb.TopicPartitions { + return &pb.TopicPartitions{PartitionCount: int64(count)} +} + +// CursorService + +func initCommitReq(subscription subscriptionPartition) *pb.StreamingCommitCursorRequest { + return &pb.StreamingCommitCursorRequest{ + Request: &pb.StreamingCommitCursorRequest_Initial{ + Initial: &pb.InitialCommitCursorRequest{ + Subscription: subscription.Path, + Partition: int64(subscription.Partition), + }, + }, + } +} + +func initCommitResp() *pb.StreamingCommitCursorResponse { + return &pb.StreamingCommitCursorResponse{ + Request: &pb.StreamingCommitCursorResponse_Initial{ + Initial: &pb.InitialCommitCursorResponse{}, + }, + } +} + +func commitReq(offset int64) *pb.StreamingCommitCursorRequest { + return &pb.StreamingCommitCursorRequest{ + Request: &pb.StreamingCommitCursorRequest_Commit{ + Commit: &pb.SequencedCommitCursorRequest{ + Cursor: &pb.Cursor{Offset: offset}, + }, + }, + } +} + +func commitResp(numAck int) *pb.StreamingCommitCursorResponse { + return &pb.StreamingCommitCursorResponse{ + Request: &pb.StreamingCommitCursorResponse_Commit{ + Commit: &pb.SequencedCommitCursorResponse{ + AcknowledgedCommits: int64(numAck), + }, + }, + } +} + +// PartitionAssignmentService + +func initAssignmentReq(subscription string, clientID []byte) *pb.PartitionAssignmentRequest { + return &pb.PartitionAssignmentRequest{ + Request: &pb.PartitionAssignmentRequest_Initial{ + Initial: &pb.InitialPartitionAssignmentRequest{ + Subscription: subscription, + ClientId: clientID, + }, + }, + } +} + +func assignmentAckReq() *pb.PartitionAssignmentRequest { + return &pb.PartitionAssignmentRequest{ + Request: &pb.PartitionAssignmentRequest_Ack{ + Ack: &pb.PartitionAssignmentAck{}, + }, + } +} + +func assignmentResp(partitions []int64) *pb.PartitionAssignment { + return &pb.PartitionAssignment{ + Partitions: partitions, + } +} + +// PublisherService + +func initPubReq(topic topicPartition) *pb.PublishRequest { + return &pb.PublishRequest{ + RequestType: &pb.PublishRequest_InitialRequest{ + InitialRequest: &pb.InitialPublishRequest{ + Topic: topic.Path, + Partition: int64(topic.Partition), + }, + }, + } +} + +func initPubResp() *pb.PublishResponse { + return &pb.PublishResponse{ + ResponseType: &pb.PublishResponse_InitialResponse{ + InitialResponse: &pb.InitialPublishResponse{}, + }, + } +} + +func msgPubReq(msgs ...*pb.PubSubMessage) *pb.PublishRequest { + return &pb.PublishRequest{ + RequestType: &pb.PublishRequest_MessagePublishRequest{ + MessagePublishRequest: &pb.MessagePublishRequest{Messages: msgs}, + }, + } +} + +func msgPubResp(cursor int64) *pb.PublishResponse { + return &pb.PublishResponse{ + ResponseType: &pb.PublishResponse_MessageResponse{ + MessageResponse: &pb.MessagePublishResponse{ + StartCursor: &pb.Cursor{Offset: cursor}, + }, + }, + } +} + +// SubscriberService + +func initSubReq(subscription subscriptionPartition) *pb.SubscribeRequest { + return &pb.SubscribeRequest{ + Request: &pb.SubscribeRequest_Initial{ + Initial: &pb.InitialSubscribeRequest{ + Subscription: subscription.Path, + Partition: int64(subscription.Partition), + }, + }, + } +} + +func initSubResp() *pb.SubscribeResponse { + return &pb.SubscribeResponse{ + Response: &pb.SubscribeResponse_Initial{ + Initial: &pb.InitialSubscribeResponse{}, + }, + } +} + +func seekReq(offset int64) *pb.SubscribeRequest { + return &pb.SubscribeRequest{ + Request: &pb.SubscribeRequest_Seek{ + Seek: &pb.SeekRequest{ + Target: &pb.SeekRequest_Cursor{ + Cursor: &pb.Cursor{Offset: offset}, + }, + }, + }, + } +} + +func seekResp(offset int64) *pb.SubscribeResponse { + return &pb.SubscribeResponse{ + Response: &pb.SubscribeResponse_Seek{ + Seek: &pb.SeekResponse{ + Cursor: &pb.Cursor{Offset: offset}, + }, + }, + } +} + +func flowControlReq(tokens flowControlTokens) *pb.FlowControlRequest { + return &pb.FlowControlRequest{ + AllowedBytes: tokens.Bytes, + AllowedMessages: tokens.Messages, + } +} + +func flowControlSubReq(tokens flowControlTokens) *pb.SubscribeRequest { + return &pb.SubscribeRequest{ + Request: &pb.SubscribeRequest_FlowControl{ + FlowControl: flowControlReq(tokens), + }, + } +} + +func seqMsgWithOffset(offset int64) *pb.SequencedMessage { + return &pb.SequencedMessage{ + Cursor: &pb.Cursor{Offset: offset}, + } +} + +func seqMsgWithSizeBytes(size int64) *pb.SequencedMessage { + return &pb.SequencedMessage{ + SizeBytes: size, + } +} + +func seqMsgWithOffsetAndSize(offset, size int64) *pb.SequencedMessage { + return &pb.SequencedMessage{ + Cursor: &pb.Cursor{Offset: offset}, + SizeBytes: size, + } +} + +func msgSubResp(msgs ...*pb.SequencedMessage) *pb.SubscribeResponse { + return &pb.SubscribeResponse{ + Response: &pb.SubscribeResponse_Messages{ + Messages: &pb.MessageResponse{Messages: msgs}, + }, + } +}