This guide walks you through the process of executing a Spring Batch job on Hadoop YARN.

What you’ll build

You’ll build a simple Hadoop YARN application with Spring Hadoop, Spring Batch and Spring Boot.

What you’ll need

Testing this sample application doesn’t need existing or running Hadoop instance.

How to complete this guide

Like most Spring Getting Started guides, you can start from scratch and complete each step, or you can bypass basic setup steps that are already familiar to you. Either way, you end up with working code.

To start from scratch, move on to Set up the project.

To skip the basics, do the following:

When you’re finished, you can check your results against the code in gs-yarn-batch-processing/complete.

Set up the project

First you set up a basic build script. You can use any build system you like when building apps with Spring, but the code you need to work with Gradle is included here. If you’re not familiar with it, refer to Building Java Projects with Gradle.

Create the directory structure

In a project directory of your choosing, create the following subdirectory structure:

├── gs-yarn-batch-processing-appmaster
│   └── src
│       └── main
│           ├── resources
│           └── java
│               └── hello
│                   └── appmaster
├── gs-yarn-batch-processing-container
│   └── src
│       └── main
│           ├── resources
│           └── java
│               └── hello
│                   └── container
├── gs-yarn-batch-processing-client
│   └── src
│       └── main
│           ├── resources
│           └── java
│               └── hello
│                   └── client
└── gs-yarn-batch-processing-dist
    └── src
        └── test
            └── java
                └── hello

for example, with

mkdir -p gs-yarn-batch-processing-appmaster/src/main/resources
mkdir -p gs-yarn-batch-processing-appmaster/src/main/java/hello/appmaster
mkdir -p gs-yarn-batch-processing-container/src/main/resources
mkdir -p gs-yarn-batch-processing-container/src/main/java/hello/container
mkdir -p gs-yarn-batch-processing-client/src/main/resources
mkdir -p gs-yarn-batch-processing-client/src/main/java/hello/client
mkdir -p gs-yarn-batch-processing-dist/src/test/java/hello

on *nix systems:

Create a Gradle build file

Below is the initial Gradle build file. If you are using Spring Tool Suite (STS), you can import the guide directly.

build.gradle

buildscript {
    repositories {
        maven { url "http://repo.spring.io/libs-release" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.3.3.RELEASE")
    }
}

allprojects {
    apply plugin: 'base'
}

subprojects { subproject ->
    apply plugin: 'java'
    apply plugin: 'eclipse'
    apply plugin: 'idea'
    version =  '0.1.0'
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/libs-release" }
    }
    dependencies {
        compile("org.springframework.data:spring-yarn-boot:2.4.0.RELEASE")
    }
    task copyJars(type: Copy) {
        from "$buildDir/libs"
        into "$rootDir/gs-yarn-batch-processing-dist/target/gs-yarn-batch-processing-dist/"
        include "**/*.jar"
    }
    configurations {
        compile.exclude group: "org.slf4j", module: "slf4j-log4j12"
        runtime.exclude group: "org.slf4j", module: "slf4j-log4j12"
    }
    assemble.doLast {copyJars.execute()}
}

project('gs-yarn-batch-processing-client') {
    apply plugin: 'spring-boot'
}

project('gs-yarn-batch-processing-appmaster') {
    apply plugin: 'spring-boot'
    dependencies {
        compile("org.springframework.data:spring-yarn-batch:2.4.0.RELEASE")
        runtime("org.springframework.boot:spring-boot-starter-batch:1.3.3.RELEASE")
    }
}

project('gs-yarn-batch-processing-container') {
    apply plugin: 'spring-boot'
    dependencies {
        compile("org.springframework.data:spring-yarn-batch:2.4.0.RELEASE")
        runtime("org.springframework.boot:spring-boot-starter-batch:1.3.3.RELEASE")
    }
}

project('gs-yarn-batch-processing-dist') {
    dependencies {
        compile project(":gs-yarn-batch-processing-client")
        compile project(":gs-yarn-batch-processing-appmaster")
        compile project(":gs-yarn-batch-processing-container")
        testCompile("org.springframework.data:spring-yarn-boot-test:2.4.0.RELEASE")
        testCompile("org.hamcrest:hamcrest-core:1.2.1")
        testCompile("org.hamcrest:hamcrest-library:1.2.1")
    }
    test.dependsOn(':gs-yarn-batch-processing-client:assemble')
    test.dependsOn(':gs-yarn-batch-processing-appmaster:assemble')
    test.dependsOn(':gs-yarn-batch-processing-container:assemble')
    clean.doLast {ant.delete(dir: "target")}
}

task wrapper(type: Wrapper) {
    gradleVersion = '1.11'
}

In above gradle build file we simply create three different jars, each having classes for its specific role. These jars are then repackaged by Spring Boot’s gradle plugin to create an executable jars.

You also need a settings.gradle file to define the sub-projects.

settings.gradle

include 'gs-yarn-batch-processing-client','gs-yarn-batch-processing-appmaster','gs-yarn-batch-processing-container','gs-yarn-batch-processing-dist'

Spring Batch Intro

Many batch processing problems can be solved with single threaded, single process jobs, so it is always a good idea to properly check if that meets your needs before thinking about more complex implementations. When you are ready to start implementing a job with some parallel processing, Spring Batch offers a range of options. At a high level there are two modes of parallel processing: single process, multi-threaded; and multi-process.

Spring Hadoop contains a support for running Spring Batch jobs using YARN on a on a Hadoop cluster. For better parallel processing Spring Batch partitioned steps can be executed on YARN as remote steps.

The starting point for running a Spring Batch Job on YARN is always the Application Master whether the job is just a simple job with or without partitioning. In the case when partitioning is not used, the whole job would run within the Application Master and no additional YARN containers would be launched. This may seem a bit odd, to run something on YARN without using containers, but one should remember that the Application Master is also just a resource allocated from the Hadoop cluster running in a YARN container.

In order to run Spring Batch jobs on a Hadoop cluster, few constraints exists:

  • Job Context - Application Master is the main entry point of running the job.

  • Job Repository - Application Master needs to have access to a repository which is located either in-memory or in a relational database. These are the two types natively supported by Spring Batch.

  • Remote Steps - Due to nature how Spring Batch partitioning works, remote steps need access to the job repository.

Let’s take a quick look at how Spring Batch partitioning is handled. The concept of running a partitioned job involves three things, Remote Steps, Partition Handler and a Partitioner. If we do a little bit of oversimplification a remote step is like any other step from a user point of view. Spring Batch itself does not contain implementations for any proprietary grid or remoting fabrics. Spring Batch does however provide a useful implementation of PartitionHandler that executes Steps locally in separate threads of execution, using the TaskExecutor strategy from Spring. Spring Hadoop provides implementation to execute Steps remotely on a Hadoop cluster using YARN.

For more background information about the Spring Batch Partitioning, read the Spring Batch reference documentation.

Create a Remote Batch Step

Here you create a PrintTasklet class.

gs-yarn-batch-processing-container/src/main/java/hello/container/PrintTasklet.java

package hello.container;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

public class PrintTasklet implements Tasklet {

	private static final Log log = LogFactory.getLog(PrintTasklet.class);

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
			throws Exception {
		log.info("PrintTasklet says Hello");
		return RepeatStatus.FINISHED;
	}

}

A Tasklet in Spring Batch is one of the easiest concepts to use when something needs to be executed in a step as part of a job. This simple tasklet is serving the purpose of demonstrating how simple it is to execute a real `Partitioned Step`s on Hadoop YARN without introducing more complex job processing.

In the above PrintTasklet we simply write a log message when the Tasklet itself is executed.

Now it’s time to create a ContainerApplication class.

gs-yarn-batch-processing-container/src/main/java/hello/container/ContainerApplication.java

package hello.container;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.yarn.batch.config.EnableYarnRemoteBatchProcessing;

@Configuration
@EnableAutoConfiguration(exclude = { BatchAutoConfiguration.class })
@EnableYarnRemoteBatchProcessing
public class ContainerApplication {

	@Autowired
	private StepBuilderFactory stepBuilder;

	@Bean
	protected Tasklet tasklet() {
		return new PrintTasklet();
	}

	@Bean
	protected Step remoteStep() throws Exception {
		return stepBuilder
			.get("remoteStep")
			.tasklet(tasklet())
			.build();
	}

	public static void main(String[] args) {
		SpringApplication.run(ContainerApplication.class, args);
	}

}
  • The @EnableYarnRemoteBatchProcessing annotation tells Spring to enable Batch processing for YARN Containers. Batch @EnableBatchProcessing is then automatically included which means all builders for JavaConfig are available.

  • We @AutoWired the step builder which is then used to create steps as beans.

  • We define PrintTasklet as a @Bean

  • We create a step as a @Bean and instruct it to execute a tasklet.

Next we create an application.yml file for the container.

gs-yarn-batch-processing-container/src/main/resources/application.yml

spring:
    batch:
        job:
            enabled: false
    hadoop:
        fsUri: hdfs://localhost:8020
    yarn:
        batch:
            enabled: true
  • We disable batch functionality in Spring Boot core to use YARN specific features.

  • We add Hadoop configuration for HDFS. This can be customized for accessing a real cluster.

  • We enable batch processing on YARN by using spring.yarn.batch.enabled property.

Create a Batch Job

Here we create a AppmasterApplication class.

gs-yarn-batch-processing-appmaster/src/main/java/hello/appmaster/AppmasterApplication.java

package hello.appmaster;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.SimplePartitioner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.yarn.batch.config.EnableYarnBatchProcessing;
import org.springframework.yarn.batch.partition.StaticPartitionHandler;

@Configuration
@EnableAutoConfiguration
@EnableYarnBatchProcessing
public class AppmasterApplication {

	@Autowired
	private JobBuilderFactory jobFactory;

	@Autowired
	private StepBuilderFactory stepFactory;

	@Bean
	public Job job() throws Exception {
		return jobFactory.get("job")
			.incrementer(jobParametersIncrementer())
			.start(master())
			.build();
	}

	@Bean
	public JobParametersIncrementer jobParametersIncrementer() {
		return new RunIdIncrementer();
	}

	@Bean
	protected Step master() throws Exception {
		return stepFactory
			.get("master")
			.partitioner("remoteStep", partitioner())
			.partitionHandler(partitionHandler())
			.build();
	}

	@Bean
	protected Partitioner partitioner() {
		return new SimplePartitioner();
	}

	@Bean
	protected PartitionHandler partitionHandler() {
		StaticPartitionHandler handler = new StaticPartitionHandler();
		handler.setStepName("remoteStep");
		handler.setGridSize(2);
		return handler;
	}

	public static void main(String[] args) {
		SpringApplication.run(AppmasterApplication.class, args);
	}

}
  • @EnableYarnBatchProcessing tells Spring to enable Batch processing for appmaster.

  • We @AutoWired builders for steps and jobs.

Next we create an application.yml file for the appmaster.

gs-yarn-batch-processing-appmaster/src/main/resources/application.yml

spring:
    batch:
        job:
            enabled: false
    hadoop:
        fsUri: hdfs://localhost:8020
        resourceManagerHost: localhost
    yarn:
        appName: gs-yarn-batch-processing
        applicationDir: /app/gs-yarn-batch-processing/
        batch:
            enabled: true
            name: job
            jobs:
              - name: job
                enabled: true
                next: true
        appmaster:
            keepContextAlive: false
            launchcontext:
                archiveFile: gs-yarn-batch-processing-container-0.1.0.jar
  • We disable batch functionality in Boot core to use a YARN specific featurs.

  • We add Hadoop configuration for HDFS. This can be customized for accessing a real cluster.

  • We enable batch processing on YARN by using spring.yarn.batch.enabled property.

  • We define a job named job to run automatically.

  • We enable a job named job and allow it to do a next operation.

Create a Yarn Client

Here we create a ClientApplication class.

gs-yarn-batch-processing-client/src/main/java/hello/client/ClientApplication.java

package hello.client;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.yarn.client.YarnClient;

@EnableAutoConfiguration
public class ClientApplication {

	public static void main(String[] args) {
		SpringApplication.run(ClientApplication.class, args)
			.getBean(YarnClient.class)
			.submitApplication();
	}

}

The ClientApplication is similar to what we’ve used in other guides and its only purpose is to submit a YARN application.

Here you create an application.yml file for the client.

gs-yarn-batch-processing-client/src/main/resources/application.yml

spring:
    hadoop:
        fsUri: hdfs://localhost:8020
        resourceManagerHost: localhost
    yarn:
        appName: gs-yarn-batch-processing
        applicationDir: /app/gs-yarn-batch-processing/
        client:
            files:
              - "file:target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-container-0.1.0.jar"
              - "file:target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-appmaster-0.1.0.jar"
            launchcontext:
                archiveFile: gs-yarn-batch-processing-appmaster-0.1.0.jar
  • Here we simply defined all the files of the application that are needed for submission.

Build the Application

For gradle simply execute the clean and build tasks.

./gradlew clean build

To skip existing tests if any:

./gradlew clean build -x test

For maven simply execute the clean and package goals.

mvn clean package

To skip existing tests if any:

mvn clean package -DskipTests=true

Below listing shows files after a succesfull gradle build.

gs-yarn-batch-processing-dist/target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-client-0.1.0.jar
gs-yarn-batch-processing-dist/target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-appmaster-0.1.0.jar
gs-yarn-batch-processing-dist/target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-container-0.1.0.jar

Run the Application

Now that we’ve successfully compiled and packaged our application, it’s time to do the fun part and execute it on Hadoop YARN.

Simply run the executable client jar.

$ cd gs-yarn-batch-processing-dist
$ java -jar target/gs-yarn-batch-processing-dist/gs-yarn-batch-processing-client-0.1.0.jar

If the application executes successfully we should be able to see two steps executed on a YARN.

Create a JUnit Test Class

Below is a class which can be used to execute this application as a JUnit test without running Hadoop cluster.

gs-yarn-batch-processing-dist/src/test/java/hello/AppIT.java

package hello;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import hello.client.ClientApplication;

import java.io.File;
import java.util.List;
import java.util.Scanner;

import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.junit.Test;
import org.springframework.core.io.Resource;
import org.springframework.yarn.boot.test.junit.AbstractBootYarnClusterTests;
import org.springframework.yarn.test.context.MiniYarnClusterTest;
import org.springframework.yarn.test.junit.ApplicationInfo;
import org.springframework.yarn.test.support.ContainerLogUtils;

@MiniYarnClusterTest
public class AppIT extends AbstractBootYarnClusterTests {

	@Test
	public void testAppSubmission() throws Exception {

		ApplicationInfo info = submitApplicationAndWait(ClientApplication.class, new String[0]);
		assertThat(info.getYarnApplicationState(), is(YarnApplicationState.FINISHED));

		List<Resource> resources = ContainerLogUtils.queryContainerLogs(getYarnCluster(), info.getApplicationId());
		assertThat(resources, notNullValue());
		assertThat(resources.size(), is(6));

		for (Resource res : resources) {
			File file = res.getFile();
			if (file.getName().endsWith("stdout")) {
				// there has to be some content in stdout file
				assertThat(file.length(), greaterThan(0l));
				if (file.getName().equals("Container.stdout")) {
					Scanner scanner = new Scanner(file);
					String content = scanner.useDelimiter("\\A").next();
					scanner.close();
					// this is what container will log in stdout
					assertThat(content, containsString("PrintTasklet says Hello"));
				}
			} else if (file.getName().endsWith("stderr")) {
				String content = "";
				if (file.length() > 0) {
					Scanner scanner = new Scanner(file);
					content = scanner.useDelimiter("\\A").next();
					scanner.close();
				}
				// can't have anything in stderr files
				assertThat("stderr file is not empty: " + content, file.length(), is(0l));
			}
		}
	}

}

Summary

Congratulations! You’ve just developed a Spring YARN application executing a Spring Batch job!

Want to write a new guide or contribute to an existing one? Check out our contribution guidelines.

All guides are released with an ASLv2 license for the code, and an Attribution, NoDerivatives creative commons license for the writing.