Engineering
Releases
News and Events

Spring Cloud Stream - functional and reactive

In the previous post, I tried to provide justification for our shift to a functional programming model in Spring Cloud Stream (SCSt). It’s less code, less configuration. Most importantly, though, your code is completely decoupled and independent from the internals of SCSt.

In this post, I’ll dig a little deeper and summarize the core features of our functional support, specifically around its reactive features.

IMPORTANT: Anything you can do with @StreamListener/@EnableBinding you can also do without it. In other words, the functional support is now feature-compatible with the annotation-based support.

While all the features described below are the features of Spring Cloud Function (SCF), a dependency of SCSt, there are certain nuances one must be aware of to understand the additional meaning of functions in the context of SCSt.

Supplier, Function and Consumer

Any bean of type Supplier, Function, or Consumer or any bean that could be mapped to Supplier, Function, or Consumer (such as a POJO function, Kotlin lambdas, and so on) is treated by SCSt as a message handler (Function or Consumer ) or message source (Supplier). Based on the type of functional strategy used, input and output bindings are automatically generated by using the following naming convention <function-name>-<in/out>-<index>.

Consider the following example:

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

The preceding function is treated as a message handler that consumes from uppercase-in-0 and sends to uppercase-out-0 bindings. The rest of the existing stream properties can be used as before. For example --spring.cloud.stream.bindings.uppercase-in-0.content-type=text/plain.

Click here for more details and configuration options.

Imperative or Reactive

Functions can be imperative or reactive. Imperative functions are triggered on each individual event, while reactive functions are triggered once, passing the reference to the entire event stream abstractions (such as Flux and Mono) that are provided by Project Reactor.

Consider the following pair of examples:

Imperative:

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

Reactive:

@SpringBootApplication
public class SampleApplication  {
    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase());
    }
}

Aside from giving you a different (monadic) programming style to handle events (something that could easily be looked at as a matter of preference), reactive programming adds additional value for certain use cases, which would otherwise be rather complex to implement. And while it is out of scope for this post to discuss these use cases or reactive patterns in details, the state management cases (such as windowing, aggregation, and grouping) are still worth mentioning, as well as the cases of splitting a stream or merging multiple streams, which I discuss in the next section.

With regard to the binding and naming rules for reactive functions, they are the same as explained in the previous section.

NOTE: While the earlier examples use Function as an example, the same rules apply for Supplier and Consumer. The Spring Cloud Function support section of the user guide as well as the Reactor documentation provides more details.

Function Arity

There are times when a stream of data needs to be categorized and organized. For example, consider a classic big-data use case of dealing with unorganized data containing, let’s say, ‘orders’ and ‘invoices’, and you want each to go into a separate data store.
This is where function arity (functions with multiple inputs and outputs) support comes to play.

Let’s look at an example of such a function (full implementation details are available here),

@Bean
public Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> organise() {
	return flux -> ...;
}

Given that Project Reactor is a core dependency of SCF, we are using its Tuple library. Tuples give us a unique advantage by communicating to us both cardinality and type information. Both are extremely important in the context of SCSt. Cardinality lets us know how many input and output bindings need to be created and bound to the corresponding inputs and outputs of a function. Awareness of the type information ensures proper type conversion.

Also, this is where the ‘index’ part of the naming convention for binding names comes into play, since, in this function, the two output binding names are organise-out-0 and organise-out-1.

IMPORTANT: At the moment, function arity is only supported for reactive functions (Function<TupleN<Flux<?>...>, TupleN<Flux<?>...>>) centered on Complex event processing where evaluation and computation on confluence of events typically requires view into a stream of events rather than single event.

For more up-to-date details, please read the Functions with multiple input and output arguments section of the user guide.

comments powered by Disqus