“What is this Kafka I’ve been hearing about?”

In short, Kafka is a horizontally scalable streaming platform. In other words, Kafka is a message broker which can be run on multiple servers as a cluster. Different data streams are called topics. Producers can place messages on a topic whereas consumers can subscribe to topics. Topics can be configured for single- and multiple delivery of messages. Consumers can be grouped in so called consumer-groups, which makes it possible for multiple consumers to act as one when it comes to single-delivery.

But don’t take my word for it. There’s a lot more to Kafka than I can get into in this post and the original documentation is much clearer, so check out the documentation at https://kafka.apache.org/.

“How do I use Kafka in my Spring applications?”

Among all the abstractions Spring Boot delivers there is also an abstraction layer for using Kafka, called Spring Cloud Stream. The use of the cloud messaging API makes it very easy to produce messages to Kafka and to consume them.

Producing Strings

The following code (and a running Kafka server) is all that is needed to produce a String to a Kafka topic.

Listing 1. Spring Cloud stream Kafka Producer
@EnableBinding(Source.class)
public class HelloWorldSource {

    @InboundChannelAdapter(channel = Source.OUTPUT)
    public String sayHello() {
           return "Hello, World!";
    }
}

The @EnableBinding annotation tells Spring you want to use Spring Cloud Stream with the Source interface. Spring uses the method annotated with @InboundChannelAdapter to create a String which Spring will place on Kafka through the Source.OUTPUT channel. By default the sayHello method will be called once per second.

To control the Kafka topic the Source.OUTPUT channel points to, you only have to set the spring.cloud.stream.bindings.output.destination property in your application.yaml file to the name of your topic.

Consuming Strings

Spring Cloud Stream Kafka consumer code is just as simple as producing:

Listing 2. Spring Cloud stream Kafka Consumer
@EnableBinding(Sink.class)
public class HelloWorldSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(HelloWorldSink.class);

    @ServiceActivator(inputChannel=Sink.INPUT)
    public void printMessage(final String message) {
           LOGGER.info("{}", message);
    }
}

Here we use the Sink interface and the @ServiceActivator annotation which makes Spring trigger the printMessage method each time a message is received on the Kafka topic set on the spring.cloud.stream.bindings.input.destination property in your application.yaml.

“What about objects other than Strings?”

Most of the time our data is a bit more complex than plain String data and manual (un)marshalling is not very Spring-like. The @InboundChannelAdapter handles automatic marshalling just fine, but the @ServiceActivator does not. Don’t fret, there is a solution! Enter the @StreamListener annotation, which will automatically unmarshall the data based on the content-type provided through the spring.cloud.stream.bindings.input.content-type property.

Listing 3. Consuming POJOs
@EnableBinding(Sink.class)
@Configuration
public class MessageHandler {

  private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class);

  @StreamListener(Sink.INPUT)
  public void handle(final ChatMessage message) {
      final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM).withZone(ZoneId.systemDefault());
      final String time = df.format(Instant.ofEpochMilli(message.getTime()));
      LOGGER.info("[{}]: {}", time, message.getContents());
  }
}

Given a simple ChatMessage class this consumer will work together with the following producer.

Listing 4. Producing POJOs
@EnableBinding(Source.class)
public class TimerSource {

  @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
  public ChatMessage timerMessageSource() {
      return new ChatMessage("hello world", System.currentTimeMillis());
  }
}

“Isn’t processing also a thing with Kafka?!”

There is another annotation, @SendTo , which allows a method to filter Kafka messages or edit them before sending them to another topic. This concept is shown in the following code.

Listing 5. Processing/filtering POJOs
@EnableBinding(Processor.class)
@Configuration
public class MessageFilter {

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public ChatMessage transform(final ChatMessage chatmessage) {
      final String contents = chatmessage.getContents().toUpperCase() + "!";
      return new ChatMessage(contents, chatmessage.getTime());
  }
}

Here we use both the @StreamListener and the @SendTo annotations to read a ChatMessage from Kafka, fiddle with the contents and send it to another topic ready for consumption.

A full example of the code in this post can be found at: https://gitlab.com/Dasyel/kafka_blogpost

shadow-left