Skip to content

niqdev/zio-kafka-streams

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

97 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

zio-kafka-streams

Build Status

Write and test Kafka Streams applications using ZIO and expose the internal state store directly via GraphQL

Add the following lines to your build.sbt

libraryDependencies ++= Seq(
  "???" %% "zio-kafka-streams" % "???", // core DONE, incomplete api and tests
  "???" %% "zio-kafka-streams-api" % "???", // TODO graphQL (?)
  "???" %% "zio-kafka-streams-testkit" % "???" % Test, // core DONE, incomplete api and tests
  "???" %% "kafka-streams-serde" % "???", // DONE, missing tests
  "???" %% "kafka-datagen" % "???" // TODO incomplete
)

🚧🚧🚧🚧🚧🚧🚧🚧🚧🚧 ⚠️ Work in Progress ⚠️ 🚧🚧🚧🚧🚧🚧🚧🚧🚧🚧

Examples

Examples of how to write Kafka Streams applications using ZStreamsBuilder, ZKStream and ZKTable

ToUpperCase

Probably the simplest Kafka Streams application you could think of

object ToUpperCaseTopology {
  // build the topology
  lazy val topology: RIO[KafkaStreamsConfig with CustomConfig, Topology] =
    for {
      sourceTopic <- CustomConfig.sourceTopic
      sinkTopic   <- CustomConfig.sinkTopic
      topology    <- ZStreamsBuilder { builder =>
        for {
          // compose the topology using ZKStream and ZKTable
          sourceStream <- builder.stream[String, String](sourceTopic)
          sinkStream   <- sourceStream.mapValue(_.toUpperCase)
          _            <- sinkStream.to(sinkTopic)
        } yield ()
      }
    } yield topology
  // define the topology's layer
  val layer: RLayer[ZEnv, KafkaStreamsTopology with KafkaStreamsConfig] =
    ToUpperCaseConfig.layer >+> KafkaStreamsTopology.make(topology)
}
// setup runtime
object ToUpperCaseApp extends KafkaStreamsApp(ToUpperCaseTopology.layer)

How to run the example

# start kafka
make local-up

# create source topic
make topic-create name=example.source.v1

# start application
LOG_LEVEL="INFO" sbt "examples/runMain com.github.niqdev.ToUpperCaseApp"

# access kafka
docker exec -it local-kafka bash

# publish messages
kafka-console-producer --bootstrap-server kafka:9092 --topic example.source.v1

# consume messages
kafka-console-consumer --bootstrap-server kafka:9092 --topic example.sink.v1

Complete example of ToUpperCaseApp

GitHubApp

Joining Avro streams integrated with Schema Registry has never been so easy ;-)

object GitHubTopology {
  lazy val topology: RIO[KafkaStreamsConfig with CustomConfig with Logging, Topology] =
    for {
      config <- KafkaStreamsConfig.config
      _      <- log.info(s"Running ${config.applicationId}")
      _      <- CustomConfig.prettyPrint.flatMap(values => log.info(values))
      topics <- CustomConfig.topics
      topology <- ZStreamsBuilder { builder =>
        for {
          userStream         <- builder.streamAvro[UserKey, UserValue](topics.userSource)
          repositoryStream   <- builder.streamAvro[RepositoryKey, RepositoryValue](topics.repositorySource)
          ghUserStream       <- userStream.mapKey(GitHubEventKey.fromUser)
          ghRepositoryStream <- repositoryStream.mapKey(GitHubEventKey.fromRepository)
          ghUserTable        <- ghUserStream.toTableAvro
          ghRepositoryTable  <- ghRepositoryStream.toTableAvro
          gitHubTable        <- ghUserTable.joinAvro(ghRepositoryTable)(GitHubEventValue.joinUserRepository)
          gitHubStream       <- gitHubTable.toStream
          _                  <- gitHubStream.toAvro(topics.gitHubSink)
        } yield ()
      }
    } yield topology

  val layer: RLayer[ZEnv, KafkaStreamsTopology with KafkaStreamsConfig] =
    Logging.console() ++ GitHubConfig.envLayer >+> KafkaStreamsTopology.make(topology)
}
object GitHubApp extends KafkaStreamsApp(GitHubTopology.layer)

How to run the example

# start kafka
make local-up

# create source topics
make topic-create-all

# generate avsc and register schema
make schema-register-all

# start application
make local-run

How to publish messages locally

# format example data
make format-data-all

# access schema-registry
docker exec -it local-schema-registry bash

# export producer config (see below)
SCHEMA_KEY_ID=XXX
SCHEMA_VALUE_ID=YYY
TOPIC_NAME=ZZZ

# start avro producer
kafka-avro-console-producer \
  --bootstrap-server kafka:29092 \
  --property schema.registry.url="http://schema-registry:8081" \
  --property parse.key=true \
  --property key.schema="$(curl -s http://schema-registry:8081/schemas/ids/$SCHEMA_KEY_ID | jq -r .schema)" \
  --property value.schema="$(curl -s http://schema-registry:8081/schemas/ids/$SCHEMA_VALUE_ID | jq -r .schema)" \
  --property key.separator=::: \
  --topic $TOPIC_NAME

How to consume messages locally

# access schema-registry
docker exec -it local-schema-registry bash

# export consumer config (see below)
TOPIC_NAME=XYZ

# start avro consumer
kafka-avro-console-consumer \
  --bootstrap-server kafka:29092 \
  --property schema.registry.url="http://schema-registry:8081" \
  --property schema.id.separator=: \
  --property print.key=true \
  --property print.schema.ids=true \
  --property key.separator=, \
  --topic $TOPIC_NAME \
  --from-beginning \
  --max-messages 10

Configurations

# produce to "user" topic
SCHEMA_KEY_ID=1
SCHEMA_VALUE_ID=2
TOPIC_NAME=example.user.v1

# produce to "repository" topic
SCHEMA_KEY_ID=3
SCHEMA_VALUE_ID=4
TOPIC_NAME=example.repository.v1

# consume from topics
TOPIC_NAME=example.user.v1
TOPIC_NAME=example.repository.v1
TOPIC_NAME=example.github.v1

Complete example of GitHubApp

TestKit

How to test ToUpperCaseTopology topology with ZTestTopology, ZTestInput and ZTestOutput

// LOG_LEVEL=WARN sbt "test:testOnly *ToUpperCaseSpec"

testM("topology") {
  for {
    sourceTopic <- CustomConfig.sourceTopic
    sinkTopic   <- CustomConfig.sinkTopic
    outputValue <- ZTestTopology.driver.use { driver =>
      for {
        input  <- driver.createInput[String, String](sourceTopic)
        output <- driver.createOutput[String, String](sinkTopic)
        _      <- input.produceValue("myValue")
        value  <- output.consumeValue
      } yield value
    }
  } yield assert(outputValue)(equalTo("MYVALUE"))
}.provideSomeLayerShared(testLayer)

More examples in the tests module

Serdes

kafka-streams-serde is an independent module without ZIO dependencies useful to build Serdes with your favourite effect system

Example of how to autoderive Avro serde for keys and values integrated with Confluent Schema Registry leveraging avro4s

import kafka.streams.serde._

final case class DummyValue(string: String)
object DummyValue {
  final implicit val dummyValueAvroCodec: AvroCodec[DummyValue] =
    AvroCodec.genericValue[DummyValue]
}

For more examples with refined, newtype, enumeratum and custom types see the schema package

Example of how to build a syntax with Cats Effect using Record and AvroRecord

object syntax {
  final implicit def streamsBuilderSyntax[F[_]](builder: StreamsBuilder): StreamsBuilderOps[F] =
    new StreamsBuilderOps(builder)
}

final class StreamsBuilderOps[F[_]](private val builder: StreamsBuilder) extends AnyVal {
  def streamF[K, V](
    topic: String,
    schemaRegistry: String
  )(implicit F: Sync[F], C: AvroRecordConsumed[K, V]): F[KStream[K, V]] =
    F.delay(builder.stream(topic)(C.consumed(schemaRegistry)))
}

Complete example of KafkaStreamsCatsApp

DataGen

TODO

LOG_LEVEL=INFO sbt "examples/runMain com.github.niqdev.SimpleKafkaGenApp"

Development

# start containers in background
# zookeeper|kafka|kafka-rest|kafka-ui|schema-registry|schema-registry-ui|kowl
make local-up

# stop all containers
make local-down

# cli
make topic-list
make topic-describe name=<TOPIC_NAME>
make topic-create name=<TOPIC_NAME>
make topic-delete name=<TOPIC_NAME>
make topic-offset name=<TOPIC_NAME>
make group-list
make group-offset name=<GROUP_NAME>

# [mac|linux] kafka ui
[open|xdg-open] http://localhost:8000
# [mac|linux] schema-registry ui
[open|xdg-open] http://localhost:8001
# [mac|linux] kowl (another ui)
[open|xdg-open] http://localhost:8002

How to build an event manually

# example
EVENT_NAME=<my-event-name>

# stringify json event
cat local/data/$EVENT_NAME.json | jq -c | jq -R

# format message "<json_key>:::<json_value>"
echo "$(cat local/data/$EVENT_NAME-key.json | jq -c):::$(cat local/data/$EVENT_NAME-value.json | jq -c)" > local/data/$EVENT_NAME-event.txt

# produce sample message
make produce-avro \
  schema-key-id=1 \
  schema-value-id=2 \
  topic-name=<TOPIC_NAME> \
  event-name=$EVENT_NAME

How to interact with schema-registry apis

TOPIC_NAME=<???>

# verify registered schema
curl -s -X GET localhost:8081/subjects/$TOPIC_NAME-value/versions
curl -s -X GET localhost:8081/subjects/$TOPIC_NAME-value/versions/1 | jq
# extract avsc
curl -s -X GET localhost:8081/subjects/$TOPIC_NAME-value/versions/1 | jq -r ".schema" | jq

TODO

  • zio-prelude e.g. newtype or refined ?
  • kafkacat docker
    • docker run --rm --name kafkacat edenhill/kafkacat:1.6.0
  • json serde with circe/zio-json + xml serde (?)
  • interop-cats
  • api with Caliban (pagination + subscriptions)
  • metrics with Prometheus
  • helm chart StatefulSet
  • convert GenerateSchema into sbt plugin
  • replace kafka-streams-scala with plain Java or zio-kafka?
  • topology description with eisner
  • Scala 3 compatibility