You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
MonoBulkhead and FluxBulkhead operators both use bulkhead.tryAcquirePermission() during the subscription. This is not ideal since bulkhead.tryAcquirePermission() calls semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS)
which blocks a thread on which subscription is being executed, as per javadoc of tryAcquire:
If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens
so ,for example, it might block IO thread which is used to serve other requests.
Both of those operators should never block and instead:
accept / reject immediately - as it happens currently without timeout set, but if it is not possible and there is a timeout set then it should:
queue the subscription for later so that when permit is released it is being granted to first waiting subscription (although I am not sure if the current semaphore based bulkhead implementation is a good solution to base it on)
Seems like rxjava2 and rxjava3 versions are also blocking.
The text was updated successfully, but these errors were encountered:
Hi, thanks for your PR.
I like the idea of having a Bulkhead which queues permission requests when the concurrency limit is reached. It's somehow like the ThreadPoolBulkhead works. But the ThreadPoolBulkhead does not allow to execute a tasks in a different threadpool.
What we need is a NonBlockingBulkhead interface with CompletableFuture<Permission> tryAcquirePermission() . (The new non-blocking Bulkhead should be free of any Reactor classes.)
The non-blocking Bulkhead could be based on a semaphore and a non-blocking, thread-safe, circular fifo queue. Instead of using a time to live per permission request, we could use a circular array which queues N permission requests. We do have a good implementation of a circular fifo queue.
On every tryAcquirePermission() we return a CompletableFuture<Permission> and store the permission request in the circular queue. Then we pick the first permission request from the fifo queue and check if the Semaphore has a free permit.
If yes, we drop the first permission request from the circular queue and execute future.complete(new Permission).
If no, we drop the first permission request and execute future.completeExceptionally(new BulkheadFullException).
A permission request is queued, until
the circular queue is full and the permission request ist dropped when a new permission request is stored
or a former permission is released with permission.release().
When a permission is released, we pick the first permission request from the queue and allow the permission with future.complete(new Permission).
Since we have a circular queue, new permission requests will drop older permission requests automatically. We don't need any scheduler to remove tasks from the queue.
Resilience4j version: 1.7.0
Java version: 17
MonoBulkhead and FluxBulkhead operators both use
bulkhead.tryAcquirePermission()
during the subscription. This is not ideal sincebulkhead.tryAcquirePermission()
callssemaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS)
which blocks a thread on which subscription is being executed, as per javadoc of tryAcquire:
so ,for example, it might block IO thread which is used to serve other requests.
Both of those operators should never block and instead:
Seems like rxjava2 and rxjava3 versions are also blocking.
The text was updated successfully, but these errors were encountered: