As more and more teams and companies adopt Apache Kafka, you can find yourself wanting to share data via replication of one or more topics from one cluster to another. While replication of an entire cluster with all of it’s topics as a means of failover can be achieved with tools such as Mirror Maker and Confluent Replicator, for replication of a single topic there are fewer examples. Even more so when the source topic is serialized with Avro, with the schema stored in Confluent Schema Registry.

Here we present a minimal consumer that replicates a single Avro serialized Kafka topic from one cluster to another, while ensuring (only) the necessary Avro schema is registered in the target cluster Schema Registry.

Approach

We want to guarantee all records are reliably copied from the topic in the source cluster to the topic in the target cluster. As always, failures can happen, so we want to be mindful of how handle offset commits on both the consumer and producer side.

We choose to use Spring for Apache Kafka as it provides us with a convenient API to handle offset commits, as well as an easy means to configure consumers and producers separately for the two clusters involved.

  • On the producer side, we produce records within a transaction, meaning they only become available to read_committed consumers after we successfully commit the transaction.

  • On the consumer side, we disable auto offset commit, and only manually commit our offsets when producing the records has succeeded.

This way we ensure any failures are be handled gracefully, with appropriate retries in th face of failures.

Schema replication is automatically handled for us, provided we configure the consumer deserializers and producer serializers with the respective schema registry URLs as needed.

Implementation details

Our consumer is using the @KafkaListener annotation, with manual offset commits and batch-wise record handling. By default up to 500 record will be consumed at a time, for which we will commit the offsets at once at the end.

To producer our records we use the callback inside KafkaTemplate.executeInTransaction(OperationsCallback<K, V, T>). Within the callback we call KafkaOperations.send(String, K, V) for each record individually. Finally, we verify each record has been produced successfully by calling ListenableFuture<SendResult<K, V>>.get(). Should any record have failed then we throw an exception to prevent the transaction commit, and fail the entire consumer batch.

The full sample application is available on Github, with relevant parts copied below.

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>> kafkaManualAckListenerContainerFactory(
    ConsumerFactory<? super String, ? super GenericRecord> consumerFactory) {
  var factory = new ConcurrentKafkaListenerContainerFactory<String, GenericRecord>();
  factory.setBatchListener(true);
  factory.setConsumerFactory(consumerFactory);
  factory.getContainerProperties().setAckMode(AckMode.MANUAL);
  factory.setMissingTopicsFatal(true);
  return factory;
}

private final KafkaTemplate<String, GenericRecord> kafkaTemplate;

@Value("${replication.target-topic}")
private String targetTopic;

@KafkaListener(
  topics = "${replication.source-topic}",
  containerFactory = "kafkaManualAckListenerContainerFactory")
public void handle(List<GenericRecord> records, Acknowledgment ack) {
  log.info("Received {} records", records.size());
  // Publish records within a transaction, which can be rolled back on failure
  kafkaTemplate.executeInTransaction(operations -> {
    // Publish each individual record
    var futureSendResults = records.stream()
      .map(record -> {
        var sendResult = operations
          .send(targetTopic, record.get("id").toString(), record);
        // Report on failed records
        sendResult.addCallback(
          success -> log.info("Produced {}", success),
          failure -> log.warn("Failed to produce {}", record, failure));
        return sendResult;
      })
      .collect(toList());
    // Verify each record was published successfully
    for (var future : futureSendResults) {
      try {
        future.get();
      } catch (InterruptedException | ExecutionException e) {
        // When any record fails, throw an exception to prevent transaction commit
        log.warn("Failed to produce a record", e);
        throw new RuntimeException("Failed to produce a record", e);
      }
    }
    // No need for a return value
    return null;
  });
  // Commit offsets for entire batch on consumer
  ack.acknowledge();
  log.info("Replicated {} records", records.size());
}

Unit testing

To unit test this application we use a single embedded Kafka cluster with two separate topics, and separate mock schema registries. There’s a few details surrounding a transactional producer with a single broker that require tweaks to the broker properties.

We create an Avro schema on the fly, for which we produce a record onto the input topic. Our application then processes this record, to produce a similar record onto the target topic. Finally, we assert the record produced onto the target topic matches the key and value we expect.

Testing with two clusters

To fully test that our application works when connected to two separate clusters, we use Docker Compose to launch two clusters each with their own Schema Registry. We then use the kafka-avro-console-producer and kafka-avro-console-consumer bundled with the Schema Registry to produce a record and verify it is replicated successfully.

  1. Download clients to produce/consume Avro messages

    1. ./scripts/confluent-avro-clients.sh

  2. Launch two Kafka clusters

    1. ./scripts/docker-compose-up.sh

  3. Produce data onto topic in source cluster

    1. ./scripts/produce-records.sh localhost:9093 localhost:8083 source-topic-a

  4. Verify data present in source cluster

    1. ./scripts/verify-records.sh localhost:9093 localhost:8083 source-topic-a

  5. Replicate topic from source cluster to target cluster

    1. ./mvnw spring-boot:run -Dspring-boot.run.profiles=local

  6. Verify data present in target cluster

    1. ./scripts/verify-records.sh localhost:9094 localhost:8084 target-topic-b

When run step 4, 5 and 6 will each start a consumer and continue listening until stopped. Each will report on the messages it receives so you can follow along in the logs. You can even keep each of the consumers open as you produce more records onto the input topic.

Conclusion

The above serves as an example of how to replicate a single Avro serialized topic from one Kafka cluster to another. This allows for all the flexibility you might need in terms of secure connections and consumer/producer configuration properties. If you have a need to replicate a full cluster instead of a single topic, do have a look at Mirror Maker or Confluent Replicator first!

shadow-left