diff --git a/rxjava2-jdbc/src/test/java/org/davidmoten/rx/jdbc/SelectTest.java b/rxjava2-jdbc/src/test/java/org/davidmoten/rx/jdbc/SelectTest.java index 63b882c1..0da29d5e 100644 --- a/rxjava2-jdbc/src/test/java/org/davidmoten/rx/jdbc/SelectTest.java +++ b/rxjava2-jdbc/src/test/java/org/davidmoten/rx/jdbc/SelectTest.java @@ -1,6 +1,5 @@ package org.davidmoten.rx.jdbc; -import org.davidmoten.rx.jdbc.Select; import org.junit.Test; import com.github.davidmoten.junit.Asserts; diff --git a/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java b/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java index 7a008df3..9439f1f2 100644 --- a/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java +++ b/rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java @@ -259,21 +259,25 @@ private void scheduleReleasesNoDelay() { } private boolean tryEmit(Observers obs, DecoratingMember m) { - // get a fresh worker each time so we jump threads to - // break the stack-trace (a long-enough chain of - // checkout-checkins could otherwise provoke stack - // overflow) - + // note that tryEmit is protected by the drain method so will + // not be run concurrently. We do have to be careful with + // concurrent disposal of observers though. + + // advance counter to the next and choose an Observer to emit to (round robin) int index = obs.index; + // a precondition of this method is that obs.activeCount > 0 (enforced by drain method) MemberSingleObserver o = obs.observers[index]; MemberSingleObserver oNext = o; - // atomically bump up the index (if that entry has not been deleted in - // the meantime by disposal) + + // atomically bump up the index to select the next Observer by round-robin + // (if that entry has not been deleted in the meantime by disposal). Need + // to be careful too that ALL observers have not been deleted via a race + // with disposal. while (true) { Observers x = observers.get(); - if (x.index == index && x.observers[index] == o) { + if (x.index == index && x.activeCount > 0 && x.observers[index] == o) { boolean[] active = new boolean[x.active.length]; System.arraycopy(x.active, 0, active, 0, active.length); int nextIndex = (index + 1) % active.length; @@ -292,6 +296,10 @@ private boolean tryEmit(Observers obs, DecoratingMember m) { return false; } } + // get a fresh worker each time so we jump threads to + // break the stack-trace (a long-enough chain of + // checkout-checkins could otherwise provoke stack + // overflow) Worker worker = scheduler.createWorker(); worker.schedule(new Emitter(worker, oNext, m)); return true; @@ -479,7 +487,7 @@ private static final class Observers { final int requested; Observers(MemberSingleObserver[] observers, boolean[] active, int activeCount, int index, int requested) { - Preconditions.checkArgument(observers.length > 0 || index == 0, "index must be -1 for zero length array"); + Preconditions.checkArgument(observers.length > 0 || index == 0, "index must be 0 for zero length array"); Preconditions.checkArgument(observers.length == active.length); this.observers = observers; this.index = index;