diff --git a/build.sbt b/build.sbt index ab37f1a..9eb6184 100644 --- a/build.sbt +++ b/build.sbt @@ -32,6 +32,7 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) ProblemFilters.exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPool.destroy"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPool.reap"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPool.put"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.typelevel.keypool.KeyPool.put"), ProblemFilters .exclude[DirectMissingMethodProblem]("org.typelevel.keypool.KeyPool#KeyPoolConcrete.this"), ProblemFilters.exclude[DirectMissingMethodProblem]( diff --git a/core/src/main/scala/org/typelevel/keypool/KeyPool.scala b/core/src/main/scala/org/typelevel/keypool/KeyPool.scala index d98245f..0c83f90 100644 --- a/core/src/main/scala/org/typelevel/keypool/KeyPool.scala +++ b/core/src/main/scala/org/typelevel/keypool/KeyPool.scala @@ -28,6 +28,7 @@ import cats.effect.std.Semaphore import cats.syntax.all._ import scala.concurrent.duration._ import org.typelevel.keypool.internal._ +import cats.effect.kernel.Resource.ExitCase /** * This pools internal guarantees are that the max number of values are in the pool at any time, not @@ -68,7 +69,7 @@ object KeyPool { private[keypool] val kpMaxIdle: Int, private[keypool] val kpMaxTotal: Int, private[keypool] val kpMaxTotalSem: Semaphore[F], - private[keypool] val kpVar: Ref[F, PoolMap[A, (B, F[Unit])]] + private[keypool] val kpVar: Ref[F, PoolMap[A, (B, ExitCase => F[Unit])]] ) extends KeyPool[F, A, B] { def take(k: A): Resource[F, Managed[F, B]] = @@ -115,20 +116,22 @@ object KeyPool { * Make a 'KeyPool' inactive and destroy all idle resources. */ private[keypool] def destroy[F[_]: MonadThrow, A, B]( - kpVar: Ref[F, PoolMap[A, (B, F[Unit])]] - ): F[Unit] = for { - m <- kpVar.getAndSet(PoolMap.closed[A, (B, F[Unit])]) - _ <- m match { - case PoolClosed() => Applicative[F].unit - case PoolOpen(_, m2) => - m2.toList.traverse_ { case (_, pl) => - pl.toList - .traverse_ { case (_, r) => - r._2.attempt.void - } - } - } - } yield () + kpVar: Ref[F, PoolMap[A, (B, ExitCase => F[Unit])]], + exit: ExitCase + ): F[Unit] = + for { + m <- kpVar.getAndSet(PoolMap.closed[A, (B, ExitCase => F[Unit])]) + _ <- m match { + case PoolClosed() => Applicative[F].unit + case PoolOpen(_, m2) => + m2.toList.traverse_ { case (_, pl) => + pl.toList + .traverse_ { case (_, r) => + r._2(exit).attempt.void + } + } + } + } yield () /** * Run a reaper thread, which will destroy old resources. It will stop running once our pool @@ -136,7 +139,7 @@ object KeyPool { */ private[keypool] def reap[F[_], A, B]( idleTimeAllowedInPoolNanos: FiniteDuration, - kpVar: Ref[F, PoolMap[A, (B, F[Unit])]], + kpVar: Ref[F, PoolMap[A, (B, ExitCase => F[Unit])]], onReaperException: Throwable => F[Unit] )(implicit F: Temporal[F]): F[Unit] = { // We are going to do non-referentially transparent things as we may be waiting for our modification to go through @@ -144,8 +147,8 @@ object KeyPool { def findStale( now: FiniteDuration, idleCount: Int, - m: Map[A, PoolList[(B, F[Unit])]] - ): (PoolMap[A, (B, F[Unit])], List[(A, (B, F[Unit]))]) = { + m: Map[A, PoolList[(B, ExitCase => F[Unit])]] + ): (PoolMap[A, (B, ExitCase => F[Unit])], List[(A, (B, ExitCase => F[Unit]))]) = { val isNotStale: FiniteDuration => Boolean = time => time + idleTimeAllowedInPoolNanos >= now // Time value is alright inside the KeyPool in nanos. @@ -157,19 +160,24 @@ object KeyPool { // (Map key (PoolList resource), [resource]) @annotation.tailrec def findStale_( - toKeep: List[(A, PoolList[(B, F[Unit])])] => List[(A, PoolList[(B, F[Unit])])], - toDestroy: List[(A, (B, F[Unit]))] => List[(A, (B, F[Unit]))], - l: List[(A, PoolList[(B, F[Unit])])] - ): (Map[A, PoolList[(B, F[Unit])]], List[(A, (B, F[Unit]))]) = { + toKeep: List[(A, PoolList[(B, ExitCase => F[Unit])])] => List[ + (A, PoolList[(B, ExitCase => F[Unit])]) + ], + toDestroy: List[(A, (B, ExitCase => F[Unit]))] => List[(A, (B, ExitCase => F[Unit]))], + l: List[(A, PoolList[(B, ExitCase => F[Unit])])] + ): (Map[A, PoolList[(B, ExitCase => F[Unit])]], List[(A, (B, ExitCase => F[Unit]))]) = { l match { case Nil => (toKeep(List.empty).toMap, toDestroy(List.empty)) case (key, pList) :: rest => // Can use span since we know everything will be ordered as the time is // when it is placed back into the pool. val (notStale, stale) = pList.toList.span(r => isNotStale(r._1)) - val toDestroy_ : List[(A, (B, F[Unit]))] => List[(A, (B, F[Unit]))] = l => - toDestroy(stale.map(t => key -> t._2) ++ l) - val toKeep_ : List[(A, PoolList[(B, F[Unit])])] => List[(A, PoolList[(B, F[Unit])])] = + val toDestroy_ : List[(A, (B, ExitCase => F[Unit]))] => List[ + (A, (B, ExitCase => F[Unit])) + ] = l => toDestroy(stale.map(t => key -> t._2) ++ l) + val toKeep_ : List[(A, PoolList[(B, ExitCase => F[Unit])])] => List[ + (A, PoolList[(B, ExitCase => F[Unit])]) + ] = l => PoolList.fromList(notStale) match { case None => toKeep(l) @@ -187,9 +195,8 @@ object KeyPool { val sleep = Temporal[F].sleep(5.seconds).void // Wait 5 Seconds - def loop: F[Unit] = for { - now <- Temporal[F].monotonic - _ <- { + def loop: F[Unit] = Temporal[F].monotonic + .flatMap { now => kpVar.tryModify { case p @ PoolClosed() => (p, F.unit) case p @ PoolOpen(idleCount, m) => @@ -199,7 +206,8 @@ object KeyPool { val (m_, toDestroy) = findStale(now, idleCount, m) ( m_, - toDestroy.traverse_(_._2._2).attempt.flatMap { + // In this context, we're closing the resource due to it not being used for a while - hence a Succeeded exit case. + toDestroy.traverse_(_._2._2(ExitCase.Succeeded)).attempt.flatMap { case Left(t) => onReaperException(t) // .handleErrorWith(t => F.delay(t.printStackTrace())) // CHEATING? case Right(()) => F.unit @@ -207,17 +215,17 @@ object KeyPool { ) } } - }.flatMap { + } + .flatMap { case Some(act) => act >> sleep >> loop case None => loop } - } yield () loop } private[keypool] def state[F[_]: Functor, A, B]( - kpVar: Ref[F, PoolMap[A, (B, F[Unit])]] + kpVar: Ref[F, PoolMap[A, (B, ExitCase => F[Unit])]] ): F[(Int, Map[A, Int])] = kpVar.get.map { case PoolClosed() => @@ -237,8 +245,8 @@ object KeyPool { kp: KeyPoolConcrete[F, A, B], k: A, r: B, - destroy: F[Unit] - ): F[Unit] = { + destroy: ExitCase => F[Unit] + ): ExitCase => F[Unit] = { def addToList[Z]( now: FiniteDuration, maxCount: Int, @@ -254,7 +262,10 @@ object KeyPool { else (l, Some(x)) } } - def go(now: FiniteDuration, pc: PoolMap[A, (B, F[Unit])]): (PoolMap[A, (B, F[Unit])], F[Unit]) = + def go( + now: FiniteDuration, + pc: PoolMap[A, (B, ExitCase => F[Unit])] + ): (PoolMap[A, (B, ExitCase => F[Unit])], ExitCase => F[Unit]) = pc match { case p @ PoolClosed() => (p, destroy) case p @ PoolOpen(idleCount, m) => @@ -264,25 +275,28 @@ object KeyPool { case None => val cnt_ = idleCount + 1 val m_ = PoolMap.open(cnt_, m + (k -> One((r, destroy), now))) - (m_, Applicative[F].pure(())) + (m_, Function.const[F[Unit], ExitCase](Applicative[F].unit)) case Some(l) => val (l_, mx) = addToList(now, kp.kpMaxPerKey(k), (r, destroy), l) val cnt_ = idleCount + mx.fold(1)(_ => 0) val m_ = PoolMap.open(cnt_, m + (k -> l_)) - (m_, mx.fold(Applicative[F].unit)(_ => destroy)) + (m_, mx.fold((_: ExitCase) => Applicative[F].unit)(_ => destroy)) } } - Clock[F].monotonic.flatMap { now => - kp.kpVar.modify(pm => go(now, pm)).flatten - } + (exit: ExitCase) => + Clock[F].monotonic.flatMap { now => + kp.kpVar.modify(pm => go(now, pm)).flatMap(_(exit)) + } } private[keypool] def take[F[_]: Temporal, A, B]( kp: KeyPoolConcrete[F, A, B], k: A ): Resource[F, Managed[F, B]] = { - def go(pm: PoolMap[A, (B, F[Unit])]): (PoolMap[A, (B, F[Unit])], Option[(B, F[Unit])]) = + def go( + pm: PoolMap[A, (B, ExitCase => F[Unit])] + ): (PoolMap[A, (B, ExitCase => F[Unit])], Option[(B, ExitCase => F[Unit])]) = pm match { case p @ PoolClosed() => (p, None) case pOrig @ PoolOpen(idleCount, m) => @@ -299,14 +313,14 @@ object KeyPool { _ <- kp.kpMaxTotalSem.permit optR <- Resource.eval(kp.kpVar.modify(go)) releasedState <- Resource.eval(Ref[F].of[Reusable](kp.kpDefaultReuseState)) - resource <- Resource.makeFull[F, (B, F[Unit])] { poll => - optR.fold(poll(kp.kpRes(k).allocated))(r => Applicative[F].pure(r)) - } { resource => + resource <- Resource.makeCaseFull[F, (B, ExitCase => F[Unit])] { poll => + optR.fold(poll(kp.kpRes(k).allocatedCase))(r => Applicative[F].pure(r)) + } { (resource, exitCase) => for { reusable <- releasedState.get out <- reusable match { - case Reusable.Reuse => put(kp, k, resource._1, resource._2).attempt.void - case Reusable.DontReuse => resource._2.attempt.void + case Reusable.Reuse => put(kp, k, resource._1, resource._2).apply(exitCase).attempt.void + case Reusable.DontReuse => resource._2(exitCase).attempt.void } } yield out } @@ -370,9 +384,11 @@ object KeyPool { def keepRunning[Z](fa: F[Z]): F[Z] = fa.onError { case e => onReaperException(e) }.attempt >> keepRunning(fa) for { - kpVar <- Resource.make( - Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]])) - )(kpVar => KeyPool.destroy(kpVar)) + kpVar <- Resource.makeCase( + Ref[F].of[PoolMap[A, (B, ExitCase => F[Unit])]]( + PoolMap.open(0, Map.empty[A, PoolList[(B, ExitCase => F[Unit])]]) + ) + )(KeyPool.destroy) kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong)) _ <- idleTimeAllowedInPool match { case fd: FiniteDuration => diff --git a/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala b/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala index 8a71389..5f86784 100644 --- a/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala +++ b/core/src/main/scala/org/typelevel/keypool/KeyPoolBuilder.scala @@ -28,6 +28,7 @@ import cats.effect.kernel._ import cats.effect.kernel.syntax.spawn._ import cats.effect.std.Semaphore import scala.concurrent.duration._ +import cats.effect.kernel.Resource.ExitCase @deprecated("use KeyPool.Builder", "0.4.7") final class KeyPoolBuilder[F[_]: Temporal, A, B] private ( @@ -88,9 +89,11 @@ final class KeyPoolBuilder[F[_]: Temporal, A, B] private ( def keepRunning[Z](fa: F[Z]): F[Z] = fa.onError { case e => onReaperException(e) }.attempt >> keepRunning(fa) for { - kpVar <- Resource.make( - Ref[F].of[PoolMap[A, (B, F[Unit])]](PoolMap.open(0, Map.empty[A, PoolList[(B, F[Unit])]])) - )(kpVar => KeyPool.destroy(kpVar)) + kpVar <- Resource.makeCase( + Ref[F].of[PoolMap[A, (B, ExitCase => F[Unit])]]( + PoolMap.open(0, Map.empty[A, PoolList[(B, ExitCase => F[Unit])]]) + ) + )(KeyPool.destroy) kpMaxTotalSem <- Resource.eval(Semaphore[F](kpMaxTotal.toLong)) _ <- idleTimeAllowedInPool match { case fd: FiniteDuration =>