Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.x] Scheduler should have a bag of properties #1551

Open
alexandru opened this issue May 6, 2022 · 0 comments
Open

[4.x] Scheduler should have a bag of properties #1551

alexandru opened this issue May 6, 2022 · 0 comments

Comments

@alexandru
Copy link
Member

alexandru commented May 6, 2022

Currently, the Scheduler interface has these methods:

trait Scheduler {
  //...
  def features: Features
 
  def executionModel: ExecutionModel
  def withExecutionModel(em: ExecutionModel): Scheduler

  def reportFailure(t: Throwable): Unit
  def withUncaughtExceptionReporter(r: UncaughtExceptionReporter): Scheduler
}

These methods point to a more general concept — that of a "properties bag" that have to do with the "environment". These are used to inject properties, for example in Observable, such that whatever run-loop we are describing, the implementation knows how to deal with fairness concerns, or how to log exceptions.

However, it's not a perfect abstraction. For example, in Monix version 3.x, executing a Task requires Task.Options:

object Task {
  final case class Options(
    autoCancelableRunLoops: Boolean,
    localContextPropagation: Boolean
  )
}

Again, we have run-loop specific options, but this time the options are specific to Task. And so Task then has methods like these:

trait Task[+A] {
   //...
   def runToFutureOpt(implicit s: Scheduler, opts: Options): CancelableFuture[A]
}

Complicating the picture is Observable, which, in its implementation, ends up executing Task references. For example, this method is forced to use a default Task.Options, as there's no way to inject it (Observable has no notion of Task.Options):

trait Observable[+A] {
  // ...
  def mapEval[B](f: A => Task[B]): Observable[B]
}

In other words, Scheduler isn't enough. But it should be, otherwise the integration with Observable (or similar) isn't good.

There's more — Scheduler could be used for all kinds of dependency injection. Consider the current Task.deferAction:

object Task {
  //...
  def deferAction[A](f: Scheduler => Task[A]): Task[A] 
}

Wouldn't it be cool if we could use Scheduler for the dependency injection of stuff provided by the “environment”? Dependency injection for necessary utilities and resources that are commonly application-wide and part of the environment, such as logging, or metrics.

Case Study: Akka Streams

An example of something similar is the Attributes from Akka Streams. These Attributes in Akka Streams are used to override default behavior. You can, for example, override the buffers that Akka Streams uses on async boundaries:

import Attributes._
val nestedSource =
  Source.single(0).map(_ + 1).named("nestedSource") // Wrap, no inputBuffer set

val nestedFlow =
  Flow[Int]
    .filter(_ != 0)
    .via(Flow[Int].map(_ - 2).withAttributes(inputBuffer(4, 4))) // override
    .named("nestedFlow") // Wrap, no inputBuffer set

val nestedSink =
  nestedFlow
    .to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
    .withAttributes(name("nestedSink") and inputBuffer(3, 3)) // override

Attributes is a data structure like this:

final case class Attributes(attributeList: List[Attributes.Attribute] = Nil)

So it's a list of attributes, where the attributes themselves are defined like:

object Attributes {
  trait Attribute
  sealed trait MandatoryAttribute extends Attribute

  final case class Name(n: String) extends Attribute

  final case class InputBuffer(initial: Int, max: Int) extends MandatoryAttribute

  final case class CancellationStrategy(strategy: CancellationStrategy.Strategy) extends MandatoryAttribute
  //...
}

Proposed solution

We need something similar to Akka Streams, but preferably more type-safe.

I'm thinking that we need a new data-structure:

import monix.newtypes.TypeInfo

final case class Properties(attributes: Map[TypeInfo[_], Any]) {

  def get[A: TypeInfo]: Option[A]

  def add[A: TypeInfo](value: A): Properties

  def remove[A: TypeInfo]: Properties
}

We're using TypeInfo tags from monix-newtypes, as this is conceptually a Map[String, Any], but we need to make it type-safe. Our keys have to be a type descriptor. And we also require support for "newtypes".

Then Scheduler could have this API:

trait Scheduler {
  //...
  def environment: Properties

  def withProperties(props: Properties): Scheduler
}

And, if, for example, we need Task.Options for the execution of Task, we could just do:

scheduler.environment.get[Task.Options]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant