close

Stream Processing with Spring Cloud Stream and Apache Kafka Streams. Part 4 - Error Handling

Part 1 - Programming Model
Part 2 - Programming Model Continued
Part 3 - Data deserialization and serialization

Continuing with the series on looking at the Spring Cloud Stream binder for Kafka Streams, in this blog post, we are looking at the various error-handling strategies that are available in the Kafka Streams binder.

The error handling in Kafka Streams is largely centered around errors that occur during deserialization on the inbound and during production on the outbound.

Handling Deserialization Exceptions

Kafka Streams lets you register deserialization exception handlers. The default behavior is that, when you have a deserialization exception, it logs that error and fails the application (LogAndFailExceptionHandler). It also lets you log and skip the record and continue the application (LogAndContinueExceptionHandler). Normally, you provide the corresponding classes as part of the configuration. By using the binder, you can set these exception handlers either at the binder level, which will be applicable for the entire application or at the binding level, which gives you more fine-grained control.

Here’s how you can set the deserialization exception handlers at the binder level:

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=logAndContinue

If you have only a single processor with a single input, it is an easy way to set the deserialization exception handler on the binder as shown above. If you have multiple processors or inputs and if you want to control error handling on them separately, that needs to be set per input binding. Here is an example of doing so:

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler=logAndContinue

Notice that the handler is actually set on the input binding process-in-0. If you have more such input bindings, then that has to be explicitly set.

Kafka Streams and the DLQ (Dead Letter Queue)

In addition to the two exception handlers that Kafka Streams provides, the binder provides a third option: a custom handler that lets you send the record in a deserialization error to a special DLQ. In order to activate this, you have to opt-in for this either at the binder or binding level, as explained above.

Here’s how to do so:

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=sendToDlq

Keep in mind that, when using this setting at the binder, this activates the DLQ at the global level, and this will be applied against all the input topics through their bindings. If that’s not what you want to happen, you have to enable it per input binding.

By default, the DLQ name is named error.<input-topic-name>.<application-id for kafka streams>.

You can replace <input-topic-name> with the actual topic name. Note that this is not the binding name but the actual topic name.

If the input topic is topic-1 and the Kafka Streams application ID is my-application, the default DLQ name will be error.topic-1.my-application.

Changing the default DLQ name generated by the binder:

You can reset the default DLQ name, as follows:

spring.cloud.stream.bindings.process-in-0.consumer.dlqName=input-1-dlq (Replace process-in-0 with the actual binding name)

If it has the required permissions on the broker, the binder provisioner will create all the necessary DLQ topics. If that’s not the case, these topics have to be created manually before the application starts.

DLQ Topic and Partitions

By default, the binder assumes that the DLQ topic is provisioned with the same number of partitions as the input topic. If that’s not true (that is if the DLQ topic is provisioned with a different number of partitions), you have to tell the binder the partition to which to send the records by using a DlqPartitionFunction implementation, as follows:

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}

There can only be one such bean present in the application. Therefore, you have to filter out the records by using a group (which is the same as the application ID when using the binder) in the event of multiple processors or inputs with separate DLQ topics.

Handling producer errors

All the exception handlers that we discussed so far deal only with errors surrounding deserialization of data. Kafka Streams also provides an ability to handle producer errors on the outbound. As of the 3.0. Release, the binder does not provide a first-class mechanism to support this. However, this doesn’t mean that you can’t use the producer exception handlers. You can use the various customizers that the binder relies on from Spring for Apache Kafka project to do that. These customizers are going to be the topic of our next blog post in this series.

Kafka Streams Binder Health Indicator and Metrics

Kafka Streams binder allows the monitoring of the health of the underlying streams thread and it exposes the health-indicator metrics through a Spring Boot actuator endpoint. You can find more details here. In addition to the health indicator, the binder also exposes Kafka Streams metrics through Micrometer meter-registry. All the basic metrics available through the KafkaStreams object is available in this registry. Here is where you can find more information on this.

Summary

In this blog post, we saw the various strategies Kafka Streams uses to enable handling deserialization exceptions. On top of these, the Kafka Streams binder also provides a handler that lets you send error-prone payloads to a DLQ topic. We saw that the binder provides fine-grained control of working with these DLQ topics.

Thank you for reading this far! In the next blog post, we are going to see how the binder enables further customizations.

comments powered by Disqus