Spring Integration Kafka Extension 1.0.GA is available

Releases | Artem Bilan | February 09, 2015 | ...

Dear Spring community,

We are pleased to announce that the Spring Integration Kafka 1.0 GA extension is now available, providing familiar Spring Integration endpoints for Apache Kafka. As usual, use the Release Repository with Maven or Gradle

compile "org.springframework.integration:spring-integration-kafka:1.0.0.RELEASE"

or download a distribution archive, to give it a spin.

First of all thanks to all who contributed to the project - special thanks to the project's founder Soby Chacko, who implemented the infrastructure, as well as the High Level Consumer-based message source and the producers, and also to Marius Bogoevici, who learned the intricacies of the Simple Consumer API for the message-driven consumer.

Overview

It isn't a surprise that this project is fully based on Apache Kafka (the 0.8.1.1 version) and on the Spring Integration foundation (4.0.5.RELEASE version). We provide several abstractions like Configuration, ConnectionFactory, KafkaMessageListenerContainer, KafkaConsumerContext/KafkaProducerContext, KafkaMessage etc. to follow with Spring's principles of decoupling and ease of use. Basing on those abstraction we provide high-level API like KafkaMessageDrivenChannelAdapter, KafkaHighLevelConsumerMessageSource and KafkaProducerMessageHandler which are adapters in terms of Spring Integration. The XML configuration support is also provided.

KafkaHighLevelConsumerMessageSource

The Kafka High Level Consumer is presented with <int-kafka:inbound-channel-adapter> and <int-kafka:consumer-context> to poll messages from Kafka topics using KafkaStream. Its main advantage is simplicity of use and ability to balance partitions between consumers, if multiple instances of the message source are running in parallel.

The typical configuration may looks like:

<int-kafka:inbound-channel-adapter kafka-consumer-context-ref="consumerContext"
                                    channel="inputFromKafka">
       <int:poller fixed-delay="10"/>
</int-kafka:inbound-channel-adapter>

<int-kafka:consumer-context id="consumerContext"
                            consumer-timeout="4000"
                            zookeeper-connect="zookeeperConnect">
     <int-kafka:consumer-configurations>
           <int-kafka:consumer-configuration group-id="default"
                                  value-decoder="valueDecoder"
                                  key-decoder="valueDecoder"
                                  max-messages="5000">
                  <int-kafka:topic id="test1" streams="4"/>
                  <int-kafka:topic id="test2" streams="4"/>
           </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

As you see in addition the <int-kafka:consumer-context> requires a reference to the zookeeperConnect. It is a simple bean, which represent a connection to the Zookeeper ensemble:

<int-kafka:zookeeper-connect id="zookeeperConnect" 
                          zk-connect="host1,host2,host3:2182"
                          zk-connection-timeout="6000"
                          zk-session-timeout="6000"
                          zk-sync-time="2000"/>

The KafkaHighLevelConsumerMessageSource produces Message with Map<String, Map<Integer, List<Object>>> payload, where it is like "Kafka messages by partitions for each topic".

KafkaMessageDrivenChannelAdapter

The Kafka Simple Consumer is presented with <int-kafka:message-driven-adapter> and based on the well-know ListenerContainer abstraction - KafkaMessageListenerContainer (similar to Spring AMQP SimpleMessageListenerContainer or Spring JMS DefaultMessageListenerContainer):

@Bean
public Configuration zkConfiguration() {
   return new ZookeeperConfiguration(new ZookeeperConnect());
}

@Bean
public ConnectionFactory kafkaConnectionFactory() {
   return new DefaultConnectionFactory(zkConfiguration());
}

@Bean
public MessageProducer kafkaMessageDrivenChannelAdapter() {
   KafkaMessageDrivenChannelAdapter adapter = 
            new KafkaMessageDrivenChannelAdapter(
   		new KafkaMessageListenerContainer(kafkaConnectionFactory(),    
                                       "topic1", "topic2"));
   adapter.setOutputChannel(inputChannel());
   return adapter;
}

The main advantage of this component is better control of the partitions one component listens to (which are configurable), as well as starting offsets (wherever replaying a topic is, for example, necessary). Also, richer offset management and error handling strategies are available, too.

The result of the listening task is a single Message with payload based on the Kafka message and additional headers with keys from KafkaHeaders. Ordering is preserved inside a partition.

Both adapters can be configured with kafka.serializer.Decoder for the Kafka message as well as for the Kafka message key. The Spring Integration Kafka provides out-of-the-box the Avro Encoder/Decoder implementations.

In addition the Spring Integration Kafka introduces the OffsetManager abstraction to get deal with Kafka topic offset, which isn't available with High Level Consumer. The MetadataStoreOffsetManager and KafkaTopicOffsetManager are presented. The OffsetManager must be injected to the KafkaMessageListenerContainer. By default the MetadataStoreOffsetManager is used, backed by SimpleMetadataStore from Spring Integration Core.

KafkaProducerMessageHandler

The Kafka Producer represents with <int-kafka:outbound-channel-adapter> and <int-kafka:producer-context> pair. The last one utilizes the configurations for the target Kafka Producer, which is selected by the KafkaHeaders.TOPIC from the MessageHeaders or by the topic-expression on the <int-kafka:outbound-channel-adapter> and matched to the configured topic option on the <int-kafka:producer-configuration> sub-element:

<int-kafka:producer-context id="kafkaProducerContext">
	<int-kafka:producer-configurations>
		<int-kafka:producer-configuration broker-list="localhost:9092"
					key-class-type="java.lang.String"
					value-class-type="java.lang.String"
					topic="test1"
					value-encoder="kafkaEncoder"
					key-encoder="kafkaEncoder"
					compression-codec="default"/>
		<int-kafka:producer-configuration broker-list="localhost:9092"
					topic="test2"
					compression-codec="default"/>
	</int-kafka:producer-configurations>
</int-kafka:producer-context>

As you see there are enough options for tuning the target Producer, as well as each Producer can be backed by the specific broker-list. If only the single <int-kafka:producer-configuration> is present you can send messages to any topic, e.g. based on the Message context headers.

The Spring XD utilizes these adapters as Kafka Source and Sink. In addition it provides the KafkaMessageBus. These features will be available in the Spring XD 1.1 RELEASE soon.

Also, while we were finishing this release, Apache Kafka 0.8.2 has received its final release as well. We are happy to congratulate the team, and we will incorporate the new features available in the near future - this is just the beginning of providing premier support for Kafka in Spring applications!

See the project home page for more information.

As always, we look forward to your comments and feedback (StackOverflow (spring-integration tag), Spring JIRA, GitHub) and we very much welcome contributions!

Get the Spring newsletter

Thank you for your interest. Someone will get back to you shortly.

Get ahead

VMware offers training and certification to turbo-charge your progress.

Learn more

Get support

Tanzu Spring Runtime offers support and binaries for OpenJDK™, Spring, and Apache Tomcat® in one simple subscription.

Learn more

Upcoming events

Check out all the upcoming events in the Spring community.

View all