Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactor Bulkhead operator blocks if max wait duration is set #1592

Open
tsroka opened this issue Dec 7, 2021 · 2 comments · May be fixed by #1714
Open

Reactor Bulkhead operator blocks if max wait duration is set #1592

tsroka opened this issue Dec 7, 2021 · 2 comments · May be fixed by #1714

Comments

@tsroka
Copy link

tsroka commented Dec 7, 2021

Resilience4j version: 1.7.0

Java version: 17

MonoBulkhead and FluxBulkhead operators both use bulkhead.tryAcquirePermission() during the subscription. This is not ideal since bulkhead.tryAcquirePermission() calls semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS)
which blocks a thread on which subscription is being executed, as per javadoc of tryAcquire:

If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens

so ,for example, it might block IO thread which is used to serve other requests.

Both of those operators should never block and instead:

  • accept / reject immediately - as it happens currently without timeout set, but if it is not possible and there is a timeout set then it should:
  • queue the subscription for later so that when permit is released it is being granted to first waiting subscription (although I am not sure if the current semaphore based bulkhead implementation is a good solution to base it on)

Seems like rxjava2 and rxjava3 versions are also blocking.

@RobWin
Copy link
Member

RobWin commented Dec 7, 2021

Hi,

PRs are welcome

@1livv 1livv linked a pull request Jul 5, 2022 that will close this issue
@RobWin
Copy link
Member

RobWin commented Jul 18, 2022

Hi, thanks for your PR.
I like the idea of having a Bulkhead which queues permission requests when the concurrency limit is reached. It's somehow like the ThreadPoolBulkhead works. But the ThreadPoolBulkhead does not allow to execute a tasks in a different threadpool.

What we need is a NonBlockingBulkhead interface with CompletableFuture<Permission> tryAcquirePermission() . (The new non-blocking Bulkhead should be free of any Reactor classes.)

The non-blocking Bulkhead could be based on a semaphore and a non-blocking, thread-safe, circular fifo queue. Instead of using a time to live per permission request, we could use a circular array which queues N permission requests. We do have a good implementation of a circular fifo queue.

On every tryAcquirePermission() we return a CompletableFuture<Permission> and store the permission request in the circular queue. Then we pick the first permission request from the fifo queue and check if the Semaphore has a free permit.

  • If yes, we drop the first permission request from the circular queue and execute future.complete(new Permission).
  • If no, we drop the first permission request and execute future.completeExceptionally(new BulkheadFullException).

A permission request is queued, until

  • the circular queue is full and the permission request ist dropped when a new permission request is stored
  • or a former permission is released with permission.release().

When a permission is released, we pick the first permission request from the queue and allow the permission with future.complete(new Permission).

Since we have a circular queue, new permission requests will drop older permission requests automatically. We don't need any scheduler to remove tasks from the queue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants