Skip to content

Example from Designing Event-driven Applications Using Apache Kafka Ecosystem By Bogdan Sucaciu m 5

Notifications You must be signed in to change notification settings

wjc-van-es/ps-eda-kafka-streams

Repository files navigation

ps-eda-kafka-streams

Content

This repository contains example source code from the PluralSight course:

by Bogdan Sucaciu

Code example from Module 5: Building Your First Streaming Application and

Code example from Module 6: Building a Streaming Application with KSQL

I enjoyed this course very much and I would recommend it to anyone who needs an introduction in event-driven design and would like to use Apache Kafka software to implement it.

Purpose

In the course the example was presented running with locally installed kafka, zookeeper & the Confluent schema-registry software components. This requires some work, which may be instructive if you want to learn about some basic principles of Kafka and how to configure it. However, it is much more convenient to run all necessary software components in separate docker containers. Especially when you are already familiar with Docker and Docker Compose technology.

The software components

The example code, which is basically a producer and a Streams API client (Consumes from one topic and produces to another) both using AVRO schema's for (de)serializing both the key and value part of the messages being written to (and read from) the topic. This example code is still build and run as small Java applications executing their respective main methods on your local computer. Presumably with help from your favorite IDE. (So this part isn´t deployed to docker containers yet).

The Kafka cluster the example code communicates with, however, is entirely deployed as docker containers:

  • one container with a single Apache kafka broker, listening on port 9092,
  • one container with a single Zookeeper instance, listening on port 2181,
  • one container with the Confluent schema-registry server, listening on port 8081.
  • one container with the Confluent ksqldb-server, listening on port 8088,
  • one container with the ksqldb-cli

Note

Take a look at the Docker Prerequisites of the Quickstart documentation. As more containers are spun up in this example you may have to adjust the RAM memory allocated in your Windows and Mac OS environment, because the respective Docker VM solutions have a limited default configured. When you have installed the Docker engine on a Linux distro, you are probably already good to go.


Making good use of the Confluent Platform Community Edition components

To get this set up to work quickly, I created a docker/docker-compose.yml file based on the one found in GitHub repo: confluentinc cp-all-in-one-community 7.2.1-post.

From this all-in-one docker-compose.yml , which defines all the components that are a part of the Confluent platform community edition, we only took the three services that are needed to make the example code work and copied them in our own Docker Compose yaml file. So, under the hood, we are using Docker images, made available by Confluent (for which we are grateful).

Changes made to the original example source code.

Introduction of the fraud-detection-interface maven module

The most important change I made was to create a separate fraud-detection-interface maven module. This module contains Java classes that are generated from an AVRO schema, avro/order_schema.avsc using the org.apache.avro:avro-maven-plugin.

These Java classes are part of the messages put together, serialized to an AVRO byte stream and send to the topic by the producers and then read from the topic by the consumers. So the transactions-producer, background-producer and the kafka-streams modules depend on the fraud-detection-interface module.

In the original example the Java classes were generated manually on the command line and copied as model packages into the other maven modules.

My introduction of the separate fraud-detection-interface maven module

  • makes running the example less complex as the generated Java classes are created automatically as part of a maven build of the project.
  • reduces code duplication. I think both are great benefits that didn´t take much effort to accomplish.

Note

For IntelliJ to notice the content of the fraud-detection-interface/src/main/generated directory, you need to mark the directory as Generated Sources Root by right-clicking on it in the Project view window and choosing Mark Directory as > Generated Sources Root from the context menu.


Updating all maven dependencies

I made an effort to update all maven dependencies to the versions available now (August 2022).

Prerequisites

  • A JDK should be installed, version 8 is the minimal requirement, but I tested this example with version 17.
  • Maven, I tested the example with version 3.8.1
  • Docker (including Docker Compose, the docker-compose-plugin is the most recent version v2.6.0, where the commands start with docker compose rather than docker-compose. The latter is a deprecated older version 1.29.2)

Usage

  • Open a terminal in the project/repository root dir
    $ cd docker
    $ docker compose up -d
    $ docker compose ps
  • When the last command shows you that all three services are up and running, you can proceed to create the user-tracking-avro topic in the same terminal with
    $ ./create-topic.sh
  • Build the example code with maven (from the project/repository root dir)
    $ mvn clean compile -e

The Streaming API code example of module 5

The KSQL CLI example of module 6

  • We assume that the first part of bringing up the containers with docker compose and the creation of the topics as described above has already been executed.
  • On the command line or within your IDE
    • Run the Main class of the background-producer module com.pluralsight.kafka.streams.Main from background-producer.
      • This application will exit after publishing five events on the same payments topic, as the key and the value type is the same as from the events produced by the transactions-producer.
      • If you didn't run the transactions-producer from the previous module in this session make sure to run the background-producer once to get the event value AVRO format Order registered with the schema registry.
  • In a terminal we can now start the KSQL CLI with
    $ docker exec --interactive --tty ksqldb-cli /bin/ksql http://ksqldb-server:8088
    • We can check for streams, tables and topics present
      ksql> show topics;
      
      Kafka Topic                 | Partitions | Partition Replicas
      ---------------------------------------------------------------
      default_ksql_processing_log | 1          | 1
      payments                    | 1          | 1
      validated-payments          | 1          | 1
      ---------------------------------------------------------------
      ksql> show streams;
      
      Stream Name         | Kafka Topic                 | Key Format | Value Format | Windowed
      ------------------------------------------------------------------------------------------
      KSQL_PROCESSING_LOG | default_ksql_processing_log | KAFKA      | JSON         | false
      ------------------------------------------------------------------------------------------
      
      ksql> show tables;
      
      Table Name | Kafka Topic | Key Format | Value Format | Windowed
      -----------------------------------------------------------------
      -----------------------------------------------------------------
    • Create a stream from which we then create a table
      ksql> create stream ksql_payments
      >with ( kafka_topic='payments', value_format='AVRO' );
      
      Message
      ----------------
      Stream created
      ----------------
      ksql> show streams;
      
      Stream Name         | Kafka Topic                 | Format
      ------------------------------------------------------------
      KSQL_PAYMENTS       | payments                    | AVRO   
      KSQL_PROCESSING_LOG | default_ksql_processing_log | JSON
      ------------------------------------------------------------
      
      ksql> create table WARNINGS
      >as select userId, count(*)
      >from ksql_payments
      >window hopping (size 10 minutes, advance by 1 minute )
      >group by userId
      >having count(*) > 4;
    • We can run the background-producer a couple of more times and then query the table
      ksql> select * from WARNINGS;
      
      +--------------------------------------------------------+--------------------------------------------------------+--------------------------------------------------------+--------------------------------------------------------+
      |USERID                                                  |WINDOWSTART                                             |WINDOWEND                                               |KSQL_COL_0                                              |
      +--------------------------------------------------------+--------------------------------------------------------+--------------------------------------------------------+--------------------------------------------------------+
      |1234                                                    |1661348100000                                           |1661348700000                                           |5                                                       |
      |1234                                                    |1661348160000                                           |1661348760000                                           |5                                                       |
      |1234                                                    |1661348220000                                           |1661348820000                                           |5                                                       |
      |1234                                                    |1661348280000                                           |1661348880000                                           |5                                                       |
      |1234                                                    |1661348340000                                           |1661348940000                                           |5                                                       |
      |1234                                                    |1661348400000                                           |1661349000000                                           |5                                                       |
      |1234                                                    |1661348460000                                           |1661349060000                                           |17                                                      |
      |1234                                                    |1661348520000                                           |1661349120000                                           |25                                                      |
      |1234                                                    |1661348580000                                           |1661349180000                                           |25                                                      |
      |1234                                                    |1661348640000                                           |1661349240000                                           |25                                                      |
      |1234                                                    |1661348700000                                           |1661349300000                                           |20                                                      |
      |1234                                                    |1661348760000                                           |1661349360000                                           |20                                                      |
      |1234                                                    |1661348820000                                           |1661349420000                                           |20                                                      |
      |1234                                                    |1661348880000                                           |1661349480000                                           |20                                                      |
      |1234                                                    |1661348940000                                           |1661349540000                                           |20                                                      |
      |1234                                                    |1661349000000                                           |1661349600000                                           |20                                                      |
      |1234                                                    |1661349060000                                           |1661349660000                                           |8                                                       |
      Query terminated
      ksql>
    • We can also print the content of a topic as soon as new events are written into it:
      ksql> print 'payments';
      Press CTRL-C to interrupt
      
      You won´t see any messages already written to the topic, but if you run both the background-producer and the transactions-producer once you will see something like:
      Key format: JSON or KAFKA_STRING
      Value format: AVRO
      rowtime: 2022/08/24 19:29:27.398 Z, key: 1, value: {"userId": "1234", "totalAmount": 100.0, "nbOfItems": 1001}, partition: 0
      rowtime: 2022/08/24 19:29:28.467 Z, key: 2, value: {"userId": "1234", "totalAmount": 200.0, "nbOfItems": 2002}, partition: 0
      rowtime: 2022/08/24 19:29:29.469 Z, key: 3, value: {"userId": "1234", "totalAmount": 300.0, "nbOfItems": 3003}, partition: 0
      rowtime: 2022/08/24 19:29:30.471 Z, key: 4, value: {"userId": "1234", "totalAmount": 400.0, "nbOfItems": 4004}, partition: 0
      rowtime: 2022/08/24 19:29:31.472 Z, key: 5, value: {"userId": "1234", "totalAmount": 500.0, "nbOfItems": 5005}, partition: 0
      rowtime: 2022/08/24 19:30:01.523 Z, key: 1, value: {"userId": "", "totalAmount": 5.0, "nbOfItems": 5}, partition: 0
      rowtime: 2022/08/24 19:30:03.591 Z, key: 2, value: {"userId": "123", "totalAmount": 100.0, "nbOfItems": 1001}, partition: 0
      rowtime: 2022/08/24 19:30:05.592 Z, key: 2, value: {"userId": "123", "totalAmount": 100.0, "nbOfItems": 1001}, partition: 0
      rowtime: 2022/08/24 19:30:07.593 Z, key: 2, value: {"userId": "123", "totalAmount": 100.0, "nbOfItems": 1001}, partition: 0
      rowtime: 2022/08/24 19:30:09.594 Z, key: 2, value: {"userId": "123", "totalAmount": 100.0, "nbOfItems": 1001}, partition: 0
      rowtime: 2022/08/24 19:30:11.595 Z, key: 2, value: {"userId": "123", "totalAmount": 100.0, "nbOfItems": 1001}, partition: 0
      rowtime: 2022/08/24 19:30:13.596 Z, key: 3, value: {"userId": "ghi", "totalAmount": 10001.0, "nbOfItems": 1}, partition: 0
      rowtime: 2022/08/24 19:30:15.597 Z, key: 4, value: {"userId": "abc", "totalAmount": 100.0, "nbOfItems": 10}, partition: 0
      rowtime: 2022/08/24 19:30:17.598 Z, key: 5, value: {"userId": "JKL", "totalAmount": 1.0, "nbOfItems": 1}, partition: 0
      
      ^CTopic printing ceased
      ksql> exit
      Exiting ksqlDB.
      $
      After pressing Ctrl-C the KSQL> prompt returns and with exit you leave the KSQL CLI.

The Schema registration process

The inner workings of schema registration are already explained in the README.md file of the GitHub repository for the previous module of the course.