Testing Spring Cloud Stream Applications - Part 1

Engineering | David Turanski | December 15, 2020 | ...

This post is part of a blog series that explores the newly redesigned Spring Cloud Stream applications based on Java Functions. This episode, presented in two parts, explores strategies for testing functions used to implement stream applications. We will pay special attention to functions that integrate with external resources, which presents additional testing challenges. Such is the case with most of the pre-packaged source and sink applications. To illustrate this, we will walk through a sample couchbase-sink application. Here in Part 1, we will focus on the core function on which the sink is based. In Part 2, we will look at writing tests for for the application.

Here are all the previous entries in this blog series.

Testing Considerations

Functions and Applications

For function-based stream applications, the core functionality is exposed as a function. The core functions for the pre-built Spring Cloud Stream applications are packaged as separate components to allow them to be used by any application, independent of Spring Cloud Stream. Spring Cloud Stream natively supports Java functions and will bind to any bean that implements one of the core java.util.function types: Consumer, Supplier, or Function. Viewed as a separate component, the function need not depend on Spring or anything else. If you register any function as a bean in any application that includes a Spring Cloud Stream binder as a dependency, Spring Cloud Stream will bind it to a configured message destination.

In a data pipeline, a stream of data originates from a Source and flows into a Sink, with zero or more processing steps in between. In practice, the Source acts as a Supplier of data from some external resource, such as a data store, any service supporting a standard protocol, or a message broker. The Sink acts as a consumer of data for some other external resource. Since Spring provides first class support for most commonly used external resources, it should come as no suprise that most of the pre-packaged sources and sinks rely on some combination of Spring Integration, Spring Data, and Spring Boot. Additionally, they are designed to be configured for many environments, domains and use cases, via @ConfigurationProperties. Although these functions themselves are not Spring Boot Applications, they must be imported into a Spring Boot application to run.

Since all the core functionality is implemented by the function, we want to focus most of our testing efforts at this level. To ensure that our function behaves correctly under all expected success and error conditions, we need to write tests to cover these scenarios. These tests need to create an auto-configured application context and provision or mock required external resource.

If the function is configurable via @ConfigurationProperties, then we can treat each properties combination as a different test case. Some properties are required, and some are optional. Since using the function requires the end user to provide these properties, expected scenarios include both valid and invalid configurations, such as required properties missing, invalid values, or an invalid combination (mutually exclusive properties).

Unit vs Integration Tests

There are no widely accepted definitions that will help us here. Especially with sources and sinks, in which the core functionality is integration, it’s hard to know where to draw the line between unit and integration tests. On one hand, a Java function is a unit, in that it is a single interface. However, if its sole purpose is to integrate with a remote system, it is difficult, if not impossible, to test in isolation. However, I think we can agree on some general characteristics:

Unit tests:

  • Run automatically as part of a build in any developer or CI environment without any external configuration

  • Are reasonably fast

  • Are written by the developer and run frequently

Integration tests:

  • Run automatically in an integration environment

  • Require the component under test, along with external dependencies to be deployed

  • May be slow

  • Are run less often

Given this definition of unit tests, Part 1 is about unit testing functions.

Test Containers

Testcontainers is a recent and popular Java library that lets you programmatically spin up and throw away any external resource that can run in a Docker container. It includes dozens of out-of-the-box modules for commonly used resources. You can also use the library to create custom containers programmatically, from Dockerfiles, or docker-compose yaml. While intended primarily for integration tests, it is extremely useful for writing unit tests when mocking takes considerably more effort. Of course we have to sacrifice some speed and relax the "no external dependencies" rule to allow for a Docker daemon installed and running on the host. Since many development and CI environments today are already required to use and build images, this is a reasonable assumption.


Couchbase Consumer Function

To illustrate, we will write a Couchbase consumer function to add some data to a Couchbase key-value store using the upsert operation.

For efficiency, we will implement the function using the Couchbase Java client’s reactive API. This API returns a publisher of MutationResult, so our core interface is Function<Flux<Message<?>>, Flux<MutationResult>>. This function will be configured with Spring, and can be embedded into any Spring Boot Application. To support a couchbase-sink, we will wrap the function in a Consumer<Flux<Message<?>>>.

The upsert operation inserts or updates data in a Bucket, which is the primary data store abstraction for Couchbase. In our case, a ReactiveBucket. A bucket is specified by name and must exist in the Couchbase cluster beforehand. Starting with v6.5, Couchbase supports Collections. So the bucket may be partitioned into many collections, but this is an optional feature that must be enabled in the cluster. The upsert method targets a named collection, or the defaultCollection.

We pass the key and value to our function in a Spring Message, consisting of a payload and headers. The payload can be any object, and the headers are essentially a Map. To make this function generic, we can use a SpEL expression to specify the key. The key expression is evaluated against the Message, and may reference fields or methods in the payload, or a header. The value is the payload. The function also requires the user to specify a bucket and collection name. To maximize flexibility, let’s double down on SpEL and make everything an expression. Now, if we want, the function can extract all its input values from the message at runtime to upsert any data in any collection in any bucket. In the simplest case, the bucket and collection can be defined statically.

So the function needs some configuration properties:

public class CouchbaseConsumerProperties {
    private static final String DEFAULT_VALUE_EXPRESSION = "payload";
    private final SpelExpressionParser parser = new SpelExpressionParser();

    * A SpEL expression to specify the bucket.
    private Expression bucketExpression;

      * A SpEL expression to specify the key.
    private Expression keyExpression;

    * A SpEL expression to specify the collection.
    private Expression collectionExpression;

    * A SpEL expression to specify the value (default is payload).
    private Expression valueExpression =


To statically confiugure some of these values, use a literal expression, enclosing the value in single quotes, e.g. couchbase.consumer.bucketExpression='mybucket'. Normally, you would extract the key and value from the message contents.

We configure the reactive Function and corresponding Consumer with Spring:

public class CouchbaseConsumerConfiguration {

    private static Logger logger =

    public Consumer<Flux<Message<?>>> couchbaseConsumer(Function<Flux<Message<?>>,
                Flux<MutationResult>> couchbaseConsumerFunction) {
        return message -> couchbaseConsumerFunction.apply(message)
               .subscribe(mutationResult -> logger.debug("Processed " + message));

    public Function<Flux<Message<?>>, Flux<MutationResult>> couchbaseConsumerFunction(
          Cluster cluster, CouchbaseConsumerProperties consumerProperties) {
        return flux -> flux.flatMap(message -> {
            logger.debug("Processing message " + message);
             String bucketName = bucket(message,
            String key = key(message, consumerProperties.getKeyExpression());
            ReactiveBucket bucket = cluster.bucket(bucketName).reactive();
             ReactiveCollection collection = collection(message,
				  .map(name -> bucket.collection(name))
            return collection.upsert(key,
                              value(message, consumerProperties.getValueExpression()));

    private String bucket(Message<?> message, Expression expression) {
        return expression.getValue(message, String.class);

    private String key(Message<?> message, Expression expression) {
        return expression.getValue(message, String.class);

    private Object value(Message<?> message, Expression expression) {
        return expression.getValue(message);

    private Optional<String> collection(Message<?> message,
                                             @Nullable Expression expression) {
        return expression == null ? Optional.empty() :
                Optional.of(expression.getValue(message, String.class));

These two classes are all we need to implement the function. The required dependencies are:

<!-- Enable configuration properties metadata to be added to the jar -->
<!-- This provides a Spring Converter to convert Strings to Expression, required for CouchbaseConsumerProperties as implemented -->

As mentioned earlier, this is not a Spring Boot application, but a component that must be embedded in a Spring Boot application to run. Spring Boot binds the @ConfigurationPropeties and also provides CouchbaseAutoConfiguration.


This example does not use spring-data-couchbase since it is intended for using Spring Data Repositories and automatically mapping specific domain objects. Since our function is intended to handle any payload type, we use boot to autoconfigure the Cluster along with the Couchbase Java SDK.

So how did we end up with a function that actually works? The sample code above is the result of test driven development, refined over several iterations. Since the function depends on the Couchbase SDK Cluster object which does all the work, we need to create a Cluster instance before we can do anything. The Cluster needs to connect to a Couchbase server. If we happen to have a Couchbase cluster already running on our network, with a bucket we can use for testing, then we might use that initially. But even if we assume Couchbase is accessable from our development and CI environment, what happens if we can’t connect to Couchbase for some reason - the cluster is down, credentials expired, permissions changed, or some other reason? Do we want to let that break our CI/CD pipeline or stop our progress?

Fortunately, we can use the Testcontainers couchbase module to spin up our own Couchbase environment.


Full disclosure: I also tried CouchbaseMock but it appears to be incompatible with the current couchbase Java client.

The required test libraries for Junit 5, are:


To use Testcontainers in our Junit 5 test class, we start with a Couchbase container configured with a bucket named test.

public class CouchbaseConsumerTests {

	static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
			.withBucket(new BucketDefinition("test"));

The @Testcontainers annotation enables lifecycle management for fields annotated with @Container. Here, we declare the CouchbaseContainer as static, so TestContainers will start the container once before the tests run and remove it after. This is a good thing, since it takes several seconds to start the container.


Also take a look at Playtika Testcontainers for Spring Boot. This is an interesting project that abstracts "embedded" services using Spring Boot to autoconfigure a Testcontainer. This requires your preferred version of org.springframework.cloud:spring-cloud-starter. If you are using a Spring Cloud version compatible with Spring Boot 2.4+ you will need to set "spring.cloud.bootstrap.enabled=true". The sample does not use this library because Spring beans cannot be declared static, so we would have to start a new container instance for each test. Anyway, Testcontainers is really easy to use.

As mentioned above, different property configurations represent different test cases. Spring Boot binds properties from its properties sources when the application starts up. So we need to create a new application context for each combination of properties we want to test. We see a few different strategies used in the stream-applications repository:

  • Create an abstract @SpringBootTest to configure a @SpringBootApplication test context and shared configuration properties. Create a sub class, annotated with @TestPropertySource for each test case, as shown here.

  • Use ApplicationContextRunner to create a new ApplicationContext for each test case, as shown here.

  • Use SpringApplicationBuilder to create a new ApplicationContext for each test case, as shown here.

Which one you use depends largely on personal choice. The tests for the sample function use the ApplicationContextRunner, pre-configured with the required boot Couchbase connection properties provided by the test container. A nice feature of Testcontainers is that it exposes standard ports as expected, mapping each exposed port to a random available port. The Couchbase testContainer includes getConnectionString() which is specific to Couchbase. Generally, you can use container.getMappedPort(int originalPort) as required.


Using random TCP ports is essential for automated tests since 1) You do not know what ports may be in use for a given environment 2) Build tools typically run tests in parallel. This frequently results in errors due to an unavailable port when statically defined.

public class CouchbaseConsumerTests {

    static CouchbaseContainer container =
            new CouchbaseContainer("couchbase/server:6.6.0")
                   .withBucket(new BucketDefinition("test"));

	private ApplicationContextRunner applicationContextRunner;

    void setup() {
        applicationContextRunner = new ApplicationContextRunner()
                 "spring.couchbase.connection-string=" +
                 "spring.couchbase.username=" + container.getUsername(),
                 "spring.couchbase.password=" + container.getPassword());

We use TestConfig.class to start an application context, which we provide as an inner class:

static class TestConfig {
    Cluster cluster;

    public void destroy() {

In many cases, this can be an empty class annotated with @SpringBootApplication to trigger properties binding, and any required auto configuration - CouchbaseAutoConfiguration in this case. Here, we disconnect from the cluster to prevent a superfluous stack trace when the context closes.

For these tests, we will create a simple User type with a name and an email address which we can use for the key:

@JsonIgnoreProperties(ignoreUnknown = true)
public class User {
	private String name;

	private String email;

	public String getName() {
		return name;

	public void setName(String name) {
		this.name = name;

	public String getEmail() {
		return email;

	public void setEmail(String email) {
		this.email = email;

	public User() {

	public User(String name, String email) {
		this.name = name;
		this.email = email;

Now we are ready to test our function. Since the function is reactive, we will use the StepVerifier from the reactor-test library to verify the contents of the returned Flux. We begin with the simplest happy path scenario: upsert a single User providing the minimum required configuration: The bucket name and the key expression. We will construct a Message with a User payload. To store the user into the test bucket’s default collection, using the user’s email as the key, we just need to provide the bucket name as a literal and set the key expression to payload.email. These properties need to use the couchbase.consumer prefix configured in CouchbaseConsumerProperties. At least, that’s the intended behavior. We can’t be sure that all this works until we can verify that ,after calling the function, the data is present in the data store. We use the Couchbase API directly to retrieve the data and assert that the contents are what we expect.

void singleUpsert() {
      .run(context -> {
           CouchbaseConsumerProperties properties =
           String bucketName = properties.getBucketExpression().getValue(String.class);
           Cluster cluster = context.getBean(Cluster.class);
           Function<Flux<Message<?>>, Flux<MutationResult>>
                 couchbaseConsumerFunction =
                       context.getBean("couchbaseConsumerFunction", Function.class);
               Flux.just(new GenericMessage<>(new User("David", "[email protected]")))))
            .expectNextMatches(mutationResult ->

        User saved = cluster.bucket(bucketName).defaultCollection()
                                   .get("[email protected]").contentAs(User.class);

With the function implemented as previously shown, we are ecstatic to see green when we run the test in our IDE. In reality, we need a test like this to write the function in the first place. That is why we put significant thought and effort into this simple test. We also want to test applying multiple objects, and setting a custom expression for the value, and bucket. We may want to also check the Java validation annotations in our properties class.

@NotNull(message = "'keyExpression' is required")
public Expression getKeyExpression() {
    return keyExpression;

I forget, does the annotation go on the getter or the setter?, do we really need the @Validated class annotation? Let’s find out. If we forget to set couchbase.consumer.keyExpression, we should get an exception message 'keyExpression is required' somewhere in the stack trace. If not, then we did something wrong. Fortunately, spring-boot-starter-test gives us everything we need for testing, including Assertj, a fluent DSL for assertions, Mockito, and Junit 5.

void keyExpressionRequired() {
   () -> applicationContextRunner.withPropertyValues(
      "couchbase.consumer.bucket-expression='test'").run(context -> context.start()))
    .withMessageContaining("'keyExpression' is required");

By the time we are done, we will write more than twice the LOC needed to implement the function, and probably spend more than twice the time. But the effort is well worth it since it gives us proof that the function behaves as expected in common scenarios, and protection against introducing regressions when refactoring or adding new functionality. The complete tests are here. I’m happy to say that my IDE reports over 90% coverage.


This concludes Part 1 of the testing topic. In this post we explored strategies for testing functions that integrate with external resources, such as Couchbase. We also showed how useful the TestContainers library is for testing components of distributed systems, especially when using mocks, stubs, or embedded servers is impractical. Part 2 will cover unit and integration testing of function based stream applications.

Stay tuned…​

Thanks for coming! We hope you found this content helpful. We have a couple more posts until we conclude this series.

Get the Spring newsletter

Thank you!

Get ahead

VMware offers training and certification to turbo-charge your progress.

Learn more

Get support

Spring Runtime offers support and binaries for OpenJDK™, Spring, and Apache Tomcat® in one simple subscription.

Learn more

Upcoming events

Check out all the upcoming events in the Spring community.

View all