Engineering
Releases
News and Events

Java DSL for Spring Integration 1.2 Milestone 2 is available

I’m pleased to announce that the Java DSL for Spring Integration 1.2 M2 is available now!

First of all I’d like to thank everyone who created issues, raised Pull Requests, provided feedback or just asked questions on StackOverflow. Especial thanks for early adopters since the previous Milestone 1. With their help we have improved and fixed some issues with runtime flow registration.

The artifact org.springframework.integration:spring-integration-java-dsl:1.2.0.M2 is available in the Milestone repo. So, give it a shot and don’t hesitate to raise a GH issue for any feedback!

Some highlights of the current iteration:

JPA support

After many Community requests we finally introduced the Jpa Factory and corresponding `IntegrationComponentSpec`s to provide a fluent API for the Spring Integration JPA components:

@Autowired
private EntityManagerFactory entityManagerFactory;

@Bean
public IntegrationFlow pollingAdapterFlow() {
    return IntegrationFlows
            .from(Jpa.inboundAdapter(this.entityManagerFactory)
                    .entityClass(StudentDomain.class)
                    .maxResults(1)
                    .expectSingleResult(true),
                e -> e.poller(p -> p.trigger(new OnlyOnceTrigger())))
            .channel(c -> c.queue("pollingResults"))
            .get();
}

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
            .handle(Jpa.updatingGateway(this.entityManagerFactory),
                    e -> e.transactional(true))
            .channel(c -> c.queue("persistResults"));
}

@Bean
public IntegrationFlow retrievingGatewayFlow() {
    return f -> f
            .handle(Jpa.retrievingGateway(this.entityManagerFactory)
                    .jpaQuery("from Student s where s.id = :id")
                    .expectSingleResult(true)
                    .parameterExpression("id", "payload"))
            .channel(c -> c.queue("retrieveResults"));
}

Mid-flow transaction support

"Inspired" by the complexity of transaction support configuration for the Spring Integration JPA components (actually programmatic TransactionalInterceptor), we have introduced TransactionInterceptorBuilder. In addition we provide the TransactionHandleMessageAdvice which allows to start transaction from any endpoint for the entire sub-flow, not only the handleRequestMessage as it is in case of regular ConsumerEndpointSpec.advice(). Actually the main trick is done by the HandleMessageAdvice, recently introduced in Spring Integration Core, which is a marker interface to distinguish advice for the handleRequestMessage only or for the flow starting from the current MessageHandler. For convenience the bunch of .transactional() methods have been added to the ConsumerEndpointSpec.

Scatter-Gather support

The Scatter-Gather EI pattern now has its own Java DSL API:

@Bean
public IntegrationFlow scatterGatherFlow() {
    return f -> f
      .scatterGather(scatterer -> scatterer
         .applySequence(true)
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10))
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10))
         .recipientFlow(m -> true,
                     sf -> sf.handle((p, h) -> Math.random() * 10)),
      gatherer -> gatherer
         .releaseStrategy(group ->
                group.size() == 3 ||
                      group.getMessages()
                          .stream()
                          .anyMatch(m -> (Double) m.getPayload() > 5)),
      scatterGather -> scatterGather
        	 .gatherTimeout(10_000));
}

Where the scatterer is just a RecipientListRouter, gatherer - an AggregatingMessageHandler, and the last Consumer accept options for the ScatterGatherHandler.

More routers improvements

The .routeToRecipients() API now provides more configuration variants for recipients:

.routeToRecipients(r -> r
    .recipient("foo-channel", "'foo' == payload")
    .recipient("bar-channel", m ->
        m.getHeaders().containsKey("recipient")
            && (boolean) m.getHeaders().get("recipient"))
    .recipientFlow("'foo' == payload or 'bar' == payload or 'baz' == payload",
        f -> f.<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("recipientListSubFlow1Result")))
    .recipientFlow((String p) -> p.startsWith("baz"),
        f -> f.transform("Hello "::concat)
            .channel(c -> c.queue("recipientListSubFlow2Result")))
    .recipientFlow(new FunctionExpression<Message<?>>(m ->
                                   "bax".equals(m.getPayload())),
        f -> f.channel(c -> c.queue("recipientListSubFlow3Result")))
    .defaultOutputToParentFlow())

Previously the .route() operator made the next .channel() in the IntegrationFlow as a defaultOutputChannel of the Router. According to the user experience that doesn’t sound reasonable to make such a decision unconditional. We reworked .route() to align it with the standard AbstractMessageRouter behaviour. The .defaultOutputChannel() and .defaultSubFlowMapping() have been added to utilize the default logic for the Router. To rallback to the previous behavior the .defaultOutputToParentFlow() is present, as you noticed by the .routeToRecipients() sample above.

See commit history for 1.2.0.M2 version for more information. And always read JavaDocs to understand the API you use!

Next Steps

We expect the first (and hope the last) Release Candidate for version 1.2 over a couple weeks, after some adoption for Spring Integration 4.3.2 and Spring Boot 1.4.1. It’s soon enough as spring-integration-java-dsl will move to the Spring Integration Core 5.0 and Java 8 code base. The current 1.2 version will be still supported, but just for bug fixes.

comments powered by Disqus