close

Stream Processing with Spring Cloud Stream and Apache Kafka Streams. Part 5 - Application Customizations

Part 1 - Programming Model
Part 2 - Programming Model Continued
Part 3 - Data deserialization and serialization
Part 4 - Error Handling

In this blog post, we continue our discussion on the support for Kafka Streams in Spring Cloud Stream. We are going to elaborate on the ways in which you can customize a Kafka Streams application.

Customizing the StreamsBuilderFactoryBean

Kafka Streams binder uses the StreamsBuilderFactoryBean, provided by the Spring for Apache Kafka project, to build the StreamsBuilder object that is the foundation for a Kafka Streams application. This factory bean is a Spring lifecycle bean. Oftentimes, this factory bean must be customized before it is started, for various reasons. As described in the previous blog post on error handling, you need to customize the StreamsBuilderFactoryBean if you want to register a production exception handler. Let’s say that you have this producer exception handler:

class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}

You can register it directly by using configuration if you choose (using default.production.exception.handler).

However, a more elegant approach, when using the binder, is to register this as part of the StreamsBuilderFactoryBean customizer, as follows:

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            IgnoreRecordTooLargeHandler.class);
    };
}

Note that, if you have multiple processors in the application, you can control which processor gets customization based on the application ID. For example, you can check on it this way:

return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {

Here is another example of setting a state listener:

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

Customizing KafkaStreams Object.

The KafkaStreams object is at the heart of any Kafka Streams application. StreamsBuilderFactoryBean is responsible for creating the topology and then creating the KafkaStreams object. Before starting the KafkaStreams object, StreamsBuilderFactoryBean gives an opportunity to customize this KafkaStreams object. For example, if you want to set an application-wide handler for uncaught exceptions you can do the following:

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

Notice that we start with the customizer for StreamsBuilderFactoryBean. However, inside it, we use a separate KafkaStreamsCustomizer.

Summary

In this blog post, we saw how the Kafka Streams binder in Spring Cloud Stream lets you customize the underlying StreamsBuilderFactoryBean and the KafkaStreams object.

Thank you for reading this far! Next, in the final blog post in this series, we will look at how the binder lets you deal with state stores and enabling interactive queries against them.

comments powered by Disqus