Engineering
Releases
News and Events

Spring Integration Kafka Support 2.0.0.M1 is now available

I am pleased to announce that the spring-integration-kafka (Spring Integration Kafka Support) First Milestone for version 2.0 is now available.

The Spring Integration Kafka extension project provides inbound and outbound channel adapters for Apache Kafka.

Starting with this version 2.0 the project is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x.

The artifact org.springframework.integration:spring-integration-kafka:2.0.0.M1
is available in the Milestone repository.

Key Features

The Kafka Consumer Channel Adapter

Having the MessageListenerContainer foundation from the spring-kafka project,
the KafkaMessageDrivenChannelAdapter definition is very simple now:

@Bean
public MessageProducer kafkaProducer(
                   AbstractMessageListenerContainer<Integer, String> container) {
    KafkaMessageDrivenChannelAdapter<Integer, String> adapter = 
                              new KafkaMessageDrivenChannelAdapter<>(container);
    adapter.setMessageConverter(new StringJsonMessageConverter());
    adapter.setOutputChannel(fromKafkaChannel());
    adapter.setErrorChannel(myErrorChannel());
    return adapter;
}

With the XML configuration we should declare just single component as well:

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        message-converter="messageConverter"
        error-channel="errorChannel" />

The Kafka Producer Channel Adapter

With the KafkaTemplate foundation from the the spring-kafka project, the KafkaProducerMessageHandler
is simple too:

@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
                            KafkaTemplate<Integer, String> template) {
    KafkaProducerMessageHandler<Integer, String> handler = 
                         new KafkaProducerMessageHandler<>(template);
    handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
    handler.setPartitionIdExpression(
                            PARSER.parseExpression("headers.myPartition"));
    return handler;
}

The XML configuration has been simplified, too:

<int-kafka:outbound-channel-adapter 
                kafka-template="template" 
                channel="inputToKafka"
                topic="foo"/>

Java DSL Changes

Starting with version 1.2 Spring Integration Java DSL introduces Kafka09 Factory to cover the functionality for aforementioned channel adapters from this new 2.0 version.
For example the producing part may look like:

.handle(Kafka09.outboundChannelAdapter(producerFactory())
             .defaultTopic("foo")
             .partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))

And finally, don’t miss Spring for Apache Kafka announcement, too!

Next Steps

Together with the next Spring for Apache Kafka we may consider to implement some adapters for Kafka Streams as well.

Since the code base of the project became pretty straightforward and looks like Apache Kafka API is going to be stable, we intend to absorb this project in the Spring Integration Code 5.0, when the time comes.

Meanwhile we look forward to your feedback and if all goes well plan to release 2.0.0.RELEASE in the next few weeks!

Project Page | Help

comments powered by Disqus