Spring Cloud Messaging using Kafka

“What is this Kafka I’ve been hearing about?”

In short, Kafka is a horizontally scalable streaming platform. In other words, Kafka is a message broker which can be run on multiple servers as a cluster. Different data streams are called topics. Producers can place messages on a topic whereas consumers can subscribe to topics. Topics can be configured for single- and multiple delivery of messages. Consumers can be grouped in so called consumer-groups, which makes it possible for multiple consumers to act as one when it comes to single-delivery.

But don’t take my word for it. There’s a lot more to Kafka than I can get into in this post and the original documentation is much clearer, so check out the documentation at https://kafka.apache.org/.

“How do I use Kafka in my Spring applications?”

Among all the abstractions Spring Boot delivers there is also an abstraction layer for using Kafka, called Spring Cloud Stream. The use of the cloud messaging API makes it very easy to produce messages to Kafka and to consume them.

Producing Strings

The following code (and a running Kafka server) is all that is needed to produce a String to a Kafka topic.

The @EnableBinding  annotation tells Spring you want to use Spring Cloud Stream with the Source interface. Spring uses the method annotated with @InboundChannelAdapter  to create a String which Spring will place on Kafka through the  Source.OUTPUT  channel. By default the sayHello method will be called once per second.

To control the Kafka topic the  Source.OUTPUT  channel points to, you only have to set the spring.cloud.stream.bindings.output.destination  property in your application.yaml file to the name of your topic.

Consuming Strings

Spring Cloud Stream Kafka consumer code is just as simple as producing:

Here we use the Sink interface and the @ServiceActivator  annotation which makes Spring trigger the printMessage method each time a message is received on the Kafka topic set on the spring.cloud.stream.bindings.input.destination  property in your application.yaml.

“What about objects other than Strings?”

Most of the time our data is a bit more complex than plain String data and manual (un)marshalling is not very Spring-like. The @InboundChannelAdapter  handles automatic marshalling just fine, but the @ServiceActivator  does not. Don’t fret, there is a solution! Enter the @StreamListener  annotation, which will automatically unmarshall the data based on the content-type provided through the spring.cloud.stream.bindings.input.content-type  property.

Given a simple ChatMessage class this consumer will work together with the following producer.

“Isn’t processing also a thing with Kafka?!”

There is another annotation, @SendTo , which allows a method to filter Kafka messages or edit them before sending them to another topic. This concept is shown in the following code.

Here we use both the @StreamListener  and the @SendTo  annotations to read a ChatMessage from Kafka, fiddle with the contents and send it to another topic ready for consumption.

A full example of the code in this post can be found at: https://gitlab.com/Dasyel/kafka_blogpost

Author Dasyel Willems

Java Developer at JCore with a background in Artificial Intelligence who has a special interest in procedural generation. Also loves challenging puzzles and movies.

More posts by Dasyel Willems