This article is part of a blog series that explores the newly redesigned Spring Cloud Stream applications based on Java Functions. In this post, we will look at the Elasticsearch sink that allows us to index records in Elasticsearch, and its corresponding Consumer function.
Here are all the previous parts of this blog series.
Before we look at the Elasticsearch sink application, let us see the consumer function that powers the sink. As we have seen previously in other sink applications, the consumer is a standard
java.util.function.Consumer which accepts a
Message<?>. The consumer relies upon Spring Boot’s support for Elasticsearch which auto configures a
RestHighLevelClient from Elasticsearch. The consumer supports messages with the following types of payload.
XContentBuilder from Elasticsearch.
When using the consumer, the Elasticsearch index to use is given by the property
You may set the Elasticsearch ID to use for each message by setting the
INDEX_ID message header. Alternately,
you can set the
elasticsearch.consumer.id property, which accepts a SpEL expression. If neither of these is set, Elasticsearch will auto-generate an ID.
By setting the property
true, we can make the indexing operation asynchronous.
We can inject the consumer function in the application and invoke its
accept method directly to index records to ElasticSearch.
For e.g, let’s say we inject the consumer bean in an application as below.
@Autowired ElasticsearchConsumer elasticsearchConsumer
Then we can use the following
java.util.Map to index a record.
Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("age", 100); jsonMap.put("dateOfBirth", 1471466076564L); jsonMap.put("fullName", "John Doe"); final Message<Map<String, Object>> message = MessageBuilder.withPayload(jsonMap).build(); elasticsearchConsumer.accept(message);
The same information on the map above can be provided as plain
JSON or using XContentBuilder from Elasticsearch.
As we have seen in the previous blogs, the consumer function becomes more powerful when combining with Spring Cloud Stream to make it a sink application. It has inherent capabilities to communicate with a middleware technology in a seamless manner. The sink application consumes data from a middleware system such as Apache Kafka or RabbitMQ and sends to Elasticsearch. We already provide out of the box variants of Elasticsearch for Kafka and RabbitMQ.
Let us run through the steps for running the standalone Elasticsearch sink application for Apache Kafka.
First, go ahead and download the sink application. Since the sink is not generally available yet, let us use the latest milestone release.
Before running the application, ensure that you have Easticsearch running. Here is a quick way to start a single-node Elasticsearch cluster in a docker container.
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
We will also need to make sure that Kafka is running.
Then run the application as below:
java -jar elasticsearch-sink-kafka-3.0.0-M4.jar --spring.cloud.stream.bindings.input.destination=data-in --elasticsearch.consumer.index=testing
By providing the input destination property, we are asking the sink to receive data from the Kafka topic
data-in and sends the data to the Elasticsearch index
Send some test data to the Kafka topic. For example, here is how you can send it using the Kafka console producer script if you have Kafka running locally on port
kafka-console-producer.sh --broker-list localhost:9092 --topic data-in
Then send the following
We can verify that the data is indexed by invoking the following endpoint.
Similarly, we can also download the RabbitMQ variant of the Elasticsearch sink application and run it against a RabbitMQ cluster.
As we have seen several times previously in this series, these Spring Cloud Stream applications become further powerful and resilient when it is run as part of a data pipeline on Spring Cloud Data Flow.
The Elasticsearch that we saw above can be combined with a number of other applications. For instance, a TCP source app may be receiving data from a source and then dump the data to a middleware destination, from where the Elasticsearch sink consumes and indexes. This index may be then used by an analytics application to generate a dashboard. This is just one example and there are several such use cases. Spring Cloud Data Flow makes the orchestration of these pipelines seamless for the user. We encourage you to take a look at the steps we laid out in previous blogs on how we can deploy apps on Spring Cloud Data Flow. Using those same steps, the Elasticsearch sink application can also be deployed.
In this blog, we saw how Elasticsearch consumer function and its corresponding Spring Cloud Stream sink work. The consumer function can be injected into custom applications for combining with other business logic. The sink application is provided out of the box for use with Kafka and RabbitMQ middleware variants.
We still have a few more epsidoses coming up in this blog series. Please stay tuned.