News and Events

Spring Cloud Stream 2.0 - Polled Consumers

This is the second blog in a series of pre-release blogs in preparation for Spring Cloud Stream 2.0.0.RELEASE.


Spring Cloud Stream 2.0 introduces polled consumers, where the application can control message processing rates.


Spring Cloud Stream has the concepts of producers and consumers; when using the messaging paradigm, MessageChannels are bound to destinations (e.g. Kafka topics, Rabbit Exchanges/Queues). To-date, on the consumer side, messages are delivered whenever an idle consumer is available. In effect, the broker controls the rate of delivery; usually, the next message is delivered immediately after the current one is processed.

2.0 introduces polled consumers, where the application can control the rate of message consumption.
Polled consumers are supported by the Kafka and RabbitMQ binders.


With polled consumers, instead of binding a MessageChannel to the destination, we bind a PollableMessageSource; for example, a PolledProcessor binding might be configured like so:

public interface PolledProcessor {

    PollableMessageSource destIn();

    MessageChannel destOut();


The message source has a method:

boolean poll(MessageHandler handler);

The message is not acknowledged until the handler’s handleRequest method exits.

The MessageHandler is the interface from spring-messaging; you can provide one of the standard Spring Integration message handlers, or your own implementation (often a lambda). Because the handleMessage method takes a Message<?> argument, there is no type information and the payload will not be converted.

However, message conversion as discussed in the first blog in this series can be applied to polled consumers as well. In order to communicate the type information to the conversion service, we provide a parameterized type reference in the overloaded poll() method:

boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type)

And the message payload will be converted to the type, which can be simple, for example with a content type of text/plain:

  • new ParameterizedTypeReference<String>() {}

or more complex with, for example a JSON content type:

  • new ParameterizedTypeReference<Map<String, Foo>>() {}

Putting it all Together

The following simple Spring Boot application provides a complete example; it receives String payloads, converts them to upper case and forwards the result to another destination.

public class Blog2Application {

  private final Logger logger =

  public static void main(String[] args) {, args);

  public ApplicationRunner runner(PollableMessageSource source,
  	    MessageChannel dest) {
    return args -> {
      while (true) {
        boolean result = source.poll(m -> {
          String payload = (String) m.getPayload();
"Received: " + payload);
        }, new ParameterizedTypeReference<String>() { });
        if (result) {
"Processed a message");
        else {
"Nothing to do");

  public static interface PolledProcessor {

    PollableMessageSource source();

    MessageChannel dest();




Applications can now control the rate at which messages are consumed.

For more information, see Using Polled Consumers in the reference manual.

We encourage you to provide feedback using one of the following facilities:
- Project’s GitHub Issues
- Stack Overflow channel
- Gitter channel


comments powered by Disqus