Engineering
Releases
News and Events

Spring Integration Extension for AWS 1.1.0 M1 Available

On behalf of the Spring Integration community I’d like to announce the first Milestone of Spring Integration Extension for Amazon Web Services version 1.1. Its artifact is spring-integration-aws.1.1.0.M1, which is available in the Milestone Repository.

Of course, first of all, big thanks to you, the community, for your contributions!

Some highlights of the features included to this Milestone:

Kinesis Support

The KinesisMessageDrivenChannelAdapter and KinesisMessageHandler are provided to integrate with the Amazon Kinesis. The former is pretty simple and allow to emit data into a Kinesis stream. All the information for the target PutRecordRequest can be determined from the request Message:

@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler kinesisMessageHandler() {
    KinesisMessageHandler kinesisMessageHandler =
                new KinesisMessageHandler(amazonKinesis());
    kinesisMessageHandler.setAsyncHandler(asyncHandler());
    kinesisMessageHandler.setStream("my_stream");
    kinesisMessageHandler.
             setPartitionKeyExpressionString("headers[aws_partitionKey]");
    return kinesisMessageHandler;
}

By default it uses SerializingConverter to convert the request data to the byte[]. The com.amazonaws.handlers.AsyncHandler can be used for asynchronous putRecordAsync() result reaction.

The KinesisMessageDrivenChannelAdapter provides a comprehensive Kinesis stream data ingestion implementation, including sequenceNumber checkpointing and resharding support. The concurrency option can be used for strict order records processing in the downstream flow. The provided shards are distributed between threads in that case. If concurrency isn’t provided, internal ShardConsumer s are performed on the consumerExecutor directly:

@Bean
public KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter() {
    KinesisMessageDrivenChannelAdapter adapter =
            new KinesisMessageDrivenChannelAdapter(amazonKinesis(), STREAM1);
    adapter.setOutputChannel(kinesisChannel());
    adapter.setCheckpointStore(checkpointStore());
    adapter.setCheckpointMode(CheckpointMode.manual);
    adapter.setListenerMode(ListenerMode.batch);
    adapter.setStartTimeout(10000);
    adapter.setDescribeStreamRetries(1);
    adapter.setConcurrency(10);
    return adapter;
}

If CheckpointMode is manual, the AwsHeaders.CHECKPOINTER message header is populated to each emitted message. It is an instance of Checkpointer and its checkpoint() can be used in the downstream flow to checkpoint a sequenceNumber for processed records in the shard.

Note
The Amazon Kinesis Channel Adapters implementation is fully based on the standard aws-java-sdk-kinesis API and doesn’t use Kinesis Client Library.

S3 Streaming MessageSource

To avoid local file system limitation, which might not be even available especially in cloud environment, alongside with the regular S3InboundFileSynchronizingMessageSource, a S3StreamingMessageSource has been introduced:

@Bean
@InboundChannelAdapter(value = "s3FilesChannel",
                 poller = @Poller(fixedDelay = "1000"))
S3StreamingMessageSource s3InboundStreamingMessageSource(AmazonS3 amazonS3) {
    S3SessionFactory s3SessionFactory = new S3SessionFactory(amazonS3);
    S3RemoteFileTemplate s3FileTemplate =
                         new S3RemoteFileTemplate(s3SessionFactory);
    S3StreamingMessageSource s3MessageSource =
                        new S3StreamingMessageSource(s3FileTemplate,
                                    Comparator.comparing(FileInfo::getFilename));
    s3MessageSource.setRemoteDirectory("/myBucket");
    s3MessageSource.setFilter(
                      new S3PersistentAcceptOnceFileListFilter(
                                             new SimpleMetadataStore(),
                                             "streaming"));
    return s3MessageSource;
}

This message source produces an InputStream for the remote S3 object as message payload and is fully similar to the FTP Streaming Inbound Channel Adapter.

Our next plans to provide MetadataStore implementation for the Amazon DynamoDB, make SnsMessageHandler and SqsMessageHandler based on async client implementation. Another challenge is before us - Amazon Kinesis Binder implementation for Spring Cloud Stream.

Do not hesitate to contact with us via any available communication channel!

comments powered by Disqus