Skip to content

Commit

Permalink
Fix bug with application of max idle time (#176)
Browse files Browse the repository at this point in the history
* add failing tests for lifo checkout order
* add LifoQueue and use it in MemberSingle
  • Loading branch information
davidmoten committed Mar 6, 2023
1 parent ae50abf commit 5e5a4bc
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 2 deletions.
@@ -0,0 +1,58 @@
package org.davidmoten.rx.internal;

import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;

/**
* Thread-safe Last-In-First-Out queue. Current usage is multi-producer, single
* consumer but LIFO use case doesn't seem to offer opportunity for performance
* enhancements like the MpscLinkedQueue does for FIFO use case.
*
* @param <T> queued item type
*/
public final class LifoQueue<T> {

private final AtomicReference<Node<T>> head = new AtomicReference<>();

public void offer(@NonNull T t) {
while (true) {
Node<T> a = head.get();
Node<T> b = new Node<>(t, a);
if (head.compareAndSet(a, b)) {
return;
}
}
}

public @Nullable T poll() {
Node<T> a = head.get();
if (a == null) {
return null;
} else {
while (true) {
if (head.compareAndSet(a, a.next)) {
return a.value;
} else {
a = head.get();
}
}
}
}

public void clear() {
head.set(null);
}

static final class Node<T> {
final @NonNull T value;
final @Nullable Node<T> next;

Node(T value, Node<T> next) {
this.value = value;
this.next = next;
}
}

}
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

import org.davidmoten.rx.internal.LifoQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -38,7 +39,7 @@ final class MemberSingle<T> extends Single<Member<T>> implements Closeable {
// toBeRemoved queue
private final MemberSingleObserver<T> removeAll;

private final SimplePlainQueue<DecoratingMember<T>> initializedAvailable;
private final LifoQueue<DecoratingMember<T>> initializedAvailable;
private final SimplePlainQueue<DecoratingMember<T>> notInitialized;
private final SimplePlainQueue<DecoratingMember<T>> toBeReleased;
private final SimplePlainQueue<DecoratingMember<T>> toBeChecked;
Expand All @@ -63,7 +64,7 @@ final class MemberSingle<T> extends Single<Member<T>> implements Closeable {
MemberSingle(NonBlockingPool<T> pool) {
Preconditions.checkNotNull(pool);
this.notInitialized = new MpscLinkedQueue<>();
this.initializedAvailable = new MpscLinkedQueue<>();
this.initializedAvailable = new LifoQueue<>();
this.toBeReleased = new MpscLinkedQueue<>();
this.toBeChecked = new MpscLinkedQueue<>();
this.toBeAdded = new MpscLinkedQueue<>();
Expand Down
@@ -0,0 +1,29 @@
package org.davidmoten.rx.internal;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

import org.junit.Test;

public class LifoQueueTest {

@Test
public void testIsLifo() {
LifoQueue<Integer> q = new LifoQueue<>();
q.offer(1);
q.offer(2);
assertEquals(2, (int) q.poll());
assertEquals(1, (int) q.poll());
assertNull(q.poll());
}

@Test
public void testClear() {
LifoQueue<Integer> q = new LifoQueue<>();
q.offer(1);
q.offer(2);
q.clear();
assertNull(q.poll());
}

}
Expand Up @@ -2,6 +2,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -239,6 +240,69 @@ public void testConnectionPoolRecylesMany() throws SQLException {
}
}

@Test
public void testConnectionPoolRecylesLastInFirstOut() throws Exception {
AtomicInteger count = new AtomicInteger();
try (Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.healthCheck(n -> true) //
.maxSize(4) //
.maxIdleTime(1, TimeUnit.MINUTES) //
.build()) {
Member<Integer> m1 = pool.member().blockingGet();
Member<Integer> m2 = pool.member().blockingGet();
m1.checkin();
m2.checkin();
Member<Integer> m3 = pool.member().blockingGet();
assertTrue(m2 == m3);
}
}

@Test
public void testMaxIdleTimeIsAppliedGivenConcurrentWorkThenMultipleSingleThreadedWorkBeforeMaxIdleTime() throws InterruptedException {
TestScheduler s = new TestScheduler();
AtomicInteger count = new AtomicInteger();
AtomicInteger disposed = new AtomicInteger();
Pool<Integer> pool = NonBlockingPool //
.factory(() -> count.incrementAndGet()) //
.healthCheck(n -> true) //
.maxSize(4) //
.maxIdleTime(2, TimeUnit.MINUTES) //
.disposer(n -> disposed.incrementAndGet()) //
.scheduler(s) //
.build();
// checkout two members concurrently
AtomicReference<Member<Integer>> a = new AtomicReference<>();
AtomicReference<Member<Integer>> b = new AtomicReference<>();
pool.member().doOnSuccess(a::set).subscribe();
pool.member().doOnSuccess(b::set).subscribe();
s.triggerActions();
assertNotNull(a.get());
assertFalse(a.get() == b.get());

// check the two in again
a.get().checkin();
b.get().checkin();
s.triggerActions();

// now advance time and do two non-concurrent uses of pool members
// if FIFO queue used then prevents idle timeout. Code should use LIFO
// under the covers
s.advanceTimeBy(1, TimeUnit.MINUTES);
AtomicReference<Member<Integer>> c = new AtomicReference<>();
pool.member().doOnSuccess(c::set).subscribe();
s.triggerActions();
c.get().checkin();
pool.member().doOnSuccess(c::set).subscribe();
s.triggerActions();
c.get().checkin();

// advance to timeout and ensure 1 member times out
s.advanceTimeBy(1, TimeUnit.MINUTES);
s.triggerActions();
assertEquals(1, disposed.get());
}

@Test
public void testHealthCheckWhenFails() throws Exception {
TestScheduler s = new TestScheduler();
Expand Down

0 comments on commit 5e5a4bc

Please sign in to comment.