Skip to content

MichaelHussey/cp-spring_cloud_streams-simple_consumer

Repository files navigation

cp-spring_cloud_streams-simple_consumer

Very simple example to demonstrate Spring Cloud Streams (SCS) exchanging data with Apache Kafka topics

The easiest way to get started is to use the Spring Initializr site. Set a Group and Artifact name and add Kafka and Cloud Stream as dependencies.

Spring Initalizr ScreenShot

This generates a zip file (in my case scs_demo.zip). Unzip this and then use maven to turn it into an Eclipse project

mvn eclipse:eclipse

Enable the Kafka Binder

SCS abstracts away the mechanics of connecting to the underlying messaging system by using the concept of a Binder. If only a single binder is found on the class path then it will be used, but it's also possible to explicitly set which one to use by adding an @Import statement to the Main Application

Set up a Listener

Now lets add the simplest possible topic listener to the code.

Create a Channel definition interface which annotates a single method as an @Input. See ListenerDefinition.java. This interface defines the name input_topic which is used in binding to the properties provided to the application. So in this example the properties spring.cloud.stream.bindings.input_topic.* are used by the Kafka binder to configure the actual Kafka consumer. Note that we can define multiple separate @Input methods if we want to have multiple bindings (for example to handle different message types).

And of course we need some code to be executed which is provided in ListenerImpl.java. The @StreamListener(ListenerDefinition.INPUT) annotation informs the Kafka binder that the processRawMessage() method should be invoked when a message is received on the input_topic. The SCS framework is smart enough to unpack the message payload depending on the method signature as can be seen in processDeserialised().

As you can see the application code doesn't need to know any details of the actual Kafka messaging system and all configuration is isolated to the application.properties file. See the SCS documentation for details.

Start ZooKeeper and Kafka

If you have docker installed then you can use the provided docker-compose.yml file.

docker-compose up -d

Run the SCS application and send some data

Then run the demo application

mvn spring-boot:run

Then send it some data using either KafkaCat

echo "{test:\"hi\"}" | kafkacat -b localhost:9092 -t my_in_topic

Or the kafka-console-producer which is found packaged inside the kafka docker image.

docker-compose exec kafka kafka-console-producer --broker-list kafka:29092 --topic my_in_topic

You should see the message sent output in the console of your SCS application like this (of course in real life you'd always output to a logger, not standard out)

Received message in processDeserialised(): {test:"hi"}

Received message in processRawMessage(): GenericMessage [payload=byte[11], headers={kafka_offset=3, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@40cc03da, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=my_in_topic, kafka_receivedTimestamp=1539124572413, contentType=application/json}]
Payload: {test:"hi"}

Shut down and clean up

Kill the SCS application by using Ctrl-C in the console where it was started.

Stop the Kafka and ZooKeeper brokers as follows

docker-compose down

You can reclaim any space used in cycling the docker images multiple times via

docker volume prune -f

About

Simplest possible Spring Cloud Streams Kafka consumer

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages