close

David Turanski

David Turanski

Advisory Architect

Malvern, PA

Blog Posts by David Turanski

Testing Spring Cloud Stream Applications - Part 2

This is Part 2 of Testing Stream Applications. In Part 1 we implemented and tested the core function needed for our sample couchbase-sink application. The tests at the function level covered expected success and error scenarios and relied on Testcontainers to provision a Couchbase cluster. This post assumes you have read Part 1 and continues where it left off.

Couchbase Sink

In Part 1 we verified that the function we wrote for upserting data into Couchbase works as expected. We can now use the function, exposed as a java.util.Consumer, to implement a sink to use in a data pipeline built with Spring Cloud Stream. Like most of the pre-packaged stream applications, we simply embed the function configuration into a Spring Boot application. Unlike the pre-packaged applications which generate identical applications configured for Kafka and Rabbit, we will roll our own to use the Kafka binder.

Here’s the main application class:

@SpringBootApplication
@Import(CouchbaseConsumerConfiguration.class)
public class CouchbaseSinkApplication {
	public static void main(String... args) {
		new SpringApplication(CouchbaseSinkApplication.class).run(args);
	}
}

We also need to add some dependencies: The function, Spring Cloud Stream, and the Kafka binder.

<dependency>
        <groupId>io.spring.example</groupId>
        <artifactId>couchbase-consumer</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

And, since we are rolling our own, we can set some required properties in application.properties. Since couchbase-consumer includes 2 candidate functions, we need to tell Spring Cloud Stream to use the Consumer wrapper. Also, we alias the default consumer input binding name couchbaseConsumer-in-0 to input so the sink to work with Spring Cloud Data Flow.

spring.cloud.function.definition=couchbaseConsumer
spring.cloud.stream.function.bindings.couchbaseConsumer-in-0=input

That’s literally it! At least we think so. How can we be sure? The kind of tests we need, not surprisingly, are similar to the function level tests. But we don’t really need to run every test case, since we already know how the function will behave within a boot application with various property settings. But we haven’t actually invoked the function via Spring Cloud Stream yet. Also, it doesn’t cost so much since we can reuse much of the test code we wrote for the function. So we only need a "smoke test" to run the happy path to make sure we didn’t leave out some required dependency, or that there are typos in our configuration properties, are that there are no gotchas now, or whenever we upgrade some dependency down the road. Here we configure a Couchbase TestContainer, as we did to test the function. But instead of invoking the function directly, we will let Spring Cloud Stream do it when we send a message to an input destination configured for the sink. For this test, we use the TestChannelBinder, an in-memory binder provided by the following dependency:

<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <type>test-jar</type>
        <classifier>test-binder</classifier>
        <scope>test</scope>
</dependency>

We use TestChannelBinderConfiguration.getCompleteConfiguration(CouchbaseSinkApplication.class) to add the TestChannelBinder to our application context for our test. This gives us an InputDestination bean to send messages to the sink. As in the function test, we use the Cluster object to verify the data is present in Couchbase. Since the upsert operation is asynchronous, we need to poll the data store for some time until the data is there. The awaitility library is great for testing asynchronous systems. In this case, we’ll give it 10 seconds before we assume the operation has failed.

@Testcontainers
public class CouchbaseSinkApplicationTests {
  @Container
  static CouchbaseContainer container =
          new CouchbaseContainer("couchbase/server:6.6.0")
             .withBucket(new BucketDefinition("test"));

  static Map<String, Object> connectProperties = new HashMap<>();

  @BeforeAll
  static void initialize() {
    connectProperties.put("spring.couchbase.connection-string", container.getConnectionString());
    connectProperties.put("spring.couchbase.username", container.getUsername());
    connectProperties.put("spring.couchbase.password", container.getPassword());
  }

  @Test
  void test() {
    try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
      TestChannelBinderConfiguration
        .getCompleteConfiguration(CouchbaseSinkApplication.class))
	.web(WebApplicationType.NONE)
        .properties(connectProperties)
        .run("--couchbase.consumer.bucketExpression='test'",
               "--couchbase.consumer.keyExpression=payload.email")) {
        InputDestination inputDestination = context.getBean(InputDestination.class);
        Cluster cluster = context.getBean(Cluster.class);
        inputDestination.send(new GenericMessage<>(
           new User("Bart Simpson", "[email protected]")));

       await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
          User user = cluster.bucket("test")
                .defaultCollection().get("[email protected]")
                .contentAs(User.class);
         assertThat(user).isNotNull();
         assertThat(user.getName()).isEqualTo("Bart Simpson");
       });
     }
  }
}

Integration Testing

At this point, we have good test coverage between the application and function tests. But we have not yet verified that the application binary that we want to build and deploy works in a true integration environment. Since the sink application uses the Kafka binder, the integration test environment requires a Kafka broker, a Couchbase cluster, and our deployed application. We can deploy and run the Spring Boot executable jar directly. More often these days, it is a container image.

Generally, it is not too risky to assume that the sink built as a container will work, but we at least want to make sure that we know how to configure the application to use an external Kafka broker and Couchbase cluster, and that we built our image correctly.

For the pre-built Spring Cloud Stream applications, we have further reason to test the built artifacts. The core applications do not provide any additional code. Instead, we use the spring-cloud-dataflow-apps-generator-plugin to automatically generate identical applications that can run with either Kafka or RabbitMQ. The plugin requires Maven configuration which we manually add for each application. Just because our function works with the TestChannelBinder, we can’t be sure that the built artifact works until we run it. Misconfiguring the apps generator plugin, changes to the plugin itself, or the base image, or any dependencies, may break something. Testcontainers and Junit 5 give us a relatively straightforward way to integration test the pre-built applications with both Kafka and RabbitMQ. To help us write integration tests, we provide additional support in stream-applications-test-suport. This library is available to the community by adding the dependency:

<dependency>
    <groupId>org.springframework.cloud.stream.app</groupId>
    <artifactId>stream-applications-test-support</artifactId>
    <scope>test</scope>
</dependency>

The sample includes an integration test to test the built image, in this case built with the Spring Boot Maven plugin. Like the application test, we will just plug in Kafka, Couchbase, and our image, turn on the power, and make sure we don’t see or smell any smoke.

The complete integration test is:

@KafkaStreamAppTest
@Tag("integration")
public class CouchbaseSinkIntegrationTests {

  static StreamAppContainer sink =
        new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");

  @Container
  static CouchbaseContainer container =
      new CouchbaseContainer("couchbase/server:6.6.0")
          .withNetwork(KafkaConfig.kafka.getNetwork())
          .withNetworkAliases("couchbase-server")
          .withBucket(new BucketDefinition("test"));

  static Cluster cluster;

  @Autowired
  TestTopicSender testTopicSender;

  @BeforeAll
  static void initialize() {
    await().until(() -> container.isRunning());
    String connectionString = "couchbase://couchbase-server";
    sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
          .withLogConsumer(appLog("couchbase-sink"))
          .withCommand(
            "--spring.couchbase.connection-string=couchbase://couchbase-server",
            "--spring.couchbase.username=" + container.getUsername(),
            "--spring.couchbase.password=" + container.getPassword(),
            "--couchbase.consumer.bucket-expression='test'",
            "--couchbase.consumer.key-expression=payload.email")
          .start();

    cluster = Cluster.connect(container.getConnectionString(),
    ClusterOptions.clusterOptions(container.getUsername(), container.getPassword()));
  }
  @AfterAll
  static void stop() {
    sink.stop();
  }

  @Test
  void test() throws JsonProcessingException {
    ObjectMapper objectMapper = new ObjectMapper();
    testTopicSender.send(sink.getInputDestination(),
    objectMapper.writeValueAsString(
       new User("Bart Simpson", "[email protected]")));

    await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
       ExistsResult result = cluster.bucket("test")
         .defaultCollection().exists("[email protected]");
      assertThat(result.exists()).isTrue();
    });

    User user = objectMapper.readValue(
    cluster.bucket("test").defaultCollection().get("[email protected]")
   .contentAs(String.class), User.class);

    assertThat(user.getName()).isEqualTo("Bart Simpson");
  }
}

To unpack this, let’s start with the @KafkaStreamAppTest class annotation. This starts a Kafka test container, and configures Kafka components, using Spring for Apache Kafka, that we can use to produce and consume messages with Kafka. The Kafka container is started in a static initializer, which makes it a true singleton, allowing every test that runs in a JVM to use it. In addition to Spring configuration, the annotation includes @TestContainers as a meta annotation. For this test, we do not let Testcontainers manage the lifecycle of the StreamAppContainer, since we want to start it after we know the Couchbase cluster is running. The Couchbase container has some additional configuration. For convenience, it shares a virtual network with the StreamAppContainer (automatically configured to use the same network as the Kafka container). This allows the Stream App Container to connect to the Couchbase server using an alias of our choosing, couchbase-server (remember, localhost inside a container refers to its own IP address).

@Container
static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
        .withNetwork(KafkaConfig.kafka.getNetwork())
        .withNetworkAliases("couchbase-server")
        .withBucket(new BucketDefinition("test"));

The StreamAppContainer is a GenericContainer with the required configuration to connect to Kafka and use the Kafka binder. The Spring Configuration also sets up a listener on a known topic to consume any output from the container. This is not used in this case, since we only have an input for the sink. The input destination is randomly generated and accessed via getInputDestination().

static StreamAppContainer sink = new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");
...

@BeforeAll
static void initialize() {
    await().until(() -> container.isRunning());
    String connectionString = "couchbase://couchbase-server";
    sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
            .withLogConsumer(appLog("couchbase-sink"))
            .withCommand(
                    "--spring.couchbase.connection-string=couchbase://couchbase-server",
                    "--spring.couchbase.username=" + container.getUsername(),
                    "--spring.couchbase.password=" + container.getPassword(),
                    "--couchbase.consumer.bucket-expression='test'",
                    "--couchbase.consumer.key-expression=payload.email")
            .start();

Once the Couchbase container is running, we will start the sink. We wait for the standard Spring Boot start up message to confirm the sink has started. We also add a LogConsumer to output all the log messages in case there is an error. Note the connection string is simply using the Couchbase container’s network alias. This is possible because the sink and Couchbase are using the same virtual network. Here, we pass all properties on the command line, but we could just as well set them as environment variables, via withEnvironment(). Since we control the sink lifecycle, we need to stop it after all the tests are complete.

The test uses an autowired TestTopicSender. This is a middleware agnostic interface, backed by KafkaTemplate in this case. This interface is useful for run the same test cases for Kafka and Rabbit. Here, we could just as well autowire the KafkaTemplate. At the time of this writing, only the String serdes are configured for the Kafka template, so we use an ObjectMapper to work with Strings.

@Test
  void test() throws JsonProcessingException {
    ObjectMapper objectMapper = new ObjectMapper();
    testTopicSender.send(sink.getInputDestination(),
    objectMapper.writeValueAsString(
       new User("Bart Simpson", "[email protected]")));

    await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
       ExistsResult result = cluster.bucket("test")
         .defaultCollection().exists("[email protected]");
      assertThat(result.exists()).isTrue();
    });

    User user = objectMapper.readValue(
    cluster.bucket("test").defaultCollection().get("[email protected]")
   .contentAs(String.class), User.class);

    assertThat(user.getName()).isEqualTo("Bart Simpson");
  }

Since this tests requires the sink image, we use the Junit 5 @Tag annotation to mark it as an integration test. We also configured Maven to exclude this from the normal build, and only build the image and run it when the integration profile is set. The complete source code is here and requires Java 8+ and Docker.

Read more...

Testing Spring Cloud Stream Applications - Part 1

This post is part of a blog series that explores the newly redesigned Spring Cloud Stream applications based on Java Functions. This episode, presented in two parts, explores strategies for testing functions used to implement stream applications. We will pay special attention to functions that integrate with external resources, which presents additional testing challenges. Such is the case with most of the pre-packaged source and sink applications. To illustrate this, we will walk through a sample couchbase-sink application. Here in Part 1, we will focus on the core function on which the sink is based. In Part 2, we will look at writing tests for for the application.

Read more...

Case Study: Remote File Ingest with Spring Cloud Data Flow

This article is part of a blog series that explores the newly redesigned Spring Cloud Stream applications based on Java Functions. In this chapter, we explore how to use Spring Cloud Stream Applications and Spring Cloud Data Flow to implement a very common ETL use case: Ingesting files from a remote service. Specifically, we will look at how to ingest files from S3, SFTP, and FTP.

Here is what is included in the blog series to date:

Read more...

Case Study: HTTP Request Function and Processor

Introduction

We began this series by introducing the new stream applications based on Java functions, and function composition. The previous entry presented a tutorial for building a simple stream application and running it in Spring Cloud Data Flow. Today we explore the HTTP Request Function and present examples of how to use it.

In case you missed it, the prior posts in this series are:

The HTTP Request Function

This is an updated implementation of the legacy HTTP Client Processor Stream App Starter, based on the reactive Spring WebClient. The function is an all purpose web client that submits HTTP requests to a URL and returns the response. Designed primarily for streaming applications, it is able to extract the URL, HTTP method, request body, desired response type, and contents, using configured SpEL expressions evaluated against each incoming Message. Also, to support efficient stream processing, the function uses reactive streams. Its signature is:

Function<Flux<Message<?>>, Flux<?>>

That is, it accepts a Flux (stream) of Messages and returns a Flux of any type.

Configuration Properties

The HttpRequestFunction is configured through the following configuration properties:

http.request.body-expression
A SpEL expression to derive the request body from the incoming message. (Expression, default: <none>)

http.request.expected-response-type
The type used to interpret the response. (Class<?>, default: String)

http.request.headers-expression
A SpEL expression used to derive the http headers map to use. (Expression, default: <none>)

http.request.http-method-expression
A SpEL expression to derive the request method from the incoming message. (Expression, default: GET)

http.request.maximum-buffer-size
Maximum buffer size in bytes allocated for input stream buffers. Defaults to 256k. Increase, as necessary, for posting or getting large binary content. (Integer, default: 256 * 1024)

http.request.reply-expression
A SpEL expression used to compute the final result, applied against the whole http {@link org.springframework.http.ResponseEntity}. (Expression, default: ResponseEntity::getBody)

http.request.timeout
Request timeout in milliseconds. (Long, default: 30000)

http.request.url-expression
A SpEL expression against the incoming message to determine the URL to use. (Expression, default: <none>)

The SpEL expressions are applied to the incoming Message. So fields like body and headers[name] can be used to evaluate message contents. I say "can be…" because sometimes it is more desirable to use static values. In this case, literal values must be enclosed in single quotes, for example:

http.request.url-expression='https://start.spring.io'
http.request.http-method-expression='POST'

Example 1: Using the HTTP Request Function in a Standalone Application

Let’s look at an example of how to use this function in a simple Spring Boot web application. In this example, we will use it in an app that retrieves an image from a URL and renders a thumbnail of the image. The complete code for this example is here.

We will build the application using Spring Boot and Spring Web Flux, along with our function to retrieve the image, and some code to generate a thumbnail.

The relevant dependencies are:

  • org.springframework.cloud.fn:http-request-function - The HTTP request function transitively includes spring-boot-starter-webflux

  • io.spring.example:image-thumbnail-processor - A simple Java function, included in this example, that creates thumbnails. We won’t get into the details here, just note that it is a separate component which we will reuse in a later example.

We first need to set some configuration properties for our function:

http.request.url-expression=payload
http.request.expected-response-type=byte[]
http.request.maximum-buffer-size=2097152

Thus, the message payload contains the target URL, the image(response body) will be returned as a byte array. And since these images might be fairly large, we will increase the size of the buffer holding the response body to 2GB (2 * 1024 * 1024).

Here is the code:

@SpringBootApplication
@Controller
@Import(HttpRequestFunctionConfiguration.class)
public class ThumbnailStandaloneApplication {
  private static Logger logger = LoggerFactory.getLogger(ThumbnailStandaloneApplication.class);

  public static void main(String[] args) {
    SpringApplication.run(ThumbnailStandaloneApplication.class, args);
  }

  private ThumbnailProcessor thumbnailProcessor = new ThumbnailProcessor();

  @Autowired
  private HttpRequestFunction httpRequestFunction;

  @Bean
  RouterFunction<?> routes() {
    return RouterFunctions.route()
        .GET("/thumbnail", this::createThumbnail)
        .build();
  }

  private Mono<ServerResponse> createThumbnail(ServerRequest serverRequest) {
    String url = serverRequest.queryParam("url").orElseThrow(
                           () -> new RuntimeException("URL required"));

    return Mono.from(httpRequestFunction.apply(Flux.just(new GenericMessage<>(url)))
        .flatMap(image -> {
          Map<String, Object> model = new HashMap<>();
          byte[] thumbnail = thumbnailProcessor.apply((byte[]) image);
          logger.info("creating thumbnail for {}", url);
          model.put("url", url);
          model.put("thumb", new String(Base64.getEncoder().encode(thumbnail)));
          Mono<ServerResponse> serverResponse = ServerResponse.ok()
              .render("thumbnail", model);
          return serverResponse;
        }));
  }

We apply the HttpRequestFunction to retrieve the image. Then we apply the thumbnailProcessor to the returned byte array and encode it to base 64 so we can render it on the page.

standalone

Example 2: Using the HTTP Request Processor in a streaming application

Now that we know how our function works, let’s put together a streaming application, using Spring Cloud Stream, to do something similar. In this case, we will use the pre-packaged HTTP Request Processor and File Source stream applications. This processor wraps the HTTP request function in a Spring Cloud Stream processor application that simply invokes the function, binding the input and output to a message broker destination (a Kafka topic, or a Rabbit MQ exchange, for example). Our application, expressed in stream definition DSL, looks like:

file-source | http-request-processor | image-thumbnail-sink

where the | represents I/O using a message broker.

Here, we are using a user-developed sink that uses the file-consumer function to write each thumbnail to a file. The sink uses Spring Cloud Function’s declarative composition to compose the thumbnail-processor, from the previous example, with a header enricher, and finally the standard fileConsumer. So our composed function is defined by:

spring.cloud.function.definition=thumbnailProcessor|filenameEnricher|fileConsumer

Our composite function definition is conceptually and syntactically similar to the above stream definition. But in this case the | represents in-process communication.

We will explore the ins and outs of the File Source in a future post. For now, we will use it to poll a source directory and produce messages whenever a new file is added to the directory. In this case, we want to process a text file with an image URL per line. We will configure the source to produce a message per line, containing the URL in the payload. We already know what the HTTP request processor does. The sink generates a thumbnail and writes it to a file.

The fully configured stream definition is:

file-source --file.consumer.mode=lines --file.consumer.mode=lines --file.supplier.directory=<source-directory> | http-request-processor --http.request.url-expression=payload --http.request.expected-response-type=byte[] --http.request.maximum-buffer-size=2097152| image-thumbnail-sink --file.consumer.directory=<target-directory>

If we run this and drop a text file into the source directory, we will see the thumbnails written to the target directory:

thumbnail files

If you want to run this on your local machine, complete instructions are here.

Summary

We just did a deep dive on the HTTP Request Function, demonstrating how to use it in a standalone web application and in a streaming pipeline to process images. We also used function composition, composing user-written and out of the box functions, to great effect.

Stay Tuned…​

In the coming weeks we will present many more case studies for Spring Cloud Stream and Spring Cloud Data Flow, each will highlight different stream applications and capabilities.

Read more...

Case Study: Build and Run a Streaming Application Using an HTTP Source and a JDBC Sink

Introduction

So far in this series we have introduced the new stream applications based on Java functions, and function composition. We have also provided detailed examples of how to build a source from a supplier and a sink from a consumer. Here we continue the journey with the first of several case studies to follow. Each case study demonstrates how to use one or more of the available pre-packaged Spring Boot stream applications, in various scenarios, to build data streaming pipelines.

Today we will showcase two of the most commonly used applications, the HTTP source and the JDBC sink. We will use them to build a simple service that accepts HTTP POST requests and saves the contents to a database table. We will first run these as standalone Spring Cloud Stream applications, and then show how to orchestrate the same pipeline with Spring Cloud Data Flow. This is presented as a step-by-step tutorial and we encourage you to follow the steps as you read.

Read more...

Introducing Java Functions for Spring Cloud Stream Applications - Part 1

Introducing Java Functions for Spring Cloud Stream Applications - Part 1

Last week we posted Introducing Java Functions for Spring Cloud Stream Applications - Part 0
to announce the release of Spring Cloud Stream applications 2020.0.0-M2.
Here, we explore function composition, one of the more powerful features enabled by the function oriented architecture presented in Part 0. If you haven’t had a chance to read Part 0, now would be a great time!

Function Composition

Function composition has a solid theoretical foundation in mathematics and computer science.
In practical terms, it is a way to join a sequence of functions to create a more complex function.

Let’s look at a simple example using Java functions. We have two functions, reverse and upper.
Each accepts a String as input and produces a String as output. We can compose them using the built-in andThen method. The composite function is itself a Function<String, String>.
If you run this, it will print ESREVER.

Function<String, String> reverse = s -> new StringBuilder(s).reverse().toString();
Function<String, String> upper = String::toUpperCase;
Function<String, String> reverseUpper = reverse.andThen(upper);
System.out.println(reverseUpper.apply("reverse"));
Tip
in addition to andThen, java.util.Function includes compose which first applies the argument (b) and then applies a to the result.
Thus, a.compose(b).apply(s) is equivalent to a.apply(b.apply(s)).

Function Composition in Spring Cloud Function

Spring Cloud Function includes some great features to take composing functions to another level.

Declarative Composition

If we define our functions from the above example as Spring beans,

@Bean
Function<String, String> reverse() {
    return s -> new StringBuilder(s).reverse().toString();
}

@Bean
Function<String, String> upper() {
    return String::toUpperCase;

}

we can compose these functions using the spring.cloud.function.definition property spring.cloud.function.definition=upper|reverse

Here | is a composition operator which results in an auto-configured bean implementing the composite function, along with related resources to let you seamlessly invoke the composite function.

Composition With Supplier and Consumer

Spring Cloud Function extends native Java Function composition to support composition with Supplier and Consumer.

This follows from concepts which are implicitly true:

  • A Function composed with a Consumer is a Consumer

  • A Supplier composed with a Function is a Supplier

  • A Supplier composed with a Consumer is a valid processing model (with no inputs or outputs, this form of composition does not map to a functional interface, but is analogous to Runnable)

As we shall see, Spring Cloud Stream Applications employ these concepts to great effect.

Type Conversion

When using function composition, we have to consider compatible argument types.
Using native Java composition, we can do compose a Function<Integer,String> with a Function<String, Integer> into a Function<Integer, Integer> :

Function<Integer, String> intToStr = String::valueOf;
Function<String, Integer> doubleit = i -> Integer.parseInt(i) * 2;
Function<Integer, Integer> composite = intToStr.andThen(doubleit);
composite.apply(10);

When running a Spring application, Spring Cloud Function uses Spring’s standard type conversion support to coerce function arguments as needed.
Given the following Function bean definitions, the function definition intToStr|doubleit works as expected, converting the String to an Integer.

@Bean
Function<Integer, Integer> doubleit() {
    return i -> i * 2;
}

@Bean
Function<Integer, String> intToStr() {
return String::valueOf;

}

In addition to converting primitives, Spring functions can convert between Message and POJO, JSON String and POJO, and more.
For example, the following functions can be composed in either order:

@Bean
Function<Integer, Integer> doubleit() {
    return i -> i * 2;
}

@Bean
Function<Integer, Message<String>> convertIntMessage() {
    return i -> MessageBuilder.withPayload(String.valueOf(i)).build();

}

Function Composition in Spring Cloud Stream

Spring Cloud Stream 3.x builds on Spring Cloud Function to fully support a functional programming model. The fundamental premise of Spring Cloud Stream is that it enables a function to execute in a distributed environment. The binder binds the input(s) and output(s) of a function packaged in a Spring Boot application, to configured message broker destinations so that the output produced by one function is consumed as the input of another remotely running function. We can think of a data streaming pipeline as just a distributed composition of functional components.

To illustrate this, a typical Spring Cloud Stream pipeline like

source | processor1 | processor2 | processor3 | sink

is logically equivalent to

supplier | function1 | function2 | function3 | sink

This idea leads to some interesting architectural choices since we can use function composition to combine some or all of these components into a single application.

For example we can implement the sequence of three processors as a single application, let’s call it composed-processor, packaging function1, function2, and function3,and composed by spring.cloud.function.definition=function1|function2|function3. Now the pipeline can be deployed as:

source | composed-processor | sink

Even simpler, we can create a composed-source to do all the processing within the source:

composed-source | sink

As always, there is no right answer here. There are always trade-offs to consider:

  • Function composition results in less deployments. This reduces cost, latency, operational complexity, and so on.

  • Individual deployments are loosely coupled and can scale independently.

  • The message broker provides guaranteed delivery. When a simple stateless application goes down and is restarted, it can continue where it left off, processing the pending results of the previous processing step.

  • A single application that performs complex processing is harder to reason about and keeps intermediate processing results in memory, or possibly in an interim data store. When a stateful application fails, it can lead to inconsistent state, making recovery harder.

If these trade-offs look familiar, it’s because they are pretty much the same as any microservice vs monolith debate. In the end, do what works best for you.

Function Composition with Prepackaged Source Applications

In some cases, function composition is a no-brainer. From the start, we have provided pre-packaged processors to perform simple transformations, or filtering using SpEL. The legacy architecture required a separate processor when using the prepackaged sources or sinks. A common complaint from users was “why do I need to deploy a separate application just to evaluate a SpEL expression?” To address this, we initially introduced a form of support for function composition in an earlier release. To use this feature with the prepackaged applications required forking them to modify the code or the build dependencies to provide the functions.

The current release provides function composition out of the box for all of the prepackaged sources. Specifically, a source can now be composed with prepackaged functions to perform any of the following locally:

  • execute SpEL transformations

  • enrich message headers

  • filter events

  • produce task launch requests

As an example, we can compose the time source with a header enricher and filter with configuration properties and run it as a standalone Spring boot application:

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar
--spring.cloud.stream.bindings.output.destination=even       --spring.cloud.function.definition=timeSupplier|headerEnricherFunction|filterFunction
--header.enricher.headers=seconds=T(java.lang.Integer).valueOf(payload .substring(payload.length() - 2))
--filter.function.expression=headers[seconds]%2==0

This will publish the time, such as `07/16/20 16:43:48, every other second whenever the number of seconds is even, to the configured destination even.

Here we are using a prepackaged time source for RabbitMQ, binding the output to a topic exchange named even. The binder will create the exchange if it does not exist. The function definition extends the supplier to extract the seconds, convert it to an integer and store it in the seconds message header and then filter on the value of the header. Only even values pass the filter.

Task Launch Requests

In 2018, we introduced a reference architecture for running file ingest with Spring Cloud Data Flow and Spring Batch. To do this, we forked the sftp source as sftp-dataflow, specifically to implement a prepackaged source that produces task launch requests. The task launch request is a simple value object, rendered as JSON, and consumed by the tasklauncher-sink. The sink acts as a client to Data Flow to launch a batch application per the request. We initially chose sftp since it is the most commonly used protocol for file processing. However, we realized that the same pattern can be applied to any source. We now can do this with function composition. Along with the standard sftp source , we can trigger a task launch from ftp, file, s3, and so one. Even the time source can be used to launch a task at regular intervals.

This somewhat contrived example produces task launch requests:

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar
--spring.cloud.stream.bindings.output.destination=time-test
--spring.cloud.stream.function.definition=timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction
--spel.function.expression=payload.length()
--header.enricher.headers=task-id=payload*2
--task.launch.request.task-name-expression="'task-'+headers['task-id']

The payload, as JSON, is {"args":[],"deploymentProps":{},"name":"task-34"}

Function composition with user written code

In reality, when users develop a Spring Cloud Stream pipeline, they are likely to select a source and sink from our prepackaged Spring Cloud Stream Applications. Processors are typically user-written code, implementing specific business logic. If you are writing a processor, or want to extend a source or sink, any of the functions are available to you. Since we publish the functions as separate artifacts, you can simply include them in your dependencies. You can either use declarative composition, as shown above, or you can inject them into your code and invoke them programmatically. Of course, you can easily integrate your own functions as well.

How do I contribute a new function or application?

If you cannot find what you are looking for in the existing catalog of functions and applications, please consider contributing. This way, the entire open source community will benefit. In a subsequent post, we will walk through a real-world example of developing a function and stream application.

We encourage the community to get involved with this project. In addition to code contributions, we really appreciate documentation improvements and creating issues.

Stay tuned…​

This blog is the second in a series that will cover many related topics. Look for more deep dives and focused topics in the coming weeks. We will take you through the entire landscape of components included in this repository and surrounding processes.

Read more...

Introducing Java Functions for Spring Cloud Stream Applications - Part 0

We are happy to announce the release of Spring Cloud Stream applications 2020.0.0-M2. This release is a complete overhaul of the legacy Spring Cloud Stream App Starters. Starting with this release, we are moving away from theme-oriented release train names (famous scientists in alphabetical order) to calendar based versioning. The current GA release is called Einstein, and we are pleased to introduce 2020.0.0-M2. We are also moving away from the app starters. Having reorganized, repackaged, and (in some cases) rewritten the underlying code, we now have a new Git repository: spring-cloud/stream-applications: Functions and Spring Cloud Stream Applications for data driven microservices.

Read more...

Groovy Bean Configuration in Spring Framework 4

This post is intended to introduce the Groovy Bean Builder to Java developers as a powerful alternative or supplement to Java @Configuration and XML configuration. The Spring Framework release 4.0 includes a port of the Grails Bean Builder to the core Spring Framework, providing a Groovy DSL for configuring Spring applications. Groovy and Grails developers are no doubt familiar with configuring Spring applications this way and I expect the rest of you are already thinking “How cool is that?”

Don’t worry if you’re not a Groovy expert. Just as many Java programmers use another popular Groovy DSL, Gradle, to build applications, you only need to know some basic syntax to get started. Sample code is available on github.

Read more...

Spring Data GemFire 1.3.0 Released

I am pleased to announce the GA release of Spring Data GemFire 1.3.0. In addition to many minor bug fixes and enhancements, this release includes some notable new features to make writing Java applications with GemFire even easier:

Annotation Support For Functions


Annotation Support for Function Execution

Simplified Connection to a GemFire Datasource

GemFire exposes a lot of options for tuning the performance of it’s connection pool, and to configure how local data is managed an synchronized. The Spring Data GemFire namespace supports all of these options, however many applications are clients that simply need read/write access to the GemFire data grid. For this class of applications, it is now possible to connect to GemFire as a client without explicitly configuring a pool or client regions:

Read more...

A Groovy DSL For Spring Integration

Spring Integration implements Enterprise Integration Patterrns using the Spring programming model to enable messaging in Spring-based applications. Spring Integration also provides integration with external systems using declarative adapters supporting jms, http, amqp, tcp, ftp(s), smtp, and so on. Currently, configuring message flows is primarily done via Spring XML and Spring Integration supports several namespaces to make this as succinct as possible. Earlier this year, SpringSource released a Scala DSL for Spring Integration. Now, we are pleased to announce the first milestone release (1.0.0.M1) of a Groovy DSL.

Read more...