Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreSpring Cloud Stream has recently added a feature to compose a function definition into an existing Spring Cloud Stream application. In this blog, we'll see how Spring Cloud Data Flow makes use of this feature to compose functions in Streaming pipelines.
In Spring Cloud Data Flow, streaming data pipelines are comprised of Spring Cloud Stream applications. A developer can pick and choose the out-of-the-box streaming applications, which cover many common use cases. A developer can also extend these out-of-the-box applications or create custom applications by using Spring Cloud Stream framework.
The release of Spring Cloud Stream 2.1.0 GA includes an integration of the Spring Cloud Function-based programming model, which allows the business logic to be represented as a java.util.Function
, a java.util.Consumer
, and a java.util.Supplier
, representing the roles of a Processor
, Sink
, and Source
respectively. Given this flexibility, Spring Cloud Stream framework now supports a simple but powerful approach to function composition. A composition in this context could be a combination of source and processor into a single application: a “new source”. Otherwise, it could be a combination of processor + sink into a single application: “a new sink”.This flexibility opens up interesting new opportunities for stream application developers.
Let’s consider how a pipeline is created to perform a simple transformation by using three applications and then see how it can be implemented as a pipeline by using two applications that use functional composition.
For the first stream,
we will use the out-of-the-box http-source
, transform-processor
and log-sink
applications.
As a first step, start the Spring Cloud Data Flow local
server:
java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar
Now, start the Spring Cloud Data Flow shell
:
java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar
Now let’s register the HTTP source, the transformer processor, and the log sink applications that use the RabbitMQ binder:
dataflow:>app register --name http --type source --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar
dataflow:>app register --name transformer --type processor --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar
dataflow:>app register --name log --type sink --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar
Now we can create a simple stream without function composition:
dataflow:>stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"
Then we can deploy the stream:
dataflow:>stream deploy hello --properties "deployer.*.local.inheritLogging=true"
dataflow:>http post --data "friend" --target "http://localhost:9000"
POST (text/plain) http://localhost:9000 friend
202 ACCEPTED
You can see the following log message at the log
application:
[sformer.hello-1] log-sink : Hello FRIEND
In this stream, we have the http (source), transformer (processor), and log (sink) applications deployed as standalone applications in the target platform (in this case, it is local
). For certain use-cases, for a simple payload transformation logic, we might want to have the Processor
application combined with either the Source
or Sink
applications. For instance, transformation scenarios like masking some specific user specific fields at the Source output data doesn’t necessarily need to be deployed as a separate standalone application. Instead, it can be composed either at the Source or Sink application.
To compose Processor functions into Source or Sink applications, we use Spring Cloud Stream’s function composition support.
The function composition support in Spring Cloud Stream is based on Spring Cloud Function’s ability to allow the registration of java.util.Supplier
, java.util.Consumer
, and java.util.Function
as Spring @Bean
definitions. These function @Bean
definitions are available for composition at runtime.
Spring Cloud Stream has introduced a new property, called spring.cloud.stream.function.definition
, which corresponds to the function definition DSL in Spring Cloud Function. When this property is set, the desired functional beans are automatically chained at the runtime.
The function composition happens in the following way:
When the Spring Cloud Stream application is of type Source
, the composed function is applied after the source output
.
When the Spring Cloud Stream application is of type Sink
, the composed function is applied before the sink input
.
This gives an ability to compose the function (defined in the Spring Cloud Function DSL) into an existing Spring Cloud Stream application and subsequently have it be orchestrated by Spring Cloud Data Flow in streaming data pipeline.
Let’s create and deploy a stream that composes the previous example’s transformer expression into the Source
application itself. The transformer logic is done by using two java.util.Function
implementations.
We will create a new source application, which we will refer to as the http-transformer
which extends the out of the box http source application. The source for the new source application can be found here.
The http-transformer
application contains the upper
and concat
function beans, as defined below:
@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> concat() {
return value -> "Hello "+ value;
}
public static void main(String[] args) {
SpringApplication.run(HttpSourceRabbitApplication.class, args);
}
}
After cloning the github repo, you can build the application using maven:
cd function-composition/http-transformer ./mvnw clean package
Now register http-transformer
application byusing the Data Flow Shell.
NOTE
For the below app register
--uri
option, replace the directory name and path of the artifact with the value appropriate to your system.
dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar
Now let’s create the stream:
dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log"
When deploying the stream, we pass the spring.cloud.stream.function.definition
property to define the composed function DSL (defined as in Spring Cloud Function). In this case, it is:
dataflow:>stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"
The above deployment composes the upper
and concat
function beans into the http
source application.
Then we can send the payload to http
application:
dataflow:>http post --data "friend" --target "http://localhost:9001"
> POST (text/plain) http://localhost:9001 friend
> 202 ACCEPTED
Then you can see the output in the log
application as,
[helloComposed-1] log-sink : Hello FRIEND
NOTE
Please note that function composition support is not applicable for the out-of-the-box Spring Cloud Stream Processor
applications, since there is ambiguity in whether the function needs to be applied before or after the existing processor’s application logic.
However, you can create your own processor applications that use functional composition with standard java.util.Function APIs, as the following example shows:
@Configuration
public static class FunctionProcessorConfiguration {
@Bean
public Function<String, String> upperAndConcat() {
return upper().andThen(concat());
}
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> concat() {
return value -> "Hello "+ value;
}
}
Then you need to deploy with the following property: spring.cloud.stream.function.definition=upperAndConcat
Another interesting feature is that Spring Cloud Function supports functional composition of Kotlin functions. This lets us add any Kotlin function beans into composable functions for Source
or Sink
applications.
To see this working, let’s use the http-transformer-kotlin-processor
application from our sample github repository.
The Kotlin function bean is configured as a processor. Here, the Kotlin function bean is the transform
function as defined below:
@Bean
open fun transform(): (String) -> String {
return { "How are you ".plus(it) }
}
Also, this project has the spring-cloud-function-kotlin
as a dependency to apply function configuration support for Kotlin functions, defined as follows:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-kotlin</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
cd function-composition/http-transformer-kotlin ./mvnw clean package
NOTE
For the below app register
--uri
option, replace the directory name and path of the artifact with the value appropriate to your system.
dataflow:>app register --name http-transformer-kotlin --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar
To create a stream with this application as the Source
:
dataflow:>stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"
As we did in the http-transformer
example, we can use thespring.cloud.stream.function.definition
property to specify any valid composed function DSL to construct the function composition. In this case, let’s combine the function beans registered via Java configuration along with the function bean from Kotlin processor configuration.
dataflow:>stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"
Here, the function name transform
corresponds to Kotlin function.
Note: We can perform the composition between Kotlin functions and Java functions because Kotlin functions are internally converted into java.util.Function
.
dataflow:>http post --data "friend" --target "http://localhost:9002"
> POST (text/plain) http://localhost:9002 friend
> 202 ACCEPTED
and, you can see the output in the log
application as:
[omposedKotlin-1] log-sink : Hello How are you FRIEND
In this example, the http-transformer
also contained the source code for the functions. However, you can make the application more modular by defining the function beans in a separate artifact. Then you can build the applications by adding only a maven dependency to the project and setting the spring.cloud.stream.function.definition
property. In this way, you can have the majority of your business logic coded as a function, and can, if necessary, compose it with a Source or a Sink.