diff --git a/pubsublite/internal/wire/publish_batcher_test.go b/pubsublite/internal/wire/publish_batcher_test.go index 014bd0d6c05..9ace683cd25 100644 --- a/pubsublite/internal/wire/publish_batcher_test.go +++ b/pubsublite/internal/wire/publish_batcher_test.go @@ -21,6 +21,7 @@ import ( "cloud.google.com/go/internal/testutil" "cloud.google.com/go/pubsublite/internal/test" "github.com/google/go-cmp/cmp" + "golang.org/x/xerrors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -187,8 +188,8 @@ func TestPublishBatcherAddMessage(t *testing.T) { t.Run("oversized message", func(t *testing.T) { msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishRequestBytes)} - if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishRequestBytes"; !test.ErrorHasMsg(gotErr, wantMsg) { - t.Errorf("AddMessage(%v) got err: %v, want err msg: %q", msg, gotErr, wantMsg) + if gotErr := batcher.AddMessage(msg, nil); !xerrors.Is(gotErr, ErrOversizedMessage) { + t.Errorf("AddMessage(%v) got err: %v, want err: %q", msg, gotErr, ErrOversizedMessage) } }) diff --git a/pubsublite/pscompat/publisher.go b/pubsublite/pscompat/publisher.go index 06ba362e9a1..8506912d3a1 100644 --- a/pubsublite/pscompat/publisher.go +++ b/pubsublite/pscompat/publisher.go @@ -19,9 +19,7 @@ import ( "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsublite/internal/wire" - "golang.org/x/xerrors" "google.golang.org/api/option" - "google.golang.org/api/support/bundler" ipubsub "cloud.google.com/go/internal/pubsub" pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" @@ -29,31 +27,20 @@ import ( var ( // ErrOverflow is set for a PublishResult when publish buffers overflow. - ErrOverflow = bundler.ErrOverflow + // Use errors.Is for comparing errors. + ErrOverflow = wire.ErrOverflow // ErrOversizedMessage is set for a PublishResult when a published message - // exceeds MaxPublishRequestBytes. - ErrOversizedMessage = bundler.ErrOversizedItem + // exceeds MaxPublishRequestBytes. Use errors.Is for comparing errors. + ErrOversizedMessage = wire.ErrOversizedMessage // ErrPublisherStopped is set for a PublishResult when a message cannot be // published because the publisher client has stopped or is in the process of // stopping. PublisherClient.Error() returns the error that caused the - // publisher client to terminate (if any). + // publisher client to terminate (if any). Use errors.Is for comparing errors. ErrPublisherStopped = wire.ErrServiceStopped ) -// translateError transforms a subset of errors to what would be returned by the -// pubsub package. -func translateError(err error) error { - if xerrors.Is(err, wire.ErrOversizedMessage) { - return ErrOversizedMessage - } - if xerrors.Is(err, wire.ErrOverflow) { - return ErrOverflow - } - return err -} - // PublisherClient is a Pub/Sub Lite client to publish messages to a given // topic. A PublisherClient is safe to use from multiple goroutines. // @@ -130,7 +117,6 @@ func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pub } p.wirePub.Publish(msgpb, func(metadata *wire.MessageMetadata, err error) { - err = translateError(err) if metadata != nil { ipubsub.SetPublishResult(result, metadata.String(), err) } else { diff --git a/pubsublite/pscompat/publisher_test.go b/pubsublite/pscompat/publisher_test.go index d794b469b14..4dfe7dfc575 100644 --- a/pubsublite/pscompat/publisher_test.go +++ b/pubsublite/pscompat/publisher_test.go @@ -21,8 +21,6 @@ import ( "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsublite/internal/test" "cloud.google.com/go/pubsublite/internal/wire" - "golang.org/x/xerrors" - "google.golang.org/api/support/bundler" pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" ) @@ -176,62 +174,3 @@ func TestPublisherClientTransformMessageError(t *testing.T) { t.Errorf("Publisher.Stopped: got %v, want %v", got, want) } } - -func TestPublisherClientTranslatePublishResultErrors(t *testing.T) { - ctx := context.Background() - input := &pubsub.Message{ - Data: []byte("data"), - OrderingKey: "ordering_key", - } - wantMsg := &pb.PubSubMessage{ - Data: []byte("data"), - Key: []byte("ordering_key"), - } - - for _, tc := range []struct { - desc string - wireErr error - wantErr error - }{ - { - desc: "oversized message", - wireErr: wire.ErrOversizedMessage, - wantErr: bundler.ErrOversizedItem, - }, - { - desc: "oversized message wrapped", - wireErr: xerrors.Errorf("placeholder error message: %w", wire.ErrOversizedMessage), - wantErr: bundler.ErrOversizedItem, - }, - { - desc: "buffer overflow", - wireErr: wire.ErrOverflow, - wantErr: bundler.ErrOverflow, - }, - { - desc: "service stopped", - wireErr: wire.ErrServiceStopped, - wantErr: wire.ErrServiceStopped, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - verifier := test.NewRPCVerifier(t) - verifier.Push(wantMsg, nil, tc.wireErr) - defer verifier.Flush() - - pubClient := newTestPublisherClient(verifier, DefaultPublishSettings) - result := pubClient.Publish(ctx, input) - - _, gotErr := result.Get(ctx) - if !test.ErrorEqual(gotErr, tc.wantErr) { - t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, tc.wantErr) - } - if !test.ErrorEqual(pubClient.Error(), tc.wireErr) { - t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), tc.wireErr) - } - if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, false; got != want { - t.Errorf("Publisher.Stopped: got %v, want %v", got, want) - } - }) - } -}