Skip to content

Commit

Permalink
Merge pull request #262 from polyvariant/ce3
Browse files Browse the repository at this point in the history
Cats Effect 3 migration
  • Loading branch information
kubukoz committed Jun 12, 2021
2 parents 0aed18b + 74a52e9 commit 1bbec11
Show file tree
Hide file tree
Showing 20 changed files with 162 additions and 170 deletions.
57 changes: 32 additions & 25 deletions build.sbt
Expand Up @@ -56,6 +56,13 @@ ThisBuild / missinglinkExcludedDependencies += moduleFilter(
name = "slf4j-api"
)

ThisBuild / libraryDependencySchemes ++= Seq(
"io.circe" %% "circe-core" % "early-semver",
"io.circe" %% "circe-generic-extras" % "early-semver",
"io.circe" %% "circe-literal" % "early-semver",
"io.circe" %% "circe-parser" % "early-semver"
)

def crossPlugin(x: sbt.librarymanagement.ModuleID) =
compilerPlugin(x.cross(CrossVersion.full))

Expand All @@ -71,31 +78,31 @@ val commonSettings = List(
scalacOptions += "-Ymacro-annotations",
libraryDependencies ++= List(
"org.typelevel" %% "cats-core" % "2.6.1",
"org.typelevel" %% "cats-effect" % "2.5.1",
"org.typelevel" %% "cats-effect" % "3.1.1",
"org.typelevel" %% "cats-tagless-macros" % "0.14.0",
"co.fs2" %% "fs2-core" % "2.5.6",
"com.github.valskalla" %% "odin-core" % "0.11.0",
"io.circe" %% "circe-core" % "0.13.0",
"co.fs2" %% "fs2-core" % "3.0.4",
"io.github.irevive" %% "odin-core" % "0.12.0-M3",
"io.circe" %% "circe-core" % "0.14.1",
"com.github.julien-truffaut" %% "monocle-macro" % "2.1.0",
"com.disneystreaming" %% "weaver-framework" % "0.5.1" % Test,
"com.disneystreaming" %% "weaver-scalacheck" % "0.5.1" % Test
"com.disneystreaming" %% "weaver-cats" % "0.7.3" % Test,
"com.disneystreaming" %% "weaver-scalacheck" % "0.7.3" % Test
) ++ compilerPlugins,
testFrameworks += new TestFramework("weaver.framework.TestFramework"),
testFrameworks += new TestFramework("weaver.framework.CatsEffect"),
publish / skip := true
)

lazy val gitlab = project
.settings(
commonSettings,
libraryDependencies ++= List(
"is.cir" %% "ciris" % "1.2.1",
"com.kubukoz" %% "caliban-gitlab" % "0.0.14",
"io.circe" %% "circe-generic-extras" % "0.13.0",
"io.circe" %% "circe-parser" % "0.13.0" % Test,
"io.circe" %% "circe-literal" % "0.13.0" % Test,
"com.softwaremill.sttp.tapir" %% "tapir-core" % "0.17.19",
"com.softwaremill.sttp.tapir" %% "tapir-json-circe" % "0.17.19",
"com.softwaremill.sttp.tapir" %% "tapir-sttp-client" % "0.17.19"
"is.cir" %% "ciris" % "2.0.1",
"com.kubukoz" %% "caliban-gitlab" % "0.1.0",
"io.circe" %% "circe-generic-extras" % "0.14.1",
"io.circe" %% "circe-parser" % "0.14.1" % Test,
"io.circe" %% "circe-literal" % "0.14.1" % Test,
"com.softwaremill.sttp.tapir" %% "tapir-core" % "0.18.0-M15",
"com.softwaremill.sttp.tapir" %% "tapir-json-circe" % "0.18.0-M15",
"com.softwaremill.sttp.tapir" %% "tapir-sttp-client" % "0.18.0-M15"
)
)
.dependsOn(core)
Expand Down Expand Up @@ -181,19 +188,19 @@ lazy val pitgull =
buildInfoPackage := "io.pg",
buildInfoKeys := List(version, scalaVersion),
libraryDependencies ++= List(
"com.softwaremill.sttp.client3" %% "http4s-ce2-backend" % "3.3.6",
"org.http4s" %% "http4s-dsl" % "0.21.24",
"org.http4s" %% "http4s-circe" % "0.21.24",
"org.http4s" %% "http4s-blaze-server" % "0.21.24",
"org.http4s" %% "http4s-blaze-client" % "0.21.24",
"is.cir" %% "ciris" % "1.2.1",
"io.circe" %% "circe-generic-extras" % "0.13.0",
"com.softwaremill.sttp.client3" %% "http4s-backend" % "3.3.6",
"org.http4s" %% "http4s-dsl" % "0.23.0-RC1",
"org.http4s" %% "http4s-circe" % "0.23.0-RC1",
"org.http4s" %% "http4s-blaze-server" % "0.23.0-RC1",
"org.http4s" %% "http4s-blaze-client" % "0.23.0-RC1",
"is.cir" %% "ciris" % "2.0.1",
"io.circe" %% "circe-generic-extras" % "0.14.0",
"io.estatico" %% "newtype" % "0.4.4",
"io.scalaland" %% "chimney" % "0.6.1",
"io.chrisdavenport" %% "cats-time" % "0.3.4",
"com.github.valskalla" %% "odin-core" % "0.11.0",
"com.github.valskalla" %% "odin-slf4j" % "0.11.0",
"io.github.vigoo" %% "prox-fs2" % "0.7.1"
"io.github.irevive" %% "odin-core" % "0.12.0-M3",
"io.github.irevive" %% "odin-slf4j" % "0.12.0-M3",
"io.github.vigoo" %% "prox-fs2-3" % "0.7.1"
)
)
.dependsOn(core, gitlab)
Expand Down
16 changes: 0 additions & 16 deletions core/src/main/scala/io/pg/Prelude.scala
@@ -1,22 +1,6 @@
package io.pg

import cats.effect.Resource
import cats.syntax.functor._
import cats.Applicative
import cats.MonadError

object Prelude {
type MonadThrow[F[_]] = MonadError[F, Throwable]

implicit class EffectToResourceLiftSyntax[F[_], A](private val fa: F[A]) extends AnyVal {

def resource(implicit F: Applicative[F]): Resource[F, A] =
Resource.eval(fa)

def resource_(implicit F: Applicative[F]): Resource[F, Unit] =
fa.void.resource

}

implicit class AnythingAnything[A](private val a: A) extends AnyVal {
def ??? : Nothing = ???
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/io/pg/background/background.scala
Expand Up @@ -15,13 +15,13 @@ object BackgroundProcess {

final case class DrainStream[F[_], G[_]](
stream: fs2.Stream[F, Nothing],
C: fs2.Stream.Compiler[F, G]
C: fs2.Compiler[F, G]
) extends BackgroundProcess[G]

def fromStream[F[_], G[_]](
stream: fs2.Stream[F, _]
)(
implicit C: fs2.Stream.Compiler[F, G]
implicit C: fs2.Compiler[F, G]
): BackgroundProcess[G] =
new DrainStream(stream.drain, C)

Expand All @@ -30,7 +30,7 @@ object BackgroundProcess {
)(
processor: Processor[F, A]
)(
implicit C: fs2.Stream.Compiler[F, F]
implicit C: fs2.Compiler[F, F]
): BackgroundProcess[F] =
fromStream(channel.consume.through(processor.process))

Expand Down
9 changes: 5 additions & 4 deletions core/src/main/scala/io/pg/messaging/messaging.scala
@@ -1,11 +1,12 @@
package io.pg.messaging

import fs2.concurrent.Queue
import cats.effect.std.Queue
import scala.reflect.ClassTag
import cats.tagless.autoInvariant
import cats.syntax.all._
import cats.ApplicativeError
import io.odin.Logger
import cats.Functor

trait Publisher[F[_], -A] {
def publish(a: A): F[Unit]
Expand Down Expand Up @@ -41,10 +42,10 @@ trait Channel[F[_], A] extends Publisher[F, A] { self =>

object Channel {

def fromQueue[F[_], A](q: Queue[F, A]): Channel[F, A] =
def fromQueue[F[_]: Functor, A](q: Queue[F, A]): Channel[F, A] =
new Channel[F, A] {
def publish(a: A): F[Unit] = q.enqueue1(a)
val consume: fs2.Stream[F, A] = q.dequeue
def publish(a: A): F[Unit] = q.offer(a)
val consume: fs2.Stream[F, A] = fs2.Stream.fromQueueUnterminated(q)
}

implicit class ChannelOpticsSyntax[F[_], A](val ch: Channel[F, A]) extends AnyVal {
Expand Down
6 changes: 3 additions & 3 deletions gitlab/src/main/scala/io/pg/gitlab/Gitlab.scala
Expand Up @@ -21,7 +21,7 @@ import io.pg.gitlab.graphql.PipelineStatusEnum
import io.pg.gitlab.graphql.Project
import io.pg.gitlab.graphql.ProjectConnection
import io.pg.gitlab.graphql.Query
import io.pg.gitlab.graphql.User
import io.pg.gitlab.graphql.UserCore
import sttp.client3.Request
import sttp.client3.SttpBackend
import sttp.model.Uri
Expand Down Expand Up @@ -80,7 +80,7 @@ object Gitlab {
accessToken: Secret[String]
)(
implicit backend: SttpBackend[F, Any],
SC: fs2.Stream.Compiler[F, F]
SC: fs2.Compiler[F, F]
): Gitlab[F] = {

def runRequest[O](request: Request[O, Any]): F[O] =
Expand Down Expand Up @@ -139,7 +139,7 @@ object Gitlab {
MergeRequest.iid.mapEither(_.toLongOption.toRight(DecodingError("MR IID wasn't a Long"))) ~
MergeRequest.headPipeline(Pipeline.status.map(convertPipelineStatus)) ~
MergeRequest
.author(User.username)
.author(UserCore.username)
.mapEither(_.toRight(DecodingError("MR has no author"))) ~
MergeRequest.description ~
MergeRequest.shouldBeRebased ~
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/slf4j/impl/StaticLoggerBinder.java
Expand Up @@ -8,3 +8,4 @@ public static StaticLoggerBinder getSingleton() {
return _instance;
}
}

99 changes: 47 additions & 52 deletions src/main/scala/io/pg/Application.scala
@@ -1,12 +1,11 @@
package io.pg

import cats.data.NonEmptyList
import cats.effect.Blocker
import cats.effect.ConcurrentEffect
import cats.effect.ContextShift
import cats.effect.Resource
import cats.effect.kernel.Async
import cats.effect.std.Queue
import cats.effect.implicits._
import cats.syntax.all._
import fs2.concurrent.Queue
import io.odin.Logger
import io.pg.background.BackgroundProcess
import io.pg.config.ProjectConfigReader
Expand All @@ -15,16 +14,14 @@ import io.pg.gitlab.webhook.WebhookEvent
import io.pg.messaging._
import io.pg.webhook._
import org.http4s.HttpApp
import org.http4s.client.blaze.BlazeClientBuilder
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.implicits._
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3.SttpBackend
import sttp.client3.http4s.Http4sBackend

import scala.concurrent.ExecutionContext

import Prelude._

sealed trait Event extends Product with Serializable

object Event {
Expand All @@ -38,56 +35,54 @@ final class Application[F[_]](

object Application {

def resource[F[_]: ConcurrentEffect: ContextShift: Logger](
def resource[F[_]: Logger: Async](
config: AppConfig
): Resource[F, Application[F]] = {
implicit val projectConfigReader = ProjectConfigReader.test[F]

Blocker[F].flatMap { blocker =>
Queue
.bounded[F, Event](config.queues.maxSize)
.map(Channel.fromQueue)
.resource
.flatMap { eventChannel =>
implicit val webhookChannel: Channel[F, WebhookEvent] =
eventChannel.only[Event.Webhook].imap(_.value)(Event.Webhook)

BlazeClientBuilder[F](ExecutionContext.global)
.resource
.map(
org
.http4s
.client
.middleware
.Logger(logHeaders = true, logBody = false, redactHeadersWhen = config.middleware.sensitiveHeaders.contains)
Queue
.bounded[F, Event](config.queues.maxSize)
.map(Channel.fromQueue(_))
.toResource
.flatMap { eventChannel =>
implicit val webhookChannel: Channel[F, WebhookEvent] =
eventChannel.only[Event.Webhook].imap(_.value)(Event.Webhook)

BlazeClientBuilder[F](ExecutionContext.global)
.resource
.map(
org
.http4s
.client
.middleware
.Logger(logHeaders = true, logBody = false, redactHeadersWhen = config.middleware.sensitiveHeaders.contains)
)
.map { client =>
implicit val backend: SttpBackend[F, Fs2Streams[F]] =
Http4sBackend.usingClient[F](client)

implicit val gitlab: Gitlab[F] =
Gitlab.sttpInstance[F](config.git.apiUrl, config.git.apiToken)

implicit val projectActions: ProjectActions[F] =
ProjectActions.instance[F]

implicit val stateResolver: StateResolver[F] =
StateResolver.instance[F]

implicit val mergeRequests: MergeRequests[F] =
MergeRequests.instance[F]

val webhookProcess = BackgroundProcess.fromProcessor(
webhookChannel
)(Processor.simple(WebhookProcessor.instance[F]))

new Application[F](
routes = WebhookRouter.routes[F].orNotFound,
background = NonEmptyList.one(webhookProcess)
)
.map { client =>
implicit val backend: SttpBackend[F, Fs2Streams[F]] =
Http4sBackend.usingClient[F](client, blocker)

implicit val gitlab: Gitlab[F] =
Gitlab.sttpInstance[F](config.git.apiUrl, config.git.apiToken)

implicit val projectActions: ProjectActions[F] =
ProjectActions.instance[F]

implicit val stateResolver: StateResolver[F] =
StateResolver.instance[F]

implicit val mergeRequests: MergeRequests[F] =
MergeRequests.instance[F]

val webhookProcess = BackgroundProcess.fromProcessor(
webhookChannel
)(Processor.simple(WebhookProcessor.instance[F]))

new Application[F](
routes = WebhookRouter.routes[F].orNotFound,
background = NonEmptyList.one(webhookProcess)
)
}
}
}
}
}
}

}

0 comments on commit 1bbec11

Please sign in to comment.