Skip to content

evolution-gaming/akka-effect

Repository files navigation

Akka-Effect

Build Status Coverage Status Codacy Badge Version License: MIT

This project aims to build a bridge between akka and pure functional code based on cats-effect

Covered:

Building blocks

akka-effect-actor module

Represents ActorRef.tell

trait Tell[F[_], -A] {

  def apply(a: A, sender: Option[ActorRef] = None): F[Unit]
}

Represents ActorRef.ask pattern

trait Ask[F[_], -A, B] {

  def apply(msg: A, timeout: FiniteDuration, sender: Option[ActorRef]): F[B]
}

Represents reply pattern: sender() ! reply

trait Reply[F[_], -A] {

  def apply(msg: A): F[Unit]
}

This is what you need to implement instead of familiar new Actor { ... }

trait Receive[F[_], -A, B] {

  def apply(msg: A): F[B]

  def timeout:  F[B]
}

Constructs Actor.scala out of receive: ActorCtx[F] => Resource[F, Receive[F, Any]]

Wraps ActorContext

trait ActorCtx[F[_]] {

  def self: ActorRef

  def parent: ActorRef

  def executor: ExecutionContextExecutor

  def setReceiveTimeout(timeout: Duration): F[Unit]

  def child(name: String): F[Option[ActorRef]]

  def children: F[List[ActorRef]]

  def actorRefFactory: ActorRefFactory

  def watch[A](actorRef: ActorRef, msg: A): F[Unit]

  def unwatch(actorRef: ActorRef): F[Unit]

  def stop: F[Unit]
}

akka-effect-persistence module

Constructs PersistentActor.scala out of eventSourcedOf: ActorCtx[F] => F[EventSourced[F, S, E, C]]

Describes a lifecycle of entity with regard to event sourcing, phases are: Started, Recovering, Receiving and Termination

trait EventSourced[F[_], S, E, C] {

  def eventSourcedId: EventSourcedId

  def recovery: Recovery

  def pluginIds: PluginIds

  def start: Resource[F, RecoveryStarted[F, S, E, C]]
}

Describes start of recovery phase

trait RecoveryStarted[F[_], S, E, C] {

  def apply(
    seqNr: SeqNr,
    snapshotOffer: Option[SnapshotOffer[S]]
  ): Resource[F, Recovering[F, S, E, C]]
}

Describes recovery phase

trait Recovering[F[_], S, E, C] {

  def replay: Resource[F, Replay[F, E]]

  def completed(
    seqNr: SeqNr,
    journaller: Journaller[F, E],
    snapshotter: Snapshotter[F, S]
  ): Resource[F, Receive[F, C]]
}

Used during recovery to replay events

trait Replay[F[_], A] {

  def apply(seqNr: SeqNr, event: A): F[Unit]
}

Describes communication with underlying journal

trait Journaller[F[_], -A] {

  def append: Append[F, A]

  def deleteTo: DeleteEventsTo[F]
}

Describes communication with underlying snapshot storage

/**
  * Describes communication with underlying snapshot storage
  *
  * @tparam A - snapshot
  */
trait Snapshotter[F[_], -A] {

  def save(seqNr: SeqNr, snapshot: A): F[F[Instant]]

  def delete(seqNr: SeqNr): F[F[Unit]]

  def delete(criteria: SnapshotSelectionCriteria): F[F[Unit]]
}

akka-effect-eventsourced module

This is the main runtime/queue where all actions against your state are processed in desired eventsourcing sequence:

  1. validate and finalize events
  2. append events to journal
  3. publish changed state
  4. execute side effects

It is optimised for maximum throughput hence different steps of different actions might be executed in parallel as well as events might be stored in batches

trait Engine[F[_], S, E] {

  def state: F[State[S]]

  /**
    * @return Outer F[_] is about `load` being enqueued, this immediately provides order guarantees
    *         Inner F[_] is about `load` being completed
    */
  def apply[A](load: F[Validate[F, S, E, A]]): F[F[A]]
}

Setup

in build.sbt

addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")

libraryDependencies += "com.evolutiongaming" %% "akka-effect-actor" % "0.2.1"

libraryDependencies += "com.evolutiongaming" %% "akka-effect-actor-tests" % "0.2.1"

libraryDependencies += "com.evolutiongaming" %% "akka-effect-persistence" % "0.2.1"

libraryDependencies += "com.evolutiongaming" %% "akka-effect-eventsourcing" % "0.2.1"

libraryDependencies += "com.evolutiongaming" %% "akka-effect-cluster" % "0.2.1"

libraryDependencies += "com.evolutiongaming" %% "akka-effect-cluster-sharding" % "0.2.1"