This blog post is the first in a series of posts that aim at providing a deeper look into Reactor’s more advanced concepts and inner workings.
It is derived from my
Flight of the Flux talk, which content I found to be more adapted to a blog post format.
I’ll update the table below with links when the other posts are published, but here is the planned content:
- Assembly vs Subscription (this post)
- Debugging caveats
- Hopping Threads and Schedulers
- Inner workings: work stealing
- Inner workings: operator fusion
Without further ado, let’s jump in:
When you first learn about Reactive Streams and reactive programming on the JVM, the first thing you learn is the high-level relationship between
Subscriber: one produces data, the other consumes it. Simple right? Furthermore, it appears that the
Publisher pushes data to the
But when working with Reactive Streams libraries like Reactor (or RxJava2), you quickly come across the following mantra:
Nothing Happens Until You Subscribe
Sometimes, you might read that both libraries implement a “push-pull hybrid model”. Hang on a minute! pull?
We’ll get back to it, but to understand that sentence you first need to realize that, by default, Reactor’s reactive types are lazy.
Calling methods on a
Mono (the operators) doesn’t immediately trigger the behavior. Instead, a new instance of
Mono) is returned, on which you can continue composing further operators. You thus create a chain of operators (or an operator acyclic graph), which represents your asynchronous processing pipeline.
This declarative phase is called assembly time.
Let’s take an example where a client side application makes an HTTP request to a server, expecting an HttpResponse:
Mono<HttpResponse> httpSource = makeHttpRequest(); Mono<Json> jsonSource = httpSource.map(req -> parseJson(req)); Mono<String> quote = jsonSource.map(json -> json.getString("quote")); //at this point, no HTTP request has been made
This can be simplified using the fluent API:
Mono<String> quote = makeHttpRequest() .map(req -> parseJson(req)) .map(json -> json.getString("quote"));
Once you are done declaring your pipeline, there are two situations: either you pass the
Mono representing the processing pipeline down to another piece of code or you trigger the pipeline.
The former means that the code to which you return the
Mono might apply other operators, resulting in a derived new pipeline. Since the operators create new instances (it’s like an onion), your own
Mono is not mutated, so it could be further decorated several times with widely different results:
//you could derive a `Mono<String>` of odd-length strings vs even-length ones Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0); Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1); //or even a `Flux<String>` of words in a quote Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" "))); //by this point, none of the 3 "pipelines" have triggered an HTTP request
Compare that with a
CompletableFuture, which is not lazy in nature: once you have a reference to the
CompletableFuture, it means the processing is already ongoing…
With that in mind, let’s look into how to trigger the reactive pipeline.
So far, we’ve assembled an asynchronous pipeline. That is, we’ve instantiated
Mono variables through the use of operators, that results other
Mono with behavior layered like an onion.
But the data hasn’t started flowing through each of these declared pipelines yet.
That’s because the trigger for the data to flow is not the declaration of the pipeline, but rather the subscription to it. Remember:
Nothing Happens Until You Subscribe
Subscribing is the act of saying “ok, this pipeline represent a transformation of data, and I’m interested in the final form of that data”. The most common way of doing so is by calling
That signalling of interest is propagated backwards through the chain of operators, up until the source operator, the
Publisher that actually produces the initial data:
makeHttpRequest() //<5> .map(req -> parseJson(req)) //<4> .map(json -> json.getString("quote")) //<3> .flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2> .subscribe(System.out::println, Throwable::printStackTrace); //<1>
- we subscribe to the words
Flux, stating that we want to print each word to the console (and print the stack trace of any error)
- that interest is signalled to the
- …which signals it up the chain to the json
- …then the request
- …to finally reach the
makeHttpRequest()(which we’ll consider our source)
At this point, the source is triggered. It generates the data in the appropriate way: here it would make an HTTP request to a JSON-producing endpoint and then emit the HTTP response.
From there on, we’re in execution time. The data has started flowing through the pipeline (in the more natural top-to-bottom order, or upstream to downstream):
HttpResponseis emitted to the
- It extracts the JSON body and emits it to the
- Which extracts the quote and passes it to the
flatMapManysplits the quote into words and emit each word individually
- The value handler in the
subscribeis notified of each word, printing these to the console, one per line
Hopefully that helps you understand the difference between assembly time and subscription/execution time!
Right after explaining the difference and introducing this mantra is probably a good time to introduce an exception :laughing:
Nothing happens until you subscribe… until something does
So far, we’ve been dealing with a flavor of
Mono sources called a Cold
Publisher. As we’ve explained, these
Publishers are lazy and only generate data when there is a
Subscription. Furthermore, they generate the data anew for each individual
In our example of an HTTP response
Mono, the HTTP request would be performed for each subscription:
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0); Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1); Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" "))); evenLength.subscribe(); //this triggers an HTTP request oddLength.subscribe(); //this triggers another HTTP request words.subscribe(); //this triggers a third HTTP request
On a side note, some operators’ behavior imply multiple subscriptions. For example
retry re-subscribe to its source in case of an error (
onError signal), while
repeat does the same for the
So for a cold source like the HTTP request, something like
retry would re-perform the request thus allowing to recover from a transient server-side error, for instance.
Publisher on the other hand isn’t as clear-cut: it doesn’t necessarily need a
Subscriber to start pumping data. It doesn’t necessarily re-generate dedicated data per each new
To illustrate that, let’s introduce a new cold publisher example, then we’ll show how to turn that cold publisher into a hot one:
Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1)); clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s"); Thread.sleep(2000); clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
clock1 1s clock1 2s clock1 3s clock2 1s clock1 4s clock2 2s clock1 5s clock2 3s clock1 6s clock2 4s
We can turn the
clockTicks source into a hot one by invoking
Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1)); Flux<Long> clockTicks = coldTicks.share(); clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s"); Thread.sleep(2000); clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
It yields the following result instead:
clock1 1s clock1 2s clock1 3s clock2 3s clock1 4s clock2 4s clock1 5s clock2 5s clock1 6s clock2 6s
You see that the two subscriptions now share the same ticks of the clock.
share() converts cold to hot by letting the source multicast elements to new
Subscribers, but only the elements that are emitted after these new subscriptions. Since
clock2 has subscribed 2 seconds later, it missed early emissions
So hot publishers can be less lazy, even though they generally require at least an initial
Subscription to trigger data flow.
In this article, we’ve learned about the difference between instantiating a
Flux / chaining operator (aka Assembly time), triggering it (aka Subscription time) and executing it (aka Execution time).
We’ve thus learned that
Mono are mostly lazy (aka cold
Publisher): nothing happens until you subscribe to them.
Finally, we’ve learned about an alternative flavor of
Mono, dubbed the hot
Publisher, which behaves a little differently and is less lazy.
In the next instalment, we’ll see why these three phases make a major difference in how you as a developer would debug reactor-based code.
In the meantime, happy reactive coding!