Notes on Reactive Programming Part III: A Simple HTTP Server Application

Engineering | Dave Syer | July 20, 2016 | ...

In this article we continue the series on Reactive Programming, and the focus is less on learning the basic APIs and more on more concrete use cases and writing code that actually does something useful. We will see how Reactive is a useful abstraction for concurrent programming, but also that it has some very low level features that we should learn to treat with respect and caution. If we start to use these features to their full potential we can take control of layers in our application that previously were invisible, hidden by containers, platforms and frameworks.

Bridging from Blocking to Reactive with Spring MVC

Being Reactive forces you to look at the world differently. Instead of asking for something and getting it (or not getting it), everything is delivered as a sequence (Publisher) and you have to subscribe to it. Instead of waiting for an answer, you have to register a callback. It’s not so hard when you get used to it, but unless the whole world turns on its head and becomes Reactive, you are going to find you need to interact with an old-style blocking API

Suppose we have a blocking method that returns an HttpStatus:

private RestTemplate restTemplate = new RestTemplate();

private HttpStatus block(int value) {
    return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
            .getStatusCode();
}

and we want to call it repeatedly with different arguments and aggregate the results. It’s a classic "scatter-gather" use case, which you would get, for instance, if you had a paginated back end needed to summarize the "top N" items across multiple pages. Since the details of the non-reactive (blocking) operation are not relevant to the scatter-gather pattern, we can push them down into a method called block(), and implement it later. Here’s a (bad) example that calls the back end and aggregates into an object of type Result:

Flux.range(1, 10) // (1)
    .log()
    .map(this::block) // (2)
    .collect(Result::new, Result::add) // (3)
    .doOnSuccess(Result::stop) // (4)
  1. make 10 calls

  2. blocking code here

  3. collect results and aggregate into a single object

  4. at the end stop the clock (the result is a Mono<Result>)

Don’t do this at home. It’s a "bad" example because, while the APIs are technically being used correctly, we know that it is going to block the calling thread; this code is more or less equivalent to a for loop with the call to block() in the body of the loop. A better implementation would push the call to block() onto a background thread. We can do that by wrapping it in a method that returns a Mono<HttpStatus>:

private Mono<HttpStatus> fetch(int value) {
    return Mono.fromCallable(() -> block(value)) // (1)
        .subscribeOn(this.scheduler);            // (2)
}
  1. blocking code here inside a Callable to defer execution

  2. subscribe to the slow publisher on a background thread

The scheduler is declared separately as a shared field: Scheduler scheduler = Schedulers.parallel(). Then we can declare that we want to flatMap() the sequence instead of using map():

Flux.range(1, 10)
    .log()
    .flatMap(                             // (1)
        this::fetch, 4)                   // (2)
    .collect(Result::new, Result::add)
    .doOnSuccess(Result::stop)
  1. drop down to a new publisher to process in parallel

  2. concurrency hint in flatMap

Embedding in a Non-Reactive Application

If we wanted to run the scatter-gather code above in a non-reactive environment, we could use Spring MVC like this:

@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
    return Flux.range(1, 10)
      ...
      .doOnSuccess(Result::stop)
      .toFuture();
}

If you read the Javadocs for @RequestMapping you will find that a method can return a CompletableFuture "which the application uses to produce a return value in a separate thread of its own choosing". The separate thread in this case is provided by "scheduler", which is a thread pool, so the processing is happening on multiple threads, 4 at a time because of the way that flatMap() is called.

No Such Thing as a Free Lunch

The scatter-gather with a background thread is a useful pattern but it isn’t perfect — it’s not blocking the caller, but it’s blocking something, so it’s just moving the problem around. There are some practical implications. We have an HTTP server with (probably) non-blocking IO handlers, passing work back to a thread pool, one HTTP request per thread — all of this is happening inside a servlet container (e.g. Tomcat). The request is processed asynchronously, so the worker thread in Tomcat isn’t blocked, and the thread pool that we created in our "scheduler" is processing on up to 4 concurrent threads. We are processing 10 back end requests (calls to block()) so there is a maximum, theoretical benefit of using the scheduler of 4 times lower latency. In other words, if processing all 10 requests one after the other in a single thread takes 1000ms, we might see a processing time of 250ms for a single incoming request at our HTTP service. We should emphasise the "might" though: it’s only going to go that fast if there is no contention for the processing threads (in both stages, the Tomcat workers, and the application scheduler). If you have a server with a large number of cores, very low concurrency, i.e. a small number of clients connecting to your application, and hardly any chance that two will make a request at the same time, then you will probably see close to the theoretical improvement. As soon as there are multiple clients trying to connect, they will all be competing for the same 4 threads, and the latency will drift up, and could even be worse than that experienced by a single client with no background processing. We can improve the latency for concurrent clients by creating the scheduler with a larger thread pool, e.g.

    private Scheduler scheduler = Schedulers.newParallel("sub", 16);

(16 threads.) Now we are using more memory for the threads and their stacks, and we can expect to see lower latency for low concurrency, but not necessarily for high concurrency if our hardware has fewer than 16 cores. We also do not expect higher throughput under load: if there is contention for the threads, there is a high cost for managing those resources and that has to be reflected somwehere in a metric that matters. If you are interested in more detailed analysis of that kind of trade off, some detailed analyses of how performance metrics scale under load can be found in a blog series by Rob Harrop.

Tip

Tomcat allocates 100 threads for processing HTTP requests by default. That is excessive if we know all the processing is going to be on our scheduler thread pool. There is an impedance mismatch: the scheduler thread pool can be a bottleneck because it has fewer threads than the upstream Tomcat thread pool. This highlights the fact that performance tuning can be very hard, and, while you might have control of all the configuration, it’s a delicate balance.

We can do better than a fixed thread pool if we use a scheduler that adjusts its capacity according to demand. Reactor has a convenience for that, so if you try the same code with Schedulers.elastic() (you can call it anywhere; there is only one instance), you will see that under load more threads are created.

Reactive all the Way Down

The bridge from blocking to reactive is a useful pattern, and is easy to implement using available technology in Spring MVC (as shown above). The next stage in the Reactive journey is to break out of blocking in application threads completely, and to do that requires new APIs and new tools. Ultimately we have to be Reactive all the way down the stack, including servers and clients. This is the goal of Spring Reactive, which is a new framework, orthogonal to Spring MVC, but meeting the same needs, and using a similar programming model.

Note

Spring Reactive started as a standalone project, but is folded into the Spring Framework in version 5.0 (first milestone June 2016).

The first step to fully Reactive in our scatter-gather example would be to replace spring-boot-starter-web with spring-boot-starter-webflux on the classpath. In Maven:

<dependencies>
  <dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-webflux</artifactId>
  </dependency>

or in Gradle:

dependencies {
	compile('org.springframework.boot:spring-boot-starter-webflux')
    ...
}

Then in the controller, we can simply lose the bridge to CompletableFuture and return an object of type Mono:

@RequestMapping("/parallel")
public Mono<Result> parallel() {
    return Flux.range(1, 10)
            .log()
            .flatMap(this::fetch, 4)
            .collect(Result::new, Result::add)
            .doOnSuccess(Result::stop);
}

Take this code and put it in a Spring Boot application and it will run in Tomcat, Jetty or Netty, depending on what it finds on the classpath. Tomcat is the default server in that starter, so you have to exclude it and provide a different one if you want to switch. All three have very similar characteristics in terms of startup time, memory usage and runtime resource usage.

We still have the blocking backend call in block(), so we still have to subscribe on a thread pool to avoid blocking the caller. We can change that if we have a non-blocking client, e.g. instead of using RestTemplate we use the new WebClient then we might do this instead to use a non-blocking client:

private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());

private Mono<HttpStatus> fetch(int value) {
    return this.client.perform(HttpRequestBuilders.get("http://example.com"))
            .extract(WebResponseExtractors.response(String.class))
            .map(response -> response.getStatusCode());
}

Note that the WebClient.perform() (or the WebResponseExtractor to be precise) has a Reactive return type, which we have transformed into a Mono<HttpStatus>, but we have not subscribed to it. We want the framework to do all the subscribing, so now we are Reactive all the way down.

Warning

Methods in Spring Reactive that return a Publisher are non-blocking, but in general a method that returns a Publisher (or Flux, Mono or Observable) is only a hint that it might be non-blocking. If you are writing such methods it is important to analyse (and preferably test) whether they block, and to let callers know explicitly if they might do.

Note

The trick we played just now of using a non-blocking client to simplify the HTTP stack works in regular Spring MVC as well. The result of the fetch() method above can be converted to a CompletableFuture and passed out of a regular @RequestMapping method (in Spring Boot 1.3 for instance).

Inversion of Control

Now we can remove the concurrency hint after the call to fetch() in the HTTP request handler:

@RequestMapping("/netty")
public Mono<Result> netty() {
    return Flux.range(1, 10) // (1)
        .log() //
        .flatMap(this::fetch) // (2)
        .collect(Result::new, Result::add)
        .doOnSuccess(Result::stop);
}
  1. make 10 calls

  2. drop down to a new publisher to process in parallel

Taking into account that we don’t need the extra callable and subscriber thread at all, this code is a lot cleaner than when we had to bridge to the blocking client, which can be attributed to the fact that the code is Reactive all the way down. The Reactive WebClient returns a Mono, and that drives us immediately to select flatMap() in the transformation chain, and the code we need just falls out. It’s a nicer experience to write it, and it’s more readable, so it’s easier to maintain. Also, since there is no thread pooling and no concurrency hint, there is no magic factor of 4 to plug into our performance expectations. There is a limit somewhere, but it’s not imposed by our choices in the application tier any more, nor is it limited by anything in the server "container". It’s not magic, and there are still laws of physics, so the backend calls are all still going to take 100ms or so each, but with low contention we might even see all 10 requests complete in roughly the same time it takes for one. As the load on the server increases latency and throughput will naturally degrade, but in a way that is governed by buffer contention and kernel networking, not by application thread management. It’s an inversion of control, to lower levels of the stack below the application code.

Remember the same application code runs on Tomcat, Jetty or Netty. Currently, the Tomcat and Jetty support is provided on top of Servlet 3.1 asynchronous processing, so it is no longer limited to one request per thread. It is build on top of a Reactive bridge that adapts the Servlet 3.1 concepts to the reactive paradigm. In the case of Reactor Netty, the backpressure and reactive support is built-in. Depending on your choice of HTTP client library, server and client might share the same HTTP resources and optimize things even more. We will come back to that later in another article in this series.

Tip

in the sample code the "reactive" sample has Maven profiles "tomcat", "tomcatNext" (for Tomcat 8.5), "jetty" and "netty", so you can easily try out all the different server options without changing a line of code.

Note

the blocking code in many applications is not HTTP backend calls, but database interactions. Very few databases support non-blocking clients at this point in time (MongoDB and Couchbase are notable exceptions, but even those are not as mature as the HTTP clients). Thread pools and the blocking-to-reactive pattern will have a long life until all the database vendors catch up on the client side.

Still No Free Lunch

We have whittled down our basic scatter-gather use case until the code is very clean, and very sympathetic to the hardware it runs on. We wrote some simple code and it was stacked up and orchestrated very nicely into a working HTTP service using Spring. On a sunny day everyone is more than happy with the outcome. But as soon as there are errors, e.g. a badly behaved network connection, or a back end service that suffers from poor latency, we are going to suffer.

The first, most obvious way to suffer is that the code we wrote is declarative, so it’s hard to debug. When errors occur the diagnostics can be very opaque. Using the raw, low-level APIs, like Reactor without Spring, or even down to the level of Netty without Reactor would probably make it even worse, because then we would have to build a lot of error handling ourselves, repeating the boiler plate every time we interact with the network. At least with Spring and Reactor in the mix we can expect to see stack traces logged for stray, uncaught exceptions. They might not be easy to understand though because they happen on threads that we don’t control, and they sometimes show up as quite low level concerns, from unfamiliar parts of the stack.

Another source of pain is that if we ever make a mistake and block in one of our Reactive callbacks, we will be holding up all requests on the same thread. With the servlet-based containers every request is isolated to a thread, and blocking doesn’t hold up other requests because they are be processed on different threads. Blocking all requests is still a recipe for trouble, but it only shows up as increased latency with roughly a constant factor per request. In the Reactive world, blocking a single request can lead to increased latency for all requests, and blocking all requests can bring a server to its knees because the extra layers of buffers and threads are not there to take up the slack.

Conclusion

It’s nice to be able to control all the moving parts in our asynchronous processing: every layer has a thread pool size and a queue. We can make some of those layers elastic, and try and adjust them according to how much work they do. But at some point it becomes a burden, and we start looking for something simpler, or leaner. Analysis of scalability leads to the conclusion that it is often better to shed the extra threads, and work with the constraints imposed by the physical hardware. This is an example of "mechanical sympathy", as is famously exploited by LMAX to great effect in the Disruptor Pattern.

We have begun to see the power of the Reactive approach, but remember that with power comes responsibility. It’s radical, and it’s fundamental. It’s "rip it up and start again" territory. So you will also hopefully appreciate that Reactive isn’t a solution to all problems. In fact it isn’t a solution to any problem, it merely facilitates the solution of a certain class of problems. The benefits you get from using it might be outweighed by the costs of learning it, modifying your APIs to be Reactive all the way down, and maintaining the code afterwards, so tread carefully.

Get the Spring newsletter

Thank you for your interest. Someone will get back to you shortly.

Get ahead

VMware offers training and certification to turbo-charge your progress.

Learn more

Get support

Tanzu Spring Runtime offers support and binaries for OpenJDK™, Spring, and Apache Tomcat® in one simple subscription.

Learn more

Upcoming events

Check out all the upcoming events in the Spring community.

View all