Get ahead
VMware offers training and certification to turbo-charge your progress.
Learn moreDear Spring community,
We are pleased to announce that the Spring Integration Java DSL 1.1 Milestone 1 is now available. Use the Milestone Repository with Maven or Gradle to try it in early access.
compile "org.springframework.integration:spring-integration-java-dsl:1.1.0.M1"
To be honest, many of the planned features for 1.1
are not implemented yet, but thanks to encouragement from our pal Josh Long and the recent announcement about the Apache Kafka support (Spring Integration Kafka Support 1.1 Release, Spring XD 1.1.1 Release), we've released this Milestone 1 mainly to showcase the Apache Kafka support in the Java Configuration DSL.
We'll look at that, and other, features from this release in this post.
Let's start with some "trivial" sample from the KafkaTests
class in the Spring Integration Java DSL :
@Bean
public ConnectionFactory connectionFactory(EmbeddedZookeeper zookeeper) {
return new DefaultConnectionFactory(
new ZookeeperConfiguration(zookeeper.connectString()));
}
@Bean
public OffsetManager offsetManager(ConnectionFactory connectionFactory) {
MetadataStoreOffsetManager offsetManager =
new MetadataStoreOffsetManager(connectionFactory);
// start reading at the end of the
offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());
return offsetManager;
}
@Bean
public IntegrationFlow listeningFromKafkaFlow(
ConnectionFactory connectionFactory,
OffsetManager offsetManager) {
return IntegrationFlows
.from(Kafka.messageDriverChannelAdapter(connectionFactory, TEST_TOPIC)
.autoCommitOffset(false)
.payloadDecoder(String::new)
.keyDecoder(b -> Integer.valueOf(new String(b)))
.configureListenerContainer(c ->
c.offsetManager(offsetManager)
.maxFetch(100)))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults"))
.get();
}
EmbeddedZookeeper
is a part of Apache Kafka test
artifact (testCompile 'org.apache.kafka:kafka_2.10:0.8.1.1:test'
in our case) and, along with many other features like kafka.utils.TestUtils
, it is very useful for unit testing.ConnectionFactory
and OffsetManager
.IntegrationFlow
bean definition. The Spring Integration Java DSL provides a namespace factory - Kafka
- which utilizes IntegrationComponentSpec
implementations for the Spring Integration Kafka adapters, like KafkaMessageDrivenChannelAdapterSpec
for the KafkaMessageDrivenChannelAdapter
.method-chain
to the underlying KafkaMessageDrivenChannelAdapter
instance..payloadDecoder(String::new)
line. The kafka.serializer.Decoder
is a Scala trait
that is compiled to a Java interface (not a class!) so we can represent it here as a Java 8 lambda method..configureListenerContainer()
is a lambda-aware method to separate concerns for the KafkaMessageListenerContainer
-specific options.The other self-explained factory-methods from the Kafka
namespace factory are .inboundChannelAdapter(...)
for the KafkaHighLevelConsumerMessageSource
polling adapter and .outboundChannelAdapter(...)
for the KafkaProducerMessageHandler
. Please refer to their JavaDocs for more information.
For more information, check out Josh Long's post on Using Apache Kafka for Integration and Data Processing Pipelines with Spring!
A lot of the great feedback from the community (Webinar Replay: Introducing the Java DSL for Spring Integration) was around the bean method invocation components (services, transformers, routers, etc.) and we heard you loud-and-clear: component method selection has been improved. Here is a sample that is analogous to a <int:service-activator input-channel="greetingChannel" ref="greetingService"/>
in the XML configuration:
@Configuration
@EnableIntegration
@ComponentScan
public class MyConfiguration {
@Autowired
private GreetingService greetingService;
@Bean
public IntegrationFlow greetingFlow() {
return IntegrationFlows.from("greetingChannel")
.handle(this.greetingService)
.get();
}
}
@Component
public class GreetingService {
public void greeting(String payload) {
System.out.println("Hello " + payload);
}
}
Here, the greeting
method will automatically be selected by the framework. There is an alternative that takes a methodName
argument to specify a method in the case of ambiguity. Similar POJO method invocation EIP-methods have been introduced for many other EIP implementations like transform(Object service, String methodName)
, split(Object service)
, etc.
The Spring Integration Java DSL also respects Spring Integration messaging annotations like @ServiceActivator
, @Router
, @Filter
, etc., and even @Payload
, @Header
. Please, refer to IntegrationFlowDefinition
JavaDocs for more information.
It shouldn't be a surprise that as IntegrationFlow
is an interface, we can just provide its direct implementation as a custom component and it works as-is in the Spring Integration Java DSL environment:
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
This is similar to the @Bean
definitions, but this approach helps our components stay more loosely coupled.
But, wait, there's more! IntegrationFlow
implementations (like lambdas in the @Bean
definition case) are limited to DirectChannel
input channels. We went further here and introduced the IntegrationFlowAdapter
. Here's my favorite sample to demonstrate how it can be used:
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new tomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this, "messageSource",
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("foo", "FOO"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "B,A,R";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> foo) {
return foo.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String foo) {
return payload + ":" + foo;
}
}
Of course, with the POJO method invocation support (see above) it won't be possible to build the flow so easily.
The Spring Framework and Spring Integration have supported Dynamic Languages for a long time now and it is, mostly, linked with XML Spring configuration. It may look strange to deal with scripts (like Groovy, Ruby, JavaScript, etc.) from Java code, but we find it a useful tool for reloading functionality at runtime, and when Java lambas aren't dynamic enough. Let's look at the Scripts
namespace factory in the Spring Integration Java DSL:
@Configuration
@EnableIntegration
public class ScriptsConfiguration {
@Value("com/my/project/integration/scripts/splitterScript.groovy")
private Resource splitterScript;
@Bean
public PollableChannel results() {
return new QueueChannel();
}
@Bean
public IntegrationFlow scriptSplitter() {
return f -> f
.split(Scripts.script(this.splitterScript)
.refreshCheckDelay(10000)
.variable("foo", "bar"))
.channel(results());
}
}
This Scripting support allows us to deal only with external resources, which can be changed and reloaded at runtime. The inline
scripts, which are supported by the Spring Integration Scripting module, don't make sense because we have Java 8 lambdas for those cases.
The Wire Tap EI Pattern is implemented as a ChannelInterceptor
in Spring Integration and can be injected into any MessageChannel
as an interceptor like this:
@Bean
public MessageChannel myChannel() {
return MessageChannels.direct()
.interceptor(new WireTap(loggerChannel()))
.get();
}
The IntegrationFlow
definition allows us to omit MessageChannel
declarations between EIP components, so we've introduced an inline .wireTap()
EIP-method to allow a WireTap
injection for those anonymous channels. Here are some samples:
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlows.from("tappedChannel1")
.wireTap("tapChannel",
wt -> wt.selector(m -> m.getPayload().equals("foo")))
.channel("nullChannel")
.get();
}
@Bean
public IntegrationFlow wireTapFlow2() {
return f -> f
.wireTap(sf -> sf
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("wireTapSubflowResult")))
.channel("nullChannel");
}
Please see the IntegrationFlowDefinition.wireTap()
methods JavaDocs for more information and don't miss our test-cases from project page on GitHub.
There's much to do for the 1.1 release, like further simplification of .aggregate()
, etc. configuration, an ability to inject external sub-flows, the ability to configure IntegrationComponentSpec
implementations as a separate @Bean
to simplify the target flow definitions, more protocol-specific Namespace Factories and more. Don't hesitate to reach us via StackOverflow, JIRA and GitHub issues to share your thoughts and ideas!
Project Page | JIRA | Issues | [Contributions]
(https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflow (spring-integration
tag)