diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 39cea2beb9f..221f1e62b6c 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1104,7 +1104,7 @@ func TestIntegration_CreateTopic_MessageStoragePolicy(t *testing.T) { func TestIntegration_OrderedKeys_Basic(t *testing.T) { ctx := context.Background() - client := integrationTestClient(ctx, t) + client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) defer client.Close() topic, err := client.CreateTopic(ctx, topicIDs.New()) @@ -1183,9 +1183,8 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) { } func TestIntegration_OrderedKeys_JSON(t *testing.T) { - t.Skip("Flaky, see https://github.com/googleapis/google-cloud-go/issues/1872") ctx := context.Background() - client := integrationTestClient(ctx, t) + client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) defer client.Close() topic, err := client.CreateTopic(ctx, topicIDs.New()) @@ -1278,8 +1277,8 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { select { case <-done: - case <-time.After(30 * time.Second): - t.Fatal("timed out after 30s waiting for all messages to be received") + case <-time.After(60 * time.Second): + t.Fatal("timed out after 60s waiting for all messages to be received") } mu.Lock() @@ -1291,9 +1290,8 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { } func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { - t.Skip("kokoro failing in https://github.com/googleapis/google-cloud-go/issues/1850") ctx := context.Background() - client := integrationTestClient(ctx, t) + client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) defer client.Close() topic, err := client.CreateTopic(ctx, topicIDs.New()) @@ -1310,15 +1308,14 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { t.Fatalf("topic %v should exist, but it doesn't", topic) } - topic.PublishSettings.DelayThreshold = time.Second + topic.PublishSettings.BufferedByteLimit = 100 topic.EnableMessageOrdering = true orderingKey := "some-ordering-key2" // Publish a message that is too large so we'll get an error that // pauses publishing for this ordering key. r := topic.Publish(ctx, &Message{ - ID: "1", - Data: bytes.Repeat([]byte("A"), 1e10), + Data: bytes.Repeat([]byte("A"), 1000), OrderingKey: orderingKey, }) <-r.ready @@ -1328,8 +1325,7 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { // Publish a normal sized message now, which should fail // since publishing on this ordering key is paused. r = topic.Publish(ctx, &Message{ - ID: "2", - Data: []byte("failed message"), + Data: []byte("should fail"), OrderingKey: orderingKey, }) <-r.ready @@ -1340,8 +1336,7 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { // Lastly, call ResumePublish and make sure subsequent publishes succeed. topic.ResumePublish(orderingKey) r = topic.Publish(ctx, &Message{ - ID: "4", - Data: []byte("normal message"), + Data: []byte("should succeed"), OrderingKey: orderingKey, }) <-r.ready