Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed Rate/Scheduled polling of messages #372

Open
jeroenvandevelde opened this issue Sep 25, 2021 · 7 comments
Open

Fixed Rate/Scheduled polling of messages #372

jeroenvandevelde opened this issue Sep 25, 2021 · 7 comments

Comments

@jeroenvandevelde
Copy link

Hi, first thanks for the great library.

We are currently using your library but now want to use it to retrieve messages at a fixed rate/schedule.
As in poll for 5 messages each 10 seconds, even if you are earlier done with processing of those 5 messages.

We can't use the BatchingPeriod in the BatchingMessageRetreiverProperties as that is the maximum period it waits.

I didn't found a way to do this, so i took a look at the code.

I found out it could be possible with a new implementation of the MessageRetriever that uses a Scheduled Executor.
So that it can run at a fixed amount of time.

Would you welcome such an addition to your library or did i miss some way it could work with the current code?
Or would you prefer a different approach?
I am willing to make a pull request.

@JaidenAshmore
Copy link
Owner

Hmmm so let me just confirm the requirements:

  1. If all threads have processed previous messages, don't request more until you hit your time period, e.g. 10 seconds
  2. If you haven't finished processing the messages, e.g. 3 of 5 are done, would you want to request 2 more messages when it hits 10 seconds? or would you want to request another 5 and start pooling messages? or would you only want to request more messages when all messages in the first batch were done?

If you are fine with requesting 2 messages at the interval, one way we could do it is to make the concurrency rate smaller than retriever batching rate. E.g. 5 concurrency with 6 as the batching period. In this case the batching message retriever would never hit 6 requests for messages so it would just keep requesting the messages it needs at that 10 second period, e.g. 0-5 messages.

If you are using the Spring annotation you might be able to achieve this by doing:

@QueueListener(concurrency = 5, batchSize = 6...)

the negative is that it would also buffer the resolving of the messages and only ever do it in the batch period. If that is a problem you could implement your own MessageListenerContainer, like the BatchingMessageListenerContainer, with your specific config. See Spring - how to add your own queue listener for more steps on how to do that.

Let me know if that isn't what you are thinking!

@jeroenvandevelde
Copy link
Author

jeroenvandevelde commented Sep 26, 2021

Regarding the requirements

  1. If all threads have processed previous messages, don't request more until you hit your time period, e.g. 10 seconds
    Yes, indeed that is the main requirement.
  2. If you haven't finished processing the messages, e.g. 3 of 5 are done, would you want to request 2 more messages when it hits 10 seconds? or would you want to request another 5 and start pooling messages? or would you only want to request more messages when all messages in the first batch were done?
    This is a less important requirement for us.
    As the main reason we want to do this is to have some priority in the queues.
    Like queue A (high priority) gets polled each 2 seconds and queue B (low priority) each 5 seconds.
    So what you are suggesting requesting 2 messages at the end of the interval would be good.

Aha, indeed this seems like a reasonable approach.
So for example we configure:

@QueueListener(concurrency = 5, batchSize = 6, batchingPeriodInMs = 10000)
This would make sure that we always wait 10 seconds before start receiving messages again.
Didn't think of it like that, thank you.

I am not entirely sure i can follow the remark around the resolving of messages.
Are you telling that we will only resolve messages at the end of each batch period?
Like 5 seconds go by -> resolve messages, 5 seconds go by -> resolve messages.

The only negative i can think is that this delays the fact that we tell SQS that we processed the message correctly.
So theoretically we might reach the timeout while we have processed the message correctly.
Am i correct, or am i missing something?

@JaidenAshmore
Copy link
Owner

yeah so we are using that batch period to determine when to obtain more messages as well as how long we should batch the resolving of messages. E.g. you have a batching period of 5 seconds, it will wait that period for all the threads to finish before it would send the delete message request. If we have it as a larger value than concurrency I believe it will always wait that time.

The best way would be to not share those properties, e.g. update @QueueListener to have new fields and we would set it here:

to use those new fields.

@jeroen-vandevelde-kbc-be
Copy link

jeroen-vandevelde-kbc-be commented Sep 30, 2021

Yes, thanks for the feedback - this is definitely an option.

But we have been looking a bit through the code.
Found a way in which we think we can add the polling functionality by creating a new MessageListenerContainer.

For this, some changes would be necessary in the library, which I am willing to make myself.
I first would like your feedback on these changes, if you find them an addition?

We would like to start from the BatchingMessage functionality and implement our own (Scheduled/Polling/Fixed)MessageRetriever.
Which we can also add to the library if you want.

We would like to have the functionality of constant looping and actual retrieving of the BatchingMesageRetriever separately.
Which would make the (while) loop code and the code for the actual retriever in different classes.
This way, we can call the actual retriever from the new polling mechanism by delegation/composition without code duplication.

We would like to create a ScheduledExecutor which we can start at a scheduledFixedRate. That runs the actual retrieval code of the BatchingMessageRetriever each x ms.

To be able to also close the ScheduledExecutor when we close the application, we suggest to incorporate a stop hook in the messageRetriever where we can shutdown the executor.
So the stop hook(method) can be called when the CoreMessageListenerContainer stop method is called.

So to summarize:

  1. Do you agree with having the looping code and the actual retrieval code in different classes in the BatchingMessageRetriever?
  2. Do you agree with having a stop hook/method in the MessageRetriever* which is called in the CoreMessageListenerContainer? So we can clean up our resources.

'* For consistency reasons, we can also add the stop hook in the resolver, ...

@JaidenAshmore
Copy link
Owner

Do you agree with having the looping code and the actual retrieval code in different classes in the BatchingMessageRetriever?

The composition of whether we delegate to a separate class or not isn't a big concern for me, as long as the MessageRetriever's contract is maintained :)

Do you agree with having a stop hook/method in the MessageRetriever* which is called in the CoreMessageListenerContainer? So we can clean up our resources.

I believe in an older version I actually had a stop method previously but it ended up just being easier to just use interruptions as the method for stopping the retriever and other background running tasks. This allows the retriever to cleanup and return any messages that were batched. E.g. what you could do without needing to update the API is:

 @Override
    public List<Message> run() {
        myBackgroundScheduler.start(); // whatever this other class is

        // whatever your blocking logic would be here
        while (!Thread.currentThread.isInterrupted()) {   
            // do your logic here for collecting messages so that they can be retrieved in any `retrieveMessage` call

           // do some sort of thread sleep, blocking etc and make sure we listening to any interrupted exceptions etc so that we always call the stop below
         }

         // your clean up code to stop the background task
         myBackgroundScheduler.stop();

        // some way for your retriever to return any previously batched messages so they aren't lost.
        // E.g. the case that the consumer wants all messages being processed before the whole container is shutdown
         return myBackgroundScheduler.batchedMessages();
    }

You could probably even just do something like this without needing to pull out the logic:

 @Override
    public List<Message> run() {
       final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
       final CountDownLatch blocker = new CountDownLatch(1);

       final ScheduledFuture<?> beeperHandle =
       scheduler.scheduleAtFixedRate(() -> {
            // retrieve your messages here somehow and make sure that it is thread safe
            this.batchedMessages.addAll(newMessages);
       }, 10, 10, SECONDS);

       try {
            blocker.await(); // will never countdown so waits for messages to be available
       } catch (InterruptedException e) {
              Thread.currentThread.interrupt();
       }

       // shutdown with whatever logic you want to make sure this is graceful
        scheduler.shutdown();
   
         List<Messages> leftoverMessages = new ArrayList<>(this.batchedMessages);
         this.batchedMessages.clear();
         return  leftoverMessages ;
    }

So given the above I don't think we need a new stop function and therefore no changes to the core library api would be needed?

In this case you can always build this scheduled version with corresponding container in your own code base but if you want to contribute back to this repository that is also fine too (though probably more effort from your end to do this).

@jeroenvandevelde
Copy link
Author

jeroenvandevelde commented Oct 3, 2021

You are correct, that with the trick of the CountDownLatch we don't need the stop function anymore.

But the pulling out the logic out of the BatchingMessageRetriever would still be handy for us.

High-level we would like to do something like below based completely on your BatchingMessageListenerContainer.
We would only need to overwrite the BatchingMessageRetriever.

@Override
    public List<Message> run() {
       ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        //The 10 would come as a parameter from a new QueueListener annotation
        executorService.scheduleAtFixedRate(this::run2, 0, 10, TimeUnit.SECONDS);
        final CountDownLatch blocker = new CountDownLatch(1);
        try {
            blocker.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        futuresWaitingForMessages.forEach(future -> future.cancel(true));
        executorService.shutdownNow();
        return Collections.emptyList();
    }

Run2() would than be this, see the duplication of the code from your original BatchingMessageRetriever:

    public void run2() {
        final Queue<CompletableFuture<Message>> messagesToObtain;
        try {
            messagesToObtain = obtainRequestForMessagesBatch();
        } catch (final InterruptedException interruptedException) {
            log.debug("Thread interrupted waiting for batch");
            return;
        }

        log.debug("Requesting {} messages", messagesToObtain.size());

        if (messagesToObtain.isEmpty()) {
            return;
        }

        final List<Message> messages;
        try {
            messages =
                    CompletableFuture
                            .supplyAsync(messagesToObtain::size)
                            .thenApply(this::buildReceiveMessageRequest)
                            .thenComposeAsync(sqsAsyncClient::receiveMessage)
                            .thenApply(ReceiveMessageResponse::messages)
                            .get();
        } catch (final RuntimeException | ExecutionException exception) {
            // Supposedly the SqsAsyncClient can get interrupted and this will remove the interrupted status from the thread and then wrap it
            // in it's own version of the interrupted exception...If this happens when the retriever is being shut down it will keep on processing
            // because it does not realise it is being shut down, therefore we have to check for this and quit if necessary
            if (exception instanceof ExecutionException) {
                final Throwable executionExceptionCause = exception.getCause();
                if (executionExceptionCause instanceof SdkClientException) {
                    if (executionExceptionCause.getCause() instanceof SdkInterruptedException) {
                        log.debug("Thread interrupted while receiving messages");
                        return;
                    }
                }
            }
            log.error("Error request messages", exception);
            // If there was an exception receiving messages we need to put these back into the queue
            futuresWaitingForMessages.addAll(messagesToObtain);
            performBackoff();
        } catch (final InterruptedException interruptedException) {
            log.debug("Thread interrupted while waiting for batch of messages");
            return;
        }

        log.debug("Downloaded {} messages", messages.size());
        if (messages.size() > messagesToObtain.size()) {
            log.error("More messages were downloaded than requested, this shouldn't happen");
        }

        for (final Message message : messages) {
            final CompletableFuture<Message> completableFuture = messagesToObtain.poll();
            if (completableFuture != null) {
                completableFuture.complete(message);
            }
        }
        // Any threads that weren't completed send back for processing again
        futuresWaitingForMessages.addAll(messagesToObtain);
    }

As you can see, the logic in the run2() is the same as in the BatchingMessageRetriever.
Therefore we would like to be able to call this part of the code aswel from our Retriever.

If we could keep the code in the run2() in the library shared by both retrievers, there would be no duplication.
In that way we could also contribute the way with the ScheduledExecutorService as an addition to your library.

What is your opinion on extracting a part of the logic in the BatchingMessageRetriever and use it in a new one?

@JaidenAshmore
Copy link
Owner

yeah if a lot of the logic can be shared that makes sense to me to pull them out!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants