Skip to content


Folders and files

Last commit message
Last commit date

Latest commit



37 Commits

Repository files navigation

kafka-topics --list --bootstrap-server=localhost:9092

kafka-console-consumer --topic sampletopic --from-beginning --bootstrap-server=localhost:9092

kafka-topics --delete --topic sampletopic --bootstrap-server=localhost:9092

Kafka Core (producer, consumer, producer-consumer)

docker-compose up

Producer & Consumer example 1.HelloKafkaConsumer kafka-topics --create --topic=t-hello --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092

2.FixedRateConsumer kafka-topics --create --topic=t-fixedrate --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092

3.FixedRateConsumer2 kafka-topics --create --topic=t-fixedrate-2 --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092

4.KafkaKeyProducer kafka-topics --create --topic=t-multi-partitions --partitions=3 --replication-factor=1 --bootstrap-server=localhost:9092 kafka-topics --describe --topic=t-multi-partitions --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-multi-partitions --offset=earliest --partition=0 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-multi-partitions --offset=earliest --partition=1 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-multi-partitions --offset=earliest --partition=2 --bootstrap-server=localhost:9092

5.KafkaKeyConsumer (multiple consumers for each topic)

6.EmployeeJsonProducer & EmployeeJsonConsumer kafka-topics --create --topic=t-employee --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-employee --offset=earliest --partition=0 --bootstrap-server=localhost:9092

7.CommodityProducer & CommodityDashboardConsumer & CommodityDashboardNotification & CommodityApi kafka-topics --create --topic=t-commodity --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity --offset=earliest --partition=0 --bootstrap-server=localhost:9092 kafka-consumer-groups --group cg-dashboard --describe --bootstrap-server=localhost:9092 kafka-consumer-groups --group cg-dashboard --execute --reset-offsets --to-offset 10 --topic=t-commodity --bootstrap-server=localhost:9092

8.a.RebalanceProducer & RebalanceConsumer kafka-topics --create --topic=t-rebalance --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-rebalance --offset=earliest --partition=0 --bootstrap-server=localhost:9092 --while leaving the producer and consumer running, add a second partition to rebalance kafka-topics --alter --topic=t-rebalance --partitions=2 --bootstrap-server=localhost:9092 kafka-topics --describe --topic=t-rebalance --bootstrap-server=localhost:9092 --after a couple of minutes, the producer will start sending to Partition 1 as well

8.b.Override producer/consumer factory with custom configuration (ProducerCustomConfig / ConsumerCustomConfig) -> after configuring to 3 partitions, the producer/consumers will rebalance according to configured METADATA_MAX_AGE_CONFIG kafka-topics --alter --topic=t-rebalance --partitions=3 --bootstrap-server=localhost:9092 kafka-topics --describe --topic=t-rebalance --bootstrap-server=localhost:9092

9.CarLocationConsumer & CarLocationProducer, CarLocationScheduler, CarLocation (consumer message filtering) kafka-topics --create --topic=t-location --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-location --offset=earliest --partition=0 --bootstrap-server=localhost:9092

10.PurchaseRequestProducer & PurchaseRequestConsumer, CaffeineCacheConfig (idempotent consumer with cache) kafka-topics --create --topic=t-purchase-request --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-purchase-request --offset=earliest --partition=0 --bootstrap-server=localhost:9092

11.PaymentRequestProducer & PaymentRequestConsumer (custom cache key) kafka-topics --create --topic=t-payment-request --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-payment-request --offset=earliest --partition=0 --bootstrap-server=localhost:9092

12.FoodOrderProducer & FoodOrderConsumer, FoodOrderErrorHandler (KafkaListener error handler) kafka-topics --create --topic=t-food-order --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092

13.SimpleNumberProducer & SimpleNumberConsumer, ConsumerCustomConfig (Global error handler) kafka-topics --create --topic=t-simple-number --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092

14.ImageProducer & ImageConsumer (blocking retrying consumer -> the consumer keeps retrying and further messages are not processed on a particular topic until success or retry failure -> can cause bottleneck -> create a retry policy not too short and not too long) kafka-topics --create --topic=t-image --partitions=2 --replication-factor=1 --bootstrap-server=localhost:9092 kafka-topics --describe --topic=t-image --bootstrap-server=localhost:9092

15.InvoiceProducer & InvoiceConsumer (dead letter topic -> is like a combination of retry + producer publishing to another topic called "dead letter topic") kafka-topics --create --topic=t-invoice --partitions=2 --replication-factor=1 --bootstrap-server=localhost:9092 kafka-topics --create --topic=t-invoice-dead-letter --partitions=2 --replication-factor=1 --bootstrap-server=localhost:9092 kafka-topics --describe --topic=t-invoice --bootstrap-server=localhost:9092 kafka-topics --describe --topic=t-invoice-dead-letter --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-invoice --offset=earliest --partition=0 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-invoice --offset=earliest --partition=1 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-invoice-dead-letter --offset=earliest --partition=0 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-invoice-dead-letter --offset=earliest --partition=1 --bootstrap-server=localhost:9092

16.Image2Producer & Image2Consumer (non-blocking retrying consumer -> when consumer encounters error, it proceeds by consuming next message(s), while the previous message is retried in the background / non-blocking) kafka-topics --create --topic=t-image-2 --partitions=2 --replication-factor=1 --bootstrap-server=localhost:9092

17.GeneralLedgerProducer & GeneralLedgerConsumer, GeneralLedgerScheduler (pause/resume consumers) kafka-topics --create --topic=t-general-ledger --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092

Kafka microservice (kafka-ms-order -> api + kafka producer, kafka-ms-pattern -> kafka consumer, kafka-ms-reward, kafka-ms-storage)

(t-commodity-order is created automatically by KafkaConfig) kafka-topics --describe --topic=t-commodity-order --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-order --from-beginning --bootstrap-server=localhost:9092

2.DiscountProducer and PromotionProducer publish to t-commodity-promotion; PromotionConsumer consumes from t-commodity-promotion kafka-topics --create --topic=t-commodity-promotion --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-promotion --from-beginning --bootstrap-server=localhost:9092

3.OrderConsumer (kafka-ms-reward) and OrderConsumer(kafka-ms-pattern) both listen to t-commodity-order OrderConsumer (kafka-ms-reward) extracts the bonus percentage from the headers if exists OrderConsumer (kafka-ms-pattern) simply consumes the message, without looking at the headers

4.OrderConsumer (kafka-ms-reward) will act also as a publisher (OrderReplyConsumer), publishing messages to t-commodity-order-reply. OrderProducer will act also as a consumer (OrderReplyConsumer), listening to t-commodity-order-reply This flow is commonly known as asynchronous request/reply t-commodity-order-reply is created automatically by KafkaConfig

Kafka Streams

1.Run kafka-ms-order, kafka-stream-sample and kafka-stream-storage (and run Postman -> Promotion -> 1000 iterations, 500 interval) t-commodity-promotion is the source; t-commodity-promotion-uppercase is the sink; PromotionUppercaseStream, KafkaStreamConfig; kafka-stream-storage hash 2 consumers: one for t-commodity-promotion and one for t-commodity-promotion-uppercase kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-order kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-order-masked kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-promotion kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-promotion-uppercase

note that PromotionUppercaseConsumer (kafka-stream-storage) displays "promotionCode=null" in the output console -> PromotionUppercaseJsonStream fixes the problem of PromotionUppercaseStream

furthermore, Spring provides a default Json Serde -> see PromotionUppercaseSpringJsonStream

lastly, custom JSON serde can be used for custom formats see CustomJsonSerializer, CustomJsonDeserializer, CustomJsonSerde, PromotionSerde, PromotionUppercaseCustomJsonStream

Kafka Stream - Commodity

Topology - see kafka-stream-commodity-topology.jpg

1.MaskOrder See MaskOrderStream (kafka-stream-sample), OrderMessage (kafka-stream-sample), CommodityStreamUtil (kafka-stream-sample) Run kafka-stream-sample, kafka-ms-order kafka-console-consumer --topic t-commodity-order-masked --from-beginning --bootstrap-server=localhost:9092 Run Postman -> Commodity Order -> Order 2 Random Items -> see in the kafka-console-consumer that the creditCardNumber is masked

2.Sink processors (see kafka-stream-commodity-topology-sink-processors-high-level.jpg and kafka-stream-commodity-topology-sink-processors-detail.jpg) See OrderPatternMessage (kafka-stream-sample), OrderRewardMessage (kafka-stream-sample), CommodityOneStream (kafka-stream-sample)

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-pattern-one kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-reward-one kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-storage-one

kafka-console-consumer --topic t-commodity-pattern-one --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-reward-one --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-storage-one --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run kafka-ms-order, kafka-stream-sample Run Postman -> Commodity Order collection (Order 1 Random Item, Order 2 Random Item, Order 3 Random Item) -> 1000 times, 1000 delay

3.a.Topology changes (see kafka-stream-commodity-topology-3.jpg): split t-commodity-pattern stream into two categories: plastic and non-plastic items; t-commodity-reward -> give reward only for item that is not cheap; t-commodity-order -> key is base64

See CommodityTwoStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-pattern-two-plastic kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-pattern-two-notplastic kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-reward-two kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-storage-two

kafka-console-consumer --topic t-commodity-pattern-two-plastic --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-pattern-two-notplastic --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-reward-two --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-storage-two --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run kafka-ms-order, kafka-stream-sample Run Postman -> Commodity Order collection (Create Plastic & Non Plastic Order) an alternative to stream branching, see CommodityThreeStream

See CommodityThreeStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-pattern-three-plastic kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-pattern-three-notplastic kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-reward-three kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-storage-three

kafka-console-consumer --topic t-commodity-pattern-three-plastic --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-pattern-three-notplastic --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-reward-three --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-storage-three --from-beginning --property print.key=true --bootstrap-server=localhost:9092 an alternative to deprectated "branch" method used in CommodityTwoStream, use CommodityTwoSplitStream

See CommodityTwoSplitStream

4.Topology change: the key of the OrderReward needs to be changed and become the location

See CommodityFourStream (rewardStream uses map instead of mapValues)

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-pattern-four-plastic kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-pattern-four-notplastic kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-reward-four kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-storage-four

5.Process stream result without passing it to sink stream (simulate fraud notification)

See CommodityFiveStream

Run Postman -> Commodity Order -> run "Order 1 Random items" and modify orderLocation from random to somehting that starts with "C"

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-pattern-five-plastic kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-pattern-five-notplastic kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-reward-five kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-storage-five kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-fraud-five

6.Topology change (see kafka-stream-commodity-topology-6.jpg)

See CommoditySixStream

Run Postman -> Commodity Order -> run "Order 1 Random items" and modify orderLocation from random to somehting that starts with "C"

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-pattern-six-plastic kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-pattern-six-notplastic kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-reward-six kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-storage-six kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-fraud-six

kafka-console-consumer --topic t-commodity-fraud-six --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer --bootstrap-server=localhost:9092

Kafka Stream - Feedback

1.New topology (see kafka-stream-commodity-topology-feedback.jpg)

See FeedbackApi, FeedbackRequest, FeedbackMessage, FeedbackProducer, FeedbackAction, FeedbackService (kafka-ms-order) See FeedbackOneStream (kafka-stream-sample)

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-one-good

kafka-console-consumer --topic t-commodity-feedback --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-feedback-one-good --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run Postman -> Feedback -> Create Good Feedback

Note that the key is null in the kafka-console-consumer output

2.Topology change (see kafka-stream-commodity-feedback-2.jpg): set key to branch location

See FeedbackTwoStream (kafka-stream-sample)

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-two-good

kafka-console-consumer --topic t-commodity-feedback-two-good --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run again Postman -> Feedback -> Create Good Feedback

3.Topology change (see kafka-stream-commodity-feedback-3.jpg): split stream to good feedback and bad feedback

See FeedbackThreeStream (kafka-stream-sample)

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-three-good kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-three-bad

kafka-console-consumer --topic t-commodity-feedback-three-good --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-feedback-three-bad --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run again Postman -> Feedback -> Create Good Feedback and Create Bad Feedback

4.Topology change (see kafka-stream-commodity-feedback-4.jpg): count good and bad feedback

See FeedbackFourStream (kafka-stream-sample)

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-four-good kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-four-bad kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-four-good-count kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-four-bad-count

kafka-console-consumer --topic t-commodity-feedback-four-good --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-feedback-four-bad --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-feedback-four-good-count --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-feedback-four-bad-count --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --bootstrap-server=localhost:9092

Run again Postman -> Feedback -> Create Random Feedback

Note that t-commodity-feedback-four-good-count and t-commodity-feedback-four-bad-count consumers have --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

the default delay that is used to flush data from Ktable to stream is 30 seconds. it can be controlled via -> see KafkaStreamConfig COMMIT_INTERVAL_MS_CONFIG

5.Avoid processing the input stream more than once (avoid using .to more than once)

See FeedbackFiveStream (kafka-stream-sample)

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-five-good kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-five-bad kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-five-good-count kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-five-bad-count

Note that the t-commodity-feedback-five-good consumer does not output anything, because when using "repartition", the output topic is for internal Kafka use and the name configured in the code is not the actual Kafka topic name

6.Topology change (see kafka-stream-commodity-feedback-6.jpg)

See FeedbackSixStream (kafka-stream-sample)

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-six-good kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-six-bad kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-six-good-count kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-six-bad-count kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-six-good-count-word kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-six-bad-count-word

Kafka Stream - Customer

1.New topology for Customer Purchase (see kafka-stream-commodity-customer.jpg)

See CustomerPurchaseMobileRequest, CustomerPurchaseWebRequest, PurchaseResponse, CustomerPurchaseApi, CustomerPurchaseMobileMessage, CustomerPurchaseWebMessage, CustomerPurchaseProducer, CustomerPurchaseAction, CustomerPurchaseService

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-customer-purchase-web kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-customer-purchase-mobile kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-customer-purchase-all

Run Postman -> Customer Purchase collection

kafka-console-consumer --topic t-commodity-customer-purchase-web --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-customer-purchase-mobile --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-customer-purchase-all --from-beginning --property print.key=true --bootstrap-server=localhost:9092

2.New topology for Customer Preference (see kafka-stream-commodity-customer-preference.jpg and kafka-stream-commodity-customer-preference-timeline.jpg)

See CustomerPreferenceShoppingCartRequest, CustomerPreferenceWishlistRequest, CustomerPreferenceApi, CustomerPreferenceShoppingCartMessage, CustomerPreferenceShoppingCartMessage, CustomerPreferenceWishlistMessage, CustomerPreferenceProducer, CustomerPreferenceAction, CustomerPreferenceService See CustomerPreferenceShoppingCartAggregator, CustomerPreferenceWishlistAggregator, CustomerPreferenceOneStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-customer-preference-shopping-cart kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-customer-preference-wishlist kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-customer-preference-all

kafka-console-consumer --topic t-commodity-customer-preference-shopping-cart --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-customer-preference-wishlist --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-customer-preference-all --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run Postman -> Customer Preference collection -> Simulation

1.Kafka Stream - flash sale vote (see kafka-stream-flash-sale-1.jpg)

See FlashSaleVoteOneStream

The Kafka message is converted to "key: customerId, value: itemName" so entries could be treated as upsert in the Kafka stream table (groupBy itemName)

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-flashsale-vote kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-flashsale-vote-user-item --config "cleanup.policy=compact" --config "" --config "" --config "min.cleanable.dirty.ratio=0.01" --config "" kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-flashsale-vote-one-result kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-flashsale-vote-two-result

kafka-console-consumer --topic t-commodity-flashsale-vote --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-flashsale-vote-user-item --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-flashsale-vote-one-result --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --bootstrap-server=localhost:9092

Run Postman -> Flash sale -> Simulation

2.Kafka stream state store (stateful operations)

State store is on the same machine with processing node (no network overhead); The state store is not shared between processes or threads in processors State store is fault-tolerant: it can be quickly recovered in case of failure (uses changelog topic)

ValueTansformer needs to implement either ValueTransformer or ValueTransformerSupplier interface

See FlashSaleVoteTwoStream

Run Postman -> Flash sale -> Create Random Flash Sale Vote

kafka-console-consumer --topic t-commodity-flashsale-vote-two-result --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --bootstrap-server=localhost:9092

Adjust FlashSaleVoteTwoValueTransformer -> voteStart / voteEnd and computer time to see ValueTransformer in action.

1.Kafka stream - feedback rating: calculate average rating, based on sum and count of ratings; the average rating will be stored to Kafka stream state store

See FeedbackRatingOneMessage, FeedbackRatingOneStoreValue, FeedbackRatingOneValueTransformer, FeedbackRatingOneStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-rating-one

kafka-console-consumer --topic t-commodity-feedback-rating-one --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run Postman -> Feedback -> Simulation

2.Detailed rating (Map of ratings)

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-feedback-rating-two

See FeedbackRatingTwoMessage, FeedbackRatingTwoStoreValue, FeedbackRatingTwoValueTransformer, FeedbackRatingTwoStream

kafka-console-consumer --topic t-commodity-feedback-rating-two --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run Postman -> Feedback -> Create random feedback

1.Kafka stream - summing records

See InventoryRequest, InventoryApi, InventoryMessage, InventoryProducer, InventoryAction, InventoryService See InventoryOneStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-inventory kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-inventory-total-one

kafka-console-consumer --topic t-commodity-inventory-total-one --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --bootstrap-server=localhost:9092

Run Postman -> Inventory -> Sum record simulation

2.Subtract value

See InventoryTwoStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-inventory-total-two

kafka-console-consumer --topic t-commodity-inventory-total-two --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --bootstrap-server=localhost:9092

Run Postman -> Inventory -> Subtract Record Simulation


See InventoryThreeStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-inventory-total-three

kafka-console-consumer --topic t-commodity-inventory-total-three --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --bootstrap-server=localhost:9092

Run Postman -> Inventory -> Subtract Record Simulation

4.Timestamp extractor (extract transaction time from payload and use it as record timestamp)

Built-in timestamp extractors: FailedOnInvalidTimestamp, LogAndSkipOnInvalidTimestamp, UsePreviousTimeOnInvalidTimestamp, WallclockTimestampExtractor

See InventoryFourStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-inventory-total-four

kafka-console-consumer --topic t-commodity-inventory-total-four --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --bootstrap-server=localhost:9092

Run Postman -> Inventory -> Subtract Record Simulation (also adjust local Operating System time)

5.Tumbling Time Window (aggregate transactions on hourly basis)

See InventoryFiveStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-inventory-total-five

kafka-console-consumer --topic t-commodity-inventory-total-five --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --bootstrap-server=localhost:9092

Run Postman -> Inventory -> Window Simulation

Notice the format of the log messages in the IDE output: Something@1656449146/1656449146 -> Something is the original key, while the two epoch times represent the window start and window end

6.Hopping Time Window (aggregate transactions on hourly basis, with a hop interval of 20 minutes)

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-inventory-six

kafka-console-consumer --topic t-commodity-inventory-six --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --bootstrap-server=localhost:9092

Run Postman -> Inventory -> Window Simulation (first change the inventorySimulationItem variable)

Kafka stream/stream joining

1.Inner join Stream / Stream (left is online order, right is online payment; the key is online order number)

See OnlineOrder* classes See OrderPaymentOneStream class

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-online-order kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-online-payment kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-join-order-payment-one

kafka-console-consumer --topic t-commodity-online-order --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-online-payment --from-beginning --property print.key=true --bootstrap-server=localhost:9092 kafka-console-consumer --topic t-commodity-join-order-payment-one --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run Postman -> Online Order / Payment -> Inner Join Simulation

2.Left join Stream / Stream

See OrderPaymentTwoStream class

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-join-order-payment-two

kafka-console-consumer --topic t-commodity-join-order-payment-two --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run Postman -> Online Order / Payment -> Left Join Simulation

3.Outer join Stream / Stream

See OrderPaymentThreeStream class

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-join-order-payment-three

kafka-console-consumer --topic t-commodity-join-order-payment-three --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run Postman -> Online Order / Payment -> Outer Join Simulation

Kafka table/table joining

1.Inner join (left is color vote, right is layout vote; the key is username)

Inner joins the two Tables, resulting a new Table; counts the number of colors and votes in the new Table

See WebColorVote* and WebLayoutVote* classes See WebDesignVoteOneStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-web-vote-color kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-web-vote-layout kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-web-vote-one-username-color --config "cleanup.policy=compact" --config "" --config "" --config "min.cleanable.dirty.ratio=0.01" --config "" kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-web-vote-one-username-layout --config "cleanup.policy=compact" --config "" --config "" --config "min.cleanable.dirty.ratio=0.01" --config "" kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-web-vote-one-result

kafka-console-consumer --topic t-commodity-web-vote-one-result --from-beginning --property print.key=true --property print.timestamp=true --bootstrap-server=localhost:9092

Run Postman -> Web Design Vote -> Inner Join Simulation

2.Left join

See WebDesignVoteTwoStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-web-vote-two-result

kafka-console-consumer --topic t-commodity-web-vote-two-result --from-beginning --property print.key=true --property print.timestamp=true --bootstrap-server=localhost:9092

Run Postman -> Web Design Vote -> Left Join Simulation

3.Outer join

See WebDesignVoteThreeStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-web-vote-three-result

kafka-console-consumer --topic t-commodity-web-vote-three-result --from-beginning --property print.key=true --property print.timestamp=true --bootstrap-server=localhost:9092

Run Postman -> Web Design Vote -> Outer Join Simulation

4.Outer join -> creates table directly from stream, without intermediary topic

Kafka stream/table joining (left is premium purchase, right is premium user; keys are purchase number and username; the join is done in a premium offer table)

1.Inner join

See Premium* classes See PremiumOfferOneStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-premium-purchase kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-premium-user --config "cleanup.policy=compact" --config "" --config "" --config "min.cleanable.dirty.ratio=0.01" --config "" kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-premium-offer-one

kafka-console-consumer --topic t-commodity-premium-offer-one --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run Postman -> Premium Purchase & User -> Premium Offer - Inner Join Stream / Table

2.Left join

See PremiumOfferTwoStream

Note the special treatment in the joiner method when the user is null.

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-premium-offer-two

kafka-console-consumer --topic t-commodity-premium-offer-two --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run Postman -> Premium Purchase & User -> Premium Offer - Left Join Stream / Table

Kafka stream/globaltable joining

1.Inner join

See PremiumOfferThreeStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-premium-user-filtered kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-premium-offer-three

kafka-console-consumer --topic t-commodity-premium-offer-three --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run Postman -> Premium Purchase & User -> Premium Offer - Inner Join Stream / Global Table

Kafka stream/table co-partition

See Subscription* classes See SubscriptionOfferOneStream

kafka-topics --bootstrap-server localhost:9092 --create --partitions 5 --replication-factor 1 --topic t-commodity-subscription-purchase kafka-topics --bootstrap-server localhost:9092 --create --partitions 2 --replication-factor 1 --topic t-commodity-subscription-user --config "cleanup.policy=compact" --config "" --config "" --config "min.cleanable.dirty.ratio=0.01" --config "" kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-subscription-offer-one

By running SubscriptionOfferOneStream, the spring application will fail because input is not co-partitioned (notice that t-commodity-subscription-purchase has 5 partitions, while t-commodity-subscription-user has 2 partitions) The fix is included in SubscriptionOfferTwoStream, which uses a GlobalKTable instead of KTable

kafka-topics --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-commodity-subscription-offer-two

kafka-console-consumer --topic t-commodity-subscription-offer-two --from-beginning --property print.key=true --bootstrap-server=localhost:9092

Run Postman -> Subscription Offer -> Diana Case

Kafka Connect

docker-compose -f docker-compose-connect.yml up docker-compose -f docker-compose-connect-sample.yml up

Source connector -> read data from non-Kafka and writes to Kafka (read from target and sink to Kafka) Sink connector -> read data from Kafka and writes to non-Kafka (read from Kafka and sink to targt) Connectors:

After docker-compose is up, run Postman -> Kafka Connect -> Connectors -> List connector plugins

1.Basic connector (file source)

Download -> unzip to ./data/kafka-connect-data/connectors and restart docker-compose-connect (docker-compose -f docker-compose-connect.yml restart kafka-connect) -> run Postman -> Kafka Connect -> Connectors -> List connector plugins

Run Postman -> Kafka Connect -> Setup source connectors -> Spooldir - CSV --topic t-spooldir-csv-demo --from-beginning --bootstrap-server=localhost:9092 --bootstrap-server localhost:9092 --list --bootstrap-server localhost:9092 --group console-consumer-71849 --describe

2.Basic connector (database sink)

Download and install (see above)

Change Postman -> local-ip variable to point to the ip of the local machine (run "ip addr show docker0" in terminal Run Postman -> Kafka Connect -> Setup sink connectors -> PostgreSQL from CSV

List connectors (Postman -> Connectors -> List connectors (name only)) and get connector status (Postman -> Connectors -> Get specific connector status)

copy .csv to ./data/kafka-connect-data/inputs

Check Postgresql -> kafka_employees table -> all records were imported

3.Basic connector (sftp sink)

Download and install

Run Postman -> Kafka Connect -> Setup sink connectors -> SFTP (as JSON output)

Check Filezilla (connect to "ip addr show docker0")

Kafka Connect - CDC

1.CDC Postgresql source connector

See ./data/postgresql/postgresql.conf for CDC specific settings See ./data/postgresql/docker-entrypoint-initdb.d/01-postgresql-publication.sql and 02-postgresql-schema.sql for initialization files

Download and install

Topics are created automatically by the CDC connector

Run Postman -> Kafka Connect -> Setup source connectors -> PostgreSQL CDC - Finance Run Postman -> Kafka Connect -> Setup source connectors -> PostgreSQL CDC - Marketing (note that it has tombstone.on.delete=false)

insert sample data to postgresql (see spring-kafka-scripts/kafka-connect-samples/cdc-legacy-modernization) --bootstrap-server localhost:9092 --list --bootstrap-server localhost:9092 --topic t-cdc-finance.public.fin_invoices --from-beginning --bootstrap-server localhost:9092 --topic t-cdc-marketing.public.mkt_promotions --from-beginning --bootstrap-server localhost:9092 --topic t-cdc-marketing.public.mkt_sales --from-beginning

See the json in the console-consumer. Update some data in the database; see again the json in the console-consumer -> the "before" field is null Delete some data in the database; see again the json in the console-consumer -> the "before" field only shows primary key column (remaining columns are null) To have the "before" filed behave correctly, run the following commands in postgresql: ALTER TABLE public.fin_invoices REPLICA IDENTITY FULL; ALTER TABLE public.mkt_promotions REPLICA IDENTITY FULL; ALTER TABLE public.mkt_sales REPLICA IDENTITY FULL;

2.CDC Postgresql sink connector

DROP TABLE IF EXISTS kafka_fin_invoices;

CREATE TABLE IF NOT EXISTS kafka_fin_invoices ( invoice_id INT PRIMARY KEY, invoice_amount INT, invoice_currency VARCHAR(3), invoice_number VARCHAR(50), invoice_date DATE );

Run Postman -> Kafka Connect -> Setup sink connectors -> PostgreSQL from finance (invoices) (note that auto.create and auto.evolve is set to false)

Add, update and delete some data in fin_invoices, and see that the changes are propagated to kafka_fin_invoices

3.CDC Postgresql sink connector

See CdcMessage, CdcPayloadMessage, CdcSourceMessage, MarketingPromotionMessage, MarketingSalesMessage See CdcMarketingListener

Start kafka-connect spring project and see console output

Insert / Update / Delete some data in the mkt_sales and mkt_promotions

See spring project console output

4.CDC Postgresql source connector

Run SQL scripts in ./spring-kafka-scripts/kafka-connect-samples/data-engineering

Run Postman -> Kafka Connect -> Setup source connectors -> PostgreSQL - Person Address --bootstrap-server localhost:9092 --from-beginning --property print.key=true --topic t-person-address-postgresql

Run Postman -> Kafka Connect -> Setup sink connectors -> PostgreSQL person address from target topic

5.CDC HTTP source connector

Download an install

Create mockaroo API endpoint based on Spring Kafka - Person Address (JSON).schema.json file

Run Postman -> Kafka Connect -> Setup source connectors -> HTTP - Person Address --bootstrap-server localhost:9092 --from-beginning --topic t-person-address-http

6.Custom source

See AddressMessage, PersonMessage, PersonProducer, PersonAddressScheduler --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic t-person-address-custom --bootstrap-server localhost:9092 --topic t-person-address-custom

Run kafka-connect spring application

6.Elasticsearch sink

Create Elasticsearch deployment on

Download and install

Change Postman elasticsearch* variables

Run Postman -> Kafka Connect -> Setup sink connectors -> Elasticsearch 01 - setup privileges, Elasticsearch 02 - setup roles, Elasticsearch 03 - create sink

Run Postman -> Kafka Connect -> Elasticsearch

Kafka Connect & Kafka Stream

Flow: Kafka Connect sources (postgresql, http, custom) publishes to kafka topics (t-person-address-postgresql, t-person-address-t-http, t-person-address-t-custom); Kafka Stream converts to a single format and publishes to t-person-address-target; Kafka Connect sinks to Elasticsearch

1.Postgresql to target

See KafkaStreamConfig, KafkaConnectMessage, KafkaConnectSchema, PersonAddressFromPostgresqlStream

To test postgresql to target topic: --bootstrap-server localhost:9092 --from-beginning --topic t-person-address-postgresql --property print.key=true --bootstrap-server localhost:9092 --from-beginning --topic t-person-address-target --property print.key=true

Run kafka-connect spring application

Before proceeding to next step, pause the source connector for postgresql (pause the name of the Postman -> Kafka Connect -> Setup source connectors -> PostgreSQL - Person Address)

2.HTTP to target

See KafkaConnectPersonTargetMessage, KafkaConnectPersonAddressTargetKeySchema, KafkaConnectPersonAddressTargetValueSchema, KafkaConnectPersonAddressFromHttpMessage, KafkaConnectPersonMessageSnakeCase, KafkaConnectAddressMessageSnakeCase, PersonAddressFromHttpStream

To test postgresql to target topic: --bootstrap-server localhost:9092 --from-beginning --topic t-person-address-http --property print.key=true --bootstrap-server localhost:9092 --from-beginning --topic t-person-address-target --property print.key=true

Run kafka-connect spring application

Note: the http messages inside t-person-address-target don't have the same exact structure as the postgresql meessages (e.g. for http, the key type is int32 and optional=false, whereas for postgresql the type is string and optional=true)

Run kafka-connect spring application

Before proceeding to next step, pause the source connector for http (pause the name of the Postman -> Kafka Connect -> Setup source connectors -> HTTP - Person Address

3.Custom to target

See PersonAddressFromCustomStream

To test postgresql to target topic: --bootstrap-server localhost:9092 --from-beginning --topic t-person-address-custom --property print.key=true --bootstrap-server localhost:9092 --from-beginning --topic t-person-address-target --property print.key=true

Run kafka-connect spring application


To change the default converter (org.apache.kafka.connect.json.JsonConverter, as defined in docker-compose-connect.yml), change Postman -> Kafka Connect -> Setup source connectors -> HTTP - Person Address and add (do the same for value.converter). --bootstrap-server localhost:9092 --from-beginning --topic t-person-address-http --property print.key=true The messages are in plain string, and not in json format

Binary data

docker-compose -f docker-compose-full.yml up docker-compose -f docker-compose-full-sample.yml up --bootstrap-server localhost:9092 --from-beginning --topic binary-topic

Run kafka-binary-data spring application


1.Simple Avro

See Hello.avsc

Run Maven -> Lifecycle -> Compile (it will generate src/main/java/

Run HelloAvro java class -> it will produce the helloAvro.avro binary file Run HelloAvroReader java class to convert the binary file to java object

2.Generic Avro

With generic Avro, we set everything manually (not recommended)

Run HelloAvroGeneric java class -> it will produce the helloAvroGeneric.avro binary file Run HelloAvroGenericReader java class to convert the binary file to java object

3.Specific Avro - basic

Don't mix specific Avro with generate Avro classes.

See Avro01 Run Avro01App

4.Specific Avro - logical type

See Avro02 Run Avro02App

Note that myDate, myTimeMillis, myTimestampMillis in Avro02 have java.time.* types Note that avro maven plugin uses java.nio.ByteBuffer instead of java.math.BigDecimal for the myDecimal field, which causes compiler error (

5.Specific Avro - optional field

See Avro03 Run Avro03App

Note that myWeirdButPossibleValue can be either boolean or int

5.Specific Avro - enum

See Avro04 and Avro05 Run Avro04App and Avro05App

Note that Avro enum is converted to Java enum

6.Specific Avro - array

See Avro06 Run Avro06App

Note that the "quotes" field is optional

7.Specific Avro - map

See Avro07 Run Avro07App

8.Specific Avro - fixed data type

See Avro08 Run Avro08App

"Fixed" means a fixed number of bytes (bytes[])

9.Avro reflection (generate Avro schema from existing java classes)

See SimpleEntity Run Avro09App -> copy output to Avro09.avsc -> Run Maven Compile -> It will produce a new class LocalDate, which clashes with Java's LocalDate -> generated Avro09 needs to be refactored in order to solve the compiler errors

See Company, Branch Run Avro10App -> copy output to Avro10.avsc -> change the namespace in Avro10.avsc -> Run Maven Compile -> It will produce new generated classes (Company and Branch)

10.Generate Avro schema from JSON (see online generators)

11.Nested Avro record

See Avro11Person.avsc

Run Avro11App

12.Convert JSON string to Avro java object, and Avro java object to JSON string

See BookAvro.avsc and BookJson

Run BookJsonAvroConversion: *converts JSON string to Avro java object, based on existing .avsc file; serialize the Avro java object as .avro file; deserialize the .avro file as JSON string *converts Java POJO to JSON string via Jackson; converts JSON string to Avro java object, based on existing .avsc file; serialize the Avro java object as .avro file; deserialize the .avro file as JSON string; deserialize the JSON string to Java POJO via Jackson

13.Avro Tools

Avro Schema Evolution

1.Backward compatible

EmployeeBackwardV2.avsc is backward compatible with EmployeeBackwardV1.avsc Run EmployeeBackwardApp: it writes .avro file using V1 schema, and reads the same .avro file using V2 schema

EmployeeNotBackwardV1.avsc and EmployeeNotBackwardV2.avsc are not backward compatible Run EmployeeNotBackwardApp: it will throw exception

2.Forward compatible

EmployeeForwardV2.avsc is forward compatible with EmployeeForwardV1.avsc Run EmployeeForwardApp: it writes .avro file using V2 schema, and reads the same .avro file using V1 schema

EmployeeNotForwardV1.avsc and EmployeeNotForwardV2.avsc are not backward compatible Run EmployeeNotForwardApp: it will throw exception

3.Full compatible

EmployeeFullV1.avsc is full compatible with EmployeeFullV2.avsc

Run EmployeeFullApp

Confluent Schema Registry

See Kafka Schema Registry Postman collection See

Exercise: create topic with single partition Run Postman -> Kafka Schema Registry -> Subject -> Create new subject with schema of Avro01.avsc Run Postman -> Kafka Schema Registry -> Compatibility -> Update schema compatibility with "FULL"

Avro and Spring

1.Avro Kafka Producer and Consumer (automatically generated Avro schema)

See Avro01Producer Run Maven compile to generate Avro01 java class Run kafka-avro-producer spring application --bootstrap-server localhost:9092 --from-beginning --topic sc-avro01 --property print.key=true

Run Postman -> Kafka Schema Registry -> Schemas -> List schemas -> it will display sc-avro01-value schema; the schema is automatically created when sending first message, if not already created The producer will publish the message with sc-avro01-value schema

See Avro01Consumer Run kafka-avro-consumer

2.Avro Kafka Producer and Consumer (manually generated Avro schema)

Create schema: Run Postman -> Subject -> Create new subject (the schema needs to be on a single line, and the double quotes escaped - find and replace " with ") -> use Avro02.avsc, subject is sc-avro02-value

See Avro02Producer Run kafka-avro-producer spring application --bootstrap-server localhost:9092 --from-beginning --topic sc-avro02 --property print.key=true

3.Kafka Avro and Kafka Stream --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic sc-hello Create schema for Hello.avsc

See KafkaStreamConfig, HelloProducer, HelloPositiveUppercase, HelloStream --bootstrap-server localhost:9092 --from-beginning --topic sc-hello --property print.key=true Run kafka-avro-producer and kafka-avro-consumer spring applications

4.Backward compatibility (occurs when consumer is updated first - e.g. v2, but the producer is not updated - e.g. v1)

In real life there is not V1, or V2 of the .avsc file (e.g. instead of EmployeeBackwardV1.avsc and EmployeeBackwardV2.avsc, there's only one EmployeeBackward.avsc) --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic sc-employee-backward Create schema for EmployeeBackward.avsc (Postman) Check compatibility in Conduktor UI (set Compatibility to "Backward") See EmployeeBackwardProducer, EmployeeBackwardScheduler, EmployeeBackwardConsumer Run kafka-avro-producer and kafka-avro-consumer spring applications

Now, in the kafka-avro-consumer project, copy the content of EmployeeBackwardV2.avsc into EmployeeBackward.avsc Note: the consumer will automatically increment the version of the schema in the Registry (or you can manually update the version of the schema via Conduktor UI) Run Maven compile to regenerate EmployeeBackward java class and run kafka-avro-producer and kafka-avro-consumer spring applications

5.Forward compatibility (occurs when producer is updated first - e.g. v2, but the consumer is not updated - e.g. v1) --bootstrap-server localhost:9092 --create --partitions 1 --replication-factor 1 --topic sc-employee-forward Create schema for EmployeeForward.avsc (Postman) Check compatibility in Conduktor UI (set Compatibility to "Forward") See EmployeeForwardProducer, EmployeeForwardScheduler, EmployeeForwardConsumer Run kafka-avro-producer and kafka-avro-consumer spring applications

Now, in the kafka-avro-producer project, copy the content of EmployeeForwardV2.avsc into EmployeeForward.avsc Note: the consumer will automatically increment the version of the schema in the Registry (or you can manually update the version of the schema via Conduktor UI) Run Maven compile to regenerate EmployeeForward java class Modify EmployeeForwardScheduler to send dummy email data (the consumer will ignore this field) Run kafka-avro-producer and kafka-avro-consumer spring applications

6.Full compatibility (neither producer, nor consumer breaks when schema evolves)

Avro and Kafka Connect

Note that docker-compose-full.yml has CONNECT_VALUE_CONVERTER and CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL for kafka-connect service (the schema will not be embedded in the message, as with JSON messages) Note: you can override each connector configuration and set the default converter Note: source connector automatically generates schema, which is saved on schema registry; the sink connector will read the schema from the Registry

1.Postgresql source connector + Kafka consumer

See PersonAddressPostgresql.avsc Create source connector by running Postman -> Kafka Schema Registry -> Kafka Connect -> Source PostgreSQL - Person Address Check Conduktor -> Schema Registry -> note that a new schema "sc-person-address-postgresql-value" was automatically created Copy schema content from Conduktor to kafka-avro-consumer/src/main/avro/PersonAddressPostgresql.avsc

See PersonAddressPostgresqlConsumer

2.Kafka producer

See PersonAddressPostgresqlProducer and PersonAddressScheduler

3.Postgresql sink connector

Create postresql table kafka_employee_forward (firstName varchar(200), lastName varchar(200), email varchar(200)) Enable EmployeeForwardScheduler Create sink connector by running Postman -> Kafka Schema Registry -> Kafka Connect -> Sink PostgreSQL - Employee Forward

Kafka Confluent REST Proxy

1.Various operations via REST Proxy List cluster ids: Postman -> Kafka Rest Proxy -> v3 -> Cluster -> List clusters Create topic: Postman -> Kafka Rest Proxy -> v3 -> Topic -> Create topic (my-topic-from-api-binary, my-topic-from-api-avro, my-topic-from-api-json)

2.Produce and consume binary data via REST Proxy The following step needs to be executed in order to produce binary data: Produce binary data: Postman -> Kafka Rest Proxy -> v2 -> Producer -> Produce binary (the value is base64 encoded) --bootstrap-server localhost:9092 --from-beginning --topic my-topic-from-api-binary --property print.key=true

The following steps need to be executed in order to consume binary data: Consume binary data: Postman -> Kafka Rest Proxy -> v2 -> Consumer -> Consume binary -> Create consumer Subscribe to topic: Postman -> Kafka Rest Proxy -> v2 -> Consumer -> Consume binary -> Subscribe to topic Consume from topic: Postman -> Kafka Rest Proxy -> v2 -> Consumer -> Consume binary -> Consume from topic

3.Produce and consume JSON data via REST Proxy

Same with binary, only the content-type header is changed

The following step needs to be executed in order to produce json data: Postman -> Kafka Rest Proxy -> v2 -> Producer -> Produce json

The following steps need to be executed in order to consume json data: Consume json data: Postman -> Kafka Rest Proxy -> v2 -> Consumer -> Consume json -> Create consumer Subscribe to topic: Postman -> Kafka Rest Proxy -> v2 -> Consumer -> Consume json -> Subscribe to topic Consume from topic: Postman -> Kafka Rest Proxy -> v2 -> Consumer -> Consume json -> Consume from topic

4.Produce and consume Avro data via REST Proxy

Similar with json

The following step needs to be executed in order to produce avro data: Postman -> Kafka Rest Proxy -> v2 -> Producer -> Produce avro 1 (has both schema and records; the schema will be automatically created) or Postman -> Kafka Rest Proxy -> v2 -> Producer -> Produce avro 1 (references previously created schema id)

The following steps need to be executed in order to consume avro data: Consume avro data: Postman -> Kafka Rest Proxy -> v2 -> Consumer -> Consume avro -> Create consumer Subscribe to topic: Postman -> Kafka Rest Proxy -> v2 -> Consumer -> Consume avro -> Subscribe to topic Consume from topic: Postman -> Kafka Rest Proxy -> v2 -> Consumer -> Consume avro -> Consume from topic


docker exec -it kafka-ksqldb ksql Run kafka-ms-order spring application

1.Hello ksqlDB stream

kafka-topics --create --topic=t-commodity-promotion --partitions=1 --replication-factor=1 --bootstrap-server=localhost:9092 Run Postman -> Microservices & Kafka Stream -> Promotion -> Create Promotion

Console consumer: Run in kafka-ksqldb: print 't-commodity-promotion'; (it will start listening for new data) or Run in kafka-ksqldb: SET 'auto.offset.reset'='earliest'; print 't-commodity-promotion'; or Run in kafka-ksqldb: print 't-commodity-promotion' from beginning;

Create stream: CREATE STREAM s-commodity-promotion ( promotionCode VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-promotion', VALUE_FORMAT = 'JSON' );

Read from stream: SELECT * FROM s-commodity-promotion EMIT CHANGES;

Read from stream with transformation (see SELECT UCASE(promotionCode) AS uppercasePromotionCode FROM s-commodity-promotion EMIT CHANGES;

Create stream with transformation (it will create a new topic 's-commodity-promotion-uppercase'; by running SHOW TOPICS or --list --bootstrap-server=localhost:9092, it will list the 's-commodity-promotion-uppercase' topic, which is just a normal topic) CREATE STREAM s-commodity-promotion-uppercase WITH ( kafka_topic = 't-ksql-commodity-promotion-uppercase' ) AS SELECT UCASE(promotionCode) AS uppercasePromotionCode FROM s-commodity-promotion EMIT CHANGES;


Select from stream: SELECT * FROM s-commodity-promotion-uppercase EMIT CHANGES;

2.Basic ksqlDB stream commands (see

PRINT t-commodity-promotion; SELECT * FROM s-commodity-promotion EMIT CHANGES; CREATE STREAM IF NOT EXISTS s-commodity-promotion2; CREATE OR REPLACE STREAM IF NOT EXISTS s-commodity-promotion2; DROP STREAM s-commodity-promotion2; DROP STREAM IF EXISTS s-commodity-promotion2;

3.Primitive data types

See BasicDataOne* Run Postman -> Kafka ksqlDB -> Basic Data -> Basic Data 1 multiple times

Show data in topic: PRINT t-ksql-basic-data-one FROM BEGINNING;

Create stream: CREATE STREAM s-basic-data-one ( myString STRING, myFloat DOUBLE, myBoolean BOOLEAN, myInteger INT, myDouble DOUBLE, myBigDecimal DECIMAL(30,18), myLong BIGINT, myAnotherString VARCHAR ) WITH ( KAFKA_TOPIC = 't-ksql-basic-data-one', VALUE_FORMAT = 'JSON' );

Try to update stream with different column order (it will fail): CREATE OR REPLACE STREAM s-basic-data-one ( myBoolean BOOLEAN, myFloat DOUBLE, myDouble DOUBLE, myInteger INT, myLong BIGINT, myString STRING, myAnotherString VARCHAR, myBigDecimal DECIMAL(30,18) ) WITH ( KAFKA_TOPIC = 't-ksql-basic-data-one', VALUE_FORMAT = 'JSON' ); The workaround is to delete and recreate the stream (DROP STREAM IF EXISTS s-basic-data-one;)

Show data with limit: SELECT * FROM s-basic-data-one EMIT CHANGES LIMIT 15;

3.Date and time data types

See BasicDataTwo* Run Postman -> Kafka ksqlDB -> Basic Data -> Basic Data 2 multiple times (see Postman Pre-request Script)

Show data in topic: PRINT t-ksql-basic-data-two FROM BEGINNING;

Create stream: CREATE OR REPLACE STREAM s-basic-data-two ( myEpochDay DATE, myMillisOfDay TIME, myEpochMillis TIMESTAMP ) WITH ( KAFKA_TOPIC = 't-ksql-basic-data-two', VALUE_FORMAT = 'JSON' );

Read data from stream: SELECT * FROM s-basic-data-two EMIT CHANGES;

Date/time ksqlDB functions: SELECT myEpochDay, DATEADD(DAYS, 7, myEpochDay) AS aWeekAfterMyEpochDay, myMillisOfDay, TIMESUB(HOURS, 2, myMillisOfDay) AS twoHoursBeforeMyMillisOfDay, myEpochMillis, FORMAT_TIMESTAMP(myEpochMillis, 'dd-MMM-yyyy, HH:mm:ss Z', 'Asia/Jakarta') as epochMillisAtJakartaTimezone FROM s-basic-data-two EMIT CHANGES;

4.Date and time data types (ISO 8601 format)

See BasicDataThree* Run Postman -> Kafka ksqlDB -> Basic Data -> Basic Data 3 multiple times

Show data in topic: PRINT t-ksql-basic-data-three;

Create stream: CREATE OR REPLACE STREAM s-basic-data-three ( myLocalDate VARCHAR, myLocalDateCustomFormat VARCHAR, myLocalTime VARCHAR, myLocalTimeCustomFormat VARCHAR, myLocalDateTime VARCHAR, myLocalDateTimeCustomFormat VARCHAR ) WITH ( KAFKA_TOPIC = 't-ksql-basic-data-three', VALUE_FORMAT = 'JSON' );

Read data from stream: SELECT * FROM s-basic-data-three EMIT CHANGES;

Read data (LocalDate) SELECT myLocalDate, DATEADD(DAYS, 7, myLocalDate) AS aWeekAfterMyLocalDate, CONCAT('Prefix string- ', myLocalDate, ' -suffix String') AS myLocalDateConcatString, myLocalDateCustomFormat, DATEADD(DAYS, 7, myLocalDateCustomFormat) AS aWeekAfterMyLocalDateCustomFormat, CONCAT('Prefix string- ', myLocalDateCustomFormat, ' -suffix String') AS myLocalDateCustomFormatConcatString FROM s-basic-data-three EMIT CHANGES;

Read data (LocalTime) SELECT myLocalTime, TIMEADD(HOURS, 3, myLocalTime) AS 3HoursAfterMyLocalTime, CONCAT('Prefix string- ', myLocalTime, ' -suffix String') AS myLocalTimeConcatString, myLocalTimeCustomFormat, TIMEADD(HOURS, 3, myLocalTimeCustomFormat) AS 3HoursAfterMyLocalDateCustomFormat, CONCAT('Prefix string- ', myLocalTimeCustomFormat, ' -suffix String') AS myLocalTimeCustomFormatConcatString FROM s-basic-data-three EMIT CHANGES;

Read data (LocalDateTime) SELECT myLocalDateTime, DATEADD(DAYS, 2, myLocalDateTime) AS 2DaysAfterMyLocalDateTime, CONCAT('Prefix string- ', myLocalDateTime, ' -suffix String') AS myLocalDateTimeConcatString, myLocalDateTimeCustomFormat, DATEADD(DAYS, 2, myLocalDateTimeCustomFormat) AS 2DaysAfterMyLocalDateTimeCustomFormat, CONCAT('Prefix string- ', myLocalDateTimeCustomFormat, ' -suffix String') AS myLocalDateTimeCustomFormatConcatString FROM s-basic-data-three EMIT CHANGES;

Parse date/time from string: SELECT PARSE_DATE(myLocalDate, 'yyyy-MM-dd') AS parsedLocalDate, PARSE_DATE(myLocalDateCustomFormat, 'dd MMM yyyy') AS parsedLocalDateCustomFormat, PARSE_TIME(myLocalTime, 'HH:mm:ss') AS parsedLocalTime, PARSE_TIME(myLocalTimeCustomFormat, 'hh:mm:ss a') AS parsedLocalTimeCustomFormat, PARSE_TIMESTAMP(myLocalDateTime, 'yyyy-MM-dd''T''HH:mm:ss') AS parsedLocalDateTime, PARSE_TIMESTAMP(myLocalDateTimeCustomFormat, 'dd-MMM-yyyy hh:mm:ss a') AS parsedLocalDateTimeCustomFormat FROM s-basic-data-three EMIT CHANGES;

Create stream with parsed date/time from string: CREATE STREAM s-basic-data-three-parsed AS SELECT PARSE_DATE(myLocalDate, 'yyyy-MM-dd') AS parsedLocalDate, PARSE_DATE(myLocalDateCustomFormat, 'dd MMM yyyy') AS parsedLocalDateCustomFormat, PARSE_TIME(myLocalTime, 'HH:mm:ss') AS parsedLocalTime, PARSE_TIME(myLocalTimeCustomFormat, 'hh:mm:ss a') AS parsedLocalTimeCustomFormat, PARSE_TIMESTAMP(myLocalDateTime, 'yyyy-MM-dd''T''HH:mm:ss') AS parsedLocalDateTime, PARSE_TIMESTAMP(myLocalDateTimeCustomFormat, 'dd-MMM-yyyy hh:mm:ss a') AS parsedLocalDateTimeCustomFormat FROM s-basic-data-three EMIT CHANGES;

Describe stream: DESCRIBE s-basic-data-three; DESCRIBE s-basic-data-three-parsed; DESCRIBE s-basic-data-three EXTENDED; DESCRIBE s-basic-data-three-parsed EXTENDED;

Correctly read data for LocalDate from parsedLocalDate column: SELECT parsedLocalDate, DATEADD(DAYS, 7, parsedLocalDate) AS aWeekAfterParsedLocalDate, parsedLocalDateCustomFormat, DATEADD(DAYS, 7, parsedLocalDateCustomFormat) AS aWeekAfterParsedLocalDateCustomFormat FROM s-basic-data-three-parsed EMIT CHANGES;

Correctly read data for LocalTime from parsedLocalDate column: SELECT parsedLocalTime, TIMEADD(HOURS, 3, parsedLocalTime) AS 3HoursAfterParsedLocalTime, parsedLocalTimeCustomFormat, TIMEADD(HOURS, 3, parsedLocalTimeCustomFormat) AS 3HoursAfterParsedLocalDateCustomFormat FROM s-basic-data-three-parsed EMIT CHANGES;

Correctly read data for LocalDateTime from parsedLocalDate column: SELECT parsedLocalDateTime, TIMESTAMPADD(DAYS, 2, parsedLocalDateTime) AS 2DaysAfterParsedLocalDateTime, parsedLocalDateTimeCustomFormat, TIMESTAMPADD(DAYS, 2, parsedLocalDateTimeCustomFormat) AS 2DaysAfterParsedLocalDateTimeCustomFormat FROM s-basic-data-three-parsed EMIT CHANGES;

5.Array, List and Set data types

See BasicDataFour* Run Postman -> Kafka ksqlDB -> Basic Data -> Basic Data 4 multiple times

Show data in topic: PRINT t-ksql-basic-data-four;

Create stream: CREATE STREAM s-basic-data-four ( myStringArray ARRAY, myIntegerList ARRAY, myDoubleSet ARRAY ) WITH ( KAFKA_TOPIC = 't-ksql-basic-data-four', VALUE_FORMAT = 'JSON' );

Read data from stream: SELECT * FROM s-basic-data-four EMIT CHANGES;

Describe stream DESCRIBE s-basic-data-four;

Array functions: SELECT ARRAY_LENGTH(myStringArray) as lengthMyStringArray, ARRAY_CONCAT(myIntegerList, ARRAY[999, 998, 997]) as concatMyIntegerList, ARRAY_SORT(myDoubleSet, 'DESC') as sortedDescMyDoubleSet FROM s-basic-data-four EMIT CHANGES;

5.Map data type

See BasicDataFive* Run Postman -> Kafka ksqlDB -> Basic Data -> Basic Data 5 multiple times

Show data in topic: PRINT t-ksql-basic-data-five;

Create stream: CREATE STREAM s-basic-data-five ( myMapAlpha MAP<VARCHAR, VARCHAR>, myMapBeta MAP<VARCHAR, VARCHAR> ) WITH ( KAFKA_TOPIC = 't-ksql-basic-data-five', VALUE_FORMAT = 'JSON' );

Read data from stream: SELECT * FROM s-basic-data-five EMIT CHANGES;

Describe stream: DESCRIBE s-basic-data-five;

Map functions: SELECT MAP_VALUES(myMapAlpha) as valuesAtMyMapAlpha, MAP_KEYS(myMapBeta) as keysAtMyMapBeta FROM s-basic-data-five EMIT CHANGES;

6.Complex data types

See BasicDataPersonRequest, BasicDataPersonMessage, BasicDataPassportMessage, BasicDataAddressMessage, BasicDataLocationMessage Run Postman -> Kafka ksqlDB -> Basic Data -> Basic Data Person multiple times

Show data in topic: PRINT t-ksql-basic-data-person FROM BEGINNING;

Create stream: CREATE STREAM s-basic-data-person ( firstName VARCHAR, lastName VARCHAR, birthDate VARCHAR, contacts MAP<VARCHAR, VARCHAR>, passport STRUCT< number VARCHAR, expiryDate VARCHAR

, addresses ARRAY< STRUCT< streetAddress VARCHAR, country VARCHAR, location STRUCT< latitude DOUBLE, longitude DOUBLE

) WITH ( KAFKA_TOPIC = 't-ksql-basic-data-person', VALUE_FORMAT = 'JSON' );

Read data from stream: SELECT * FROM s-basic-data-person EMIT CHANGES;

Access map data: SELECT contacts['email'] AS emailFromContactsMap, contacts['phoneHome'] AS phoneHomeFromContactsMap, contacts['phoneWork'] AS phoneWorkFromContactsMap FROM s-basic-data-person EMIT CHANGES;

Access struct data: SELECT passport->number AS passportNumber, passport->expiryDate AS passportExpiryDate FROM s-basic-data-person EMIT CHANGES;

Convert each address in the list into one record (explode function): SELECT firstName, lastName, EXPLODE(addresses) as addressSingle FROM s-basic-data-person EMIT CHANGES;

Convert each address in the list into one record, then access each field in the address: SELECT firstName, lastName, EXPLODE(addresses)->streetAddress, EXPLODE(addresses)->country, EXPLODE(addresses)->location FROM s-basic-data-person EMIT CHANGES;

Convert each address in the list into one record, then access each field in the address, including fields from structs within structs: SELECT firstName, lastName, EXPLODE(addresses)->streetAddress, EXPLODE(addresses)->country, EXPLODE(addresses)->location->latitude AS latitude, EXPLODE(addresses)->location->longitude AS longitude FROM s-basic-data-person EMIT CHANGES;

Convert each address in the list into one record, then access each field in the address, including fields from structs within structs, and date conversion functions: CREATE STREAM s-basic-data-person-complete AS SELECT firstName, lastName, PARSE_DATE(birthDate, 'yyyy-MM-dd') AS birthDate, contacts, passport->number AS passportNumber, PARSE_DATE(passport->expiryDate,'yyyy-MM-dd') AS passportExpiryDate, EXPLODE(addresses)->streetAddress, EXPLODE(addresses)->country, EXPLODE(addresses)->location->latitude AS latitude, EXPLODE(addresses)->location->longitude AS longitude FROM s-basic-data-person EMIT CHANGES;

Describe stream: DESCRIBE s-basic-data-person-complete;

Get data from stream: SELECT * FROM s-basic-data-person-complete EMIT CHANGES;

ksqlDB Stream and Table key (KSQL Stream and Table are similar with Kafka Stream and Table)

See BasicDataCountryMessage Run Postman -> Kafka ksqlDB -> Stream & Table Key -> Basic Data Country multiple times

Show data in topic: PRINT t-ksql-basic-data-country FROM BEGINNING;

Create stream: CREATE STREAM s-basic-data-country ( countryName VARCHAR, currencyCode VARCHAR, population INT ) WITH ( KAFKA_TOPIC = 't-ksql-basic-data-country', VALUE_FORMAT = 'JSON' );

Describe stream: DESCRIBE s-basic-data-country;

Get stream data: SELECT * FROM s-basic-data-country EMIT CHANGES;

Re-key by country name (from existing s-basic-data-country stream): DROP STREAM IF EXISTS s-basic-data-country-rekeyed;

CREATE STREAM s-basic-data-country-rekeyed AS SELECT countryName, currencyCode, population FROM s-basic-data-country PARTITION BY countryName EMIT CHANGES;

DESCRIBE s-basic-data-country-rekeyed;

Include key in the stream: DROP STREAM IF EXISTS s-basic-data-country-rekeyed;

CREATE STREAM s-basic-data-country-rekeyed AS SELECT countryName AS rowkey, AS_VALUE(countryName) AS countryName, currencyCode, population FROM s-basic-data-country PARTITION BY countryName EMIT CHANGES;

Get data from stream: SET 'auto.offset.reset'='earliest';

SELECT * FROM s-basic-data-country-rekeyed EMIT CHANGES;

Re-key by country name and currency code: DROP STREAM IF EXISTS s-basic-data-country-rekeyed-json;

CREATE STREAM s-basic-data-country-rekeyed-json WITH ( KEY_FORMAT = 'JSON' ) AS SELECT STRUCT(countryName := countryName, currencyCode := currencyCode) AS jsonKey, AS_VALUE(countryName) AS countryName, AS_VALUE(currencyCode) AS currencyCode, population FROM s-basic-data-country PARTITION BY STRUCT(countryName := countryName, currencyCode := currencyCode) EMIT CHANGES;

Create table with key = country name and sum(population) (Table is an aggregation of stream with group by) DROP TABLE IF EXISTS tbl-basic-data-country;

CREATE TABLE tbl-basic-data-country AS SELECT countryName, SUM(population) AS totalPopulation FROM s-basic-data-country GROUP BY countryName EMIT CHANGES;

Run Postman -> Kafka ksqlDB -> Stream & Table Key -> Table Simulation requests Note: 07 - Null key (no country name) is still publishing to the underlying topic, however, it's not published to table Note: 09 - Japan Delete sends a tombstone record (nothing happened on the topic, while in the Table, a tombstone value was created); re-running "SELECT * FROM tbl-basic-data-country" will only display "Indonesia" records

Get data from table: SET 'auto.offset.reset'='earliest';

SELECT * FROM tbl-basic-data-country EMIT CHANGES;

ksqlDB Commodity

1.Commodity stream + KSQL rowkey Create stream: CREATE STREAM s-commodity-order ( rowkey VARCHAR KEY, creditCardNumber VARCHAR, itemName VARCHAR, orderDateTime VARCHAR, orderLocation VARCHAR, orderNumber VARCHAR, price INT, quantity INT ) WITH ( KAFKA_TOPIC = 't-commodity-order', VALUE_FORMAT = 'JSON' );

Describe stream DESCRIBE s-commodity-order;

Run Postman -> Microservices & Kafka Stream -> Commodity Order -> Order 1 Random Item

Get data from topic: PRINT t-commodity-order FROM BEGINNING; Note: the PRINT statement is not reliable (e.g. "Key format: KAFKA_BIGINT or KAFKA_DOUBLE or KAFKA_STRING")

Mask credit card number CREATE STREAM s-commodity-order-masked AS SELECT rowkey, MASK_LEFT(creditCardNumber, 12, '', '', '', '') AS maskedCreditCardNumber, itemName, orderDateTime, orderLocation, orderNumber, price, quantity FROM s-commodity-order EMIT CHANGES;

Get data from stream: SELECT * FROM s-commodity-order-masked EMIT CHANGES;

Calculate total item amount to pattern output CREATE STREAM s-commodity-pattern-one AS SELECT rowkey, itemName, orderDateTime, orderLocation, orderNumber, (price * quantity) as totalItemAmount FROM s-commodity-order-masked EMIT CHANGES;

Get data from stream: SELECT * FROM s-commodity-pattern-one EMIT CHANGES;

Filter stream based on quantity: CREATE STREAM s-commodity-reward-one AS SELECT rowkey, itemName, orderDateTime, orderLocation, orderNumber, price, quantity FROM s-commodity-order-masked WHERE quantity > 200 EMIT CHANGES;

Storage sink: CREATE STREAM s-commodity-storage-one AS SELECT * FROM s-commodity-order-masked EMIT CHANGES;

Select from stream: SELECT * FROM s-commodity-reward-one EMIT CHANGES;

2.Custom rowkey from value

CREATE STREAM s-commodity-order-key-from-value ( creditCardNumber VARCHAR, itemName VARCHAR, orderDateTime VARCHAR, orderLocation VARCHAR, orderNumber VARCHAR KEY, price INT, quantity INT ) WITH ( KAFKA_TOPIC = 't-commodity-order', VALUE_FORMAT = 'JSON' );

Describe stream: DESCRIBE s-commodity-order-key-from-value;

3.Commodity plastic/non-plastic (see Kafka Stream - Commodity -> 3.a.)

Create stream for plastic items CREATE OR REPLACE STREAM s-commodity-pattern-two-plastic AS SELECT rowkey, itemName, orderDateTime, orderLocation, orderNumber, (price * quantity) as totalItemAmount FROM s-commodity-order-masked WHERE LCASE(itemName) LIKE 'plastic%' EMIT CHANGES;

SELECT * FROM s-commodity-pattern-two-plastic EMIT CHANGES;

CREATE OR REPLACE STREAM s-commodity-pattern-two-notplastic AS SELECT rowkey, itemName, orderDateTime, orderLocation, orderNumber, (price * quantity) as totalItemAmount FROM s-commodity-order-masked WHERE LCASE(itemName) NOT LIKE 'plastic%' EMIT CHANGES;

SELECT * FROM s-commodity-pattern-two-notplastic EMIT CHANGES;

4.Commodity reward (see Kafka Stream - Commodity -> t-commodity-reward-two)

Create stream for large & not cheap items CREATE OR REPLACE STREAM s-commodity-reward-two AS SELECT rowkey, itemName, orderDateTime, orderLocation, orderNumber, price, quantity FROM s-commodity-order-masked WHERE quantity > 200 AND price > 100 EMIT CHANGES;

5.Commodity reward (see Kafka Stream - Commodity -> t-commodity-storage-two)

Replace key for storage CREATE OR REPLACE STREAM s-commodity-storage-two AS SELECT FROM_BYTES( TO_BYTES(orderNumber, 'utf8'), 'base64' ) AS base64Rowkey, itemName, orderDateTime, orderLocation, orderNumber, price, quantity FROM s-commodity-order-masked PARTITION BY FROM_BYTES( TO_BYTES(orderNumber, 'utf8'), 'base64' ) EMIT CHANGES;

Describe stream
DESCRIBE s-commodity-storage-two; Note that base64Rowkey column is marked as key or Use console consumer to see the key kafka-console-consumer --topic t-commodity-storage-two --from-beginning --property print.key=true --bootstrap-server=localhost:9092

6.Commodity reward for each location (see Kafka Stream - Commodity -> 4. -> the key of the OrderReward needs to be changed and become the location)

CREATE OR REPLACE STREAM s-commodity-reward-four AS SELECT itemName, orderDateTime, orderLocation, orderNumber, price, quantity FROM s-commodity-order-masked PARTITION BY orderLocation EMIT CHANGES;

7.KSQL scripts

copy spring-kafka-scripts/ksqldb-samples/scripts/commodity-sample.ksql to ./data/kafka-ksqldb-data/scripts

execute script by running: RUN SCRIPT /data/scripts/commodity-sample.ksql;


8.Filter locations starting with C (see Kafka Stream - Commodity -> 6.)

CREATE OR REPLACE STREAM s-commodity-fraud-six AS SELECT CONCAT( SUBSTRING(orderLocation, 1, 1), '' ) as key, (price * quantity) as totalValue FROM s-commodity-order-masked WHERE LCASE(orderLocation) LIKE 'c%' PARTITION BY CONCAT( SUBSTRING(orderLocation, 1, 1), '' ) EMIT CHANGES;

kafka-console-consumer --topic s-commodity-fraud-six --from-beginning --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer --bootstrap-server=localhost:9092

ksqlDB REST API - Feedback

1.Basic operations (create stream, describe stream, get data from stream) Run Postman -> Kafka ksqlDB -> Feedback -> Are We Good Enough?

Note: running 07 - Push query word stream in Postman, will return an empty response because Postman does not support streaming; the alternative is to run the ksql "SELECT * FROM s-commodity-feedback-word EMIT CHANGES;" in terminal, or to run the curl command in terminal

Run Postman -> Microservices & Kafka Stream -> Feedback -> Create Random Feedback

2.Who owns the feedback (change key to location)

Run Postman -> Kafka ksqlDB -> Feedback -> Who Owns This Feedback? Run Postman -> Microservices & Kafka Stream -> Feedback -> Create good feedback

3.Good feedback/bad feedback

Run Postman -> Kafka ksqlDB -> Feedback -> Good Feedback or Bad Feedback? Run Postman -> Microservices & Kafka Stream -> Feedback -> Create bad feedback

4.Group using Table

Run Postman -> Kafka ksqlDB -> Feedback -> Group Using Table Run Postman -> Microservices & Kafka Stream -> Feedback -> Create good feedback Run Postman -> Microservices & Kafka Stream -> Feedback -> Create bad feedback

5.Send and continue and streams.through have equivalents of "CREATE STREAM" and "CREATE TABLE"

6.Overall good or bad count

Run Postman -> Kafka ksqlDB -> Feedback -> Overall Good or Bad Word Run Postman -> Microservices & Kafka Stream -> Feedback -> Create good feedback Run Postman -> Microservices & Kafka Stream -> Feedback -> Create bad feedback Run Postman -> Microservices & Kafka Stream -> Feedback -> Create random feedback

ksqlDB - insert data

1.Insert simple data Insert basic data one INSERT INTO s-basic-data-one ( myBoolean, myString, myAnotherString, myFloat, myDouble, myBigDecimal, myInteger, myLong ) VALUES ( false, 'This is a string', 'And this is another string', 52.918, 58290.581047, 4421672.5001855, 1057, 2900175 );

Insert basic data two INSERT INTO s-basic-data-two ( myEpochDay, myMillisOfDay, myEpochMillis ) VALUES ( FROM_DAYS(20967), PARSE_TIME('18:47:15', 'HH:mm:ss'), FROM_UNIXTIME(1678610274295) );

Insert basic data three INSERT INTO s-basic-data-three ( myLocalDate, myLocalTime, myLocalDateTime, myLocalDateCustomFormat, myLocalTimeCustomFormat, myLocalDateTimeCustomFormat ) VALUES ( '2024-03-07', '16:52:09', '2028-11-26T19:44:16', '27 Aug 2024', '02:55:17 PM', '19-Dec-2026 05:42:53 AM' );

Insert basic data four (array of string) INSERT INTO s-basic-data-four ( myStringArray ) VALUES ( ARRAY[ 'Hello', 'from', 'ksqldb', 'I hope you like it', 'and enjoy the course' ] );

Insert basic data four (list of integer) INSERT INTO s-basic-data-four ( myIntegerList ) VALUES ( ARRAY[ 1001, 1002, 1003, 1004, 1005, 1006 ] );

Insert basic data four (set of double) INSERT INTO s-basic-data-four ( myDoubleSet ) VALUES ( ARRAY[ 582.59, 1964.094, 287.296, 7933.04, 332.694 ] );

Insert basic data five (1) INSERT INTO s-basic-data-five ( myMapAlpha ) VALUES ( MAP( '973' := 'nine seven three', '628' := 'six two eight', '510' := 'five one zero' ) );

Insert basic data five (2) INSERT INTO s-basic-data-five ( myMapAlpha, myMapBeta
) VALUES ( MAP( '409' := 'four zero nine', '152' := 'one five two', '736' := 'seven three six', '827' := 'eight two seven'
), MAP( 'd2c1b963-c18c-4c6e-b85f-3ebc44b93cec' := 'The first element', '4edf4394-fd33-4643-9ed8-f3354fe96c28' := 'The second element', '720ecc9e-c81f-4fac-a4d5-752c1d3f3f4f' := 'The third element' ) );

Insert person INSERT INTO s-basic-data-person ( firstName, lastName, birthDate, contacts, passport, addresses ) VALUES ( 'Kate', 'Bishop', '2002-11-25', MAP( 'email' := '', 'phone' := '999888777' ), STRUCT( number := 'MCU-PASS-957287759', expiryDate := '2029-08-18' ), ARRAY[ STRUCT( streetAddress := 'Somewhere in New York', country := 'USA', location := STRUCT( latitude := 40.830063426849705, longitude := -74.14751581646931 ) ), STRUCT( streetAddress := 'Tokyo, just there', country := 'Japan', location := STRUCT( latitude := 35.734078460795104, longitude := 139.62821562631277 ) ) ] );

Note: update is not available in Kafka, nor in ksqlDB Note: delete single record is not available in Kafka, nor in ksqlDB; the only way to delete data is to delete topic (e.g. DROP STREAM my-stream DELETE TOPIC)

2.Insert stream to other stream / merge multiple streams into one stream (see ksqldb-merge-streams.jpg and Kafka Stream - Customer -> 1.)

Create stream from topic mobile CREATE STREAM s-commodity-customer-purchase-mobile( purchaseNumber VARCHAR, purchaseAmount INT, mobileAppVersion VARCHAR, operatingSystem VARCHAR, location STRUCT< latitude DOUBLE, longitude DOUBLE

) WITH ( KAFKA_TOPIC = 't-commodity-customer-purchase-mobile', VALUE_FORMAT = 'JSON' );

Create stream from topic web CREATE STREAM s-commodity-customer-purchase-web( purchaseNumber VARCHAR, purchaseAmount INT, browser VARCHAR, operatingSystem VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-customer-purchase-web', VALUE_FORMAT = 'JSON' );

Create merged stream from topic mobile + web CREATE STREAM s-commodity-customer-purchase-all ( purchaseNumber VARCHAR, purchaseAmount INT, mobileAppVersion VARCHAR, operatingSystem VARCHAR, location STRUCT< latitude DOUBLE, longitude DOUBLE

, browser VARCHAR, source VARCHAR ) WITH ( KAFKA_TOPIC = 't-ksql-commodity-customer-purchase-all', PARTITIONS = 2, VALUE_FORMAT = 'JSON' );

INSERT INTO s-commodity-customer-purchase-all SELECT purchaseNumber, purchaseAmount, mobileAppVersion, operatingSystem, location, CAST(null AS VARCHAR) AS browser, 'mobile' AS source FROM s-commodity-customer-purchase-mobile EMIT CHANGES;

Insert into merged from stream web INSERT INTO s-commodity-customer-purchase-all SELECT purchaseNumber, purchaseAmount, CAST(null AS VARCHAR) AS mobileAppVersion, operatingSystem, CAST(null AS STRUCT<latitude DOUBLE, longitude DOUBLE>) AS location, browser, 'web' AS source FROM s-commodity-customer-purchase-web EMIT CHANGES;

SELECT * FROM s-commodity-customer-purchase-all EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream - Customer Purchase

3.Table and cogroup (see Kafka Stream - Customer -> 2.)

Create stream from topic shopping cart CREATE STREAM s-commodity-customer-preference-shopping-cart( customerId VARCHAR, itemName VARCHAR, cartAmount INT, cartDatetime VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-customer-preference-shopping-cart', VALUE_FORMAT = 'JSON' );

Create stream from topic wishlist CREATE STREAM s-commodity-customer-preference-wishlist( customerId VARCHAR, itemName VARCHAR, wishlistDatetime VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-customer-preference-wishlist', VALUE_FORMAT = 'JSON' );

Create cogroup stream, taking latest cart date time for each item CREATE TABLE tbl-commodity-customer-cogroup-shopping-cart WITH ( KEY_FORMAT = 'JSON' ) AS SELECT customerId, itemName, ARRAY_MAX( COLLECT_LIST(cartDatetime) ) AS latestCartDatetime FROM s-commodity-customer-preference-shopping-cart GROUP BY customerId, itemName EMIT CHANGES;

Note: the key contains 2 fields (customerId and itemName), thus the KEY_FORMAT needs to be 'JSON'

Run Postman -> Kafka ksqlDB -> Customer Preference -> Cart & Wishlist Simulation

Create map of <item name, latest add to cart datetime> CREATE TABLE tbl-commodity-customer-preference-shopping-cart AS SELECT customerId, AS_MAP ( COLLECT_LIST(itemName), COLLECT_LIST(latestCartDatetime)) AS cartItems FROM tbl-commodity-customer-cogroup-shopping-cart GROUP BY customerId EMIT CHANGES;

Run Postman -> Kafka ksqlDB -> Customer Preference -> Cart & Wishlist Simulation (starting with 01)

SELECT * FROM tbl-commodity-customer-preference-shopping-cart EMIT CHANGES;

Create cogroup stream, taking latest wishlist date time for each item CREATE TABLE tbl-commodity-customer-cogroup-wishlist WITH ( KEY_FORMAT = 'JSON' ) AS SELECT customerId, itemName, ARRAY_MAX( COLLECT_LIST(wishlistDatetime) ) AS latestWishlistDatetime FROM s-commodity-customer-preference-wishlist GROUP BY customerId, itemName EMIT CHANGES;

Create map of <item name, latest wishlist datetime> CREATE TABLE tbl-commodity-customer-preference-wishlist AS SELECT customerId, AS_MAP ( COLLECT_LIST(itemName), COLLECT_LIST(latestWishlistDatetime)) AS wishlistItems FROM tbl-commodity-customer-cogroup-wishlist GROUP BY customerId EMIT CHANGES;

SELECT * FROM tbl-commodity-customer-preference-wishlist EMIT CHANGES;

Run Postman -> Kafka ksqlDB -> Customer Preference -> Cart & Wishlist Simulation (start with 06)

Create merged preference from shopping cart + wishlist CREATE TABLE tbl-commodity-customer-preference-all AS SELECT tbl-commodity-customer-preference-shopping-cart.customerId AS customerId, cartItems, wishlistItems FROM tbl-commodity-customer-preference-shopping-cart JOIN tbl-commodity-customer-preference-wishlist ON tbl-commodity-customer-preference-shopping-cart.customerId = tbl-commodity-customer-preference-wishlist.customerId EMIT CHANGES;

SELECT * FROM tbl-commodity-customer-preference-all EMIT CHANGES;

ksqlDB pull query

For pull queries, just remove the EMIT CHANGES statement

Pull query to stream (1) SELECT myBoolean, myDouble, myString FROM s-basic-data-one;

Pull query to stream (2) SELECT * FROM s-basic-data-person;

Pull query to table SELECT * FROM tbl-commodity-customer-preference-all WHERE customerId = 'Linda';

ksqlDB - Flash Sale

1.Latest user vote (see 1.Kafka Stream - flash sale vote)

Create stream from underlying topic CREATE STREAM s-commodity-flashsale-vote ( customerId VARCHAR, itemName VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-flashsale-vote', VALUE_FORMAT = 'JSON' );

Create table to know latest user vote CREATE TABLE tbl-commodity-flashsale-vote-user-item AS SELECT customerId, LATEST_BY_OFFSET(itemName) AS itemName FROM s-commodity-flashsale-vote GROUP BY customerId;

Create table for item and vote count, based on latest user vote CREATE TABLE tbl-commodity-flashsale-vote-one-result AS SELECT itemName, COUNT(customerId) AS votesCount FROM tbl-commodity-flashsale-vote-user-item GROUP BY itemName EMIT CHANGES;

SELECT * FROM tbl-commodity-flashsale-vote-one-result EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream -> Flash Sale -> Simulation

2.Latest user vote within time range (see 2.Kafka stream state store)

Create table to know latest user vote, on certain time range CREATE TABLE tbl-commodity-flashsale-vote-user-item-timestamp AS SELECT customerId, LATEST_BY_OFFSET(itemName) AS itemName FROM s-commodity-flashsale-vote WHERE rowtime >= '2022-07-06T10:00:00' AND rowtime < '2022-07-06T10:00:00' GROUP BY customerId;

Create table for item and vote count, based on latest user vote, on certain time range CREATE TABLE tbl-commodity-flashsale-vote-two-result AS SELECT itemName, COUNT(customerId) AS votesCount FROM tbl-commodity-flashsale-vote-user-item-timestamp GROUP BY itemName EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream -> Flash Sale -> Create Random Flash Sale Vote

3.Average rating

Create table for average rating by country CREATE TABLE tbl-commodity-feedback-rating-one AS SELECT location, AVG(rating) as averageRating FROM s-commodity-feedback GROUP BY location EMIT CHANGES;

Filter aggregated values SELECT location, AVG(rating) as averageRating FROM s-commodity-feedback GROUP BY location HAVING AVG(rating) <= 3.5 EMIT CHANGES;

4.Detailed rating (histogram) Create table for average rating and histogram CREATE TABLE tbl-commodity-feedback-rating-two AS SELECT location, AVG(rating) as averageRating, HISTOGRAM( CAST(rating AS VARCHAR) ) as histogramRating FROM s-commodity-feedback GROUP BY location EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream -> Feedback -> Create Random Feedback

ksqlDB - Inventory

1.Summing records (see 1.Kafka stream - summing records)

Create stream from underlying topic CREATE STREAM s-commodity-inventory ( item VARCHAR, location VARCHAR, quantity INT, transactionTime VARCHAR, type VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-inventory', VALUE_FORMAT = 'JSON' );

CREATE STREAM s-commodity-inventory-movement AS SELECT item, CASE WHEN type = 'ADD' THEN quantity WHEN type = 'REMOVE' THEN (-1 * quantity) ELSE 0 END AS quantity FROM s-commodity-inventory EMIT CHANGES;

CREATE TABLE tbl-commodity-inventory-total-two AS SELECT item, SUM(quantity) AS totalQuantity FROM s-commodity-inventory-movement GROUP BY item EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream -> Inventory -> Sum Record Simulation

2.Custom rowtime (4.Timestamp extractor)

SELECT item, location, quantity, type, transactionTime, FORMAT_TIMESTAMP( FROM_UNIXTIME(rowtime), 'yyyy-MM-dd''T''HH:mm:ss') AS defaultRowtime FROM s-commodity-inventory EMIT CHANGES;

CREATE STREAM s-commodity-inventory-four WITH ( TIMESTAMP = 'transactionTime', TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss' ) AS SELECT item, location, quantity, transactionTime, type FROM s-commodity-inventory EMIT CHANGES;

SELECT item, location, quantity, type, transactionTime, FORMAT_TIMESTAMP( FROM_UNIXTIME(rowtime), 'yyyy-MM-dd''T''HH:mm:ss') AS extractedTime FROM s-commodity-inventory-four EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream -> Inventory -> Inventory - Add

DESCRIBE s-commodity-inventory-four EXTENDED; -> note that the "Timestamp field" is associated with the "transactionTime" field DESCRIBE s-commodity-inventory EXTENDED; -> note that the "Timestamp field" is "not set - using "

3.Tumbling time window (see 5.Tumbling Time Window)

Create stream with custom timestamp and quantity movement CREATE STREAM s-commodity-inventory-five-movement WITH ( TIMESTAMP = 'transactionTime', TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss' ) AS SELECT item, CASE WHEN type = 'ADD' THEN quantity WHEN type = 'REMOVE' THEN (-1 * quantity) ELSE 0 END AS quantity, transactionTime FROM s-commodity-inventory EMIT CHANGES;

Tumbling window CREATE TABLE tbl-commodity-inventory-total-five AS SELECT FORMAT_TIMESTAMP( FROM_UNIXTIME(windowstart), 'yyyy-MM-dd''T''HH:mm:ss') AS windowStartTime, FORMAT_TIMESTAMP( FROM_UNIXTIME(windowend), 'yyyy-MM-dd''T''HH:mm:ss') AS windowEndTime, item, SUM(quantity) totalQuantity FROM s-commodity-inventory-five-movement WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY item EMIT CHANGES;

Notice the built-in windowstart and windowend variables

Run Postman -> Microservices & Kafka Stream -> Inventory -> Window Simulation

4.Hopping time window (see 6.Hopping Time Window)

Create stream with custom timestamp and quantity movement CREATE STREAM s-commodity-inventory-six-movement WITH ( TIMESTAMP = 'transactionTime', TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss' ) AS SELECT item, CASE WHEN type = 'ADD' THEN quantity WHEN type = 'REMOVE' THEN (-1 * quantity) ELSE 0 END AS quantity, transactionTime FROM s-commodity-inventory EMIT CHANGES;

Hopping window CREATE TABLE tbl-commodity-inventory-total-six AS SELECT FORMAT_TIMESTAMP( FROM_UNIXTIME(windowstart), 'yyyy-MM-dd''T''HH:mm:ss') AS windowStartTime, FORMAT_TIMESTAMP( FROM_UNIXTIME(windowend), 'yyyy-MM-dd''T''HH:mm:ss') AS windowEndTime, item, SUM(quantity) totalQuantity FROM s-commodity-inventory-six-movement WINDOW HOPPING (SIZE 1 HOUR, ADVANCE BY 20 MINUTES) GROUP BY item EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream -> Inventory -> Window Simulation

ksqlDB - Stream/Stream join (see Kafka stream/stream joining)

1.Inner join

Create stream online order CREATE STREAM s-commodity-online-order ( orderDateTime VARCHAR, onlineOrderNumber VARCHAR KEY, totalAmount INT, username VARCHAR ) WITH ( TIMESTAMP = 'orderDateTime', TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss', KAFKA_TOPIC = 't-commodity-online-order', VALUE_FORMAT = 'JSON' );

Create stream online payment CREATE STREAM s-commodity-online-payment ( paymentDateTime VARCHAR, onlineOrderNumber VARCHAR KEY, paymentMethod VARCHAR, paymentNumber VARCHAR ) WITH ( TIMESTAMP = 'paymentDateTime', TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss', KAFKA_TOPIC = 't-commodity-online-payment', VALUE_FORMAT = 'JSON' );

Inner join with no grace period CREATE STREAM s-commodity-join-order-payment-one AS SELECT s-commodity-online-order.onlineOrderNumber AS onlineOrderNumber, PARSE_TIMESTAMP(s-commodity-online-order.orderDateTime, 'yyyy-MM-dd''T''HH:mm:ss') AS orderDateTime, s-commodity-online-order.totalAmount AS totalAmount, s-commodity-online-order.username AS username, PARSE_TIMESTAMP(s-commodity-online-payment.paymentDateTime, 'yyyy-MM-dd''T''HH:mm:ss') AS paymentDateTime, s-commodity-online-payment.paymentMethod AS paymentMethod, s-commodity-online-payment.paymentNumber AS paymentNumber FROM s-commodity-online-order INNER JOIN s-commodity-online-payment WITHIN 1 HOUR GRACE PERIOD 0 MILLISECOND ON s-commodity-online-order.onlineOrderNumber = s-commodity-online-payment.onlineOrderNumber EMIT CHANGES;

2.Left join

CREATE STREAM s-commodity-join-order-payment-two AS SELECT s-commodity-online-order.onlineOrderNumber AS onlineOrderNumber, PARSE_TIMESTAMP(s-commodity-online-order.orderDateTime, 'yyyy-MM-dd''T''HH:mm:ss') AS orderDateTime, s-commodity-online-order.totalAmount AS totalAmount, s-commodity-online-order.username AS username, PARSE_TIMESTAMP(s-commodity-online-payment.paymentDateTime, 'yyyy-MM-dd''T''HH:mm:ss') AS paymentDateTime, s-commodity-online-payment.paymentMethod AS paymentMethod, s-commodity-online-payment.paymentNumber AS paymentNumber FROM s-commodity-online-order LEFT JOIN s-commodity-online-payment WITHIN 1 HOUR ON s-commodity-online-order.onlineOrderNumber = s-commodity-online-payment.onlineOrderNumber EMIT CHANGES;

3.Outer join Full outer join CREATE STREAM s-commodity-join-order-payment-three AS SELECT ROWKEY as syntheticKey, s-commodity-online-order.onlineOrderNumber AS onlineOrderNumber, PARSE_TIMESTAMP(s-commodity-online-order.orderDateTime, 'yyyy-MM-dd''T''HH:mm:ss') AS orderDateTime, s-commodity-online-order.totalAmount AS totalAmount, s-commodity-online-order.username AS username, PARSE_TIMESTAMP(s-commodity-online-payment.paymentDateTime, 'yyyy-MM-dd''T''HH:mm:ss') AS paymentDateTime, s-commodity-online-payment.paymentMethod AS paymentMethod, s-commodity-online-payment.paymentNumber AS paymentNumber FROM s-commodity-online-order FULL JOIN s-commodity-online-payment WITHIN 1 HOUR ON s-commodity-online-order.onlineOrderNumber = s-commodity-online-payment.onlineOrderNumber EMIT CHANGES;

Note: there is one auto-generated column (syntheticKey), which has the value of first non null key

ksqlDB - Table/Table join (Kafka table/table joining)

1.Inner join Create stream from underlying topic (color) CREATE STREAM s-commodity-web-vote-color ( username VARCHAR, color VARCHAR, voteDateTime VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-web-vote-color', VALUE_FORMAT = 'JSON' );

Create stream from underlying topic (layout) CREATE STREAM s-commodity-web-vote-layout ( username VARCHAR, layout VARCHAR, voteDateTime VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-web-vote-layout', VALUE_FORMAT = 'JSON' );

Create table to know latest user vote (color) CREATE TABLE tbl-commodity-web-vote-username-color AS SELECT username, LATEST_BY_OFFSET(color) AS color FROM s-commodity-web-vote-color GROUP BY username;

Create table to know latest user vote (layout) CREATE TABLE tbl-commodity-web-vote-username-layout AS SELECT username, LATEST_BY_OFFSET(layout) AS layout FROM s-commodity-web-vote-layout GROUP BY username;

Create table for item and vote count, based on latest user vote (color only) CREATE TABLE t-commodity-web-vote-one-result-color AS SELECT color, COUNT(tbl-commodity-web-vote-username-color.username) AS votesCount FROM tbl-commodity-web-vote-username-color INNER JOIN tbl-commodity-web-vote-username-layout ON tbl-commodity-web-vote-username-color.username = tbl-commodity-web-vote-username-layout.username GROUP BY color EMIT CHANGES;

Create table for item and vote count, based on latest user vote (layout only) CREATE TABLE t-commodity-web-vote-one-result-layout AS SELECT layout, COUNT(tbl-commodity-web-vote-username-layout.username) AS votesCount FROM tbl-commodity-web-vote-username-color INNER JOIN tbl-commodity-web-vote-username-layout ON tbl-commodity-web-vote-username-color.username = tbl-commodity-web-vote-username-layout.username GROUP BY layout EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream -> Web Design Vote -> Inner Join Simulation

2.Left join

Crate table for item and vote count, based on latest user vote (color only) CREATE TABLE t-commodity-web-vote-two-result-color AS SELECT color, COUNT(tbl-commodity-web-vote-username-color.username) AS votesCount FROM tbl-commodity-web-vote-username-color LEFT JOIN tbl-commodity-web-vote-username-layout ON tbl-commodity-web-vote-username-color.username = tbl-commodity-web-vote-username-layout.username GROUP BY color EMIT CHANGES;

Create table for item and vote count, based on latest user vote (layout only) CREATE TABLE t-commodity-web-vote-two-result-layout AS SELECT layout, COUNT(tbl-commodity-web-vote-username-layout.username) AS votesCount FROM tbl-commodity-web-vote-username-color LEFT JOIN tbl-commodity-web-vote-username-layout ON tbl-commodity-web-vote-username-color.username = tbl-commodity-web-vote-username-layout.username GROUP BY layout EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream -> Web Design Vote -> Left Join Simulation

3.Outer join

Crate table for item and vote count, based on latest user vote (color only) CREATE TABLE t-commodity-web-vote-three-result-color AS SELECT color, COUNT(tbl-commodity-web-vote-username-color.username) AS votesCount FROM tbl-commodity-web-vote-username-color FULL JOIN tbl-commodity-web-vote-username-layout ON tbl-commodity-web-vote-username-color.username = tbl-commodity-web-vote-username-layout.username GROUP BY color EMIT CHANGES;

Create table for item and vote count, based on latest user vote (layout only) CREATE TABLE t-commodity-web-vote-three-result-layout AS SELECT layout, COUNT(tbl-commodity-web-vote-username-layout.username) AS votesCount FROM tbl-commodity-web-vote-username-color FULL JOIN tbl-commodity-web-vote-username-layout ON tbl-commodity-web-vote-username-color.username = tbl-commodity-web-vote-username-layout.username GROUP BY layout EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream -> Web Design Vote -> Outer Join Simulation

ksqlDB - Stream/Table join (Kafka stream/table joining)

1.Inner join

Create stream from underlying topic (purchase) CREATE STREAM s-commodity-premium-purchase ( username VARCHAR, purchaseNumber VARCHAR, item VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-premium-purchase', VALUE_FORMAT = 'JSON' );

Create stream from underlying topic (user) CREATE STREAM s-commodity-premium-user ( username VARCHAR, level VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-premium-user', VALUE_FORMAT = 'JSON' );

Create table for latest user level CREATE TABLE tbl-commodity-premium-user AS SELECT username, LATEST_BY_OFFSET(level) AS level FROM s-commodity-premium-user GROUP BY username EMIT CHANGES;

Join stream / table, filter only 'gold' and 'diamond' users CREATE STREAM s-commodity-premium-offer-one AS SELECT s-commodity-premium-purchase.username AS username, level, purchaseNumber FROM s-commodity-premium-purchase INNER JOIN tbl-commodity-premium-user ON s-commodity-premium-purchase.username = tbl-commodity-premium-user.username WHERE LCASE(level) IN ('gold', 'diamond') EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream -> Premium Purchase & User -> Premium Offer - Inner Join Stream / Table

2.Left join

Join stream / table, filter only 'gold' and 'diamond' users CREATE STREAM s-commodity-premium-offer-two AS SELECT s-commodity-premium-purchase.username AS username, level, purchaseNumber FROM s-commodity-premium-purchase LEFT JOIN tbl-commodity-premium-user ON s-commodity-premium-purchase.username = tbl-commodity-premium-user.username WHERE level IS NULL OR LCASE(level) IN ('gold', 'diamond') EMIT CHANGES;

Run Postman -> Microservices & Kafka Stream -> Premium Purchase & User -> Premium Offer - Lef Join Stream / Table


When joining, the requirement is that both sides of join must be co-partitioned, which means left side and right side must have same partitions number

Create stream from underlying topic (user) CREATE STREAM s-commodity-subscription-user ( username VARCHAR KEY, duration VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-subscription-user', VALUE_FORMAT = 'JSON' );

Create stream from underlying topic (purchase) CREATE STREAM s-commodity-subscription-purchase ( username VARCHAR KEY, subscriptionNumber VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-subscription-purchase', VALUE_FORMAT = 'JSON' );

Create table for latest user subscription CREATE TABLE tbl-commodity-subscription-user AS SELECT username, LATEST_BY_OFFSET(duration) AS duration FROM s-commodity-subscription-user GROUP BY username EMIT CHANGES;

See partition number (5 on stream) DESCRIBE s-commodity-subscription-purchase EXTENDED;

See partition number (2 on table) DESCRIBE tbl-commodity-subscription-user EXTENDED;

join stream / table (different partition number, will fail) CREATE STREAM s-commodity-subscription-offer-one AS SELECT s-commodity-subscription-purchase.username AS username, subscriptionNumber,duration FROM s-commodity-subscription-purchase INNER JOIN tbl-commodity-subscription-user ON s-commodity-subscription-purchase.username = tbl-commodity-subscription-user.username EMIT CHANGES;

Re-partition table for latest user duration (5 partitions) CREATE TABLE tbl-commodity-subscription-user-repartition WITH ( PARTITIONS = 5 ) AS SELECT username, LATEST_BY_OFFSET(duration) AS duration FROM s-commodity-subscription-user GROUP BY username EMIT CHANGES;

See partition number (5 on table) DESCRIBE tbl-commodity-subscription-user-repartition EXTENDED;

Join stream / re-partitioned table (same partition number) CREATE STREAM s-commodity-subscription-offer-two AS SELECT s-commodity-subscription-purchase.username AS username, subscriptionNumber,duration FROM s-commodity-subscription-purchase INNER JOIN tbl-commodity-subscription-user-repartition ON s-commodity-subscription-purchase.username = tbl-commodity-subscription-user-repartition.username EMIT CHANGES;

ksqlDB - exactly once semantic - see


1.UDF (User Defined Function)

See LoanUdf

copy ./target/kafka-ksqldb-udf-1.0.0-SNAPSHOT.jar to ./data/kafka-ksqldb-data/udfs

Run: docker restart kafka-ksqldb

Inside ksqldb terminal, run:

Show functions SHOW FUNCTIONS;


Create new stream and new topic CREATE STREAM s-commodity-loan-request ( username VARCHAR, principalLoanAmount DOUBLE, annualInterestRate DOUBLE, loanPeriodMonth INT ) WITH ( KAFKA_TOPIC = 't-commodity-loan-request', PARTITIONS = 2, VALUE_FORMAT = 'JSON' );

insert data INSERT INTO s-commodity-loan-request ( username, principalLoanAmount, annualInterestRate, loanPeriodMonth ) VALUES ( 'danny', 1000, 12, 12 );

INSERT INTO s-commodity-loan-request ( username, principalLoanAmount, annualInterestRate, loanPeriodMonth ) VALUES ( 'melvin', 1500, 10.5, 24 );

INSERT INTO s-commodity-loan-request ( username, principalLoanAmount, annualInterestRate, loanPeriodMonth ) VALUES ( 'thomas', 3500, 11.2, 36 );

Use the UDF SELECT username, principalLoanAmount, annualInterestRate, loanPeriodMonth, LOAN_INSTALLMENT(principalLoanAmount, annualInterestRate, loanPeriodMonth) AS monthlyLoanInstallment FROM s-commodity-loan-request EMIT CHANGES;

2.UDTF (User Defined Tabular Function)

An UDTF receives one input, and produces one or more outputs (similar with Kafka Stream flatMap); it can use STRUCT as well

See LoanUdtf -> accepts a loanSubmission STRUCT as an input, and outputs a List of monthlyInstallment STRUCT

copy ./target/kafka-ksqldb-udf-1.0.0-SNAPSHOT.jar to ./data/kafka-ksqldb-data/udfs

Run: docker restart kafka-ksqldb

Create stream with struct CREATE STREAM s-commodity-loan-submission ( loanSubmission STRUCT< principalLoanAmount DOUBLE, annualInterestRate DOUBLE,
loanPeriodMonth INT,
loanApprovedDate VARCHAR

) WITH ( KAFKA_TOPIC = 't-commodity-loan-submission', PARTITIONS = 2, VALUE_FORMAT = 'JSON' );

Insert data INSERT INTO s-commodity-loan-submission ( loanSubmission ) VALUES ( STRUCT( principalLoanAmount := 6000, annualInterestRate := 11.5, loanPeriodMonth := 24, loanApprovedDate := '2022-11-21' ) );

Run query SELECT LOAN_INSTALLMENT_SCHEDULE(loanSubmission) FROM s-commodity-loan-submission;

3.UDAF (User Defined Aggregate Function)

An UDAF is an aggregation function that consumes one row at a time, maintaining a stateful representation of historical data.

See LoanUdaf

copy ./target/kafka-ksqldb-udf-1.0.0-SNAPSHOT.jar to ./data/kafka-ksqldb-data/udfs

Run: docker restart kafka-ksqldb

Create stream for payment CREATE STREAM s-commodity-loan-payment ( loanNumber VARCHAR, installmentDueDate VARCHAR, installmentPaidDate VARCHAR ) WITH ( KAFKA_TOPIC = 't-commodity-loan-payment', PARTITIONS = 2, VALUE_FORMAT = 'JSON' );

Create stream with payment latency. Positive latency means late payment (bad). CREATE STREAM s-commodity-loan-payment-latency AS SELECT loanNumber, installmentDueDate, installmentPaidDate, UNIX_DATE(PARSE_DATE(installmentPaidDate, 'yyyy-MM-dd')) - UNIX_DATE(PARSE_DATE(installmentDueDate, 'yyyy-MM-dd')) AS paymentLatency FROM s-commodity-loan-payment;

Insert data INSERT INTO s-commodity-loan-payment ( loanNumber, installmentDueDate, installmentPaidDate ) VALUES ( 'LOAN-111', '2023-04-17', '2023-04-15' );

INSERT INTO s-commodity-loan-payment ( loanNumber, installmentDueDate, installmentPaidDate ) VALUES ( 'LOAN-111', '2023-05-17', '2023-05-05' );

INSERT INTO s-commodity-loan-payment ( loanNumber, installmentDueDate, installmentPaidDate ) VALUES ( 'LOAN-111', '2023-06-17', '2023-06-09' );

INSERT INTO s-commodity-loan-payment ( loanNumber, installmentDueDate, installmentPaidDate ) VALUES ( 'LOAN-111', '2023-07-17', '2023-07-17' );

INSERT INTO s-commodity-loan-payment ( loanNumber, installmentDueDate, installmentPaidDate ) VALUES ( 'LOAN-111', '2023-08-17', '2023-08-15' );

-- insert dummy data 2 INSERT INTO s-commodity-loan-payment ( loanNumber, installmentDueDate, installmentPaidDate ) VALUES ( 'LOAN-222', '2023-04-14', '2023-04-15' );

INSERT INTO s-commodity-loan-payment ( loanNumber, installmentDueDate, installmentPaidDate ) VALUES ( 'LOAN-222', '2023-05-14', '2023-05-05' );

INSERT INTO s-commodity-loan-payment ( loanNumber, installmentDueDate, installmentPaidDate ) VALUES ( 'LOAN-222', '2023-06-14', '2023-06-19' );

INSERT INTO s-commodity-loan-payment ( loanNumber, installmentDueDate, installmentPaidDate ) VALUES ( 'LOAN-222', '2023-07-14', '2023-07-22' );

INSERT INTO s-commodity-loan-payment ( loanNumber, installmentDueDate, installmentPaidDate ) VALUES ( 'LOAN-222', '2023-08-14', '2023-08-15' );

SELECT loanNumber, LOAN_RATING(paymentLatency) AS loanRating FROM s-commodity-loan-payment-latency GROUP BY loanNumber EMIT CHANGES;

ksqlDB - Schema Registry

See docker-compose-full.yml -> kafka-ksqldb -> KSQL_KSQL_SCHEMA_REGISTRY_URL

1.Create stream from topic which has associated Avro schema

Run Postman -> Kafka ksqlDB -> Schema Registry -> Avro01 -> Create topic sc-avro01 Run Postman -> Kafka ksqlDB -> Schema Registry -> Avro01 -> Create subject sc-avro01

CREATE STREAM s-avro01 WITH ( KAFKA_TOPIC = 'sc-avro01', VALUE_FORMAT = 'AVRO' ); Note that for s-avro01, there are no columns defined and the value format is AVRO

DESCRIBE s-avro01; Note that the columns are all uppercase

Run kafka-avro-producer spring project and check the stream data: SELECT * FROM s-avro01 EMIT CHANGES;

2.Automatically created schema

CREATE STREAM s-avro-member ( email VARCHAR, username VARCHAR, birthDate VARCHAR, membership VARCHAR ) WITH ( KAFKA_TOPIC = 'sc-avro-member', PARTITIONS = 1, VALUE_FORMAT = 'AVRO' );

When creating the stream using Avro, it will also create Registry Schema subject automatically Run to see the automatically created schema: Postman -> Kafka Schema Registry -> Schema -> List schemas

Insert data INSERT INTO s-avro-member ( email, username, birthDate, membership ) VALUES ( '', 'god_of_thunder', '1900-05-19', 'black' );

INSERT INTO s-avro-member ( email, username, birthDate, membership ) VALUES ( '', 'iamloki', '1914-11-05', 'black' );

INSERT INTO s-avro-member ( email, username, birthDate, membership ) VALUES ( '', 'kang.the.conqueror', '1912-10-05', 'white' );

INSERT INTO s-avro-member ( email, username, birthDate, membership ) VALUES ( '', 'therealgodofthunder', '1852-01-05', 'white' );

INSERT INTO s-avro-member ( email, username, birthDate, membership ) VALUES ( '', 'prettybutdeadly', '1922-08-25', 'blue' );

When creating a stream from stream, the schema is also automatically generated when using VALUE_FORMAT = 'AVRO' CREATE STREAM s-avro-member-black WITH ( VALUE_FORMAT = 'AVRO' ) AS SELECT * FROM s-avro-member WHERE LCASE(membership) = 'black';

DESCRIBE s-avro-member-black

When creating a table from stream, the schema is also automatically generated when using VALUE_FORMAT = 'AVRO' CREATE TABLE tbl-avro-member-count WITH ( VALUE_FORMAT = 'AVRO' ) AS SELECT membership, COUNT(email) AS countMember FROM s-avro-member GROUP BY membership EMIT CHANGES;

DESCRIBE tbl-avro-member-count;

3.Avro-JSON conversion

Running console-consumer on s-avro-member will display base64 encoded characters Create a stream with VALUE_FORMAT = 'JSON' from existing Avro stream will solve the problem CREATE STREAM s-avro-member-json WITH ( VALUE_FORMAT = 'JSON'
) AS SELECT * FROM s-avro-member EMIT CHANGES;

INSERT INTO s-avro-member ( email, username, birthDate, membership ) VALUES ( '', 'supergirl', '1993-11-05', 'black' );

Run again the console-consumer, but this time on s-avro-member-json stream, will output json values

4.JSON-Avro conversion

CREATE STREAM s-power-json ( power VARCHAR, level INT ) WITH ( VALUE_FORMAT = 'JSON', KAFKA_TOPIC = 't-power-json', PARTITIONS = 1 );

Insert data INSERT INTO s-power-json ( power, level ) VALUES ( 'healing', 6 );

INSERT INTO s-power-json ( power, level ) VALUES ( 'energy projection', 8 );

INSERT INTO s-power-json ( power, level ) VALUES ( 'mind control', 7 );

Run console-consumer on topic t-power-json and see that the data is in json format

Create avro stream CREATE STREAM s-power-avro WITH ( VALUE_FORMAT = 'AVRO' ) AS SELECT * FROM s-power-json EMIT CHANGES;

ksqlDB and Kafka Connect

See docker-compose-full.yml -> kafka-ksqldb -> KSQL_KSQL_CONNECT_URL

Run in ksqlDB console: SHOW CONNECTORS;

DESCRIBE CONNECTOR source-spooldir-csv;

create source connector CREATE SOURCE CONNECTOR source-spooldir-dummy-csv WITH ( 'connector.class'='com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector', 'topic'='t-spooldir-csv-demo', 'input.file.pattern'='dummy-.*.csv', 'input.path'='/data/inputs', 'error.path'='/data/errors', 'finished.path'='/data/processed', 'schema.generation.enabled'='true', ''='true', ''='10000' );

create sink connector CREATE SINK CONNECTOR sink-postgresql-dummy-csv WITH ( 'connector.class'='io.confluent.connect.jdbc.JdbcSinkConnector', 'topics'='t-spooldir-csv-demo', 'confluent.topic.bootstrap.servers'='', 'connection.url'='jdbc:postgresql://', 'connection.user'='postgres', 'connection.password'='postgres', ''='kafka_employees', 'auto.create'=true, 'auto.evolve'=true, 'pk.mode'='record_value', 'pk.fields'='employee_id', 'insert.mode'='upsert' );

Drop connectors DROP CONNECTOR IF EXISTS source-spooldir-dummy-csv;

DROP CONNECTOR IF EXISTS sink-postgresql-dummy-csv;

ksqlDB Java Client - see BasicJavaClient

Credits to Udemy/Java Spring & Apache Kafka Bootcamp - Basic to Complete


No description, website, or topics provided.






No releases published


No packages published
