Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn morePart 1 - Programming Model Part 2 - Programming Model Continued Part 3 - Data deserialization and serialization Part 4 - Error Handling Part 5 - Application Customizations
In this part (the sixth and final one of this series), we are going to look into the ways Spring Cloud Stream Binder for Kafka Streams supports state stores and interactive queries in Kafka Streams.
When you have the need to maintain state in the application, Kafka Streams lets you materialize that state information into a named state store. There are several operations in Kafka Streams that require it to keep track of the state such as count
, aggregate
, reduce
, various windowing
operations, and others. Kafka Streams uses a special database called RocksDB for maintaining this state store in most cases (unless you explicitly change the store type). By default, the same information in the state store is backed up to a changelog topic as well as within Kafka, for fault-tolerant reasons.
When you explicitly materialize state like this into a named state store, this gives the ability for applications to query that state store at a later stage. This is a very powerful feature, as this gives you the ability to query into a database-like structure from within your Kafka Streams applications.
Kafka Streams binder-based applications can bind to destinations as KTable
or GlobalKTable
. GlobalKTable
is a special table type, where you get data from all partitions of an input topic, regardless of the instance that it is running. By contrast, a KTable
gives you only data from the respective partitions of the topic that the instance is consuming from.
The following is a function signature we saw earlier in this series of blog posts:
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> process() {
As you can see, this function has three input bindings, one KStream
, one KTable
, and another GlobalKTable
. Kafka Streams lets you materialize tables consumed like these into named state stores, given that these tables are based on a primary key. You can use the binding level property to materialize them into named state stores along with consumption. The following examples show how to do so:
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store-1
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.materializedAs: incoming-store-2
There are various methods in the Kafka Streams high-level DSL, which returns table types such as count
, aggregate
, and reduce
. There are other operations that use state stores to keep track of information. For example, the various join method calls in KStream
, although they return a KStream
type, internally use state stores to keep the joined data. In summary, when Kafka Streams lets you materialize data either as a table or stream, it is materialized into a state store, much like data stored in a database table.
When using the processor API of Kafka Streams, which gives you more flexibility on how the stream is processed, you have to declare a state store beforehand and provide that to the StreamsBuilder. Kafka Streams binder can scan the application to detect beans of type StoreBuilder and then use that to create state stores and pass them along with the underlying StreamsBuilder through the StreamsBuilderFactoryBean. Here is a look at such beans:
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
Duration.ofSeconds(3), Duration.ofSeconds(3), false), Serdes.Long(),
Serdes.Long());
}
The two StoreBuilder beans are detected by the binder, and it then attaches them to the streams builder automatically. Later on, you can access them, in your processor API based applications, as follows:
…
KeyValueStore<Long, Long> state1;
WindowStore<Long, Long> state2;
...
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.process((ProcessorSupplier<Object, String>) () -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext context) {
state1 = (KeyValueStore<Long, Long>) context.getStateStore("my-store");
state2 = (WindowStore<Long, Long>) context.getStateStore("other-store");
}
@Override
public void process(Object key, String value) {
// processing code
}
@Override
public void close() {
if (state1 != null) {
state1.close();
}
if (state2 != null) {
state2.close();
}
}
}, "my-store", "other-store");
}
One quick note about the usage of the processor API in Kafka Streams binder-based applications. The only way you can use the low-level processor API when you use the binder is through a usage pattern of higher-level DSL and then combine that with a transform or process call on it, as shown in the preceding example. See here for more details on how the processor API can be used in a binder based application.
Instead of creating StoreBuilder
beans in the application, you can also use the StreamsBuilderFactoryBean
customizer which we saw in the previous blog, to add the state stores programmatically, if that is your preference.
Kafka Streams lets you interactively query the data in the state store in real time as live stream processing is going on. The binder provides abstractions around this feature to make it easier to work with interactive queries. InteractiveQueryService
is a basic API that the binder provides to work with state store querying. You can usually inject this as a bean into your application and then invoke various API methods from it. Here is an example:
@Autowired
private InteractiveQueryService interactiveQueryService;
…
...
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
Then you can invoke various retrieval methods from the store and iterate through the result. There are various methods that you can invoke from these state stores based on your use case and the type of state store that you are using. Please refer to the Kafka Streams documentation for interactive queries for these various iteration methods available.
Oftentimes, you want to expose the state of your system from state stores over an RPC mechanism. You can combine Spring web support for writing powerful REST based applications in this manner. Here is a blueprint:
@RestController
public class Controller {
@RequestMapping("/song/id")
public SongBean song(@RequestParam(value="id") Long id) {
final ReadOnlyKeyValueStore<Long, Song> songStore =
interactiveQueryService.getQueryableStore(“song-store”, QueryableStoreTypes.<Long, Song>keyValueStore());
final Song song = songStore.get(id);
if (song == null) {
throw new IllegalArgumentException("...");
}
This REST controller can be accessed from a front end web application for example.
This usage pattern obviously raises concerns. What happens if there are multiple Kafka Streams application instances running? For instance, what if there are 3 instances in which each of them is pulling data from a single source partition? Which controller instance is going to be responsible for providing information for key X? What if key X is only hosted in partition 3 and that happens to be the instance 3, but the request landed on instance 1. This is obviously a problem, but Kafka Streams provides a solution for that.
When you have multiple instances running and you want to use interactive queries, you have to set this property at the binder level:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
Then, in the controller method, you have to write logic that is similar to the following:
@RequestMapping("/charts/top-five")
@SuppressWarnings("unchecked")
public List<SongPlayCountBean> topFive(@RequestParam(value="genre") String genre) {
{
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
RestTemplate restTemplate = new RestTemplate();
return restTemplate.postForObject(
String.format("http://%s:%d/%s", hostInfo.host(),
hostInfo.port(), "charts/top-five?genre=Punk"), …);
}
In this blog, we saw the various ways in which Kafka Streams lets you materialize state information into state stores. The binder lets you consume data as KTable
or GlobalKTable
while allowing you to materialize that into a named state store. Kafka Streams has several operations in which state stores can be materialized as named stores. We saw that, when using the processor API in Kafka Streams, the application needs to create state store builder beans that the binder detects which it then passes along to Kafka Streams. Finally, we saw how these state stores can be queried by using interactive queries. We also saw the nuances involving multiple instances of an application and interactive queries against them.
Thank you for reading this blog series!
In this six-part series, we saw many features of Kafka Streams binder in Spring Cloud Stream, such as its programming models, data serialization, error handling, customization, and interactively querying the state stores. There are more features that we haven’t covered as part of this series as we wanted to focus on the general theme of introducing the main features of this binder that was added or enhanced in version 3.0.0
. For those additional features or to engage with the engineering team behind Spring Cloud Stream, please check out the various links provided in the resources section below.
Core Spring Cloud Stream GitHub Spring Cloud Stream Kafka Binder GitHub Spring Cloud Stream Samples