Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): remove publish error translation #3843

Merged
merged 3 commits into from Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions pubsublite/internal/wire/publish_batcher_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"github.com/google/go-cmp/cmp"
"golang.org/x/xerrors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -187,8 +188,8 @@ func TestPublishBatcherAddMessage(t *testing.T) {

t.Run("oversized message", func(t *testing.T) {
msg := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'0'}, MaxPublishRequestBytes)}
if gotErr, wantMsg := batcher.AddMessage(msg, nil), "MaxPublishRequestBytes"; !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("AddMessage(%v) got err: %v, want err msg: %q", msg, gotErr, wantMsg)
if gotErr := batcher.AddMessage(msg, nil); !xerrors.Is(gotErr, ErrOversizedMessage) {
t.Errorf("AddMessage(%v) got err: %v, want err: %q", msg, gotErr, ErrOversizedMessage)
}
})

Expand Down
24 changes: 5 additions & 19 deletions pubsublite/pscompat/publisher.go
Expand Up @@ -19,41 +19,28 @@ import (

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/wire"
"golang.org/x/xerrors"
"google.golang.org/api/option"
"google.golang.org/api/support/bundler"

ipubsub "cloud.google.com/go/internal/pubsub"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

var (
// ErrOverflow is set for a PublishResult when publish buffers overflow.
ErrOverflow = bundler.ErrOverflow
// Use errors.Is for comparing errors.
ErrOverflow = wire.ErrOverflow

// ErrOversizedMessage is set for a PublishResult when a published message
// exceeds MaxPublishRequestBytes.
ErrOversizedMessage = bundler.ErrOversizedItem
// exceeds MaxPublishRequestBytes. Use errors.Is for comparing errors.
ErrOversizedMessage = wire.ErrOversizedMessage

// ErrPublisherStopped is set for a PublishResult when a message cannot be
// published because the publisher client has stopped or is in the process of
// stopping. PublisherClient.Error() returns the error that caused the
// publisher client to terminate (if any).
// publisher client to terminate (if any). Use errors.Is for comparing errors.
ErrPublisherStopped = wire.ErrServiceStopped
)

// translateError transforms a subset of errors to what would be returned by the
// pubsub package.
func translateError(err error) error {
if xerrors.Is(err, wire.ErrOversizedMessage) {
return ErrOversizedMessage
}
if xerrors.Is(err, wire.ErrOverflow) {
return ErrOverflow
}
return err
}

// PublisherClient is a Pub/Sub Lite client to publish messages to a given
// topic. A PublisherClient is safe to use from multiple goroutines.
//
Expand Down Expand Up @@ -130,7 +117,6 @@ func (p *PublisherClient) Publish(ctx context.Context, msg *pubsub.Message) *pub
}

p.wirePub.Publish(msgpb, func(metadata *wire.MessageMetadata, err error) {
err = translateError(err)
if metadata != nil {
ipubsub.SetPublishResult(result, metadata.String(), err)
} else {
Expand Down
61 changes: 0 additions & 61 deletions pubsublite/pscompat/publisher_test.go
Expand Up @@ -21,8 +21,6 @@ import (
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/internal/test"
"cloud.google.com/go/pubsublite/internal/wire"
"golang.org/x/xerrors"
"google.golang.org/api/support/bundler"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
Expand Down Expand Up @@ -176,62 +174,3 @@ func TestPublisherClientTransformMessageError(t *testing.T) {
t.Errorf("Publisher.Stopped: got %v, want %v", got, want)
}
}

func TestPublisherClientTranslatePublishResultErrors(t *testing.T) {
ctx := context.Background()
input := &pubsub.Message{
Data: []byte("data"),
OrderingKey: "ordering_key",
}
wantMsg := &pb.PubSubMessage{
Data: []byte("data"),
Key: []byte("ordering_key"),
}

for _, tc := range []struct {
desc string
wireErr error
wantErr error
}{
{
desc: "oversized message",
wireErr: wire.ErrOversizedMessage,
wantErr: bundler.ErrOversizedItem,
},
{
desc: "oversized message wrapped",
wireErr: xerrors.Errorf("placeholder error message: %w", wire.ErrOversizedMessage),
wantErr: bundler.ErrOversizedItem,
},
{
desc: "buffer overflow",
wireErr: wire.ErrOverflow,
wantErr: bundler.ErrOverflow,
},
{
desc: "service stopped",
wireErr: wire.ErrServiceStopped,
wantErr: wire.ErrServiceStopped,
},
} {
t.Run(tc.desc, func(t *testing.T) {
verifier := test.NewRPCVerifier(t)
verifier.Push(wantMsg, nil, tc.wireErr)
defer verifier.Flush()

pubClient := newTestPublisherClient(verifier, DefaultPublishSettings)
result := pubClient.Publish(ctx, input)

_, gotErr := result.Get(ctx)
if !test.ErrorEqual(gotErr, tc.wantErr) {
t.Errorf("Publish() got err: (%v), want err: (%v)", gotErr, tc.wantErr)
}
if !test.ErrorEqual(pubClient.Error(), tc.wireErr) {
t.Errorf("PublisherClient.Error() got: (%v), want: (%v)", pubClient.Error(), tc.wireErr)
}
if got, want := pubClient.wirePub.(*mockWirePublisher).Stopped, false; got != want {
t.Errorf("Publisher.Stopped: got %v, want %v", got, want)
}
})
}
}