diff --git a/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java b/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java index 2f66ea6c..0a5f1755 100644 --- a/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java +++ b/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java @@ -1,6 +1,8 @@ package org.davidmoten.rx.pool; import java.io.Closeable; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -22,21 +24,26 @@ import io.reactivex.disposables.Disposable; import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.queue.MpscLinkedQueue; +import io.reactivex.internal.util.EmptyComponent; import io.reactivex.plugins.RxJavaPlugins; + final class MemberSingle extends Single> implements Closeable { - final AtomicReference> observers; + final Observers observers; private static final Logger log = LoggerFactory.getLogger(MemberSingle.class); - @SuppressWarnings({ "rawtypes", "unchecked" }) - static final Observers EMPTY = new Observers(new MemberSingleObserver[0], new boolean[0], 0, 0, 0); + // sentinel object representing remove all observers that is added to + // toBeRemoved queue + private final MemberSingleObserver removeAll; private final SimplePlainQueue> initializedAvailable; private final SimplePlainQueue> notInitialized; private final SimplePlainQueue> toBeReleased; private final SimplePlainQueue> toBeChecked; + private final SimplePlainQueue> toBeAdded; + private final SimplePlainQueue> toBeRemoved; private final AtomicInteger wip = new AtomicInteger(); private final DecoratingMember[] members; @@ -53,21 +60,23 @@ final class MemberSingle extends Single> implements Closeable { // mutable private volatile boolean cancelled; - @SuppressWarnings("unchecked") MemberSingle(NonBlockingPool pool) { Preconditions.checkNotNull(pool); - this.initializedAvailable = new MpscLinkedQueue>(); - this.notInitialized = new MpscLinkedQueue>(); - this.toBeReleased = new MpscLinkedQueue>(); - this.toBeChecked = new MpscLinkedQueue>(); + this.notInitialized = new MpscLinkedQueue<>(); + this.initializedAvailable = new MpscLinkedQueue<>(); + this.toBeReleased = new MpscLinkedQueue<>(); + this.toBeChecked = new MpscLinkedQueue<>(); + this.toBeAdded = new MpscLinkedQueue<>(); + this.toBeRemoved = new MpscLinkedQueue<>(); this.members = createMembersArray(pool.maxSize, pool.checkinDecorator); for (DecoratingMember m : members) { notInitialized.offer(m); } this.scheduler = pool.scheduler; this.createRetryIntervalMs = pool.createRetryIntervalMs; - this.observers = new AtomicReference<>(EMPTY); + this.observers = new Observers(); this.pool = pool; + this.removeAll = new MemberSingleObserver(EmptyComponent.INSTANCE, this); } private DecoratingMember[] createMembersArray(int poolMaxSize, @@ -82,27 +91,16 @@ private DecoratingMember[] createMembersArray(int poolMaxSize, @Override protected void subscribeActual(SingleObserver> observer) { + log.debug("subscribeActual"); // the action of checking out a member from the pool is implemented as a // subscription to the singleton MemberSingle - MemberSingleObserver m = new MemberSingleObserver(observer, this); - observer.onSubscribe(m); + MemberSingleObserver o = new MemberSingleObserver(observer, this); + observer.onSubscribe(o); if (pool.isClosed()) { observer.onError(new PoolClosedException()); return; } - add(m); - if (m.isDisposed()) { - remove(m); - } else { - // atomically change requested - while (true) { - Observers a = observers.get(); - if (observers.compareAndSet(a, a.withRequested(a.requested + 1))) { - break; - } - } - } - log.debug("subscribed"); + toBeAdded.offer(o); drain(); } @@ -112,7 +110,7 @@ public void checkin(Member member) { public void checkin(Member member, boolean decrementInitializeScheduled) { log.debug("checking in {}", member); - DecoratingMember d = ((DecoratingMember) member); + DecoratingMember d = (DecoratingMember) member; d.scheduleRelease(); d.markAsChecked(); initializedAvailable.offer((DecoratingMember) member); @@ -139,19 +137,19 @@ private void drain() { log.debug("drain loop starting"); int missed = 1; while (true) { - // we schedule release of members even if no requests exist + // we add observers or schedule release of members even if no requests exist + removeObservers(); + addObservers(); + scheduleReleasesNoDelay(); scheduleChecksNoDelay(); - Observers obs = observers.get(); + Observers obs = observers; log.debug("requested={}", obs.requested); // max we can emit is the number of active (available) resources in pool - long r = Math.min(obs.activeCount, obs.requested); + long r = Math.min(obs.readyCount, obs.requested); long e = 0; // emitted - // record number of attempted emits in case all observers have been cancelled - // while are in the loop and we want to break - long attempts = 0; - while (e != r && attempts != obs.activeCount) { + while (e != r && obs.readyCount > 0) { if (cancelled) { disposeAll(); return; @@ -180,17 +178,21 @@ private void drain() { } else { log.debug("no health check required for {}", m); // this should not block because it just schedules emissions to observers - if (tryEmit(obs, m)) { - e++; - } else { - log.debug("no active observers"); - } - attempts++; + emit(obs, m); + log.debug("emitted"); + e++; } } + // else otherwise leave off the initializedAvailable queue because it is being + // released or checked + + removeObservers(); + addObservers(); + // schedule release immediately of any member // queued for releasing scheduleReleasesNoDelay(); + // schedule check of any member queued for checking scheduleChecksNoDelay(); } @@ -202,6 +204,25 @@ private void drain() { } } + private void addObservers() { + MemberSingleObserver o; + while ((o = toBeAdded.poll()) != null) { + observers.add(o); + } + } + + private void removeObservers() { + MemberSingleObserver o; + while ((o = toBeRemoved.poll()) != null) { + if (o == removeAll) { + observers.removeAll(); + return; + } else { + observers.remove(o); + } + } + } + private boolean trySchedulingInitializationNoDelay(long r, long e, final DecoratingMember m) { // check initializeScheduled using a CAS loop while (true) { @@ -224,7 +245,12 @@ private boolean trySchedulingInitializationNoDelay(long r, long e, final Decorat private boolean shouldPerformHealthCheck(final DecoratingMember m) { long now = scheduler.now(TimeUnit.MILLISECONDS); log.debug("schedule.now={}, lastCheck={}", now, m.lastCheckTime()); - return pool.idleTimeBeforeHealthCheckMs > 0 && now - m.lastCheckTime() >= pool.idleTimeBeforeHealthCheckMs; + return shouldPerformHealthCheck(m, pool.idleTimeBeforeHealthCheckMs, now); + } + + @VisibleForTesting + static boolean shouldPerformHealthCheck(DecoratingMember m, long idleTimeBeforeHealthCheckMs, long now) { + return idleTimeBeforeHealthCheckMs > 0 && now - m.lastCheckTime() >= idleTimeBeforeHealthCheckMs; } private void scheduleChecksNoDelay() { @@ -251,51 +277,29 @@ private void scheduleReleasesNoDelay() { } } - private boolean tryEmit(Observers obs, DecoratingMember m) { - // note that tryEmit is protected by the drain method so will - // not be run concurrently. We do have to be careful with - // concurrent disposal of observers though. - - + private void emit(Observers obs, DecoratingMember m) { + // note that tryEmit is protected by the drain method so will + // not be run concurrently. // advance counter to the next and choose an Observer to emit to (round robin) + // a precondition of this method is that obs.activeCount > 0 (enforced by drain + // method) + int index = obs.index; - // a precondition of this method is that obs.activeCount > 0 (enforced by drain method) - MemberSingleObserver o = obs.observers[index]; - MemberSingleObserver oNext = o; - - // atomically bump up the index to select the next Observer by round-robin - // (if that entry has not been deleted in the meantime by disposal). Need - // to be careful too that ALL observers have not been deleted via a race - // with disposal. - while (true) { - Observers x = observers.get(); - if (x.index == index && x.activeCount > 0 && x.observers[index] == o) { - boolean[] active = new boolean[x.active.length]; - System.arraycopy(x.active, 0, active, 0, active.length); - int nextIndex = (index + 1) % active.length; - while (nextIndex != index && !active[nextIndex]) { - nextIndex = (nextIndex + 1) % active.length; - } - active[nextIndex] = false; - if (observers.compareAndSet(x, - new Observers(x.observers, active, x.activeCount - 1, nextIndex, x.requested - 1))) { - oNext = x.observers[nextIndex]; - break; - } - } else { - // checkin because no active observers - m.checkin(); - return false; - } + int nextIndex = (index + 1) % observers.ready.size(); + while (nextIndex != index && !observers.ready.get(nextIndex)) { + nextIndex = (nextIndex + 1) % observers.ready.size(); } + observers.ready.set(nextIndex, Boolean.FALSE); + observers.readyCount--; + observers.requested--; + MemberSingleObserver oNext = obs.observers.get(nextIndex); // get a fresh worker each time so we jump threads to // break the stack-trace (a long-enough chain of // checkout-checkins could otherwise provoke stack // overflow) Worker worker = scheduler.createWorker(); worker.schedule(new Emitter(worker, oNext, m)); - return true; } @VisibleForTesting @@ -400,102 +404,69 @@ private void disposeValues() { } } - void add(@NonNull MemberSingleObserver inner) { - while (true) { - Observers a = observers.get(); - int n = a.observers.length; - @SuppressWarnings("unchecked") - MemberSingleObserver[] b = new MemberSingleObserver[n + 1]; - System.arraycopy(a.observers, 0, b, 0, n); - b[n] = inner; - boolean[] active = new boolean[n + 1]; - System.arraycopy(a.active, 0, active, 0, n); - active[n] = true; - if (observers.compareAndSet(a, new Observers(b, active, a.activeCount + 1, a.index, a.requested))) { - return; - } - } - } - - @SuppressWarnings("unchecked") private void removeAllObservers() { - while (true) { - Observers a = observers.get(); - if (observers.compareAndSet(a, EMPTY.withRequested(a.requested))) { - return; - } - } + toBeRemoved.offer(removeAll); + drain(); } - @SuppressWarnings("unchecked") void remove(@NonNull MemberSingleObserver inner) { - while (true) { - Observers a = observers.get(); - int n = a.observers.length; - if (n == 0) { - return; - } + toBeRemoved.offer(inner); + drain(); + } - int j = -1; + private static final class Observers { - for (int i = 0; i < n; i++) { - if (a.observers[i] == inner) { - j = i; - break; - } - } + final List> observers; + + // an observer is ready until it is emitted to + final List ready; - if (j < 0) { + // the number of true values in the ready array + // which is the number of observers that can be + // emitted to + int readyCount; + + // used as the starting point of the next check for an + // observer to emit to (for a round-robin). Is 0 when no + // observers + int index; + + int requested; + + Observers() { + observers = new ArrayList<>(); + ready = new ArrayList<>(); + readyCount = 0; + index = 0; + requested = 0; + } + + void add(MemberSingleObserver o) { + observers.add(o); + ready.add(Boolean.TRUE); + readyCount++; + requested++; + } + + void remove(MemberSingleObserver o) { + int i = observers.indexOf(o); + if (i == -1) { + // not present return; } - Observers next; - if (n == 1) { - next = EMPTY.withRequested(a.requested); - } else { - MemberSingleObserver[] b = new MemberSingleObserver[n - 1]; - System.arraycopy(a.observers, 0, b, 0, j); - System.arraycopy(a.observers, j + 1, b, j, n - j - 1); - boolean[] active = new boolean[n - 1]; - System.arraycopy(a.active, 0, active, 0, j); - System.arraycopy(a.active, j + 1, active, j, n - j - 1); - int nextActiveCount = a.active[j] ? a.activeCount - 1 : a.activeCount; - if (a.index >= j && a.index > 0) { - next = new Observers(b, active, nextActiveCount, a.index - 1, a.requested); - } else { - next = new Observers(b, active, nextActiveCount, a.index, a.requested); - } + readyCount = ready.get(i) ? readyCount - 1 : readyCount; + if (index >= i && index > 0) { + index--; } - if (observers.compareAndSet(a, next)) { - break; - } - } - } - - private static final class Observers { - - final MemberSingleObserver[] observers; - - // an observer is active until it is emitted to - final boolean[] active; - - // the number of true values in the active array - final int activeCount; - - final int index; - final int requested; - - Observers(MemberSingleObserver[] observers, boolean[] active, int activeCount, int index, int requested) { - Preconditions.checkArgument(observers.length > 0 || index == 0, "index must be 0 for zero length array"); - Preconditions.checkArgument(observers.length == active.length); - this.observers = observers; - this.index = index; - this.active = active; - this.activeCount = activeCount; - this.requested = requested; + observers.remove(i); + ready.remove(i); } - Observers withRequested(int r) { - return new Observers(observers, active, activeCount, index, r); + void removeAll() { + observers.clear(); + ready.clear(); + readyCount = 0; + index = 0; } } @@ -546,7 +517,6 @@ public void dispose() { MemberSingle parent = getAndSet(null); if (parent != null) { parent.remove(this); - parent.drain(); } } diff --git a/rxjava2-pool/src/test/java/org/davidmoten/rx/pool/NonBlockingPoolConcurrencyTest.java b/rxjava2-pool/src/test/java/org/davidmoten/rx/pool/NonBlockingPoolConcurrencyTest.java index f251dcc0..52a94467 100644 --- a/rxjava2-pool/src/test/java/org/davidmoten/rx/pool/NonBlockingPoolConcurrencyTest.java +++ b/rxjava2-pool/src/test/java/org/davidmoten/rx/pool/NonBlockingPoolConcurrencyTest.java @@ -35,7 +35,7 @@ public void memberSingleCoverage() throws Exception { .doOnNext(x -> c[0]++) // // have to keep the observeOn buffer small so members don't get buffered // and not checked in - .observeOn(Schedulers.from(Executors.newFixedThreadPool(1)), false, 1) // + .observeOn(Schedulers.from(Executors.newFixedThreadPool(poolSize)), false, 1) // .doOnNext(member -> member.checkin()) // .timeout(10, TimeUnit.SECONDS) // .doOnError(e -> {