Engineering
Releases
News and Events

Spring Cloud Stream 2.0 - Polled Consumers

This is the second blog in a series of pre-release blogs in preparation for Spring Cloud Stream 2.0.0.RELEASE.

Preface

Spring Cloud Stream 2.0 introduces polled consumers, where the application can control message processing rates.

Introduction

Spring Cloud Stream has the concepts of producers and consumers; when using the messaging paradigm, MessageChannels are bound to destinations (e.g. Kafka topics, Rabbit Exchanges/Queues). To-date, on the consumer side, messages are delivered whenever an idle consumer is available. In effect, the broker controls the rate of delivery; usually, the next message is delivered immediately after the current one is processed.

2.0 introduces polled consumers, where the application can control the rate of message consumption.
Polled consumers are supported by the Kafka and RabbitMQ binders.

Details

With polled consumers, instead of binding a MessageChannel to the destination, we bind a PollableMessageSource; for example, a PolledProcessor binding might be configured like so:

public interface PolledProcessor {

    @Input
    PollableMessageSource destIn();

    @Output
    MessageChannel destOut();

}

The message source has a method:

boolean poll(MessageHandler handler);

The message is not acknowledged until the handler’s handleRequest method exits.

The MessageHandler is the interface from spring-messaging; you can provide one of the standard Spring Integration message handlers, or your own implementation (often a lambda). Because the handleMessage method takes a Message<?> argument, there is no type information and the payload will not be converted.

However, message conversion as discussed in the first blog in this series can be applied to polled consumers as well. In order to communicate the type information to the conversion service, we provide a parameterized type reference in the overloaded poll() method:

boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type)

And the message payload will be converted to the type, which can be simple, for example with a content type of text/plain:

  • new ParameterizedTypeReference<String>() {}

or more complex with, for example a JSON content type:

  • new ParameterizedTypeReference<Map<String, Foo>>() {}

Putting it all Together

The following simple Spring Boot application provides a complete example; it receives String payloads, converts them to upper case and forwards the result to another destination.

@SpringBootApplication
@EnableBinding(Blog2Application.PolledProcessor.class)
public class Blog2Application {

  private final Logger logger =
  	  LoggerFactory.getLogger(Blog2Application.class);

  public static void main(String[] args) {
    SpringApplication.run(Blog2Application.class, args);
  }

  @Bean
  public ApplicationRunner runner(PollableMessageSource source,
  	    MessageChannel dest) {
    return args -> {
      while (true) {
        boolean result = source.poll(m -> {
          String payload = (String) m.getPayload();
          logger.info("Received: " + payload);
          dest.send(MessageBuilder.withPayload(payload.toUpperCase())
              .copyHeaders(m.getHeaders())
              .build());
        }, new ParameterizedTypeReference<String>() { });
        if (result) {
          logger.info("Processed a message");
        }
        else {
          logger.info("Nothing to do");
        }
        Thread.sleep(5_000);
      }
    };
  }

  public static interface PolledProcessor {

    @Input
    PollableMessageSource source();

    @Output
    MessageChannel dest();

  }

}

Conclusion

Applications can now control the rate at which messages are consumed.

For more information, see Using Polled Consumers in the reference manual.

We encourage you to provide feedback using one of the following facilities:
- Project’s GitHub Issues
- Stack Overflow channel
- Gitter channel

Enjoy!

comments powered by Disqus