Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreI am pleased to announce the availability of Spring for Apache Kafka 2.1.0.RELEASE.
In addition, maintenance releases 1.3.2.RELEASE and 2.0.2.RELEASE are available, containing important bug fixes. See also below for information about spring-integration-kafka
3.0.0.RELEASE. It is recommended that all users upgrade.
The main purpose of the 2.1 release is to upgrade the kafka-clients
library to 1.0.0, but we have included a few improvements:
Sometimes, when a message can’t be processed, you may wish to stop the container so the condition can be corrected and the message re-delivered. The framework now provides the ContainerStoppingErrorHandler
for record listeners and ContainerStoppingBatchErrorHandler
for batch listeners.
The KafkaAdmin
now supports increasing partitions when a NewTopic
bean is detected with a larger number of partitions than currently exist on the topic.
StringJsonMessageConverter
and JsonSerializer/JsonDeserializer
now pass and consume type information in Headers
. This allows multiple types to be easily sent/received on the same topic:
@SpringBootApplication public class Kafka21Application {
public static void main(String[] args) {
SpringApplication.run(Kafka21Application.class, args)
.close();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
return args -> {
template.send(MessageBuilder.withPayload(42)
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
template.send(MessageBuilder.withPayload("43")
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
Thread.sleep(5_000);
};
}
@Bean
public StringJsonMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Component
@KafkaListener(id = "multi", topics = "blog")
public static class Listener {
@KafkaHandler
public void intListener(Integer in) {
System.out.println("Got an int: " + in);
}
@KafkaHandler
public void stringListener(String in) {
System.out.println("Got a string: " + in);
}
}
}
Got an int: 42 Got a string: 43
The first time you run this app, you may need this property…
spring.kafka.consumer.auto-offset-reset=earliest
…in case the template sends the messages before the containers completely start.
In addition, the JsonSerializer
and JsonDeserializer
can be configured using kafka properties for the producer/consumer.
Important
In accordance with CVE-2017-4995, only classes in java.util
and java.lang
will be deserialized by default; to deserialize (trust) other packages, use the addTrustedPackages
method on the deserializer or in a customized DefaultJackson2TypeMapper
for the message converter. For the JsonDeserializer
, the packages can be provided in the Kafka consumer config in property JsonDeserializer.TRUSTED_PACKAGES
.
See the What’s New for complete information.
Finally, spring-integration-kafka
3.0.0.RELEASE is also available; it is based on Spring for Apache Kafka 2.1, Spring Integration 5.0, and Spring Framework 5.0; it requires Java 8 and has the following new feature:
spring-messaging
headers to/from Kafka Headers
.See the Project Page for a complete matrix of spring-kafka
, spring-integration-kafka
and kafka-clients
version compatibility.
Project Page | Issues | Contributing | Help | Chat