Skip to content

Commit

Permalink
Merge branch 'master' into admin_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Nov 24, 2020
2 parents 021a088 + 200e261 commit ab7b796
Show file tree
Hide file tree
Showing 6 changed files with 517 additions and 56 deletions.
90 changes: 90 additions & 0 deletions pubsublite/internal/test/msg_tracker.go
@@ -0,0 +1,90 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package test

import (
"fmt"
"sync"
"time"
)

// MsgTracker is a helper for checking whether a set of messages make a full
// round trip from publisher to subscriber.
//
// Add() registers published messages. Remove() should be called when messages
// are received by subscribers. Call Wait() to block until all tracked messages
// are received. The same MsgTracker instance can be reused to repeat this
// sequence for multiple test cycles.
//
// Add() and Remove() calls should not be interleaved.
type MsgTracker struct {
msgMap map[string]bool
done chan struct{}
mu sync.Mutex
}

// NewMsgTracker creates a new message tracker.
func NewMsgTracker() *MsgTracker {
return &MsgTracker{
msgMap: make(map[string]bool),
done: make(chan struct{}, 1),
}
}

// Add a set of tracked messages.
func (mt *MsgTracker) Add(msgs ...string) {
mt.mu.Lock()
defer mt.mu.Unlock()

for _, msg := range msgs {
mt.msgMap[msg] = true
}
}

// Remove and return true if `msg` is tracked. Signals the `done` channel once
// all messages have been received.
func (mt *MsgTracker) Remove(msg string) bool {
mt.mu.Lock()
defer mt.mu.Unlock()

if _, exists := mt.msgMap[msg]; exists {
delete(mt.msgMap, msg)
if len(mt.msgMap) == 0 {
var void struct{}
mt.done <- void
}
return true
}
return false
}

// Wait up to `timeout` to receive all tracked messages.
func (mt *MsgTracker) Wait(timeout time.Duration) error {
mt.mu.Lock()
totalCount := len(mt.msgMap)
mt.mu.Unlock()

select {
case <-time.After(timeout):
mt.mu.Lock()
receivedCount := totalCount - len(mt.msgMap)
err := fmt.Errorf("received %d of %d messages", receivedCount, totalCount)
mt.msgMap = make(map[string]bool)
mt.mu.Unlock()
return err

case <-mt.done:
return nil
}
}
56 changes: 56 additions & 0 deletions pubsublite/internal/test/msg_tracker_test.go
@@ -0,0 +1,56 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package test

import (
"testing"
"time"
)

func TestMsgTrackerWaitSuccess(t *testing.T) {
msgs := []string{"a", "b", "c"}

msgTracker := NewMsgTracker()
msgTracker.Add(msgs...)

for _, msg := range msgs {
if got, want := msgTracker.Remove(msg), true; got != want {
t.Errorf("MsgTracker.Remove(%q) got %v, want %v", msg, got, want)
}
}
for _, msg := range []string{"d", "e"} {
if got, want := msgTracker.Remove(msg), false; got != want {
t.Errorf("MsgTracker.Remove(%q) got %v, want %v", msg, got, want)
}
}
if gotErr := msgTracker.Wait(time.Millisecond); gotErr != nil {
t.Errorf("MsgTracker.Wait() got err: %v", gotErr)
}
}

func TestMsgTrackerWaitTimeout(t *testing.T) {
msgs := []string{"a", "b", "c"}

msgTracker := NewMsgTracker()
msgTracker.Add(msgs...)

for _, msg := range []string{"a", "c"} {
if got, want := msgTracker.Remove(msg), true; got != want {
t.Errorf("MsgTracker.Remove(%q) got %v, want %v", msg, got, want)
}
}
if gotErr, wantMsg := msgTracker.Wait(time.Millisecond), "received 2 of 3 messages"; ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("MsgTracker.Wait() got err: %v, want msg: %q", gotErr, wantMsg)
}
}
181 changes: 181 additions & 0 deletions pubsublite/internal/test/msg_validators.go
@@ -0,0 +1,181 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package test

import (
"fmt"
"strconv"
"strings"
"sync"
)

// OrderingSender generates strings containing a message index to use for
// verifying message ordering. It is used on conjunction with Publishers.
type OrderingSender struct {
TotalMsgCount int64
}

// NewOrderingSender creats a new OrderingSender.
func NewOrderingSender() *OrderingSender {
return new(OrderingSender)
}

// Next generates the next string to publish.
func (os *OrderingSender) Next(prefix string) string {
os.TotalMsgCount++
return fmt.Sprintf("%s/%d", prefix, os.TotalMsgCount)
}

// OrderingReceiver consumes a message string generated by OrderingSender and
// verifies that messages in a partition are ordered. It is used in conjunction
// with Subscribers.
type OrderingReceiver struct {
mu sync.Mutex
// Map of key and last received message index. Messages are only guaranteed to
// be received in order within a partition.
received map[string]int64
}

// NewOrderingReceiver creates a new OrderingReceiver.
func NewOrderingReceiver() *OrderingReceiver {
return &OrderingReceiver{
received: make(map[string]int64),
}
}

func parseMsgIndex(msg string) int64 {
pos := strings.LastIndex(msg, "/")
if pos >= 0 {
if n, err := strconv.ParseInt(msg[pos+1:], 10, 64); err == nil {
return n
}
}
return -1
}

// Receive checks the given message data and key and returns an error if
// unordered messages are detected.
//
// Note: a normal scenario resulting in unordered messages is when the Publish
// stream breaks while there are in-flight batches, which are resent upon
// stream reconnect.
func (or *OrderingReceiver) Receive(data, key string) error {
or.mu.Lock()
defer or.mu.Unlock()

idx := parseMsgIndex(data)
if idx < 0 {
return fmt.Errorf("failed to parse index from message: %q", data)
}

// Verify non-decreasing ordering. Allow duplicates, which can be verified
// with DuplicateMsgDetector.
lastIdx, exists := or.received[key]
if exists && idx < lastIdx {
return fmt.Errorf("message ordering failed for key %s, expected message idx >= %d, got %d", key, lastIdx, idx)
}
or.received[key] = idx
return nil
}

var void struct{}

type msgMetadata struct {
offsets map[int64]struct{}
}

func newMsgMetadata() *msgMetadata {
return &msgMetadata{
offsets: make(map[int64]struct{}),
}
}

func (mm *msgMetadata) ContainsOffset(offset int64) bool {
_, exists := mm.offsets[offset]
return exists
}

func (mm *msgMetadata) AddOffset(offset int64) {
mm.offsets[offset] = void
}

// DuplicateMsgDetector can be used to detect duplicate messages, either due to
// duplicate publishes or receives.
type DuplicateMsgDetector struct {
mu sync.Mutex
// Map of Pub/Sub message data and associated metadata.
msgs map[string]*msgMetadata
duplicatePublishCount int64
duplicateReceiveCount int64
}

// NewDuplicateMsgDetector creates a new DuplicateMsgDetector.
func NewDuplicateMsgDetector() *DuplicateMsgDetector {
return &DuplicateMsgDetector{
msgs: make(map[string]*msgMetadata),
}
}

// Receive checks the given message data and offset.
func (dm *DuplicateMsgDetector) Receive(data string, offset int64) {
dm.mu.Lock()
defer dm.mu.Unlock()

if metadata, exists := dm.msgs[data]; exists {
if metadata.ContainsOffset(offset) {
// If the message contains the same offset, it means it was received
// multiple times. This is not expected within a single test run. But it
// is normal when processes are stopped & restarted without committing
// cursors.
dm.duplicateReceiveCount++
} else {
// If the message contains a different offset, it means a message was
// republished, which can occur when a publish stream reconnects with
// in-flight published messages.
dm.duplicatePublishCount++
metadata.AddOffset(offset)
}
} else {
metadata = newMsgMetadata()
metadata.AddOffset(offset)
dm.msgs[data] = metadata
}
}

// Status returns a non-empty status string if there were duplicates detected.
func (dm *DuplicateMsgDetector) Status() string {
dm.mu.Lock()
defer dm.mu.Unlock()

if (dm.duplicateReceiveCount + dm.duplicatePublishCount) == 0 {
return ""
}
return fmt.Sprintf("duplicate publish count = %d, receive count = %d", dm.duplicatePublishCount, dm.duplicateReceiveCount)
}

// HasPublishDuplicates returns true if duplicate published messages were
// detected.
func (dm *DuplicateMsgDetector) HasPublishDuplicates() bool {
dm.mu.Lock()
defer dm.mu.Unlock()
return dm.duplicatePublishCount > 0
}

// HasReceiveDuplicates returns true if duplicate received messages were
// detected.
func (dm *DuplicateMsgDetector) HasReceiveDuplicates() bool {
dm.mu.Lock()
defer dm.mu.Unlock()
return dm.duplicateReceiveCount > 0
}

0 comments on commit ab7b796

Please sign in to comment.