Skip to content

Commit

Permalink
fix(pubsub): fix ordering key integration tests (#2811)
Browse files Browse the repository at this point in the history
* fix(pubsub): fix ordering key integration tests

* remove commented skip

* use regional enndpoint

* use smaller message size for testing failures

* increase test timeout to 60s
  • Loading branch information
hongalex committed Sep 2, 2020
1 parent 43a0107 commit aa4dea4
Showing 1 changed file with 9 additions and 14 deletions.
23 changes: 9 additions & 14 deletions pubsub/integration_test.go
Expand Up @@ -1104,7 +1104,7 @@ func TestIntegration_CreateTopic_MessageStoragePolicy(t *testing.T) {

func TestIntegration_OrderedKeys_Basic(t *testing.T) {
ctx := context.Background()
client := integrationTestClient(ctx, t)
client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
defer client.Close()

topic, err := client.CreateTopic(ctx, topicIDs.New())
Expand Down Expand Up @@ -1183,9 +1183,8 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) {
}

func TestIntegration_OrderedKeys_JSON(t *testing.T) {
t.Skip("Flaky, see https://github.com/googleapis/google-cloud-go/issues/1872")
ctx := context.Background()
client := integrationTestClient(ctx, t)
client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
defer client.Close()

topic, err := client.CreateTopic(ctx, topicIDs.New())
Expand Down Expand Up @@ -1278,8 +1277,8 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {

select {
case <-done:
case <-time.After(30 * time.Second):
t.Fatal("timed out after 30s waiting for all messages to be received")
case <-time.After(60 * time.Second):
t.Fatal("timed out after 60s waiting for all messages to be received")
}

mu.Lock()
Expand All @@ -1291,9 +1290,8 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
}

func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
t.Skip("kokoro failing in https://github.com/googleapis/google-cloud-go/issues/1850")
ctx := context.Background()
client := integrationTestClient(ctx, t)
client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
defer client.Close()

topic, err := client.CreateTopic(ctx, topicIDs.New())
Expand All @@ -1310,15 +1308,14 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
t.Fatalf("topic %v should exist, but it doesn't", topic)
}

topic.PublishSettings.DelayThreshold = time.Second
topic.PublishSettings.BufferedByteLimit = 100
topic.EnableMessageOrdering = true

orderingKey := "some-ordering-key2"
// Publish a message that is too large so we'll get an error that
// pauses publishing for this ordering key.
r := topic.Publish(ctx, &Message{
ID: "1",
Data: bytes.Repeat([]byte("A"), 1e10),
Data: bytes.Repeat([]byte("A"), 1000),
OrderingKey: orderingKey,
})
<-r.ready
Expand All @@ -1328,8 +1325,7 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
// Publish a normal sized message now, which should fail
// since publishing on this ordering key is paused.
r = topic.Publish(ctx, &Message{
ID: "2",
Data: []byte("failed message"),
Data: []byte("should fail"),
OrderingKey: orderingKey,
})
<-r.ready
Expand All @@ -1340,8 +1336,7 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
// Lastly, call ResumePublish and make sure subsequent publishes succeed.
topic.ResumePublish(orderingKey)
r = topic.Publish(ctx, &Message{
ID: "4",
Data: []byte("normal message"),
Data: []byte("should succeed"),
OrderingKey: orderingKey,
})
<-r.ready
Expand Down

0 comments on commit aa4dea4

Please sign in to comment.