close

This Week in Spring - December 22nd, 2020

Hi, Spring fans! Welcome to another installment of This Week in Spring!

It’s December 22nd, 2020, as I write this and I can not believe we’re smack dab in the middle of the week and only have two shopping days until Christmas! I don’t know if it’s just that time of year and I’m flush with the normal amount of warm-n-fuzzies or if it’s just that, after a year like 2020, I’m very keen on savoring this precious reprieve. Either way, couldn’t be happier. I’ve got a cold! I’m sick and sneezing and my nose is running, but at least it’s not COVID-19, and at least I’ve got my family and my job, and - all things considered - I feel very, very, veery fortunate.

Read more

Spring Cloud 2020.0.0 (aka Ilford) Is Available

On behalf of the community, I am pleased to announce that the GA release of the Spring Cloud 2020.0 Release Train is available today. The release can be found in Maven Central. You can check out the 2020.0 release notes for more information.

Notable Changes in the 2020.0 Release Train

This release requires Spring Boot 2.4.1. In general, this release was to fix bugs prior to release.

See this page for a list of Known Issues.

See the wiki for a list of all breaking changes in this release train.

See all of the included issues and pull requests at the Github project.

Read more

Announcing Spring Cloud Stream Applications 2020.0.0 GA Release

We are glad to announce the GA release of the newly redesigned Spring Cloud Stream applications - 2020.0.0.

We would like to use this release announcement as an opportunity to wrap up the blog series that we started in the summer. Therefore, consider this as part 15 of the blog series. In this blog, we are going to give a rundown of all the previous episodes in the series, but first, let us go through some release details.

Release Overview

2020.0.0 GA release contains the completely revamped functional foundation for the event-streaming applications. The old structure was based on an app starter model in which the critical logic for the applications is provided as part of a starter module. The starters then form the foundation for the applications. While it worked for the previous four generations of these app starters (Avogadro, Bacon, Celsius, Darwin, and Einstein), it deemed necessary to rewrite these starters as reusable functions so that they can be used for a wide array of use cases beyond what is required in the out of the box applications. Therefore, many of the old app starters were refactored and redesigned as functions, suppliers, and consumers. For the out of the box Spring Cloud Stream binder based applications, we take these functional components and use them as the base to build them. Other custom applications, even non-streaming use cases, can be designed using these functional components as a foundation. The functions can be composed together to implement many other data integration use cases.

There is a new single mono repository that hosts all the stream application components. The source for all the currently available functions and applications can be found there. These collections comprise components that satisfy a wide spectrum of use cases such as data ingestion, ETL, machine learning, analytics, file processing, etc. among many others. Take a look at the README to get more information on what is available.

Read more

Spring Tools 4.9.0 released

Dear Spring Community,

I am happy to announce the 4.9.0 release of the Spring Tools 4 for Eclipse, Visual Studio Code, and Theia.

major changes to the Spring Tools 4 for Eclipse distribution

  • updated to Eclipse 2020-12 release (including support for Java 15)

reminder

  • the Eclipse-based distribution of the Spring Tools 4 requires a JDK11 (or newer) to run on
  • ships with an embedded JDK15 runtime, no need to install or configure a specific JDK to run the IDE on anymore

additional changes

  • (Spring Boot) new: show bean startup performance metrics in live hovers and code lenses (details in the user guide)
  • (Spring Boot) new: show basic request mapping performance metrics in live hovers and code lensses (details in the user guide)
  • (Spring Boot) new: provide content-assist for constructor-arg name in Spring XML config files (#562)
  • (Spring Boot) fixed: language-server-internal exception happening when saving a file that has a space or other special characters in its name or path
  • (Eclipse) improvement: added support for custom scripts to create docker images when deploying a boot app to docker in the boot dashboard (details in the user guide)
  • (Eclipse) fixed: enable live hover action for more docker-related nodes in the boot dashboard
  • (Concourse) fixed: navigation in pipeline files with VSCode Concourse CI extension doesn’t work everytime (#483)
Read more

This Week in Spring - December 15th, 2020

Hi, Spring fans! Welcome to another installment of This Week in Spring! Can you believe it’s already December 15th? Me either! Another few weeks and this soul annhilating year will be over with and we’ll be staring down 2021 filled with new hopes and possibilities! How are you? (Have you dipped into the eggnog yet?) I’m doing alright, thanks! I’ve been busy, as usual.

Most of that was fun stuff. But, some of that, I confess, was a mess of my own making. I spent Monday cleaning up a fire I’d set for myself. You see, I got a little sloppy with last week’s podcast episode. Last week, I was editing the audio for an episode with Spring Batch colead Mahmoud Ben Hassine and - in the middle of that - was pushing an interview I’d done with Dion Almaer out the door for publication that week. So, two episodes: one with Dion, one with Mahmoud. Dion was slated for last week. Mahmoud’s was slated for a future week. On top of that I’ve been toiling night and day it feels like on the production pipeline for my podcasts, living that kubectl apply -k $WORKING_DIR life, and I was tired of some of the issues still lingering but figured I could, basically trust the system. So I published the episode with Dion. I got everything right, except the interview was with Mahmoud. Not Dion. So it said Dion. But I’d accidentally sent out the wrong episode. Awkward. Worse, after I published that episode, figuring I had a week to make a mess before I’d next have to publish an episode, I started making a mess of the codebase. I broke stuff. Would’ve been no big deal. I would’ve sorted it out! But yesterday, Monday, I realize people are complaining that I’d bungled the audio: published the preface and description for Dion but published the audio interview with Mahmoud. So I raced to fix it, except production was down! So I spent Monday dodging between meetings and calls and trying to get the system restored. I finally got the system on its feet again late last night - both episodes with the correct titling and audio and links and photos went out and it was good. Thank you so much for your patience, community. I have egg on my face.

Read more

Testing Spring Cloud Stream Applications - Part 2

This is Part 2 of Testing Stream Applications. In Part 1 we implemented and tested the core function needed for our sample couchbase-sink application. The tests at the function level covered expected success and error scenarios and relied on Testcontainers to provision a Couchbase cluster. This post assumes you have read Part 1 and continues where it left off.

Couchbase Sink

In Part 1 we verified that the function we wrote for upserting data into Couchbase works as expected. We can now use the function, exposed as a java.util.Consumer, to implement a sink to use in a data pipeline built with Spring Cloud Stream. Like most of the pre-packaged stream applications, we simply embed the function configuration into a Spring Boot application. Unlike the pre-packaged applications which generate identical applications configured for Kafka and Rabbit, we will roll our own to use the Kafka binder.

Here’s the main application class:

@SpringBootApplication
@Import(CouchbaseConsumerConfiguration.class)
public class CouchbaseSinkApplication {
	public static void main(String... args) {
		new SpringApplication(CouchbaseSinkApplication.class).run(args);
	}
}

We also need to add some dependencies: The function, Spring Cloud Stream, and the Kafka binder.

<dependency>
        <groupId>io.spring.example</groupId>
        <artifactId>couchbase-consumer</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>

And, since we are rolling our own, we can set some required properties in application.properties. Since couchbase-consumer includes 2 candidate functions, we need to tell Spring Cloud Stream to use the Consumer wrapper. Also, we alias the default consumer input binding name couchbaseConsumer-in-0 to input so the sink to work with Spring Cloud Data Flow.

spring.cloud.function.definition=couchbaseConsumer
spring.cloud.stream.function.bindings.couchbaseConsumer-in-0=input

That’s literally it! At least we think so. How can we be sure? The kind of tests we need, not surprisingly, are similar to the function level tests. But we don’t really need to run every test case, since we already know how the function will behave within a boot application with various property settings. But we haven’t actually invoked the function via Spring Cloud Stream yet. Also, it doesn’t cost so much since we can reuse much of the test code we wrote for the function. So we only need a "smoke test" to run the happy path to make sure we didn’t leave out some required dependency, or that there are typos in our configuration properties, are that there are no gotchas now, or whenever we upgrade some dependency down the road. Here we configure a Couchbase TestContainer, as we did to test the function. But instead of invoking the function directly, we will let Spring Cloud Stream do it when we send a message to an input destination configured for the sink. For this test, we use the TestChannelBinder, an in-memory binder provided by the following dependency:

<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <type>test-jar</type>
        <classifier>test-binder</classifier>
        <scope>test</scope>
</dependency>

We use TestChannelBinderConfiguration.getCompleteConfiguration(CouchbaseSinkApplication.class) to add the TestChannelBinder to our application context for our test. This gives us an InputDestination bean to send messages to the sink. As in the function test, we use the Cluster object to verify the data is present in Couchbase. Since the upsert operation is asynchronous, we need to poll the data store for some time until the data is there. The awaitility library is great for testing asynchronous systems. In this case, we’ll give it 10 seconds before we assume the operation has failed.

@Testcontainers
public class CouchbaseSinkApplicationTests {
  @Container
  static CouchbaseContainer container =
          new CouchbaseContainer("couchbase/server:6.6.0")
             .withBucket(new BucketDefinition("test"));

  static Map<String, Object> connectProperties = new HashMap<>();

  @BeforeAll
  static void initialize() {
    connectProperties.put("spring.couchbase.connection-string", container.getConnectionString());
    connectProperties.put("spring.couchbase.username", container.getUsername());
    connectProperties.put("spring.couchbase.password", container.getPassword());
  }

  @Test
  void test() {
    try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
      TestChannelBinderConfiguration
        .getCompleteConfiguration(CouchbaseSinkApplication.class))
	.web(WebApplicationType.NONE)
        .properties(connectProperties)
        .run("--couchbase.consumer.bucketExpression='test'",
               "--couchbase.consumer.keyExpression=payload.email")) {
        InputDestination inputDestination = context.getBean(InputDestination.class);
        Cluster cluster = context.getBean(Cluster.class);
        inputDestination.send(new GenericMessage<>(
           new User("Bart Simpson", "[email protected]")));

       await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
          User user = cluster.bucket("test")
                .defaultCollection().get("[email protected]")
                .contentAs(User.class);
         assertThat(user).isNotNull();
         assertThat(user.getName()).isEqualTo("Bart Simpson");
       });
     }
  }
}

Integration Testing

At this point, we have good test coverage between the application and function tests. But we have not yet verified that the application binary that we want to build and deploy works in a true integration environment. Since the sink application uses the Kafka binder, the integration test environment requires a Kafka broker, a Couchbase cluster, and our deployed application. We can deploy and run the Spring Boot executable jar directly. More often these days, it is a container image.

Generally, it is not too risky to assume that the sink built as a container will work, but we at least want to make sure that we know how to configure the application to use an external Kafka broker and Couchbase cluster, and that we built our image correctly.

For the pre-built Spring Cloud Stream applications, we have further reason to test the built artifacts. The core applications do not provide any additional code. Instead, we use the spring-cloud-dataflow-apps-generator-plugin to automatically generate identical applications that can run with either Kafka or RabbitMQ. The plugin requires Maven configuration which we manually add for each application. Just because our function works with the TestChannelBinder, we can’t be sure that the built artifact works until we run it. Misconfiguring the apps generator plugin, changes to the plugin itself, or the base image, or any dependencies, may break something. Testcontainers and Junit 5 give us a relatively straightforward way to integration test the pre-built applications with both Kafka and RabbitMQ. To help us write integration tests, we provide additional support in stream-applications-test-suport. This library is available to the community by adding the dependency:

<dependency>
    <groupId>org.springframework.cloud.stream.app</groupId>
    <artifactId>stream-applications-test-support</artifactId>
    <scope>test</scope>
</dependency>

The sample includes an integration test to test the built image, in this case built with the Spring Boot Maven plugin. Like the application test, we will just plug in Kafka, Couchbase, and our image, turn on the power, and make sure we don’t see or smell any smoke.

The complete integration test is:

@KafkaStreamAppTest
@Tag("integration")
public class CouchbaseSinkIntegrationTests {

  static StreamAppContainer sink =
        new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");

  @Container
  static CouchbaseContainer container =
      new CouchbaseContainer("couchbase/server:6.6.0")
          .withNetwork(KafkaConfig.kafka.getNetwork())
          .withNetworkAliases("couchbase-server")
          .withBucket(new BucketDefinition("test"));

  static Cluster cluster;

  @Autowired
  TestTopicSender testTopicSender;

  @BeforeAll
  static void initialize() {
    await().until(() -> container.isRunning());
    String connectionString = "couchbase://couchbase-server";
    sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
          .withLogConsumer(appLog("couchbase-sink"))
          .withCommand(
            "--spring.couchbase.connection-string=couchbase://couchbase-server",
            "--spring.couchbase.username=" + container.getUsername(),
            "--spring.couchbase.password=" + container.getPassword(),
            "--couchbase.consumer.bucket-expression='test'",
            "--couchbase.consumer.key-expression=payload.email")
          .start();

    cluster = Cluster.connect(container.getConnectionString(),
    ClusterOptions.clusterOptions(container.getUsername(), container.getPassword()));
  }
  @AfterAll
  static void stop() {
    sink.stop();
  }

  @Test
  void test() throws JsonProcessingException {
    ObjectMapper objectMapper = new ObjectMapper();
    testTopicSender.send(sink.getInputDestination(),
    objectMapper.writeValueAsString(
       new User("Bart Simpson", "[email protected]")));

    await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
       ExistsResult result = cluster.bucket("test")
         .defaultCollection().exists("[email protected]");
      assertThat(result.exists()).isTrue();
    });

    User user = objectMapper.readValue(
    cluster.bucket("test").defaultCollection().get("[email protected]")
   .contentAs(String.class), User.class);

    assertThat(user.getName()).isEqualTo("Bart Simpson");
  }
}

To unpack this, let’s start with the @KafkaStreamAppTest class annotation. This starts a Kafka test container, and configures Kafka components, using Spring for Apache Kafka, that we can use to produce and consume messages with Kafka. The Kafka container is started in a static initializer, which makes it a true singleton, allowing every test that runs in a JVM to use it. In addition to Spring configuration, the annotation includes @TestContainers as a meta annotation. For this test, we do not let Testcontainers manage the lifecycle of the StreamAppContainer, since we want to start it after we know the Couchbase cluster is running. The Couchbase container has some additional configuration. For convenience, it shares a virtual network with the StreamAppContainer (automatically configured to use the same network as the Kafka container). This allows the Stream App Container to connect to the Couchbase server using an alias of our choosing, couchbase-server (remember, localhost inside a container refers to its own IP address).

@Container
static CouchbaseContainer container = new CouchbaseContainer("couchbase/server:6.6.0")
        .withNetwork(KafkaConfig.kafka.getNetwork())
        .withNetworkAliases("couchbase-server")
        .withBucket(new BucketDefinition("test"));

The StreamAppContainer is a GenericContainer with the required configuration to connect to Kafka and use the Kafka binder. The Spring Configuration also sets up a listener on a known topic to consume any output from the container. This is not used in this case, since we only have an input for the sink. The input destination is randomly generated and accessed via getInputDestination().

static StreamAppContainer sink = new KafkaStreamAppContainer("couchbase-sink:0.0.1-SNAPSHOT");
...

@BeforeAll
static void initialize() {
    await().until(() -> container.isRunning());
    String connectionString = "couchbase://couchbase-server";
    sink.waitingFor(Wait.forLogMessage(".*Started CouchbaseSink.*", 1))
            .withLogConsumer(appLog("couchbase-sink"))
            .withCommand(
                    "--spring.couchbase.connection-string=couchbase://couchbase-server",
                    "--spring.couchbase.username=" + container.getUsername(),
                    "--spring.couchbase.password=" + container.getPassword(),
                    "--couchbase.consumer.bucket-expression='test'",
                    "--couchbase.consumer.key-expression=payload.email")
            .start();

Once the Couchbase container is running, we will start the sink. We wait for the standard Spring Boot start up message to confirm the sink has started. We also add a LogConsumer to output all the log messages in case there is an error. Note the connection string is simply using the Couchbase container’s network alias. This is possible because the sink and Couchbase are using the same virtual network. Here, we pass all properties on the command line, but we could just as well set them as environment variables, via withEnvironment(). Since we control the sink lifecycle, we need to stop it after all the tests are complete.

The test uses an autowired TestTopicSender. This is a middleware agnostic interface, backed by KafkaTemplate in this case. This interface is useful for run the same test cases for Kafka and Rabbit. Here, we could just as well autowire the KafkaTemplate. At the time of this writing, only the String serdes are configured for the Kafka template, so we use an ObjectMapper to work with Strings.

@Test
  void test() throws JsonProcessingException {
    ObjectMapper objectMapper = new ObjectMapper();
    testTopicSender.send(sink.getInputDestination(),
    objectMapper.writeValueAsString(
       new User("Bart Simpson", "[email protected]")));

    await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
       ExistsResult result = cluster.bucket("test")
         .defaultCollection().exists("[email protected]");
      assertThat(result.exists()).isTrue();
    });

    User user = objectMapper.readValue(
    cluster.bucket("test").defaultCollection().get("[email protected]")
   .contentAs(String.class), User.class);

    assertThat(user.getName()).isEqualTo("Bart Simpson");
  }

Since this tests requires the sink image, we use the Junit 5 @Tag annotation to mark it as an integration test. We also configured Maven to exclude this from the normal build, and only build the image and run it when the integration profile is set. The complete source code is here and requires Java 8+ and Docker.

Read more

Testing Spring Cloud Stream Applications - Part 1

This post is part of a blog series that explores the newly redesigned Spring Cloud Stream applications based on Java Functions. This episode, presented in two parts, explores strategies for testing functions used to implement stream applications. We will pay special attention to functions that integrate with external resources, which presents additional testing challenges. Such is the case with most of the pre-packaged source and sink applications. To illustrate this, we will walk through a sample couchbase-sink application. Here in Part 1, we will focus on the core function on which the sink is based. In Part 2, we will look at writing tests for for the application.

Read more

Spring Cloud 2020.0.0-RC1 (aka Ilford) Is Available

On behalf of the community, I am pleased to announce that Release Candidate 1 (RC1) of the Spring Cloud 2020 Release Train is available today. You can find the release in the Spring Milestone repository. See the 2020 release notes for more information.

Notable Changes in the 2020 Release Train

This release requires Spring Boot 2.4.0.

See the wiki for a list of all breaking changes in this release train.

See all of the included issues and pull requests at the Github project.

Spring Cloud Contract

The Gradle plugin creates a separate classpath when executing tasks.

Read more