Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreIt is my great pleasure to announce the GA release of Reactor Bismuth, which notably encompasses reactor-core
3.1.0.RELEASE and reactor-netty
0.7.0.RELEASE ?
With the release of Spring Framework 5.0 now just happening, you can imagine this is a giant step for Project Reactor :)
Like the bismuth crystal this release is intricate
The release contains a lot of changes and API polishing. For reactor-core you can find the exhaustive list in the release notes.
Important
There is a known (minor) issue with the reactor-core
and reactor-test
sources jars: They contain duplicate java source entries. See issue #887 for more details.
Let’s have an overview of what is new and noteworthy in this release:
The major new feature in this release: The Context
A number of API changes have been made between the last release in the 3.0.x cycle and 3.1.0. We wanted to get the best API out for the long run with Spring Framework 5, so these breaking changes were necessary. The release notes contain a more exhaustive list of such changes, but let’s have a look at a few of them.
The error handling operators have been made more consistent in both Flux
and Mono
. All use the onError
prefix and the APIs are aligned in both classes:
onErrorReturn to switch to a fall-back value (formerly otherwiseReturn
in Mono)
onErrorResume to switch to a fall-back Publisher
(formerly onErrorResumeWith
in Flux and otherwise
in Mono)
onErrorMap to translate an Exception
into another one (formerly mapError
in both Flux and Mono)
switchIfEmpty to switch to a fall-back sequence if the source is empty (formerly otherwiseIfEmpty
in Mono
)
Flux#switchOnError
has been removed, the same can be achieved with onErrorResume
using a lambda that ignores its parameter.
Flux
And Mono
Concepts And OperatorsMono
API had a tendency to sometimes divert from the Flux
API despite similar concepts. Where it makes sense, these separations have been fixed.
when
, and
, zip
, and zipWith
For instance, Mono
had a and
operator and when
static methods which used to combine elements and to produce Mono<Tuple2>
. In essence, they were essentially a specialization of zipWith
and zip
in Flux
, but having different names made it difficult to attach the concepts. That is why these methods have been renamed in Mono
.
On the other hand, you’ll notice that a different flavor of when
and and
is still present in the Mono
API. Contrary to the ones we saw previously, these return a Mono<Void>
. Dealing with completion of tasks is an essential use case of Mono
, and these methods are now specifically tailored for that sort of application: they will combine and execute several tasks (represented as source Monos), ignore their potential onNext
signal and only propagate the combination of onComplete
signals, resulting in a Mono<Void>
that completes whenever the N tasks complete.
then
PrefixSimilarly, the then
prefix now consistently indicates that the onNext
of the source are to be discarded, instead building up on the terminal signals. This has been made consistent in both Flux
and Mono
:
then()
returns a Mono<Void>
that only propagates the onComplete
or onError
signal from the source.
then(Mono<V>)
returns a Mono<V>
: it waits for the original onComplete
signal before switching to another provided Mono
, emitting only the elements from that other Mono
.
thenMany(Publisher<V>)
is similar, except it continues into a Flux<V>
.
thenEmpty(Publisher<Void>)
returns a Mono<Void>
that completes once the original Mono
then the Publisher
have completed. That is, it represents sequential completion, unlike and
which subscribes to both sequences immediately.
Note that variants that were taking a Supplier
parameter have been removed altogether (their lazy semantics can be replaced by a Mono.defer
). Also, the Mono#thenEmpty
described above was renamed from then(Publisher<Void>)
.
then
Was Really A flatMap
Mono
used to have another interesting then
variant:
Mono<V> then(Function<T, Mono<V>> thenFunction);
When looking closely, this didn’t fit with the new semantic of then*
focusing on the terminal signals. Rather, it would transform the source into another Mono
depending on the source’s onNext
. Sounds familiar? This is indeed consistent with what Flux#flatMap
does!
There was one problem, though: Mono
already had a flatMap
operator:
Flux<V> flatMap(Function<T, Publisher<V>> mapper);
After thinking a bit more about it, we recognized that the classic semantic of flatMap
is to return a value of the same type as the one flatMap
is applied to. So it was more correct to have Mono#flatMap
return a Mono
…
As a result, we renamed this then variant to flatMap
and we used the Many
suffix on the variant that would return a Flux
:
Mono<V> flatMap(Function<T, Mono<V>> mapper);
Flux<V> flatMapMany(Function<T, Publisher<V>> mapper);
Tip
In order to ease the migration, we advise you to search for all usage of the old Mono.flatMap
and replace these with flatMapMany
first. Then search for then(Function)
uses and replace those with flatMap
.
Time: In this release, all the operators dealing with time do so exclusively via the Duration
type. Most used to have a variant with the *Millis
suffix that was using a long
and a TimeUnit
to represent a duration. These variants have been removed in 3.1.0.
Delaying Errors: Another cross-cutting change dealing with suffixes: some operators have an optional configuration where they can combine multiple errors and emit the onError
signal at the very end, allowing for some values to still make it in the resulting sequence. For example, flatMap
can either stop immediately if an inner sequence emits onError
or continue merging the elements from other inner sequences before propagating that error.
In some instances, that optional behavior was represented in the API as a boolean
flag parameter. In other instances, it was a separate variant suffixed with DelayError
. In 3.1.0, all these variants have been aligned to consistently use the *DelayError
suffix rather than the boolean flag.
Simplified Interfaces: Some specialized interfaces have also been removed in favor of simpler or more generic alternatives:
The Cancellation
interface has been removed in favor of the more generic Disposable
one.
The TimedScheduler
interface has been removed. The few Scheduler
that are not time-capable will throw a RejectedExecutionException
indicating so whenever one attempts to use schedulePeriodically
on them. Also, Scheduler#shutdown
has been removed in favor of dispose()
from Disposable.
Several interfaces used for introspection have been simplified into a single Scannable
interface, which exposes information about the current state of an operator (in a best effort fashion) through its scan(Scannable.Attr)
method.
QueueSupplier
has been renamed to Queues
and is purely a utility class now (and not a Supplier
anymore).
Both Kotlin Extensions and Reactor Test artifacts have been directly integrated into the main repository of reactor-core
:
Kotlin extensions are part of the reactor-core
artifact. No need for a dependency to io.projectreactor:reactor-kotlin-extensions
anymore.
reactor-test
is now under the same groupId as reactor-core
.
Replace io.projectreactor.addons:reactor-test
with io.projectreactor:reactor-test
.
During this release cycle, an ongoing effort to better integrate with languages like Kotlin has been made.
This notably translates to some API rework in order to avoid ambiguous signatures with lambdas. Whenever a method had two overrides that where just differing by the type of functional interface they took, we extracted one of the variants as a new suffix for the operator.
For example, consider buffer(Publisher, Supplier)
and buffer(Publisher, Function)
. The second variant has been renamed to bufferWhen
, as it creates a buffer when the companion Publisher
from that Function
emits.
As we saw above, the Kotlin extensions have also been integrated directly into the reactor-core repository.
Additionally, support for null-safety analysis has been improved through the introduction of three annotations. These annotations build upon JSR 305 which, despite being dormant, is drawn on by multiple static analysis tools including the IntelliJ IDE. The following annotations are provided in the reactor.util.annotation
package:
@NonNull
indicates that a specific parameter, return value, or field cannot be null. (It is not needed on parameters and return value where @NonNullApi
applies) .
@Nullable
indicates that a parameter, return value, or field can be null.
@NonNullApi
is a package level annotation that indicates non-null is the default behavior for parameters and return values.
We leverage these annotations to express an explicit and actionable null-safety contract on all public Reactor Core APIs.
The documentation has also received some love: the reference guide is finally complete and the javadoc have been reviewed and reworded in order to describe some methods more clearly.
With the support for Kotlin being integrated directly into reactor-core, a new section has been added to the reference guide and the KotlinDocs are published.
Context
Contextual data can now be attached to a Flux
or Mono
, per subscription, as a Map-like Context
object!
This is an advanced feature that will mostly interest library developers, but we know it will prove invaluable in migrating features that formerly relied on ThreadLocal
in imperative code for example.
In the Spring portfolio, we expect both spring-security
and spring-cloud-sleuth
to benefit greatly.
In order to add information to the Context
, use the subscriberContext(Context)
operator, like in the following example:
Mono<String> put = doPut("www.example.com", Mono.just("Walter"))
.subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf"))
.filter(t -> t.getT1() < 300)
.map(Tuple2::getT2);
Tip
The Context
is immutable and is propagated during the subscription phase, which runs from the end of the chain (subscribe()
call) toward the beginning of the chain.
As a result, what you put into the Context
via subscriberContext
results in an enriched copy of the Context
visible only by the operators above it.
In order to retrieve and use the information put inside the Context
, the upstream chain of operators can make use of Mono.subscriberContext()
, which materializes the visible Context
(e.g. inside a flatMap
).
It could look like the following snippet:
Mono<Tuple2<String, Optional<Object>>> dataAndContext =
data.zipWith(Mono.subscriberContext()
.map(c -> c.getOrEmpty(HTTP_CORRELATION_ID))
);
Head to the reference guide to learn more about Context
.
expand
OperatorThis operator can be used to recursively expand source elements into nested Publishers
, producing a graph of Publisher
either depth-first or breadth-first.
null
Provider To The Empty Mono
When transforming a source Callable
or Supplier
that returns null
into a Mono
, fromCallable
and fromSupplier
used to emit an error. They now accommodate the null
result and produce an empty Mono
instead.
name
And tag
A Reactive SequenceA single name(String)
can be given to a Flux
. It can be retrieved using Scannable.name()
, which walks the chain of operators upstream until it finds a first declared name.
Similarly, several tag(String, String)
key-pairs can be associated with a Flux
or Mono
. These can be retrieved as a Stream<Tuple2<String, String>>
by the Scannable#tags()
method, which walks the whole chain of operators upstream.
distinctUntilChanged
Variant With A BiPredicate
Rather than using Set
semantics to evaluate if there is a change, the BiPredicate
is applied on the current source element and last emitted element. This makes it possible to skip elements if they are too close to the last emitted element (e.g. Doubles
with a difference < 1).
Mono.cache(Duration)
This allows to easily cache a hard-to-compute single value (or error) for a limited amount of time. First subscriber to come in after the TTL period will re-trigger a subscription to the source.
checkpoint(String)
The checkpoint(String)
variant is now light by default, which means there is no stack trace filled on instantiation (making the operator less costly to use). We now assume the String
identifier is unique enough that it would be sufficient to find the instantiation point of a sequence that terminates in error.
refCount
When using refCount
, one can now provide a Duration
. When the number of subscribers to the reference counted sequence falls below the threshold, the operator waits for that duration instead of immediately unsubscribing from the source. No cancellation happen in case enough subscribers come back within this grace period.
delayUntil
Replaces untilOther
The delayUntil
operator delays the emission of a Mono
until after a companion Publisher
, generated from the source value, completes.
Note that there was an untilOther
operator that has been removed. It used to also delay, but trigger on the first onNext
of the companion. delayUntil
is more flexible, as the same behavior can be achieved by appending a take(1)
to the companion.
The reactor-test
artifact also has a few new features:
New expectations around verification of errors compatible with using assertions: expectErrorSatisfies
and verifyErrorSatisfies
.
Added an optional configurable default timeout for StepVerifier#verify()
. Set it up by using the static StepVerifier#setDefaultTimeout
method.
Added a PublisherProbe
to easily check that a complex chain of operators with conditional switches (e.g. switchIfEmpty
) does go through a logical branch, while still emitting meaningful data for the test by wrapping any Flux
or Mono
.
If you’re new to Reactor, now is an exciting time to start your reactive journey with Spring Framework 5.0. If you’re not, we hope that’ll you enjoy working with Reactor even more now that all these changes have been put in place.
If you encounter any migration difficulty, have a look at the release notes or ask for help on our Gitter.
Happy Reactive Coding!
bismuth crystal photo CC-By-SA David Abercrombie via Flickr