From 5e5a4bcc2a1801d9145f9fb060c3cf2b11f74038 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Tue, 7 Mar 2023 08:46:51 +1100 Subject: [PATCH] Fix bug with application of max idle time (#176) * add failing tests for lifo checkout order * add LifoQueue and use it in MemberSingle --- .../org/davidmoten/rx/internal/LifoQueue.java | 58 +++++++++++++++++ .../org/davidmoten/rx/pool/MemberSingle.java | 5 +- .../davidmoten/rx/internal/LifoQueueTest.java | 29 +++++++++ .../rx/pool/NonBlockingPoolTest.java | 64 +++++++++++++++++++ 4 files changed, 154 insertions(+), 2 deletions(-) create mode 100644 rxjava2-pool/src/main/java/org/davidmoten/rx/internal/LifoQueue.java create mode 100644 rxjava2-pool/src/test/java/org/davidmoten/rx/internal/LifoQueueTest.java diff --git a/rxjava2-pool/src/main/java/org/davidmoten/rx/internal/LifoQueue.java b/rxjava2-pool/src/main/java/org/davidmoten/rx/internal/LifoQueue.java new file mode 100644 index 00000000..29eabcb2 --- /dev/null +++ b/rxjava2-pool/src/main/java/org/davidmoten/rx/internal/LifoQueue.java @@ -0,0 +1,58 @@ +package org.davidmoten.rx.internal; + +import java.util.concurrent.atomic.AtomicReference; + +import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.Nullable; + +/** + * Thread-safe Last-In-First-Out queue. Current usage is multi-producer, single + * consumer but LIFO use case doesn't seem to offer opportunity for performance + * enhancements like the MpscLinkedQueue does for FIFO use case. + * + * @param queued item type + */ +public final class LifoQueue { + + private final AtomicReference> head = new AtomicReference<>(); + + public void offer(@NonNull T t) { + while (true) { + Node a = head.get(); + Node b = new Node<>(t, a); + if (head.compareAndSet(a, b)) { + return; + } + } + } + + public @Nullable T poll() { + Node a = head.get(); + if (a == null) { + return null; + } else { + while (true) { + if (head.compareAndSet(a, a.next)) { + return a.value; + } else { + a = head.get(); + } + } + } + } + + public void clear() { + head.set(null); + } + + static final class Node { + final @NonNull T value; + final @Nullable Node next; + + Node(T value, Node next) { + this.value = value; + this.next = next; + } + } + +} diff --git a/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java b/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java index 0a5f1755..7ec149cc 100644 --- a/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java +++ b/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import org.davidmoten.rx.internal.LifoQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ final class MemberSingle extends Single> implements Closeable { // toBeRemoved queue private final MemberSingleObserver removeAll; - private final SimplePlainQueue> initializedAvailable; + private final LifoQueue> initializedAvailable; private final SimplePlainQueue> notInitialized; private final SimplePlainQueue> toBeReleased; private final SimplePlainQueue> toBeChecked; @@ -63,7 +64,7 @@ final class MemberSingle extends Single> implements Closeable { MemberSingle(NonBlockingPool pool) { Preconditions.checkNotNull(pool); this.notInitialized = new MpscLinkedQueue<>(); - this.initializedAvailable = new MpscLinkedQueue<>(); + this.initializedAvailable = new LifoQueue<>(); this.toBeReleased = new MpscLinkedQueue<>(); this.toBeChecked = new MpscLinkedQueue<>(); this.toBeAdded = new MpscLinkedQueue<>(); diff --git a/rxjava2-pool/src/test/java/org/davidmoten/rx/internal/LifoQueueTest.java b/rxjava2-pool/src/test/java/org/davidmoten/rx/internal/LifoQueueTest.java new file mode 100644 index 00000000..0916cce5 --- /dev/null +++ b/rxjava2-pool/src/test/java/org/davidmoten/rx/internal/LifoQueueTest.java @@ -0,0 +1,29 @@ +package org.davidmoten.rx.internal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.junit.Test; + +public class LifoQueueTest { + + @Test + public void testIsLifo() { + LifoQueue q = new LifoQueue<>(); + q.offer(1); + q.offer(2); + assertEquals(2, (int) q.poll()); + assertEquals(1, (int) q.poll()); + assertNull(q.poll()); + } + + @Test + public void testClear() { + LifoQueue q = new LifoQueue<>(); + q.offer(1); + q.offer(2); + q.clear(); + assertNull(q.poll()); + } + +} diff --git a/rxjava2-pool/src/test/java/org/davidmoten/rx/pool/NonBlockingPoolTest.java b/rxjava2-pool/src/test/java/org/davidmoten/rx/pool/NonBlockingPoolTest.java index 760f22d2..5030b278 100644 --- a/rxjava2-pool/src/test/java/org/davidmoten/rx/pool/NonBlockingPoolTest.java +++ b/rxjava2-pool/src/test/java/org/davidmoten/rx/pool/NonBlockingPoolTest.java @@ -2,6 +2,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -239,6 +240,69 @@ public void testConnectionPoolRecylesMany() throws SQLException { } } + @Test + public void testConnectionPoolRecylesLastInFirstOut() throws Exception { + AtomicInteger count = new AtomicInteger(); + try (Pool pool = NonBlockingPool // + .factory(() -> count.incrementAndGet()) // + .healthCheck(n -> true) // + .maxSize(4) // + .maxIdleTime(1, TimeUnit.MINUTES) // + .build()) { + Member m1 = pool.member().blockingGet(); + Member m2 = pool.member().blockingGet(); + m1.checkin(); + m2.checkin(); + Member m3 = pool.member().blockingGet(); + assertTrue(m2 == m3); + } + } + + @Test + public void testMaxIdleTimeIsAppliedGivenConcurrentWorkThenMultipleSingleThreadedWorkBeforeMaxIdleTime() throws InterruptedException { + TestScheduler s = new TestScheduler(); + AtomicInteger count = new AtomicInteger(); + AtomicInteger disposed = new AtomicInteger(); + Pool pool = NonBlockingPool // + .factory(() -> count.incrementAndGet()) // + .healthCheck(n -> true) // + .maxSize(4) // + .maxIdleTime(2, TimeUnit.MINUTES) // + .disposer(n -> disposed.incrementAndGet()) // + .scheduler(s) // + .build(); + // checkout two members concurrently + AtomicReference> a = new AtomicReference<>(); + AtomicReference> b = new AtomicReference<>(); + pool.member().doOnSuccess(a::set).subscribe(); + pool.member().doOnSuccess(b::set).subscribe(); + s.triggerActions(); + assertNotNull(a.get()); + assertFalse(a.get() == b.get()); + + // check the two in again + a.get().checkin(); + b.get().checkin(); + s.triggerActions(); + + // now advance time and do two non-concurrent uses of pool members + // if FIFO queue used then prevents idle timeout. Code should use LIFO + // under the covers + s.advanceTimeBy(1, TimeUnit.MINUTES); + AtomicReference> c = new AtomicReference<>(); + pool.member().doOnSuccess(c::set).subscribe(); + s.triggerActions(); + c.get().checkin(); + pool.member().doOnSuccess(c::set).subscribe(); + s.triggerActions(); + c.get().checkin(); + + // advance to timeout and ensure 1 member times out + s.advanceTimeBy(1, TimeUnit.MINUTES); + s.triggerActions(); + assertEquals(1, disposed.get()); + } + @Test public void testHealthCheckWhenFails() throws Exception { TestScheduler s = new TestScheduler();