Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: Message ordering appears to cause race conditions and duplicate messages #1974

Open
turneand opened this issue Apr 3, 2024 · 2 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/java-pubsub API.

Comments

@turneand
Copy link

turneand commented Apr 3, 2024

Environment details

  1. Specify the API at the beginning of the title. For example, "BigQuery: ...").
    General, Core, and Other are also allowed as types
  2. OS type and version: Windows/unix
  3. Java version: 17
  4. version(s): 1.126.4 and 1.127.3 (confirmed issue on these two versions)

Steps to reproduce

This is similar steps as in #1889

  1. create a new pubsub topic and subscription with message ordering enabled (and exactly once delivery)
  2. create a subscriber with random sleeps, parallel pull count of 2, and max outstanding element count of 1000
  3. publish 500 messages, with random sleeps
  4. the messages should all be processed in order, which is mostly true
  5. However, we are frequently inundated with errors "failed to send operations" - "Some acknowledgement ids in the request were sent out of order"

A workaround we have is to set the "maxOutstandingElementCount" down to 1, which appears to effectively disable any batching.
Although our application can handle a certain degree of instability with pubsub, the extent we get these (hundreds per day) is excessive.

Code example

Note, this seems to be a race condition, I've managed to get the following to reproduce the issues consistently,
but not 100% guranteed. On some executions, only a couple of errors get logged, but on others I get hundreds.

In our live applications, we get this when sending acks or nacks, which causes significant delays as we have to
wait for the default timeouts to occur before it will attempt again.

var project =
var subscriptionName = ProjectSubscriptionName.of(project, "andrew-test-sub-order2");
var topicName = TopicName.of(project, "andrew_test");
var publisher = Publisher.newBuilder(topicName).setEnableMessageOrdering(true).build();

var subscriber = Subscriber.newBuilder(subscriptionName, (PubsubMessage message, AckReplyConsumer consumer) -> {
    try {
        long sleepFor = (int) (Math.random() * 200);
        System.err.println(Thread.currentThread().getName() + ";Received;" + message.getData().toStringUtf8() + ";" + sleepFor);
        Thread.sleep(sleepFor);
        consumer.ack();
    } catch (Exception e) {
        System.err.println("!!!!error - " + message.getData().toStringUtf8());
        e.printStackTrace();
    }
})
.setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).setMaxOutstandingRequestBytes(104857600L).build())
.setMaxAckExtensionPeriod(Duration.ofMinutes(60L))
.setMaxDurationPerAckExtension(Duration.ofMillis(0L))
.setParallelPullCount(2)
.build();

System.err.println("Starting subscriber");
subscriber.startAsync().awaitRunning();

try {
    var now = LocalDateTime.now();
    for (int i = 0; i <= 500; i++) {
        var data = "prefix," + now + "," + i;

        try {
            long sleepFor = (int) (Math.random() * 200);
            System.err.println(Thread.currentThread().getName() + ";Sending;" + data + ";" + sleepFor);
            Thread.sleep(sleepFor);
        } catch (Exception e) {
            e.printStackTrace();
        }

        var message = PubsubMessage.newBuilder()
                .setData(ByteString.copyFromUtf8(data))
                .setOrderingKey("defabc")
                .build();
        publisher.publish(message);
    }
} finally {
    publisher.shutdown();
    publisher.awaitTermination(1, TimeUnit.MINUTES);
}

try {
    System.err.println("Awaiting termination");
    subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    subscriber.stopAsync();
}

Stack trace

com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFalure
WARNING: failed to send operations
com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Some acknowledgement ids in the request were sent out of order.

External references such as API reference guides

  • ?

Any additional information below

This is similar to #1889

Following these steps guarantees the quickest resolution possible.

Thanks!

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/java-pubsub API. label Apr 3, 2024
@turneand
Copy link
Author

turneand commented Apr 3, 2024

Unfortunately my workaround is not correct, as have observed these errors, and therefore the associated duplicate messages even when the maxOutstandingElementCount is null, or 0.

@michaelpri10
Copy link
Contributor

I was able to reproduce the INVALID_ARGUMENT: Some acknowledgement ids in the request were sent out of order errors which seem to occur because of multiple outstanding acknowledgement requests. We are still investigating this issue, but as a current workaround, you can utilize the acknowledgement with response interface when receiving messages. Following the SubscribeWithExactlyOnceConsumerWithResponse sample will allow you to check the response of your acknowledgement call for each message. This will prevent multiple outstanding acknowledgement requests by only processing the next message once the previous message has been successfully acknowledged, although it may slow down message processing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/java-pubsub API.
Projects
None yet
Development

No branches or pull requests

2 participants