Skip to content

Threading in a SerializedSubject #3758

@a-reisberg

Description

@a-reisberg

With usual Observable, the Observer is executed on the same thread as the one that the Observable is created. With SerializedSubject, there's some magic that is unclear. Below is the code snippet:

object Launcher extends App {
  val pubSub = PublishSubject.create[Int]
  val serSub = new SerializedSubject(pubSub)
  val observable = Observable.just(1, 2).subscribeOn(Schedulers.computation())

  serSub.subscribe(new Action1[Int] {
    override def call(t: Int): Unit =
      println(s"Subject subscribeOn $t, ${Thread.currentThread().getName}")
  })

  observable.subscribe(serSub)
  Observable.just(6).subscribe(serSub)

  while (true) {}
}

One would expect that the numbers 1 and 2 are printed inside some computation thread, and the 6 is printed inside main thread. Now, if I remove the Observable.just(6) line, everything is printed inside the computation thread. Is that the expected behavior? If so, how does the SerializedSubject decide which thread to run?

Now, I can modify a code a bit (publish to pubSub and subscribe to serSub)

object Launcher extends App {
  val pubSub = PublishSubject.create[Int]
  val serSub = new SerializedSubject(pubSub)
  val observable = Observable.just(1, 2).subscribeOn(Schedulers.computation())

  serSub.subscribe(new Action1[Int] {
    override def call(t: Int): Unit =
      println(s"Subject subscribeOn $t, ${Thread.currentThread().getName}")
  })

  observable.subscribe(pubSub)
  Observable.just(6, 7, 8, 9, 10).subscribe(pubSub)

  while (true) {}
}

then things get printed in the expected thread. Is this the legit way to do it if I want this behavior?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions