Spring Team
Artem Bilan

Artem Bilan

Spring Integration Team

Philadelphia, PA

Blog Posts by Artem Bilan

Spring Integration 5.0 Milestone 3 Available

The Spring Integration team is pleased to announce that the third milestone for the Spring Integration 5.0 release (5.0.0.M3) is now available.

53 JIRAs (and some GitHub issues) made into this release, including bug fixes and a number of new features. Some highlights since the previous Milestone 2:

  • Initial implementation for a Spring Integration Testing Framework - the @SpringIntegrationTest annotation for test classes and MockIntegration factory help you to write unit tests for integration flows and channel adapters. We intend to flush out this capability with more features before GA, including more mocking, verifications and some send-and-receive utilities to test components in isolation. Feedback is welcome!

  • POJO handler method invocations (@ServiceActivator, @Transformer etc., or such methods invoked from XML definitions) now use InvocableHandlerMethod by default. Together with the ConfigurableCompositeMessageConverter and @Default utilities that allows us to implement conditional method invocation scenarios based on the Content-Type and target method arguments resolution. To restore the previous SpEL-based behavior, the @UseSpelInvoker method-level annotation is provided.

  • A based on the WebFlux WebClient ReactiveHttpRequestExecutingMessageHandler implementation is provided. Together with a ReactiveChannel as an outputChannel options it provides backpressure manner for remote HTTP service consumption.

  • The (S)FTP (and AWS S3) Inbound Channel Adapters can now restore file tree locally. For that purpose a new, Files.walk() based, RecursiveDirectoryScanner is introduced. The useWatchService option is also provided.

  • Web Services Gateways now can exchange WebServiceMessage s directly as the inbound/outbound payload. This allows the support of MTOM via direct access to WebServiceMessage properties. The UnmarshallingTransformer can now process a MimeMessage as the payload to unmarshal it into an object graph with attachments.

  • The reply producing MessageHandler now has a fallback to the replyChannel header from the reply message, if there is no outputChannel or replyChannel in the request message headers. This allows the implementation of business process-like scenarios when the next step is determined by the result of current calculations.

Read more...

Spring Integration Extension for AWS 1.1.0 M1 Available

On behalf of the Spring Integration community I’d like to announce the first Milestone of Spring Integration Extension for Amazon Web Services version 1.1. Its artifact is spring-integration-aws.1.1.0.M1, which is available in the Milestone Repository.

Of course, first of all, big thanks to you, the community, for your contributions!

Some highlights of the features included to this Milestone:

Kinesis Support

The KinesisMessageDrivenChannelAdapter and KinesisMessageHandler are provided to integrate with the Amazon Kinesis. The former is pretty simple and allow to emit data into a Kinesis stream. All the information for the target PutRecordRequest can be determined from the request Message:

@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler kinesisMessageHandler() {
    KinesisMessageHandler kinesisMessageHandler =
                new KinesisMessageHandler(amazonKinesis());
    kinesisMessageHandler.setAsyncHandler(asyncHandler());
    kinesisMessageHandler.setStream("my_stream");
    kinesisMessageHandler.
             setPartitionKeyExpressionString("headers[aws_partitionKey]");
    return kinesisMessageHandler;
}

By default it uses SerializingConverter to convert the request data to the byte[]. The com.amazonaws.handlers.AsyncHandler can be used for asynchronous putRecordAsync() result reaction.

The KinesisMessageDrivenChannelAdapter provides a comprehensive Kinesis stream data ingestion implementation, including sequenceNumber checkpointing and resharding support. The concurrency option can be used for strict order records processing in the downstream flow. The provided shards are distributed between threads in that case. If concurrency isn’t provided, internal ShardConsumer s are performed on the consumerExecutor directly:

@Bean
public KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter() {
    KinesisMessageDrivenChannelAdapter adapter =
            new KinesisMessageDrivenChannelAdapter(amazonKinesis(), STREAM1);
    adapter.setOutputChannel(kinesisChannel());
    adapter.setCheckpointStore(checkpointStore());
    adapter.setCheckpointMode(CheckpointMode.manual);
    adapter.setListenerMode(ListenerMode.batch);
    adapter.setStartTimeout(10000);
    adapter.setDescribeStreamRetries(1);
    adapter.setConcurrency(10);
    return adapter;
}

If CheckpointMode is manual, the AwsHeaders.CHECKPOINTER message header is populated to each emitted message. It is an instance of Checkpointer and its checkpoint() can be used in the downstream flow to checkpoint a sequenceNumber for processed records in the shard.

Note
The Amazon Kinesis Channel Adapters implementation is fully based on the standard aws-java-sdk-kinesis API and doesn’t use Kinesis Client Library.
Read more...

Spring for Apache Kafka 1.1.3 available now

It is my pleasure to announce that the Spring for Apache Kafka 1.1.3 maintenance release is available now.

As usual, thanks to the community for any feedback and contribution as always. Looking forward for more!

This release contains several bug fixes, including proper offset commit handling when using a BatchListener; therefore an upgrade is highly recommended.

Right now master has been switched to the version 2.0 for Java 8 and Spring Framework 5.0 code base. We have some plans for high-level API for Kafka Streams and Reactor Kafka support.

Read more...

Spring Integration Extension for Hazelcast 1.0.0 M2 Available

On behalf of the Spring Integration community I’d like to announce the second Milestone of Spring Integration Extension for Hazelcast and its artifact is spring-integration-hazelcast.1.0.0.M2, which is available in the Milestone Repository.

The project has been around for some time and there has not been so much activity since the previous Milestone 1 and it only recently gained enough community traction to warrant a release. So, first of all, big thanks to you, the community, for your contributions!

Read more...

Spring Integration Extension for SMB 0.5.0 is Available

On behalf of the Spring Integration team I’d like to announce release of one more Spring Integration Extension. This time it is Spring Integration for Server Message Block and its artifact is spring-integration-smb.0.5.0.RELEASE, which is available in the Release Repository and Maven Central.

The project has been around for some time but only recently gained enough community traction to warrant a release. So, first of all, big thanks to you, the community, for your contributions!

The Java CIFS Client Library has been chosen as a Java implementation for the CIFS/SMB networking protocol. Its SmbFile abstraction is simply wrapped to the Spring Integration "Remote File" foundations like SmbSession, SmbRemoteFileTemplate etc.

Read more...

Spring Integration 5.0 Milestone 2 Available

On behalf of the Spring Integration team I’d like to announce the Second Milestone of Spring Integration 5.0, which is available in the Milestone Repository.

Some highlights of this release since the previous Milestone.

Of course, first of all, big thanks to you, the community, for your contributions!

MongoDb Improvements

  • MongoDbOutboundGateway - for performing queries or any arbitrary operation on the collection

  • An initial Java DSL support for MongoDB components

  • The MongoDb component now can use org.springframework.data.mongodb.core.query.Query API in their expressions

@Bean
public IntegrationFlow mongoDbGatewayFlow() {
    return f -> f
        .handle(MongoDb.outboundGateway(this.mongoTemplate)
                            .collectionCallback(MongoCollection::count)
                            .collectionNameFunction(m ->
                                           m.getHeaders().get("collection")));
}
Read more...

Spring AMQP 1.7 RC1 available now

On behalf of the Spring Integration team I’d like to announce the first (and final) Release Candidate of Spring AMQP 1.7, which is available in the Milestone Repository.

This release is mainly an intermediate between version 1.6.x and 2.0 for Spring Boot 1.5 and IO Platform Brussels compatibility.

Anyway there are several improvements and new features to explain here:

  • Upgrade to Amqp Client 4.0 with appropriate RabbitConnectionFactoryBean changes

  • Upgrade to Log4j 2.7 and required braking change fix for the log4j2.AmqpAppender

  • Upgrade to Spring Retry 1.2 with important StatefulRetryOperationsInterceptor.setUseRawKey(true) for backward compatibility

  • a new spring-rabbit-junit artifact is provided with several utilities (like BrokerRunning @Rule) which can be useful for testing Spring AMQP applications

  • The SimpleMessageListenerContainer can now be started without queues to listen to at all. They can be provided later at runtime via addQueues()

  • a ConnectionNameStrategy is provided for the ConnectionFactory to allow to identify application connections in the Broker or other monitoring and tracing tools

Read more...

Java DSL for Spring Integration 1.2 Release is available

Dear Spring Community,

It’s my pleasure to announce that the Java DSL for Spring Integration 1.2 GA is now available.

The artifact org.springframework.integration:spring-integration-java-dsl:1.2.0.RELEASE is available in the Release repo and Maven Central.

Since the previous Release Candidate 1 we have received some feedback and these additional features have been added:

Thread Barrier support

A new .barrier() and its mirror .trigger() EIP-methods have been added to the IntegrationFlow definition:

private static final String BARRIER = "barrier";

@Bean
public IntegrationFlow barrierFlow() {
    return f -> f
        .barrier(10000, b -> b
                .correlationStrategy(
                         new HeaderAttributeCorrelationStrategy(BARRIER))
                 .outputProcessor(g ->
                         g.getMessages()
                                 .stream()
                                 .skip(1)
                                 .findFirst()
                                 .get()))
         .channel(c -> c.queue("barrierResults"));
}

@Bean
public IntegrationFlow releaseBarrierFlow(
                          MessageTriggerAction barrierTriggerAction) {
    return IntegrationFlows.from((Channels c) -> c.queue("releaseChannel"))
        .trigger(barrierTriggerAction,
            e -> e.poller(p -> p.fixedDelay(100)))
        .get();
}
Read more...

Java DSL for Spring Integration 1.2 Release Candidate 1 is available

Dear Spring Community,

I’m pleased to announce that the Java DSL for Spring Integration 1.2 RC1 is available now.

Since the previous Milestone 2 we had a good deep feedback for our new features and some API has been broken to reflect real requirements.

As usual, big thanks to everyone who created issues, raised Pull Requests, provided feedback or just asked questions on StackOverflow.

The artifact org.springframework.integration:spring-integration-java-dsl:1.2.0.RC1 is available in the Milestone repo. So, give it a shot for last chance to raise a GH issue for any feedback!

Read more...

Spring for Apache Kafka 1.1 GA and Spring Integration Kafka 2.1 GA are Available

I am pleased to announce that the Spring for Apache Kafka 1.1.0.RELEASE is now available in the spring release repo and Maven Central.

Due to some community requirements, we decided to bypass the Release Candidate (RC) phase and released General Availability (GA) immediately.

There are not many changes since the previous Milestone 2, but here is a summary of all Spring Kafka 1.1 changes:

  • Apache Kafka 0.10 upgrade;

  • The batch of messages support via BatchMessageListener as well as via @KafkaListener annotation configuration;

  • The null payload concept support via KafkaNull placeholder object;

  • You can now perform seek operations from the listener - this allows setting an initial offset when partitions are assigned by Kafka when using group management. You can also perform arbitrary seek operations after initialization;

  • Allow setting the initial offset to be relative to the current offset;

  • The KafkaTemplate now provides access to the metrics and partitionsFor methods on the Producer.

Read more...