Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question regarding handling of HTTP 429 with reactive stream #2104

Open
Muenze opened this issue Feb 25, 2022 · 5 comments
Open

Question regarding handling of HTTP 429 with reactive stream #2104

Muenze opened this issue Feb 25, 2022 · 5 comments
Labels
type: enhancement A general enhancement

Comments

@Muenze
Copy link

Muenze commented Feb 25, 2022

Hi,

we are using spring-data-elasticsearch with version 4.3.0. Our backend is opensearch v1. Whenever we do too many requests opensearch sends HTTP 429 to throttle the client and to tell us that it was too much. Our current approach to that is to retry it, so we have something like that:

repository.mySearchFunctionWhichReturnsFluxOfEntity() .retryExponentialBackoff(20, Duration.ofSeconds(1), Duration.ofSeconds(20), false) { log.debug("I retried") } .doOnNext { // do something with the entity, in our case: add them to a report class to be written to the file system }

Now ... my expectation was that the retry will only retry the failed scroll call, but it doesn't, it restarts the whole flux.
So instead of having 5M rows in our report we now have 25M rows, because it restarted 18 times and so the creation of the report took 7h instead of 45min.

I really don't want to use elasticsearch client lib directly to have full control over my SearchScroll and cover all of the retriable stuff directly before adding them to the Flux, so I would like to know how this can be solved with the tools that spring-data-elasticsearch provide, or something that configures the underlying code.
I thought that this must occur very often and so I think there must be a way to not restart the stream or at least restart it where it broke.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Feb 25, 2022
@sothawo
Copy link
Collaborator

sothawo commented Feb 26, 2022

where is retryExponentialBackoff defined? Anyway, a retry on a Flux resubscribes to the source sequence and so starts again. You could work around this probably doing paged requests and keeping track of what you successfully processed.

If you want to implement this in a different way you might want to derive from org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient and reimplement the scroll method with a behaviour that is fitting your needs, but beware that this client implementation will be replaced in the future with one that uses code from the new Elasticsearch client library and the will then be deprecated and later removed.

Changing the behaviour of the scroll function by default to do a retry from where the flux broke would change the default behaviour/contract of a Flux.

I thought that this must occur very often

Actually up to now nobody reported this. Btw it seems strange to me that the cluster throttles a client for too many requests when doing a scrolled search.

@sothawo sothawo removed the status: waiting-for-triage An issue we've not yet triaged label Feb 26, 2022
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Feb 26, 2022
@Muenze
Copy link
Author

Muenze commented Feb 27, 2022

Hi,

the retryExponentialBackoff is defined in reactor-kotlin-extensions and uses reactor-extra as dependency.
I don't want to retry the flux when the HTTP 429 occurs, what I really want is to retry the http call to the ES, which should return the next batch.
Maybe it is a problem for us because we have really big documents with even fields of binary data in like 2-3MB size, and 5k of those is maybe really heavy.
I have added the typical reasons for Search Rejections in our grafana board but nothing really looks suspicious.
HTTP 429 as a rejection status code is mentioned on AWS, so it cannot be that rare for someone to encounter that.

It was not my intention to do a retry by default, because there may be other reasons for an error, like a Date parser exceptions which happened on some Dates with 000 milliseconds, weird, but the original code was to separate those errors and only do a retry on stuff that was related to http and actually have a good chance of going right next time.
But at that time I was under the illusion that a Flux with error would only retry the element which contained the error, not restart the stream itself.

@Muenze
Copy link
Author

Muenze commented Feb 27, 2022

Hi,

sadly I was not able to really subclass it, because kotlin -> java code doesn't work that well and I had to copy over nearly everything. In the end I made a new version, branch of 4.3.0 and implemented my logic directly in DefaultReactiveElasticsearchClient, it looks like this:

image

Right now it's more or less quick and dirty, I will review it with my colleagues, but I already tried it on prod on a 5 Million row report and although we had >10 retries there were no broken streams and each retry only happened for that one call that was done to the BE System. I think changing the method in which the webClient does the call or a method that evaluates the result could be even more beneficial because it could be used by more than the scroll (we also got http 429 for search and bulk)

https://aws.amazon.com/de/premiumsupport/knowledge-center/opensearch-resolve-429-error/
here a link which describes when the 429 happens and what the issue usually is.

Also here something from elastic
https://discuss.elastic.co/t/in-which-case-elasticsearch-will-return-429/23483/2

I really would have liked the elastic client to be able to handle that automatically, maybe we even need to use elastic client more directly in the future.

You said you will use elastic client lib in the future, is this already part of V5?

@sothawo
Copy link
Collaborator

sothawo commented Feb 27, 2022

thanks for the info and that code. The sendRequest method does call execute and there is already some basic connection error handling - which does not include 429 handling. Maybe the execute method would be a good place to integrate a retry on 429. Might be worth to have a look if this could be added to the imperative code as well.

The new reactive implementation will be in version 5, and hopefully already as an optional alternative in 4.4. But it is not yet complete, the are some bugs that need to be fixed in the new client and in Elasticsearch itself.

This next reactive implementation will not be using the WebClient under the hood but the async methods provided by the transport layer of the new Elasticsearch client.

@sothawo sothawo added type: enhancement A general enhancement and removed status: waiting-for-triage An issue we've not yet triaged labels Feb 27, 2022
@Muenze
Copy link
Author

Muenze commented Feb 28, 2022

    public <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback) {

        return this.hostProvider.getActive(Verification.LAZY) //
                .flatMap(callback::doWithClient) //
                .retryWhen(
                        Retry.backoff(20, Duration.ofSeconds(2))
                                .filter(err -> err instanceof RestStatusException && ((RestStatusException)err).getStatus() == 429)
                )

                .onErrorResume(throwable -> {

                    if (isCausedByConnectionException(throwable)) {
                        return hostProvider.getActive(Verification.ACTIVE) //
                                .flatMap(callback::doWithClient)
                                .retryWhen(
                                        Retry.backoff(10, Duration.ofSeconds(2))
                                                .filter(err -> err instanceof RestStatusException && ((RestStatusException)err).getStatus() == 429)
                                );
                    }

                    return Mono.error(throwable);
                });
    }

This change is sufficient to make the scroll work, also will probably fix all other problems.

We had a prob in the beginning where we thought that the magic findBy Methods are probably broken because a @Query annotated search had other results, in the end it also was just this retry problem.
Glad that it could be solved, i will now replace the 4.3.0 versions with this added code. Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

3 participants