Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn morePart 1 - Programming Model Part 2 - Programming Model Continued
Continuing on the previous two blog posts, in this series on writing stream processing applications with Spring Cloud Stream and Kafka Streams, now we will look at the details of how these applications handle deserialization on the inbound and serialization on the outbound.
All three major higher-level types in Kafka Streams - KStream<K,V>
, KTable<K,V>
and GlobalKTable<K,V>
- work with a key and a value.
With Spring Cloud Stream Kafka Streams support, keys are always deserialized and serialized by using the native Serde
mechanism. A Serde
is a container object where it provides a deserializer and a serializer.
Values, on the other hand, are marshaled by using either Serde
or the binder-provided message conversion. Starting with version 3.0 of the binder, using Serde
is the default approach. Using the message converters in Spring is an optional feature that you only need to use on special occasions.
Let’s look at this processor:
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
This is the same processor we saw in the previous blog. It has two inputs and an output. The first input binding is a KStream<String, Long>
. The key is of type String
and the value is a Long
. The next input binding is a KTable<String, String>
. Here, both key and value are of type String.
Finally, the output binding is a KStream<String, Long>
with the key as a String
and the value as a Long
.
Normally, you have to tell the application the right Serde
to use as part of the application’s configuration. However, when using the Kafka Streams binder, for most standard types, this information is inferred and you don’t need to provide any special configuration.
The types that are inferred by the binder are those for which Kafka Streams provides out of the box Serde
implementations. These are those types:
In other words, if your KStream
, KTable
, or GlobalKTable
have these as the types for the key and the value, you don’t need to provide any special Serde
configuration.
If the types are not from one of these, you can provide a bean of type Serde<T>
, and, if the generic type T
matches with the actual type, the binder will delegate that as the Serde
.
For example, let's say you have the following function signature:
@Bean
publicFunction<KStream<CustomKey, AvroIn>, KStream<CustomKey, AvroOut>> process() {
}
Then, the key and value types don’t match with any of the known Serde
implementations. In that case, you have two options. The recommended approach is to provide a Serde
bean, as follows:
@Bean
public Serde<CustomKey> customKeySerde(){
return new CustomKeySerde();
}
@Bean
public Serde<AvroIn> avroInSerde(){
final SpecificAvroSerde<AvroIn> avroInSerde = new SpecificAvroSerde<>();
avroInSerde.configure(...);
return avroInSerde;
}
@Bean
public Serde<AvroOut> avroInSerde(){
final SpecificAvroSerde<AvroOut> avroOutSerde = new SpecificAvroSerde<>();
avroOutSerde.configure(...);
return avroOutSerde;
}
If you don’t want to provide Serde
as programmatically created Spring beans, you can also define these by using configuration, where you pass the fully qualified name of the Serde
implementation class, as follows:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
By the way, setting Serde like this will have higher precedence even if you have matching beans since these configurations are set on the actual consumer and producer bindings. The binder gives it precedence since the user explicitly requested it.
At this point, if the binder still cannot match any Serde
, it looks for a default one to match.
If all approaches fail to match one, the binder will fall back to the JsonSerde implementation provided by Spring for Apache Kafka project. If you don’t use any of the above mechanisms and let the binder fall back to JsonSerde
, you have to make sure that the classes are JSON-friendly.
Kafka Streams has several API methods that need access to Serde
objects. For example, look at the method calls joined
or groupBy
from the earlier BiFunction
example processor. This is actually the responsibility of the application developer to provide, as the binder cannot help with any inference in those instances. In other words, the binder support for Serde
inference, matching a Serde
with a provided bean, and so on are applied only on the edges of your application, at either the input or the output bindings. Confusion may arise because, when you use the binder for developing Kafka Streams applications, you might think that the binder will completely hide the complexities of Serde
, which is a false impression. The binder helps you with the Serde
only on consuming and producing. Any Serde
required by your business logic implementation still needs to be provided by the application.
In this blog post, we saw an overview of how the Kafka Streams binder for Spring Cloud Stream helps you with deserialization and serialization of the data. The binder can infer the key and value types used on the input and output bindings. We saw that the default is to always use native Serde
mechanism, but the binder gives you an option to disable this and delegate to Spring’s message converters if need be. We also found out that any Serde
required by your business logic implementation still needs to be provided by the application.
In the next blog post, we will look at the various error handling mechanisms that Kafka Streams provides for deserialization and production of messages and how the binder supports them.