diff --git a/pubsub/iterator.go b/pubsub/iterator.go index dcca633aae4..8b54df8ee34 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -247,6 +247,8 @@ func (it *messageIterator) pullMessages(maxToPull int32) ([]*pb.ReceivedMessage, switch { case err == context.Canceled: return nil, nil + case status.Code(err) == codes.Canceled: + return nil, nil case err != nil: return nil, err default: diff --git a/pubsub/iterator_test.go b/pubsub/iterator_test.go index be4f8b4c439..d3c7cf8a5ae 100644 --- a/pubsub/iterator_test.go +++ b/pubsub/iterator_test.go @@ -400,3 +400,24 @@ func TestIterator_ModifyAckContextDeadline(t *testing.T) { t.Fatal(err) } } + +func TestIterator_SynchronousPullCancel(t *testing.T) { + srv := pstest.NewServer() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil) + + _, client, err := initConn(ctx, srv.Addr) + if err != nil { + t.Fatal(err) + } + iter := newMessageIterator(client.subc, fullyQualifiedTopicName, &pullOptions{}) + + // Cancelling the iterator and pulling should not result in any errors. + iter.cancel() + + if _, err := iter.pullMessages(100); err != nil { + t.Fatalf("Got error in pullMessages: %v", err) + } +}