It is my great pleasure to announce the GA release of Reactor Bismuth, which
reactor-core 3.1.0.RELEASE and
With the release of Spring Framework 5.0 now just happening, you can imagine this is a giant step for Project Reactor :)
The release contains a lot of changes and API polishing. For reactor-core you can find the exhaustive list in the release notes.
There is a known (minor) issue with the
Let’s have an overview of what is new and noteworthy in this release:
A number of API changes have been made between the last release in the 3.0.x cycle and 3.1.0. We wanted to get the best API out for the long run with Spring Framework 5, so these breaking changes were necessary. The release notes contain a more exhaustive list of such changes, but let’s have a look at a few of them.
The error handling operators have been made more consistent in both
Mono. All use the
onError prefix and the APIs are aligned in both classes:
onErrorReturn to switch to a fall-back value (formerly
onErrorResume to switch to a fall-back
onErrorResumeWithin Flux and
onErrorMap to translate an
Exceptioninto another one (formerly
mapErrorin both Flux and Mono)
switchIfEmpty to switch to a fall-back sequence if the source is empty (formerly
Flux#switchOnErrorhas been removed, the same can be achieved with
onErrorResumeusing a lambda that ignores its parameter.
Mono API had a tendency to sometimes divert from the
Flux API despite similar
concepts. Where it makes sense, these separations have been fixed.
Mono had a
and operator and
when static methods which used
to combine elements and to produce
Mono<Tuple2>. In essence, they were
essentially a specialization of
Flux, but having
different names made it difficult to attach the concepts. That is why these
methods have been renamed in
On the other hand, you’ll notice that a different flavor of
still present in the
Mono API. Contrary to the ones we saw previously, these
Mono<Void>. Dealing with completion of tasks is an essential use case
Mono, and these methods are now specifically tailored for that sort of
application: they will combine and execute several tasks (represented as source
Monos), ignore their potential
onNext signal and only propagate the combination
onComplete signals, resulting in a
Mono<Void> that completes whenever the
N tasks complete.
then prefix now consistently indicates that the
onNext of the
source are to be discarded, instead building up on the terminal signals. This
has been made consistent in both
Mono<Void>that only propagates the
onErrorsignal from the source.
Mono<V>: it waits for the original
onCompletesignal before switching to another provided
Mono, emitting only the elements from that other
thenMany(Publisher<V>)is similar, except it continues into a
Mono<Void>that completes once the original
Publisherhave completed. That is, it represents sequential completion, unlike
andwhich subscribes to both sequences immediately.
Note that variants that were taking a
Supplier parameter have been removed
altogether (their lazy semantics can be replaced by a
Mono.defer). Also, the
Mono#thenEmpty described above was renamed from
Mono used to have another interesting
Mono<V> then(Function<T, Mono<V>> thenFunction);
When looking closely, this didn’t fit with the new semantic of
on the terminal signals. Rather, it would transform the source into another
Mono depending on the source’s
onNext. Sounds familiar? This is indeed
consistent with what
There was one problem, though:
Mono already had a
Flux<V> flatMap(Function<T, Publisher<V>> mapper);
After thinking a bit more about it, we recognized that the classic semantic of
flatMap is to return a value of the same type as the one
flatMap is applied
to. So it was more correct to have
Mono#flatMap return a
As a result, we renamed this then variant to
flatMap and we used
Many suffix on the variant that would return a
Mono<V> flatMap(Function<T, Mono<V>> mapper); Flux<V> flatMapMany(Function<T, Publisher<V>> mapper);
In order to ease the migration, we advise you to search for all usage of
In this release, all the operators dealing with time do so exclusively via the
Duration type. Most used to have a variant with the
*Millis suffix that was
long and a
TimeUnit to represent a duration. These variants have
been removed in 3.1.0.
Another cross-cutting change dealing with suffixes: some operators have an
optional configuration where they can combine multiple errors and emit the
onError signal at the very end, allowing for some values to still make it in
the resulting sequence. For example,
flatMap can either stop immediately if an
inner sequence emits
onError or continue merging the elements from other inner
sequences before propagating that error.
In some instances, that optional behavior was represented in the API as a
boolean flag parameter. In other instances, it was a separate variant suffixed
DelayError. In 3.1.0, all these variants have been aligned to consistently
*DelayError suffix rather than the boolean flag.
Simplified Interfaces: Some specialized interfaces have also been removed in favor of simpler or more generic alternatives:
Cancellationinterface has been removed in favor of the more generic
TimedSchedulerinterface has been removed. The few
Schedulerthat are not time-capable will throw a
RejectedExecutionExceptionindicating so whenever one attempts to use
schedulePeriodicallyon them. Also,
Scheduler#shutdownhas been removed in favor of
Several interfaces used for introspection have been simplified into a single
Scannableinterface, which exposes information about the current state of an operator (in a best effort fashion) through its
QueueSupplierhas been renamed to
Queuesand is purely a utility class now (and not a
Both Kotlin Extensions and Reactor Test artifacts have been directly integrated
into the main repository of
Kotlin extensions are part of the
reactor-coreartifact. No need for a dependency to
reactor-testis now under the same groupId as
During this release cycle, an ongoing effort to better integrate with languages like Kotlin has been made.
This notably translates to some API rework in order to avoid ambiguous signatures with lambdas. Whenever a method had two overrides that where just differing by the type of functional interface they took, we extracted one of the variants as a new suffix for the operator.
For example, consider
buffer(Publisher, Supplier) and
Function). The second variant has been renamed to
bufferWhen, as it creates a
buffer when the companion
Publisher from that
As we saw above, the Kotlin extensions have also been integrated directly into the reactor-core repository.
Additionally, support for null-safety analysis has been improved through the
introduction of three annotations. These annotations build upon JSR 305 which,
despite being dormant, is drawn on by multiple static analysis tools including
the IntelliJ IDE. The following annotations are provided in the
@NonNullindicates that a specific parameter, return value, or field cannot be null. (It is not needed on parameters and return value where
@Nullableindicates that a parameter, return value, or field can be null.
@NonNullApiis a package level annotation that indicates non-null is the default behavior for parameters and return values.
We leverage these annotations to express an explicit and actionable null-safety contract on all public Reactor Core APIs.
The documentation has also received some love: the reference guide is finally complete and the javadoc have been reviewed and reworded in order to describe some methods more clearly.
Contextual data can now be attached to a
Mono, per subscription, as
This is an advanced feature that will mostly interest library developers, but we
know it will prove invaluable in migrating features that formerly relied on
ThreadLocal in imperative code for example.
In the Spring portfolio, we expect both
to benefit greatly.
In order to add information to the
Context, use the
operator, like in the following example:
Mono<String> put = doPut("www.example.com", Mono.just("Walter")) .subscriberContext(Context.of(HTTP_CORRELATION_ID, "2-j3r9afaf92j-afkaf")) .filter(t -> t.getT1() < 300) .map(Tuple2::getT2);
As a result, what you put into the
In order to retrieve and use the information put inside the
upstream chain of operators can make use of
materializes the visible
Context (e.g. inside a
It could look like the following snippet:
Mono<Tuple2<String, Optional<Object>>> dataAndContext = data.zipWith(Mono.subscriberContext() .map(c -> c.getOrEmpty(HTTP_CORRELATION_ID)) );
Head to the
to learn more about
This operator can be used to recursively expand source elements into nested
Publishers, producing a graph of
Publisher either depth-first or breadth-first.
When transforming a source
Supplier that returns
null into a
fromSupplier used to emit an error. They now
null result and produce an empty
name(String) can be given to a
Flux. It can be retrieved using
Scannable.name(), which walks the chain of operators upstream until it finds
a first declared name.
tag(String, String) key-pairs can be associated with a
Mono. These can be retrieved as a
Scannable#tags() method, which walks the whole chain of operators
Rather than using
Set semantics to evaluate if there is a change, the
BiPredicate is applied on the current source element and last emitted element.
This makes it possible to skip elements if they are too close to the last
emitted element (e.g.
Doubles with a difference < 1).
This allows to easily cache a hard-to-compute single value (or error) for a limited amount of time. First subscriber to come in after the TTL period will re-trigger a subscription to the source.
checkpoint(String) variant is now light by default, which means there is
no stack trace filled on instantiation (making the operator less costly to use).
We now assume the
String identifier is unique enough that it would be sufficient
to find the instantiation point of a sequence that terminates in error.
refCount, one can now provide a
Duration. When the number of
subscribers to the reference counted sequence falls below the threshold, the
operator waits for that duration instead of immediately unsubscribing from the
source. No cancellation happen in case enough subscribers come back within this
delayUntil operator delays the emission of a
Mono until after a
Publisher, generated from the source value, completes.
Note that there was an
untilOther operator that has been removed. It used to
also delay, but trigger on the first
onNext of the companion.
more flexible, as the same behavior can be achieved by appending a
reactor-test artifact also has a few new features:
New expectations around verification of errors compatible with using assertions:
Added an optional configurable default timeout for
StepVerifier#verify(). Set it up by using the static
PublisherProbeto easily check that a complex chain of operators with conditional switches (e.g.
switchIfEmpty) does go through a logical branch, while still emitting meaningful data for the test by wrapping any
If you’re new to Reactor, now is an exciting time to start your reactive journey with Spring Framework 5.0. If you’re not, we hope that’ll you enjoy working with Reactor even more now that all these changes have been put in place.
Happy Reactive Coding!
bismuth crystal photo CC-By-SA David Abercrombie via Flickr