Skip to content

Commit

Permalink
chore: clean code in ClusterSharding (#1305)
Browse files Browse the repository at this point in the history
* chore: clean code in ClusterSharding

* cleaner ShardCommandActor
  • Loading branch information
Roiocam committed May 19, 2024
1 parent 7eec328 commit 5d23adb
Showing 1 changed file with 14 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import pekko.actor.typed.ActorRef
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Behavior
import pekko.actor.typed.Props
import pekko.actor.typed.TypedActorContext
import pekko.actor.typed.internal.InternalRecipientRef
import pekko.actor.typed.internal.PoisonPill
import pekko.actor.typed.internal.PoisonPillInterceptor
Expand Down Expand Up @@ -194,12 +193,9 @@ import pekko.util.JavaDurationConverters._
typeKey.name,
new java.util.function.Function[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] {
override def apply(t: String): ActorRef[scaladsl.ClusterSharding.ShardCommand] = {
// using classic.systemActorOf to avoid the Future[ActorRef]
system.toClassic
.asInstanceOf[ExtendedActorSystem]
.systemActorOf(
PropsAdapter(ShardCommandActor.behavior(stopMessage.getOrElse(PoisonPill))),
URLEncoder.encode(typeKey.name, ByteString.UTF_8) + "ShardCommandDelegator")
system.systemActorOf(
ShardCommandActor.behavior(stopMessage.getOrElse(PoisonPill)),
URLEncoder.encode(typeKey.name, ByteString.UTF_8) + "ShardCommandDelegator")
}
})

Expand Down Expand Up @@ -307,11 +303,8 @@ import pekko.util.JavaDurationConverters._
}
}

override lazy val shardState: ActorRef[ClusterShardingQuery] = {
import pekko.actor.typed.scaladsl.adapter._
val behavior = ShardingState.behavior(classicSharding)
classicSystem.systemActorOf(PropsAdapter(behavior), "typedShardState")
}
override lazy val shardState: ActorRef[ClusterShardingQuery] =
system.systemActorOf(ShardingState.behavior(classicSharding), "typedShardState")

}

Expand Down Expand Up @@ -436,22 +429,26 @@ import pekko.util.JavaDurationConverters._
* INTERNAL API
*/
@InternalApi private[pekko] object ShardCommandActor {
import pekko.actor
import pekko.actor.typed.scaladsl.adapter._
import pekko.cluster.sharding.ShardRegion.{ Passivate => ClassicPassivate }

def behavior(stopMessage: Any): Behavior[scaladsl.ClusterSharding.ShardCommand] = {
def sendClassicPassivate(entity: ActorRef[_], ctx: TypedActorContext[_]): Unit = {
val pathToShard = entity.toClassic.path.elements.take(4).mkString("/")
ctx.asScala.system.toClassic.actorSelection(pathToShard).tell(ClassicPassivate(stopMessage), entity.toClassic)
def sendClassicPassivate(entity: ActorRef[_], classicSystem: actor.ActorSystem): Unit = {
val classicRef = entity.toClassic
val pathToShard = classicRef.path.elements.take(4).mkString("/")
classicSystem.actorSelection(pathToShard).tell(ClassicPassivate(stopMessage), classicRef)
}

Behaviors.receive { (ctx, msg) =>
val classicSystem = ctx.asScala.system.toClassic

msg match {
case scaladsl.ClusterSharding.Passivate(entity) =>
sendClassicPassivate(entity, ctx)
sendClassicPassivate(entity, classicSystem)
Behaviors.same
case javadsl.ClusterSharding.Passivate(entity) =>
sendClassicPassivate(entity, ctx)
sendClassicPassivate(entity, classicSystem)
Behaviors.same
case _ =>
Behaviors.unhandled
Expand Down

0 comments on commit 5d23adb

Please sign in to comment.