I was writing some automated test to verify the behaviour of PublishSubject and I noticed strange and unexpected behaviour when the subject is subscribed on a Scheduler. If I do not put a sleep after the subscriber, I will not get the onNext items but the onCompleted is called on the correct Thread. If I put a sleep after subscribing, the subscriber receives the onNext items but on the wrong thread. It receives the onNext items on the thread that called the subject.onNext.
I wrote some unit test to show the behaviour. Both tests will fail with rxjava 1.1.2. Is this a bug or is this expected behavior and where can I find this in the documentation?
import org.junit.Test;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;
import static org.assertj.core.api.Assertions.assertThat;
import static rx.schedulers.Schedulers.newThread;
public class PublishSubjectTest {
@Test
public void subscribeOn_WhenNoSleep_ThenNoOnNextReceived() throws InterruptedException {
// GIVEN
PublishSubject<String> subject = PublishSubject.create();
Thread currentThread = Thread.currentThread();
TestSubscriber<String> subscriber = new TestSubscriber<>();
// WHEN
subject.subscribeOn(newThread()).subscribe(subscriber);
subject.onNext("one");
subject.onCompleted();
// THEN
subscriber.awaitTerminalEvent();
assertThat(subscriber.getLastSeenThread()).isNotSameAs(currentThread);
assertThat(subscriber.getOnNextEvents()).containsOnly("one");
}
@Test
public void subscribeOn_WhenSleep_ThenOnNextReceivedButOnWrongThread() throws InterruptedException {
// GIVEN
PublishSubject<String> subject = PublishSubject.create();
Thread currentThread = Thread.currentThread();
TestSubscriber<String> subscriber = new TestSubscriber<>();
// WHEN
subject.subscribeOn(newThread()).subscribe(subscriber);
Thread.sleep(2000);
subject.onNext("one");
subject.onCompleted();
// THEN
subscriber.awaitTerminalEvent();
assertThat(subscriber.getOnNextEvents()).containsOnly("one");
assertThat(subscriber.getLastSeenThread()).isNotSameAs(currentThread);
}
}
I was writing some automated test to verify the behaviour of PublishSubject and I noticed strange and unexpected behaviour when the subject is subscribed on a Scheduler. If I do not put a sleep after the subscriber, I will not get the onNext items but the onCompleted is called on the correct Thread. If I put a sleep after subscribing, the subscriber receives the onNext items but on the wrong thread. It receives the onNext items on the thread that called the subject.onNext.
I wrote some unit test to show the behaviour. Both tests will fail with rxjava 1.1.2. Is this a bug or is this expected behavior and where can I find this in the documentation?