Skip to content

Commit

Permalink
refactor error tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Aug 23, 2023
1 parent ca01559 commit 567ed2d
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 62 deletions.
21 changes: 3 additions & 18 deletions go.work.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
cloud.google.com/go/gaming v1.9.0 h1:7vEhFnZmd931Mo7sZ6pJy7uQPDxF7m7v8xtBheG08tc=
cloud.google.com/go/gaming v1.10.1/go.mod h1:XQQvtfP8Rb9Rxnxm5wFVpAp9zCQkJi2bLIb7iHGwB3s=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
Expand All @@ -8,33 +7,19 @@ github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObk
github.com/elazarl/goproxy v0.0.0-20221015165544-a0805db90819/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/gliderlabs/ssh v0.3.5/go.mod h1:8XB4KraRrX39qHhT6yxPsHedjA08I/uBVwj4xC+/+z4=
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20230305113008-0c11038e723f/go.mod h1:8LHG1a3SRW71ettAD/jW13h8c6AqjVSeL11RAdgaqpo=
github.com/google/go-pkcs11 v0.2.0/go.mod h1:6eQoGcuNJpa7jnd5pMGdkSaQpNDYvPlXWMcjXXThLlY=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/google/s2a-go v0.1.3/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A=
github.com/googleapis/enterprise-certificate-proxy v0.2.4/go.mod h1:AwSRAtLfXpU5Nm3pW+v7rGDHp09LsPtGY9MduiEsR9k=
github.com/googleapis/gax-go/v2 v2.9.1/go.mod h1:4FG3gMrVZlyMp5itSYKMU9z/lBE7+SbnUOvzH2HqbEY=
github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
github.com/mmcloughlin/avo v0.5.0/go.mod h1:ChHFdoV7ql95Wi7vuq2YT1bwCJqiWdZrQ1im3VujLYM=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I=
go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU=
go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8=
golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ=
golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/term v0.9.0/go.mod h1:M6DEAAIenWoTxdKrOltXcmDY3rSplQUkrvaDU5FcQyo=
golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4=
google.golang.org/api v0.123.0/go.mod h1:gcitW0lvnyWjSp9nKxAbdHKIZ6vF4aajGueeslZOyms=
google.golang.org/api v0.128.0/go.mod h1:Y611qgqaE92On/7g65MQgxYul3c0rEB894kniWLY750=
google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk=
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20230629202037-9506855d4529/go.mod h1:ylj+BE99M198VPbBh6A8d9n3w8fChvyLK3wwBOjXBFA=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20230711160842-782d3b101e98/go.mod h1:3QoBVwTHkXbY1oRGzlhwhOykfcATQN43LJ6iT8Wy8kE=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20230720185612-659f7aaaa771/go.mod h1:3QoBVwTHkXbY1oRGzlhwhOykfcATQN43LJ6iT8Wy8kE=
google.golang.org/grpc v1.52.3/go.mod h1:pu6fVzoFb+NBYNAvQL08ic+lvB2IojljRYuun5vorUY=
google.golang.org/grpc v1.56.1/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s=
170 changes: 126 additions & 44 deletions pubsub/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestTrace_PublishSpan(t *testing.T) {
attribute.String(orderingAttribute, m.OrderingKey),
semconv.MessagingSystemKey.String("pubsub"),
},
ChildSpanCount: 1,
ChildSpanCount: 2,
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
Expand All @@ -98,7 +98,7 @@ func TestTrace_PublishSpan(t *testing.T) {
},
},
tracetest.SpanStub{
Name: publishSchedulerSpanName,
Name: publishBatcherSpanName,
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
Expand Down Expand Up @@ -130,13 +130,13 @@ func TestTrace_PublishSpan(t *testing.T) {
}
defer topic.Stop()

spans := getSpans(e)
got := getSpans(e)
opts := []cmp.Option{
cmp.Comparer(spanStubComparer),
cmpopts.SortSlices(sortSpanStub),
}
if diff := testutil.Diff(spans, expectedSpans, opts...); diff != "" {
log.Printf("print spans: %+v\n", spans)
if diff := testutil.Diff(got, expectedSpans, opts...); diff != "" {
log.Printf("print spans: %+v\n", got)
log.Printf("\n\nprint expected spans: %+v\n", expectedSpans)
t.Errorf("diff: -got, +want:\n%s\n", diff)
}
Expand All @@ -155,7 +155,7 @@ func TestTrace_PublishSpanError(t *testing.T) {

m := &Message{
Data: []byte("test"),
OrderingKey: "my-key",
OrderingKey: "m",
}

msgSize := proto.Size(&pb.PubsubMessage{
Expand All @@ -167,50 +167,91 @@ func TestTrace_PublishSpanError(t *testing.T) {
topicID := "t"
topicName := fmt.Sprintf("projects/P/topics/%s", topicID)

expectedSpans := tracetest.SpanStubs{
tracetest.SpanStub{
Name: fmt.Sprintf("%s %s", topicName, publisherSpanName),
SpanKind: trace.SpanKindProducer,
Attributes: []attribute.KeyValue{
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationKey.String(topicName),
semconv.MessagingMessageIDKey.String(""),
semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize),
attribute.String(orderingAttribute, m.OrderingKey),
semconv.MessagingSystemKey.String("pubsub"),
},
ChildSpanCount: 0,
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
Status: sdktrace.Status{
Code: codes.Error,
Description: errTopicOrderingNotEnabled.Error(),
},
},
}

topic, err := c.CreateTopic(ctx, topicID)
if err != nil {
t.Fatalf("failed to create topic: %v", err)
}

r := topic.Publish(ctx, m)
_, err = r.Get(ctx)
if err == nil {
t.Fatal("expected err, got nil")
}
defer topic.Stop()
// Publishing a message with an ordering key without enabling ordering topic ordering
// should fail.
t.Run("no ordering key", func(t *testing.T) {
r := topic.Publish(ctx, m)
_, err = r.Get(ctx)
if err == nil {
t.Fatal("expected err, got nil")
}

want := getPublishSpanStubsWithError(topicName, m, msgSize, errTopicOrderingNotEnabled)

got := getSpans(e)
opts := []cmp.Option{
cmp.Comparer(spanStubComparer),
cmpopts.SortSlices(sortSpanStub),
}
if diff := testutil.Diff(got, want, opts...); diff != "" {
t.Errorf("diff: -got, +want:\n%s\n", diff)
}
e.Reset()
topic.ResumePublish(m.OrderingKey)
})

t.Run("stopped topic", func(t *testing.T) {
// Publishing a message with a stopped publisher should fail too
topic.ResumePublish(m.OrderingKey)
topic.EnableMessageOrdering = true
topic.Stop()
r := topic.Publish(ctx, m)
_, err = r.Get(ctx)
if err == nil {
t.Fatal("expected err, got nil")
}

got := getSpans(e)
want := getPublishSpanStubsWithError(topicName, m, msgSize, ErrTopicStopped)
opts := []cmp.Option{
cmp.Comparer(spanStubComparer),
cmpopts.SortSlices(sortSpanStub),
}
if diff := testutil.Diff(got, want, opts...); diff != "" {
t.Errorf("diff: -got, +want:\n%s\n", diff)
}
e.Reset()
topic.ResumePublish(m.OrderingKey)
})

t.Run("flow control error", func(t *testing.T) {
// Use a different topic here than above since
// we need to adjust the flow control settings,
// which are immutable after publish.
topicID := "t2"

topic, err := c.CreateTopic(ctx, topicID)
if err != nil {
t.Fatalf("failed to create topic: %v", err)
}
topic.EnableMessageOrdering = true
topic.PublishSettings.FlowControlSettings = FlowControlSettings{
LimitExceededBehavior: FlowControlSignalError,
MaxOutstandingBytes: 1,
}

r := topic.Publish(ctx, m)
_, err = r.Get(ctx)
if err == nil {
t.Fatal("expected err, got nil")
}

got := getSpans(e)
want := getFlowControlSpanStubs(ErrFlowControllerMaxOutstandingBytes)
opts := []cmp.Option{
cmp.Comparer(spanStubComparer),
cmpopts.SortSlices(sortSpanStub),
}
if diff := testutil.Diff(got, want, opts...); diff != "" {
t.Errorf("diff: -got, +want:\n%s\n", diff)
}
})

spans := getSpans(e)
opts := []cmp.Option{
cmp.Comparer(spanStubComparer),
cmpopts.SortSlices(sortSpanStub),
}
if diff := testutil.Diff(spans, expectedSpans, opts...); diff != "" {
t.Errorf("diff: -got, +want:\n%s\n", diff)
}
}

func spanStubComparer(a, b tracetest.SpanStub) bool {
Expand Down Expand Up @@ -245,3 +286,44 @@ func getSpans(e *tracetest.InMemoryExporter) tracetest.SpanStubs {

return e.GetSpans()
}

func getPublishSpanStubsWithError(topicName string, m *Message, msgSize int, err error) tracetest.SpanStubs {
return tracetest.SpanStubs{
tracetest.SpanStub{
Name: fmt.Sprintf("%s %s", topicName, publisherSpanName),
SpanKind: trace.SpanKindProducer,
Attributes: []attribute.KeyValue{
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationKey.String(topicName),
semconv.MessagingMessageIDKey.String(""),
semconv.MessagingMessagePayloadSizeBytesKey.Int(msgSize),
attribute.String(orderingAttribute, m.OrderingKey),
semconv.MessagingSystemKey.String("pubsub"),
},
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
Status: sdktrace.Status{
Code: codes.Error,
Description: err.Error(),
},
},
}
}

func getFlowControlSpanStubs(err error) tracetest.SpanStubs {
return tracetest.SpanStubs{
tracetest.SpanStub{
Name: publishFlowControlSpanName,
InstrumentationLibrary: instrumentation.Scope{
Name: "cloud.google.com/go/pubsub",
Version: internal.Version,
},
Status: sdktrace.Status{
Code: codes.Error,
Description: err.Error(),
},
},
}
}

0 comments on commit 567ed2d

Please sign in to comment.