Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreOn behalf of the Spring Integration team I’d like to announce the Second Milestone of Spring Integration 5.0, which is available in the Milestone Repository.
Some highlights of this release since the previous Milestone.
Of course, first of all, big thanks to you, the community, for your contributions!
MongoDbOutboundGateway
- for performing queries or any arbitrary operation on the collection
An initial Java DSL support for MongoDB components
The MongoDb component now can use org.springframework.data.mongodb.core.query.Query
API in their expressions
@Bean public IntegrationFlow mongoDbGatewayFlow() { return f -> f .handle(MongoDb.outboundGateway(this.mongoTemplate) .collectionCallback(MongoCollection::count) .collectionNameFunction(m -> m.getHeaders().get("collection"))); }
The Java DSL IntegrationFlow
can now start from the interface marked with @MessagingGateway
, and all the method calls on the target proxy bean will perform sending Message
to the downstream IntegrationFlow
. This lets you omit @IntegrationComponentScan
and extra channels configuration. For example a simple gateway for Control Bus component:
@MessagingGateway
public interface ControlBusGateway {
void send(String command);
}
...
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(ControlBusGateway.class)
.controlBus()
.get();
}
And of course some news from the Reactive Streams subject.
The MessageChannelReactiveUtils
can be used to adapt any MessageChannel
to the org.reactivestreams.Publisher
. This can be useful when you would like to "flux" an upstream data via integration loosely coupled manner from one side and reactive back pressure from another:
@Autowired
private PollableChannel queueChannel;
...
Flux.from(MessageChannelReactiveUtils.<String>toPublisher(this.queueChannel))
.map(Message::getPayload)
.map(String::toUpperCase)
.doOnNext(results::add)
.subscribe(v -> done.countDown());
This technique is used now in the existing IntegrationFlowDefinition.toReactivePublisher()
:
@Bean
public Publisher<Message<Integer>> pollableReactiveFlow() {
return IntegrationFlows
.from("inputChannel")
.split(s -> s.delimiters(","))
.<String, Integer>transform(Integer::parseInt)
.channel(MessageChannels.queue())
.toReactivePublisher();
}
...
@Autowired
@Qualifier("pollableReactiveFlow")
private Publisher<Message<Integer>> pollablePublisher;
The ReactiveChannel
now has ability to subscribe to upstream Publisher
alongside with the regular (but back pressure) send(Message<?>)
implementation. This allowed us to introduce a feature like start an IntegrationFlow
from the Publisher
:
Flux<Message<?>> messageFlux = Flux.just("1,2,3,4")
.map(v -> v.split(","))
.flatMapIterable(Arrays::asList)
.map(Integer::parseInt)
.map(GenericMessage<Integer>::new);
QueueChannel resultChannel = new QueueChannel();
IntegrationFlow integrationFlow =
IntegrationFlows.from(messageFlux)
.<Integer, Integer>transform(p -> p * 2)
.channel(resultChannel)
.get();
this.integrationFlowContext.registration(integrationFlow)
.register();
Such wise and placing ReactiveChannel
in between endpoints (MessageChannels.reactive()
), we can reach the best of both integration and reactive worlds!
See What’s New for more information.
We are going to provide more features and improvement in the next Milestones, so, stay tuned and don’t hesitate to come back to us for any feedback!
Project Page | GitHub | Help | Documentation | Chat