Introducing Spring for Apache Hadoop

Engineering | Costin Leau | February 29, 2012 | ...

I am happy to announce that the first milestone release (1.0.0.M1) for Spring for Apache Hadoop project is available and talk about some of the work we have been doing over the last few months. Part of the Spring Data umbrella, Spring for Apache Hadoop provides support for developing applications based on Apache Hadoop technologies by leveraging the capabilities of the Spring ecosystem. Whether one is writing stand-alone, vanilla MapReduce applications, interacting with data from multiple data stores across the enterprise, or coordinating a complex workflow of HDFS, Pig, or Hive jobs, or anything in between, Spring for Apache Hadoop stays true to the Spring philosophy offering a simplified programming model and addresses "accidental complexity" caused by the infrastructure. Spring for Apache Hadoop, provides a powerful tool in the developer arsenal for dealing with big data volumes.

MapReduce Jobs

The Hello world for Apache Hadoop is the word count example - a simple use-case that exposes the base Apache Hadoop capabilities. When using Spring for Apache Hadoop, the word count example looks as follows:
<!-- configure Apache Hadoop FS/job tracker using defaults -->
<hdp:configuration />

<!-- define the job -->
<hdp:job id="word-count" 
  input-path="/input/" output-path="/ouput/"
  mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper"
  reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/>

<!-- execute the job -->
<bean id="runner" class="org.springframework.data.hadoop.mapreduce.JobRunner" 
                  p:jobs-ref="word-count"/>

Notice how the creation and submission of the job configuration is handled by the IoC container. Whether the Apache Hadoop configuration needs to be tweaked or the reducer needs extra parameters, all the configuration options are still available for you to configure. This allows you to start small and have the configuration grow alongside the app. The configuration can be as simple or advanced as the developer wants/needs it to be taking advantage of Spring container functionality such as property placeholders and environment support:

<hdp:configuration resources="classpath:/my-cluster-site.xml">
    fs.default.name=${hd.fs}
    hadoop.tmp.dir=file://${java.io.tmpdir}
    electric=sea
</hdp:configuration>

<context:property-placeholder location="classpath:hadoop.properties" />

<!-- populate Apache Hadoop distributed cache -->
<hdp:cache create-symlink="true">
  <hdp:classpath value="/cp/some-library.jar#library.jar" />
  <hdp:cache value="/cache/some-archive.tgz#main-archive" />
  <hdp:cache value="/cache/some-resource.res" />
</hdp:cache>

(the word count example is part of the Spring for Apache Hadoop distribution - feel free to download it and experiment).

Spring for Apache Hadoop does not require one to rewrite your MapReduce job in Java, you can use non-Java streaming jobs seamlessly: they are just objects (or as Spring calls them beans) that are created, configured, wired and managed just like any other by the framework in a consistent, coherent manner. The developer can mix and match according to her preference and requirements without having to worry about integration issues.

<hdp:streaming id="streaming-env" 
  input-path="/input/" output-path="/ouput/"
  mapper="${path.cat}" reducer="${path.wc}">
  <hdp:cmd-env>
    EXAMPLE_DIR=/home/example/dictionaries/
  </hdp:cmd-env>
</hdp:streaming>

Existing Apache Hadoop Tool implementations are also supported; in fact rather than specifying custom Apache Hadoop properties through the command line, one can simply inject it:

<!-- the tool automatically is injected with 'hadoop-configuration' -->
<hdp:tool-runner id="scalding" tool-class="com.twitter.scalding.Tool">
   <hdp:arg value="tutorial/Tutorial1"/>
   <hdp:arg value="--local"/>
</hdp:tool-runner>

The configuration above executes Tutorial1 of Twitter's Scalding (a Scala DSL on top of Cascading (see below) library. Note there is no dedicated support code in either Spring for Apache Hadoop or Scalding - just the standard, Apache Hadoop APIs are being used.

Working with HBase/Hive/Pig

Speaking of DSLs, it is quite common to use higher-level abstractions when interacting with Apache Hadoop - popular choices include HBase, Hive or Pig. Spring for Apache Hadoop provides integration for all of these, allowing easy configuration and consumption of these data sources inside a Spring app:
<!-- HBase configuration with nested properties -->
<hdp:hbase-configuration stop-proxy="false" delete-connection="true">
    foo=bar
</hdp:hbase-configuration>

<!-- create a Pig instance using custom properties
    and execute a script (using given arguments) at startup -->
     
<hdp:pig properties-location="pig-dev.properties" />
   <script location="org/company/pig/script.pig">
     <arguments>electric=tears</arguments>
   </script>
</hdp:pig>

Through Spring for Apache Hadoop, one not only gets a powerful IoC container but also access to Spring's portable service abstractions. Take the popular JdbcTemplate, one can use that on top of Hive's Jdbc client:

<!-- basic Hive driver bean -->
<bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>

<!-- wrapping a basic datasource around the driver -->
<bean id="hive-ds"
    class="org.springframework.jdbc.datasource.SimpleDriverDataSource"
    c:driver-ref="hive-driver" c:url="${hive.url}"/>

<!-- standard JdbcTemplate declaration -->
<bean id="template" class="org.springframework.jdbc.core.JdbcTemplate"
    c:data-source-ref="hive-ds"/>

Cascading

Spring also supports a Java based, type-safe configuration model. One can use it as an alternative or complement to declarative XML configurations - such as with Cascading
@Configuration
public class CascadingConfig {
    @Value("${cascade.sec}") private String sec;
    
    @Bean public Pipe tsPipe() {
        DateParser dateParser = new DateParser(new Fields("ts"), 
                 "dd/MMM/yyyy:HH:mm:ss Z");
        return new Each("arrival rate", new Fields("time"), dateParser);
    }

    @Bean public Pipe tsCountPipe() {
        Pipe tsCountPipe = new Pipe("tsCount", tsPipe());
        tsCountPipe = new GroupBy(tsCountPipe, new Fields("ts"));
    }
}
<!-- code configuration class -->
<bean class="org.springframework.data.hadoop.cascading.CascadingConfig "/>

<bean id="cascade"
    class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean" 
    p:configuration-ref="hadoop-configuration" p:tail-ref="tsCountPipe" />

The example above mixes both programmatic and declarative configurations: the former to create the individual Cascading pipes and the latter to wire them together into a flow.

Using Spring's portable service abstractions

Or use Spring's excellent task/scheduling support to submit jobs at certain times:
<task:scheduler id="myScheduler" pool-size="10"/>

<task:scheduled-tasks scheduler="myScheduler">
 <!-- run once a day, at midnight -->
 <task:scheduled ref="word-count-job" method="submit" cron="0 0 * * * "/>
</task:scheduled-tasks>

The configuration above uses a simple JDK Executor instance - excellent for <a href="http://en.wikipedia.org/wiki/Proof_of_Concept'>POC development. One can easily replace it (a one-liner) in production with a more comprehensive solution such as dedicated scheduler or a WorkManager implementation - another example of Spring's powerful service abstractions.

HDFS/Scripting

A common task when interacting with HDFS is preparing the file-system, such as cleaning the output directory to avoid overriding data or moving all input files under the same name scheme or folder. Spring for Apache Hadoop addresses the issue by fully embracing Apache Hadoop's fs commands, such as FS Shell and DistCp and exposing them as proper Java APIs. Mix that along with JVM scripting (whether it is Groovy, JRuby or Rhino/JavaScript) to form a powerful combination:

<hdp:script language="groovy">
  inputPath = "/user/gutenberg/input/word/"
  outputPath = "/user/gutenberg/output/word/"

  if (fsh.test(inputPath)) {
    fsh.rmr(inputPath)
  }

  if (fsh.test(outputPath)) {
    fsh.rmr(outputPath)
  }

  fs.copyFromLocalFile("data/input.txt", inputPath)
</hdp:script>

Summary

This post just touches the surface of some of the features available in Spring for Apache Hadoop; I have not mentioned the Spring Batch integration providing tasklets for various Apache Hadoop interactions or the use of Spring Integration for event triggering - more about that in a future entry. Let us know what you think, what you need and give us feedback: download the code, fork the source, report issues, post on the forum or send us a tweet.


Get the Spring newsletter

Stay connected with the Spring newsletter

Subscribe

Get ahead

VMware offers training and certification to turbo-charge your progress.

Learn more

Get support

Tanzu Spring offers support and binaries for OpenJDK™, Spring, and Apache Tomcat® in one simple subscription.

Learn more

Upcoming events

Check out all the upcoming events in the Spring community.

View all