Skip to content

Commit

Permalink
Merge branch 'master' into bttest-dropfam
Browse files Browse the repository at this point in the history
  • Loading branch information
dragonsinth committed Jun 3, 2021
2 parents 0a938ab + 57b3879 commit d2d1090
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 23 deletions.
12 changes: 12 additions & 0 deletions pubsublite/internal/wire/flow_control.go
Expand Up @@ -95,6 +95,13 @@ func exceedsExpediteRatio(pending, client int64) bool {
return client > 0 && (float64(pending)/float64(client)) >= expediteBatchRequestRatio
}

// Reset client tokens to the given values and reset pending tokens.
func (fc *flowControlBatcher) Reset(tokens flowControlTokens) {
fc.clientTokens.Reset()
fc.clientTokens.Add(tokens)
fc.pendingTokens.Reset()
}

// OnClientFlow increments flow control tokens. This occurs when:
// - Initialization from ReceiveSettings.
// - The user acks messages.
Expand Down Expand Up @@ -147,6 +154,11 @@ type subscriberOffsetTracker struct {
minNextOffset int64
}

// Reset the offset tracker to the initial state.
func (ot *subscriberOffsetTracker) Reset() {
ot.minNextOffset = 0
}

// RequestForRestart returns the seek request to send when a new subscribe
// stream reconnects. Returns nil if the subscriber has just started, in which
// case the server returns the offset of the last committed cursor.
Expand Down
22 changes: 22 additions & 0 deletions pubsublite/internal/wire/flow_control_test.go
Expand Up @@ -222,6 +222,28 @@ func TestFlowControlBatcher(t *testing.T) {
})
}

func TestFlowControlBatcherReset(t *testing.T) {
var batcher flowControlBatcher

initialTokens := flowControlTokens{Bytes: 400, Messages: 40}
batcher.OnClientFlow(initialTokens)
if got, want := batcher.clientTokens.ToFlowControlRequest(), flowControlReq(initialTokens); !proto.Equal(got, want) {
t.Errorf("flowControlBatcher.clientTokens.ToFlowControlRequest(): got %v, want %v", got, want)
}
if got, want := batcher.pendingTokens.ToFlowControlRequest(), flowControlReq(initialTokens); !proto.Equal(got, want) {
t.Errorf("flowControlBatcher.pendingTokens.ToFlowControlRequest(): got %v, want %v", got, want)
}

updatedTokens := flowControlTokens{Bytes: 500, Messages: 50}
batcher.Reset(updatedTokens)
if got, want := batcher.clientTokens.ToFlowControlRequest(), flowControlReq(updatedTokens); !proto.Equal(got, want) {
t.Errorf("flowControlBatcher.clientTokens.ToFlowControlRequest(): got %v, want %v", got, want)
}
if got, want := batcher.pendingTokens.ToFlowControlRequest(), (*pb.FlowControlRequest)(nil); !proto.Equal(got, want) {
t.Errorf("flowControlBatcher.pendingTokens.ToFlowControlRequest(): got %v, want %v", got, want)
}
}

func TestOffsetTrackerRequestForRestart(t *testing.T) {
for _, tc := range []struct {
desc string
Expand Down
62 changes: 40 additions & 22 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"errors"
"reflect"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -46,57 +47,74 @@ type ReceivedMessage struct {
type MessageReceiverFunc func(*ReceivedMessage)

// messageDeliveryQueue delivers received messages to the client-provided
// MessageReceiverFunc sequentially.
// MessageReceiverFunc sequentially. It is only accessed by the subscribeStream.
type messageDeliveryQueue struct {
receiver MessageReceiverFunc
messagesC chan *ReceivedMessage
stopC chan struct{}
acks *ackTracker
status serviceStatus
bufferSize int
acks *ackTracker
receiver MessageReceiverFunc
messagesC chan *ReceivedMessage
stopC chan struct{}
active sync.WaitGroup
}

func newMessageDeliveryQueue(acks *ackTracker, receiver MessageReceiverFunc, bufferSize int) *messageDeliveryQueue {
return &messageDeliveryQueue{
acks: acks,
receiver: receiver,
messagesC: make(chan *ReceivedMessage, bufferSize),
stopC: make(chan struct{}),
bufferSize: bufferSize,
acks: acks,
receiver: receiver,
}
}

// Start the message delivery, if not already started.
func (mq *messageDeliveryQueue) Start() {
if mq.status == serviceUninitialized {
go mq.deliverMessages()
mq.status = serviceActive
if mq.stopC != nil {
return
}

mq.stopC = make(chan struct{})
mq.messagesC = make(chan *ReceivedMessage, mq.bufferSize)
mq.active.Add(1)
go mq.deliverMessages(mq.messagesC, mq.stopC)
}

// Stop message delivery and discard undelivered messages.
func (mq *messageDeliveryQueue) Stop() {
if mq.status < serviceTerminated {
close(mq.stopC)
mq.status = serviceTerminated
if mq.stopC == nil {
return
}

close(mq.stopC)
mq.stopC = nil
mq.messagesC = nil
}

// Wait until the message delivery goroutine has terminated.
func (mq *messageDeliveryQueue) Wait() {
mq.active.Wait()
}

func (mq *messageDeliveryQueue) Add(msg *ReceivedMessage) {
if mq.status == serviceActive {
if mq.messagesC != nil {
mq.messagesC <- msg
}
}

func (mq *messageDeliveryQueue) deliverMessages() {
func (mq *messageDeliveryQueue) deliverMessages(messagesC chan *ReceivedMessage, stopC chan struct{}) {
// Notify the wait group that the goroutine has terminated upon exit.
defer mq.active.Done()

for {
// stopC has higher priority.
select {
case <-mq.stopC:
case <-stopC:
return // Ends the goroutine.
default:
}

select {
case <-mq.stopC:
case <-stopC:
return // Ends the goroutine.
case msg := <-mq.messagesC:
case msg := <-messagesC:
// Register outstanding acks, which are primarily handled by the
// `committer`.
mq.acks.Push(msg.Ack.(*ackConsumer))
Expand Down Expand Up @@ -175,7 +193,7 @@ func (s *subscribeStream) Start() {
s.pollFlowControl.Start()
s.messageQueue.Start()

s.flowControl.OnClientFlow(flowControlTokens{
s.flowControl.Reset(flowControlTokens{
Bytes: int64(s.settings.MaxOutstandingBytes),
Messages: int64(s.settings.MaxOutstandingMessages),
})
Expand Down
48 changes: 47 additions & 1 deletion pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -154,7 +154,7 @@ func (tr *testBlockingMessageReceiver) Return() {
tr.blockReceive <- void
}

func TestMessageDeliveryQueue(t *testing.T) {
func TestMessageDeliveryQueueStartStop(t *testing.T) {
acks := newAckTracker()
receiver := newTestMessageReceiver(t)
messageQueue := newMessageDeliveryQueue(acks, receiver.onMessage, 10)
Expand Down Expand Up @@ -189,9 +189,55 @@ func TestMessageDeliveryQueue(t *testing.T) {
messageQueue.Stop()
messageQueue.Stop() // Check duplicate stop
messageQueue.Add(&ReceivedMessage{Msg: msg4, Ack: ack4})
messageQueue.Wait()

receiver.VerifyNoMsgs()
})

t.Run("Restart", func(t *testing.T) {
msg5 := seqMsgWithOffset(5)
ack5 := newAckConsumer(5, 0, nil)

messageQueue.Start()
messageQueue.Add(&ReceivedMessage{Msg: msg5, Ack: ack5})

receiver.ValidateMsg(msg5)
})

t.Run("Stop", func(t *testing.T) {
messageQueue.Stop()
messageQueue.Wait()

receiver.VerifyNoMsgs()
})
}

func TestMessageDeliveryQueueDiscardMessages(t *testing.T) {
acks := newAckTracker()
blockingReceiver := newTestBlockingMessageReceiver(t)
messageQueue := newMessageDeliveryQueue(acks, blockingReceiver.onMessage, 10)

msg1 := seqMsgWithOffset(1)
ack1 := newAckConsumer(1, 0, nil)
msg2 := seqMsgWithOffset(2)
ack2 := newAckConsumer(2, 0, nil)

messageQueue.Start()
messageQueue.Add(&ReceivedMessage{Msg: msg1, Ack: ack1})
messageQueue.Add(&ReceivedMessage{Msg: msg2, Ack: ack2})

// The blocking receiver suspends after receiving msg1.
blockingReceiver.ValidateMsg(msg1)
// Stopping the message queue should discard undelivered msg2.
messageQueue.Stop()

// Unsuspend the blocking receiver and verify msg2 is not received.
blockingReceiver.Return()
messageQueue.Wait()
blockingReceiver.VerifyNoMsgs()
if got, want := acks.outstandingAcks.Len(), 1; got != want {
t.Errorf("ackTracker.outstandingAcks.Len() got %v, want %v", got, want)
}
}

// testSubscribeStream wraps a subscribeStream for ease of testing.
Expand Down
17 changes: 17 additions & 0 deletions spanner/CHANGES.md
@@ -1,5 +1,22 @@
# Changes

## [1.19.0](https://www.github.com/googleapis/google-cloud-go/compare/spanner/v1.18.0...spanner/v1.19.0) (2021-06-03)


### Features

* **spanner/spannertest:** support multiple aggregations ([#3965](https://www.github.com/googleapis/google-cloud-go/issues/3965)) ([1265dc3](https://www.github.com/googleapis/google-cloud-go/commit/1265dc3289693f79fcb9c5785a424eb510a50007))
* **spanner/spansql:** case insensitive parsing of keywords and functions ([#4034](https://www.github.com/googleapis/google-cloud-go/issues/4034)) ([ddb09d2](https://www.github.com/googleapis/google-cloud-go/commit/ddb09d22a737deea0d0a9ab58cd5d337164bbbfe))
* **spanner:** add a database name getter to client ([#4190](https://www.github.com/googleapis/google-cloud-go/issues/4190)) ([7fce29a](https://www.github.com/googleapis/google-cloud-go/commit/7fce29af404f0623b483ca6d6f2af4c726105fa6))
* **spanner:** add custom instance config to tests ([#4194](https://www.github.com/googleapis/google-cloud-go/issues/4194)) ([e935345](https://www.github.com/googleapis/google-cloud-go/commit/e9353451237e658bde2e41b30e8270fbc5987b39))


### Bug Fixes

* **spanner:** add missing NUMERIC type to the doc for Row ([#4116](https://www.github.com/googleapis/google-cloud-go/issues/4116)) ([9a3b416](https://www.github.com/googleapis/google-cloud-go/commit/9a3b416221f3c8b3793837e2a459b1d7cd9c479f))
* **spanner:** indent code example for Encoder and Decoder ([#4128](https://www.github.com/googleapis/google-cloud-go/issues/4128)) ([7c1f48f](https://www.github.com/googleapis/google-cloud-go/commit/7c1f48f307284c26c10cd5787dbc94136a2a36a6))
* **spanner:** mark SessionPoolConfig.MaxBurst deprecated ([#4115](https://www.github.com/googleapis/google-cloud-go/issues/4115)) ([d60a686](https://www.github.com/googleapis/google-cloud-go/commit/d60a68649f85f1edfbd8f11673bb280813c2b771))

## [1.18.0](https://www.github.com/googleapis/google-cloud-go/compare/spanner/v1.17.0...spanner/v1.18.0) (2021-04-29)


Expand Down
6 changes: 6 additions & 0 deletions spanner/client.go
Expand Up @@ -82,6 +82,12 @@ type Client struct {
qo QueryOptions
}

// DatabaseName returns the full name of a database, e.g.,
// "projects/spanner-cloud-test/instances/foo/databases/foodb".
func (c *Client) DatabaseName() string {
return c.sc.database
}

// ClientConfig has configurations for the client.
type ClientConfig struct {
// NumChannels is the number of gRPC channels.
Expand Down
13 changes: 13 additions & 0 deletions spanner/client_test.go
Expand Up @@ -416,6 +416,19 @@ func TestClient_Single_QueryOptions(t *testing.T) {
}
}

func TestClient_ReturnDatabaseName(t *testing.T) {
t.Parallel()

_, client, teardown := setupMockedTestServer(t)
defer teardown()

got := client.DatabaseName()
want := "projects/[PROJECT]/instances/[INSTANCE]/databases/[DATABASE]"
if got != want {
t.Fatalf("Incorrect database name returned, got: %s, want: %s", got, want)
}
}

func testQueryOptions(t *testing.T, iter *RowIterator, server InMemSpannerServer, qo QueryOptions) {
defer iter.Stop()

Expand Down

0 comments on commit d2d1090

Please sign in to comment.