Engineering
Releases
News and Events

Notes on Reactive Programming Part II: Writing Some Code

In this article we continue the series on Reactive Programming, and we concentrate on explaining some concepts through actual code samples. The end result should be that you understand a bit better what makes Reactive different, and what makes it functional. The examples here are quite abstract, but they give you a way to think about the APIs and the programming style, and start to get a feel for how it is different. We will see the elements of Reactive, and learn how to control the flow of data, and process in background threads if necessary.

Setting Up a Project

We will use the Reactor libraries to illustrate the points we need to make. The code could just as easily be written with other tools. If you want to play with the code and see it working without having to copy-paste anything, there are working samples with tests in Github.

To get started grab a blank project from https://start.spring.io and add the Reactor Core dependency. With Maven

		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.0.0.RC2</version>
		</dependency>

With Gradle it’s very similar:

    compile 'io.projectreactor:reactor-core:3.0.0.RC2'

Now let’s write some code.

What Makes it Functional?

The basic building block of Reactive is a sequence of events, and two protagonists, a publisher and a subscriber to those events. It’s also OK to call a sequence a "stream" because that’s what it is. If we need to, we will use the word "stream" with a small "s", but Java 8 has a java.util.Stream which is different, so try not to get confused. We will try to concentrate the narrative on the publisher and subscriber anyway (that’s what Reactive Streams does).

Reactor is the library we are going to use in samples, so we’ll stick to the notation there, and call the publisher a Flux (it implements the interface Publisher from Reactive Streams). The RxJava library is very similar and has a lot of parallel features, so in that case we would be talking about an Observable instead, but the code would be very similar. (Reactor 2.0 called it a Stream which is confusing if we need to talk about Java 8 Streams as well, so we’ll only use the new code in Reactor 3.0.)

Generators

A Flux is a publisher of a sequence of events of a specific POJO type, so it is generic, i.e. Flux<T> is a publisher of T. Flux has some static convenience methods to create instances of itself from a variety of sources. For example, to create a Flux from an array:

Flux<String> flux = Flux.just("red", "white", "blue");

We just generated a Flux, and now we can do stuff with it. There are actually only two things you can do with it: operate on it (transform it, or combine it with other sequences), subscribe to it (it’s a publisher).

Single Valued Sequences

Often you encounter a sequence that you know has only one or zero elements, for example a repository method that finds an entity by its id. Reactor has a Mono type representing a single valued or empty Flux. Mono has a very similar API to Flux but more focused because not all operators make sense for single-valued sequences. RxJava also has a bolt on (in version 1.x) called Single, and also Completable for an empty sequence. The empty sequence in Reactor is Mono<Void>.

Operators

There are a lot of methods on a Flux and nearly all of them are operators. We aren’t going to look at them all here because there are better places to look for that (like the Javadocs). We only need to get a flavour for what an operator is, and what it can do for you.

For instance, to ask for the internal events inside a Flux to be logged to standard out, you can call the .log() method. Or you can transform it using a map():

Flux<String> flux = Flux.just("red", "white", "blue");

Flux<String> upper = flux
  .log()
  .map(String::toUpperCase);

In this code we transformed the strings in the input by converting them to upper case. So far, so trivial.

What’s interesting about this little sample — mind blowing, even, if you’re not used to it — is that no data have been processed yet. Nothing has even been logged because literally, nothing happened (try it and you will see). Calling operators on a Flux amounts to building a plan of execution for later. It is completely declarative, and it’s why people call it "functional". The logic implemented in the operators is only executed when data starts to flow, and that doesn’t happen until someone subscribes to the Flux (or equivalently to the Publisher).

The same declarative, functional approach to processing a sequence of data exists in all Reactive libraries, and also in Java 8 Streams. Consider this, similar looking code, using a Stream with the same contents as the Flux:

Stream<String> stream = Streams.of("red", "white", "blue");
Stream<String> upper = stream.map(value -> {
    System.out.println(value);
    return value.toUpperCase();
});

The observation we made about Flux applies here: no data is processed, it’s just a plan of execution. There are, however, some important differences between Flux and Stream, which make Stream an inappropriate API for Reactive use cases. Flux has a lot more operators, much of which is just convenience, but the real difference comes when you want to consume the data, so that’s what we need to look at next.

Tip
There is a useful blog by Sebastien Deleuze on Reactive Types, where he describes the differences between the various streaming and reactive APIs by looking at the types they define, and how you would use them. The differences between Flux and Stream are highlighted there in more detail.

Subscribers

To make the data flow you have to subscribe to the Flux using one of the subscribe() methods. Only those methods make the data flow. They reach back through the chain of operators you declared on your sequence (if any) and request the publisher to start creating data. In the sample samples we have been working with, this means the underlying collection of strings is iterated. In more complicated use case it might trigger a file to be read from the filesystem, or a pull from a database or a call to an HTTP service.

Here’s a call to subscribe() in action:

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe();

The output is:

09:17:59.665 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe([email protected])
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

So we can see from this that the effect of subscribe() without an argument, is to request the publisher to send all data — there’s only one request() logged and it’s "unbounded". We can also see callbacks for each item that is published (onNext()), for the end of the sequence (onComplete()), and for the original subscription (onSubscribe()). If you needed to you could listen for those events yourself using the doOn*() methods in Flux, which are themselves operators, not subscribers, so they don’t cause any data to flow on their own.

The subscribe() method is overloaded, and the other variants give you different options to control what happens. One important and convenient form is subscribe() with callbacks as arguments. The first argument is a Consumer, which gives you a callback with each of the items, and you can also optionally add a Consumer for an error if there is one, and a vanilla Runnable to execute when the sequence is complete. For example, just with the per-item callback:

Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
.subscribe(System.out::println);

Here’s the output:

09:56:12.680 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe([email protected])
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
RED
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
WHITE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
BLUE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

We could control the flow of data, and make it "bounded", in a variety of ways. The raw API for control is the Subscription you get from a Subscriber. The equivalent long form of the short call to subscribe() above is:

.subscribe(new Subscriber<String>() {

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(String t) {
        System.out.println(t);
    }
    @Override
    public void onError(Throwable t) {
    }
    @Override
    public void onComplete() {
    }

});

To control the flow, e.g. to consume at most 2 items at a time, you could use the Subscription more intelligently:

.subscribe(new Subscriber<String>() {

    private long count = 0;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(2);
    }

    @Override
    public void onNext(String t) {
        count++;
        if (count>=2) {
            count = 0;
            subscription.request(2);
        }
     }
...

This Subscriber is "batching" items 2 at a time. It’s a common use case so you might think about extracting the implementation to a convenience class, and that would make the code more readable too. The output looks like this:

09:47:13.562 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe([email protected])
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

In fact the batching subscriber is such a common use case that there are convenience methods already available in Flux. The batching example above can be implemented like this:

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe(null, 2);

(note the call to subscribe() with a request limit). Here’s the output:

10:25:43.739 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe([email protected])
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onComplete()
Tip
A library that will process sequences for you, like Spring Reactive Web, can handle the subscriptions. It’s good to be able to push these concerns down the stack because it saves you from cluttering your code with non-business logic, making it more readable and easier to test and maintain. So as a rule, it is a good thing if you can avoid subscribing to a sequence, or at least push that code into a processing layer, and out of the business logic.

Threads, Schedulers and Background Processing

An interesting feature of all the logs above is that they are all on the "main" thread, which is the thread of the caller to subscribe(). This highlights an important point: Reactor is extremely frugal with threads, because that gives you the greatest chance of the best possible performance. That might be a surprising statement if you’ve been wrangling threads and thread pools and asynchronous executions for the last 5 years, trying to squeeze more juice out of your services. But it’s true: in the absence of any imperative to switch threads, even if the JVM is optimized to handle threads very efficiently, it is always faster to do computation on a single thread. Reactor has handed you the keys to control all the asynchronous processing, and it assumes you know what you are doing.

Flux provides a few configurer methods that control the thread boundaries. For example, you can configure the subscriptions to be handled in a background thread using Flux.subscribeOn():

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.parallel())
.subscribe(null, 2);

the result can be seen in the output:

13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe([email protected])
13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
Tip
if you write this code yourself, or copy-paste it, remember to wait for the processing to stop before the JVM exits.

Note that the subscription, and all the processing, takes place on a single background thread "parallel-1-1" — this is because we asked for the subscriber to our main Flux to be in the background. This is fine if the item processing is CPU intensive (but pointless being in a background thread, in point of fact, since you pay for the context switch but don’t get the results any faster). You might also want to be able to perform item processing that is I/O intensive and possibly blocking. In this case, you would want to get it done as quickly as possible without blocking the caller. A thread pool is still your friend, and that’s what you get from Schedulers.parallel(). To switch the processing of the individual items to separate threads (up to the limit of the pool) we need to break them out into separate publishers, and for each of those publishers ask for the result in a background thread. One way to do this is with an operator called flatMap(), which maps the items to a Publisher (potentially of a different type), and then back to a sequence of the new type:

Flux.just("red", "white", "blue")
  .log()
  .flatMap(value ->
     Mono.just(value.toUpperCase())
       .subscribeOn(Schedulers.parallel()),
     2)
.subscribe(value -> {
  log.info("Consumed: " + value);
})

Note here the use of flatMap() to push the items down into a "child" publisher, where we can control the subscription per item instead of for the whole sequence. Reactor has built in default behaviour to hang onto a single thread as long as possible, so we need to be explicit if we want it to process specific items or groups of items in a background thread. Actually, this is one of a handful of recognized tricks for forcing parallel processing (see the Reactive Gems issue for more detail).

The output looks like this:

15:24:36.596 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe([email protected])
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  request(2)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:24:36.613 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(1)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE

Notice that there are now multiple threads consuming the items, and the concurrency hint in the flatMap() ensures that there are 2 items being processed at any given time, as long as they are available. We see request(1) a lot because the system is trying to keep 2 items in the pipeline, and generally they don’t finish processing at the same time. Reactor tries to be very smart here in fact, and it pre-fetches items from the upstream Publisher to try to eliminate waiting time for the subscriber (we aren’t seeing that here because the numbers are low — we are only processing 3 items).

Tip
Three items ("red", "white", "blue") might be too few to convincingly see more than one background thread, so it might be better to generate more data. You could do that with a random number generator, for instance.

Flux also has a publishOn() method which is the same, but for the listeners (i.e. onNext() or consumer callbacks) instead of for the subscriber itself:

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.newParallel("sub"))
  .publishOn(Schedulers.newParallel("pub"), 2)
.subscribe(value -> {
    log.info("Consumed: " + value);
});

The output looks like this:

15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe([email protected])
15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE

Notice that the consumer callbacks (logging "Consumed: …​") are on the publisher thread pub-1-1. If you take out the subscribeOn() call, you might see all of the 2nd chunk of data processed on the pub-1-1 thread as well. This, again, is Reactor being frugal with threads — if there’s no explicit request to switch threads it stays on the same one for the next call, whatever that is.

Note
We changed the code in this sample from subscribe(null, 2) to adding a prefetch=2 to the publishOn(). In this case the fetch size hint in subscribe() would have been ignored.

Extractors: The Subscribers from the Dark Side

There is another way to subscribe to a sequence, which is to call Mono.block() or Mono.toFuture() or Flux.toStream() (these are the "extractor" methods — they get you out of the Reactive types into a less flexible, blocking abstraction). Flux also has converters collectList() and collectMap() that convert from Flux to Mono. They don’t actually subscribe to the sequence, but they do throw away any control you might have had over the suscription at the level of the individual items.

Warning
A good rule of thumb is "never call an extractor". There are some exceptions (otherwise the methods would not exist). One notable exception is in tests because it’s useful to be able to block to allow results to accumulate.

These methods are there as an escape hatch to bridge from Reactive to blocking; if you need to adapt to a legacy API, for instance Spring MVC. When you call Mono.block() you throw away all the benefits of the Reactive Streams. This is the key difference between Reactive Streams and Java 8 Streams — the native Java Stream only has the "all or nothing" subscription model, the equivalent of Mono.block(). Of course subscribe() can block the calling thread as well, so it’s just as dangerous as the converter methods, but you have more control — you can prevent it from blocking by using subscribeOn() and you can drip the items through by applying back pressure and periodically deciding whether to continue.

Conclusion

In this article we have covered the basics of the Reactive Streams and Reactor APIs. If you need to know more there are plenty of places to look, but there’s no substitute for hands on coding, so use the code in GitHub (for this article in tests in the project called "flux"), or head over to the Lite RX Hands On workshop. So far, really this is just overhead, and we haven’t learned much that we couldn’t have done in a more obvious way using non-Reactive tools. The next article in the series will dig a little deeper into the blocking, dispatching and asynchronous sides of the Reactive model, and show you what opportunities there are to reap the real benefits.

comments powered by Disqus