Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreI’m pleased to announce that the Java DSL for Spring Integration 1.2 M2
is available now!
First of all I’d like to thank everyone who created issues, raised Pull Requests, provided feedback or just asked questions on StackOverflow. Especial thanks for early adopters since the previous Milestone 1. With their help we have improved and fixed some issues with runtime flow registration.
The artifact org.springframework.integration:spring-integration-java-dsl:1.2.0.M2
is available in the Milestone repo. So, give it a shot and don’t hesitate to raise a GH issue for any feedback!
Some highlights of the current iteration:
After many Community requests we finally introduced the Jpa
Factory and corresponding `IntegrationComponentSpec`s to provide a fluent API for the Spring Integration JPA components:
@Autowired
private EntityManagerFactory entityManagerFactory;
@Bean
public IntegrationFlow pollingAdapterFlow() {
return IntegrationFlows
.from(Jpa.inboundAdapter(this.entityManagerFactory)
.entityClass(StudentDomain.class)
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
.channel(c -> c.queue("pollingResults"))
.get();
}
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}
@Bean
public IntegrationFlow retrievingGatewayFlow() {
return f -> f
.handle(Jpa.retrievingGateway(this.entityManagerFactory)
.jpaQuery("from Student s where s.id = :id")
.expectSingleResult(true)
.parameterExpression("id", "payload"))
.channel(c -> c.queue("retrieveResults"));
}
"Inspired" by the complexity of transaction support configuration for the Spring Integration JPA components (actually programmatic TransactionalInterceptor
), we have introduced TransactionInterceptorBuilder
. In addition we provide the TransactionHandleMessageAdvice
which allows to start transaction from any endpoint for the entire sub-flow, not only the handleRequestMessage
as it is in case of regular ConsumerEndpointSpec.advice()
. Actually the main trick is done by the HandleMessageAdvice
, recently introduced in Spring Integration Core, which is a marker interface to distinguish advice for the handleRequestMessage
only or for the flow starting from the current MessageHandler
. For convenience the bunch of .transactional()
methods have been added to the ConsumerEndpointSpec
.
The Scatter-Gather EI pattern now has its own Java DSL API:
@Bean
public IntegrationFlow scatterGatherFlow() {
return f -> f
.scatterGather(scatterer -> scatterer
.applySequence(true)
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10))
.recipientFlow(m -> true,
sf -> sf.handle((p, h) -> Math.random() * 10)),
gatherer -> gatherer
.releaseStrategy(group ->
group.size() == 3 ||
group.getMessages()
.stream()
.anyMatch(m -> (Double) m.getPayload() > 5)),
scatterGather -> scatterGather
.gatherTimeout(10_000));
}
Where the scatterer
is just a RecipientListRouter
, gatherer
- an AggregatingMessageHandler
, and the last Consumer
accept options for the ScatterGatherHandler
.
The .routeToRecipients()
API now provides more configuration variants for recipients:
.routeToRecipients(r -> r
.recipient("foo-channel", "'foo' == payload")
.recipient("bar-channel", m ->
m.getHeaders().containsKey("recipient")
&& (boolean) m.getHeaders().get("recipient"))
.recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
f -> f.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("recipientListSubFlow1Result")))
.recipientFlow((String p) -> p.startsWith("baz"),
f -> f.transform("Hello "::concat)
.channel(c -> c.queue("recipientListSubFlow2Result")))
.recipientFlow(new FunctionExpression<Message<?>>(m ->
"bax".equals(m.getPayload())),
f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
.defaultOutputToParentFlow())
Previously the .route()
operator made the next .channel()
in the IntegrationFlow
as a defaultOutputChannel
of the Router
. According to the user experience that doesn’t sound reasonable to make such a decision unconditional. We reworked .route()
to align it with the standard AbstractMessageRouter
behaviour. The .defaultOutputChannel()
and .defaultSubFlowMapping()
have been added to utilize the default
logic for the Router
. To rallback to the previous behavior the .defaultOutputToParentFlow()
is present, as you noticed by the .routeToRecipients()
sample above.
See commit history for 1.2.0.M2
version for more information. And always read JavaDocs to understand the API you use!
We expect the first (and hope the last) Release Candidate for version 1.2
over a couple weeks, after some adoption for Spring Integration 4.3.2 and Spring Boot 1.4.1. It’s soon enough as spring-integration-java-dsl
will move to the Spring Integration Core 5.0
and Java 8 code base. The current 1.2
version will be still supported, but just for bug fixes.
Project Page | Documentation | Issues | Help