Spring Integration Kafka Support 2.0.0.M1 is now available
I am pleased to announce that the spring-integration-kafka
(Spring Integration Kafka Support) First Milestone for version 2.0
is now available.
The Spring Integration Kafka extension project provides inbound
and outbound
channel adapters for Apache Kafka.
Starting with this version 2.0 the project is a complete rewrite based on the new spring-kafka
project which uses the pure java Producer
and Consumer
clients provided by Kafka 0.9.x.x
.
The artifact org.springframework.integration:spring-integration-kafka:2.0.0.M1
is available in the Milestone repository.
Key Features
The Kafka Consumer Channel Adapter
Having the MessageListenerContainer
foundation from the spring-kafka
project,
the KafkaMessageDrivenChannelAdapter
definition is very simple now:
@Bean
public MessageProducer kafkaProducer(
AbstractMessageListenerContainer<Integer, String> container) {
KafkaMessageDrivenChannelAdapter<Integer, String> adapter =
new KafkaMessageDrivenChannelAdapter<>(container);
adapter.setMessageConverter(new StringJsonMessageConverter());
adapter.setOutputChannel(fromKafkaChannel());
adapter.setErrorChannel(myErrorChannel());
return adapter;
}
With the XML configuration we should declare just single component as well:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
message-converter="messageConverter"
error-channel="errorChannel" />
The Kafka Producer Channel Adapter
With the KafkaTemplate
foundation from the the spring-kafka
project, the KafkaProducerMessageHandler
is simple too:
@Bean
@ServiceActivator(inputChannel = "toKafka")
public MessageHandler kafkaProducerHandler(
KafkaTemplate<Integer, String> template) {
KafkaProducerMessageHandler<Integer, String> handler =
new KafkaProducerMessageHandler<>(template);
handler.setTopicExpression(PARSER.parseExpression("headers.myTopic"));
handler.setPartitionIdExpression(
PARSER.parseExpression("headers.myPartition"));
return handler;
}
The XML configuration has been simplified, too:
<int-kafka:outbound-channel-adapter
kafka-template="template"
channel="inputToKafka"
topic="foo"/>
Java DSL Changes
Starting with version 1.2
Spring Integration Java DSL introduces Kafka09
Factory to cover the functionality for aforementioned channel adapters from this new 2.0
version.
For example the producing part may look like:
.handle(Kafka09.outboundChannelAdapter(producerFactory())
.defaultTopic("foo")
.partitionId(m -> m.getHeaders().get("myPartition", Integer.class)))
And finally, don’t miss Spring for Apache Kafka announcement, too!
Next Steps
Together with the next Spring for Apache Kafka we may consider to implement some adapters for Kafka Streams as well.
Since the code base of the project became pretty straightforward and looks like Apache Kafka API is going to be stable, we intend to absorb this project in the Spring Integration Code 5.0, when the time comes.
Meanwhile we look forward to your feedback and if all goes well plan to release 2.0.0.RELEASE
in the next few weeks!