Spring for Apache Kafka 2.1.0.RELEASE (and 1.3.2, 2.0.2) Available
I am pleased to announce the availability of Spring for Apache Kafka 2.1.0.RELEASE.
In addition, maintenance releases 1.3.2.RELEASE and 2.0.2.RELEASE are available, containing important bug fixes. See also below for information about spring-integration-kafka
3.0.0.RELEASE.
It is recommended that all users upgrade.
The main purpose of the 2.1 release is to upgrade the kafka-clients
library to 1.0.0, but we have included a few improvements:
-
Sometimes, when a message can’t be processed, you may wish to stop the container so the condition can be corrected and the message re-delivered. The framework now provides the
ContainerStoppingErrorHandler
for record listeners andContainerStoppingBatchErrorHandler
for batch listeners. -
The
KafkaAdmin
now supports increasing partitions when aNewTopic
bean is detected with a larger number of partitions than currently exist on the topic. -
StringJsonMessageConverter
andJsonSerializer/JsonDeserializer
now pass and consume type information inHeaders
. This allows multiple types to be easily sent/received on the same topic:
@SpringBootApplication
public class Kafka21Application {
public static void main(String[] args) {
SpringApplication.run(Kafka21Application.class, args)
.close();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
return args -> {
template.send(MessageBuilder.withPayload(42)
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
template.send(MessageBuilder.withPayload("43")
.setHeader(KafkaHeaders.TOPIC, "blog")
.build());
Thread.sleep(5_000);
};
}
@Bean
public StringJsonMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Component
@KafkaListener(id = "multi", topics = "blog")
public static class Listener {
@KafkaHandler
public void intListener(Integer in) {
System.out.println("Got an int: " + in);
}
@KafkaHandler
public void stringListener(String in) {
System.out.println("Got a string: " + in);
}
}
}
Got an int: 42
Got a string: 43
The first time you run this app, you may need this property…
spring.kafka.consumer.auto-offset-reset=earliest
…in case the template sends the messages before the containers completely start.
In addition, the JsonSerializer
and JsonDeserializer
can be configured using kafka properties for the producer/consumer.
Important
|
In accordance with CVE-2017-4995, only classes in java.util and java.lang will be deserialized by default; to deserialize (trust) other packages, use the addTrustedPackages method on the deserializer or in a customized DefaultJackson2TypeMapper for the message converter. For the JsonDeserializer , the packages can be provided in the Kafka consumer config in property JsonDeserializer.TRUSTED_PACKAGES .
|
See the What’s New for complete information.
Finally, spring-integration-kafka
3.0.0.RELEASE is also available; it is based on Spring for Apache Kafka 2.1, Spring Integration 5.0, and Spring Framework 5.0; it requires Java 8 and has the following new feature:
-
Mapping of
spring-messaging
headers to/from KafkaHeaders
.
See the Project Page for a complete matrix of spring-kafka
, spring-integration-kafka
and kafka-clients
version compatibility.
Project Page | Issues | Contributing | Help | Chat