Skip to content

Commit

Permalink
Fix race condition with disposal in MemberSingle.tryEmit issue #58, pr
Browse files Browse the repository at this point in the history
…#59

* fix race condition with disposal in MemberSingle.tryEmit #58
* organize imports
  • Loading branch information
davidmoten committed Oct 10, 2021
1 parent 21c91fb commit 8bac6ba
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
@@ -1,6 +1,5 @@
package org.davidmoten.rx.jdbc;

import org.davidmoten.rx.jdbc.Select;
import org.junit.Test;

import com.github.davidmoten.junit.Asserts;
Expand Down
Expand Up @@ -259,21 +259,25 @@ private void scheduleReleasesNoDelay() {
}

private boolean tryEmit(Observers<T> obs, DecoratingMember<T> m) {
// get a fresh worker each time so we jump threads to
// break the stack-trace (a long-enough chain of
// checkout-checkins could otherwise provoke stack
// overflow)

// note that tryEmit is protected by the drain method so will
// not be run concurrently. We do have to be careful with
// concurrent disposal of observers though.

// advance counter to the next and choose an Observer to emit to (round robin)

int index = obs.index;
// a precondition of this method is that obs.activeCount > 0 (enforced by drain method)
MemberSingleObserver<T> o = obs.observers[index];
MemberSingleObserver<T> oNext = o;
// atomically bump up the index (if that entry has not been deleted in
// the meantime by disposal)

// atomically bump up the index to select the next Observer by round-robin
// (if that entry has not been deleted in the meantime by disposal). Need
// to be careful too that ALL observers have not been deleted via a race
// with disposal.
while (true) {
Observers<T> x = observers.get();
if (x.index == index && x.observers[index] == o) {
if (x.index == index && x.activeCount > 0 && x.observers[index] == o) {
boolean[] active = new boolean[x.active.length];
System.arraycopy(x.active, 0, active, 0, active.length);
int nextIndex = (index + 1) % active.length;
Expand All @@ -292,6 +296,10 @@ private boolean tryEmit(Observers<T> obs, DecoratingMember<T> m) {
return false;
}
}
// get a fresh worker each time so we jump threads to
// break the stack-trace (a long-enough chain of
// checkout-checkins could otherwise provoke stack
// overflow)
Worker worker = scheduler.createWorker();
worker.schedule(new Emitter<T>(worker, oNext, m));
return true;
Expand Down Expand Up @@ -479,7 +487,7 @@ private static final class Observers<T> {
final int requested;

Observers(MemberSingleObserver<T>[] observers, boolean[] active, int activeCount, int index, int requested) {
Preconditions.checkArgument(observers.length > 0 || index == 0, "index must be -1 for zero length array");
Preconditions.checkArgument(observers.length > 0 || index == 0, "index must be 0 for zero length array");
Preconditions.checkArgument(observers.length == active.length);
this.observers = observers;
this.index = index;
Expand Down

0 comments on commit 8bac6ba

Please sign in to comment.