Flight of the Flux 1 - Assembly vs Subscription
This blog post is the first in a series of posts that aim at providing a deeper look into Reactor’s more advanced concepts and inner workings.
It is derived from my Flight of the Flux
talk, which content I found to be more adapted to a blog post format.
I’ll update the table below with links when the other posts are published, but here is the planned content:
- Assembly vs Subscription (this post)
- Debugging caveats
- Hopping Threads and Schedulers
- Inner workings: work stealing
- Inner workings: operator fusion
If you’re missing an introduction to Reactive Streams and the basic concepts of Reactor, head out to the site’s learning section and the reference guide.
Without further ado, let’s jump in:
Assembly Time
When you first learn about Reactive Streams and reactive programming on the JVM, the first thing you learn is the high-level relationship between Publisher
and Subscriber
: one produces data, the other consumes it. Simple right? Furthermore, it appears that the Publisher
pushes data to the Subscriber
.
But when working with Reactive Streams libraries like Reactor (or RxJava2), you quickly come across the following mantra:
Nothing Happens Until You Subscribe
Sometimes, you might read that both libraries implement a “push-pull hybrid model”. Hang on a minute! pull?
We’ll get back to it, but to understand that sentence you first need to realize that, by default, Reactor’s reactive types are lazy.
Calling methods on a Flux
or Mono
(the operators) doesn’t immediately trigger the behavior. Instead, a new instance of Flux
(or Mono
) is returned, on which you can continue composing further operators. You thus create a chain of operators (or an operator acyclic graph), which represents your asynchronous processing pipeline.
This declarative phase is called assembly time.
Let’s take an example where a client side application makes an HTTP request to a server, expecting an HttpResponse:
Mono<HttpResponse> httpSource = makeHttpRequest();
Mono<Json> jsonSource = httpSource.map(req -> parseJson(req));
Mono<String> quote = jsonSource.map(json -> json.getString("quote"));
//at this point, no HTTP request has been made
This can be simplified using the fluent API:
Mono<String> quote = makeHttpRequest()
.map(req -> parseJson(req))
.map(json -> json.getString("quote"));
Once you are done declaring your pipeline, there are two situations: either you pass the Flux
/Mono
representing the processing pipeline down to another piece of code or you trigger the pipeline.
The former means that the code to which you return the Mono
might apply other operators, resulting in a derived new pipeline. Since the operators create new instances (it’s like an onion), your own Mono
is not mutated, so it could be further decorated several times with widely different results:
//you could derive a `Mono<String>` of odd-length strings vs even-length ones
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
//or even a `Flux<String>` of words in a quote
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
//by this point, none of the 3 "pipelines" have triggered an HTTP request
Compare that with a CompletableFuture
, which is not lazy in nature: once you have a reference to the CompletableFuture
, it means the processing is already ongoing…
With that in mind, let’s look into how to trigger the reactive pipeline.
Subscription Time
So far, we’ve assembled an asynchronous pipeline. That is, we’ve instantiated Flux
and Mono
variables through the use of operators, that results other Flux
/Mono
with behavior layered like an onion.
But the data hasn’t started flowing through each of these declared pipelines yet.
That’s because the trigger for the data to flow is not the declaration of the pipeline, but rather the subscription to it. Remember:
Nothing Happens Until You Subscribe
Subscribing is the act of saying “ok, this pipeline represent a transformation of data, and I’m interested in the final form of that data”. The most common way of doing so is by calling Flux.subscribe(valueConsumer, errorConsumer)
.
That signalling of interest is propagated backwards through the chain of operators, up until the source operator, the Publisher
that actually produces the initial data:
makeHttpRequest() //<5>
.map(req -> parseJson(req)) //<4>
.map(json -> json.getString("quote")) //<3>
.flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2>
.subscribe(System.out::println, Throwable::printStackTrace); //<1>
- we subscribe to the words
Flux
, stating that we want to print each word to the console (and print the stack trace of any error) - that interest is signalled to the
flatMapMany
step… - …which signals it up the chain to the json
map
step… - …then the request
map
step… - …to finally reach the
makeHttpRequest()
(which we’ll consider our source)
At this point, the source is triggered. It generates the data in the appropriate way: here it would make an HTTP request to a JSON-producing endpoint and then emit the HTTP response.
From there on, we’re in execution time. The data has started flowing through the pipeline (in the more natural top-to-bottom order, or upstream to downstream):
- The
HttpResponse
is emitted to theparseJson
map
- It extracts the JSON body and emits it to the
getString
map
- Which extracts the quote and passes it to the
flatMapMany
- The
flatMapMany
splits the quote into words and emit each word individually - The value handler in the
subscribe
is notified of each word, printing these to the console, one per line
Hopefully that helps you understand the difference between assembly time and subscription/execution time!
Cold vs Hot
Right after explaining the difference and introducing this mantra is probably a good time to introduce an exception :laughing:
Nothing happens until you subscribe… until something does
Cold
So far, we’ve been dealing with a flavor of Flux
and Mono
sources called a Cold Publisher
. As we’ve explained, these Publishers
are lazy and only generate data when there is a Subscription
. Furthermore, they generate the data anew for each individual Subscription
.
In our example of an HTTP response Mono
, the HTTP request would be performed for each subscription:
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
evenLength.subscribe(); //this triggers an HTTP request
oddLength.subscribe(); //this triggers another HTTP request
words.subscribe(); //this triggers a third HTTP request
On a side note, some operators’ behavior imply multiple subscriptions. For example retry
re-subscribe to its source in case of an error (onError
signal), while repeat
does the same for the onComplete
signal.
So for a cold source like the HTTP request, something like retry
would re-perform the request thus allowing to recover from a transient server-side error, for instance.
Hot
A Hot Publisher
on the other hand isn’t as clear-cut: it doesn’t necessarily need a Subscriber
to start pumping data. It doesn’t necessarily re-generate dedicated data per each new Subscriber
either.
To illustrate that, let’s introduce a new cold publisher example, then we’ll show how to turn that cold publisher into a hot one:
Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
This prints:
clock1 1s
clock1 2s
clock1 3s
clock2 1s
clock1 4s
clock2 2s
clock1 5s
clock2 3s
clock1 6s
clock2 4s
We can turn the clockTicks
source into a hot one by invoking share()
:
Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = coldTicks.share();
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
It yields the following result instead:
clock1 1s
clock1 2s
clock1 3s
clock2 3s
clock1 4s
clock2 4s
clock1 5s
clock2 5s
clock1 6s
clock2 6s
You see that the two subscriptions now share the same ticks of the clock. share()
converts cold to hot by letting the source multicast elements to new Subscribers
, but only the elements that are emitted after these new subscriptions. Since clock2
has subscribed 2 seconds later, it missed early emissions 1s
and 2s
.
So hot publishers can be less lazy, even though they generally require at least an initial Subscription
to trigger data flow.
Conclusion
In this article, we’ve learned about the difference between instantiating a Flux
/ chaining operator (aka Assembly time), triggering it (aka Subscription time) and executing it (aka Execution time).
We’ve thus learned that Flux
and Mono
are mostly lazy (aka cold Publisher
): nothing happens until you subscribe to them.
Finally, we’ve learned about an alternative flavor of Flux
and Mono
, dubbed the hot Publisher
, which behaves a little differently and is less lazy.
In the next instalment, we’ll see why these three phases make a major difference in how you as a developer would debug reactor-based code.
In the meantime, happy reactive coding!