This guide walks you through the process of executing a Spring Batch job and its partitioned steps on Hadoop YARN. We also add in a way of simulating an error in a partitioned step to show the restart of a job.

What you’ll build

You’ll build a simple Hadoop YARN application with Spring Hadoop and Spring Boot. This application contains a job with two master steps where the actual execution is done on YARN as partitioned steps. We also simulate an error of step execution order to demonstrate a job restart so that step execution is continued from the failed steps.

What you’ll need

Testing this sample application doesn’t need existing or running Hadoop instance.

How to complete this guide

Like most Spring Getting Started guides, you can start from scratch and complete each step, or you can bypass basic setup steps that are already familiar to you. Either way, you end up with working code.

To start from scratch, move on to Set up the project.

To skip the basics, do the following:

When you’re finished, you can check your results against the code in gs-yarn-batch-restart/complete.

Set up the project

First you set up a basic build script. You can use any build system you like when building apps with Spring, but the code you need to work with Gradle is included here. If you’re not familiar with it, refer to Building Java Projects with Gradle.

Create the directory structure

In a project directory of your choosing, create the following subdirectory structure:

├── gs-yarn-batch-restart-appmaster
│   └── src
│       └── main
│           ├── resources
│           └── java
│               └── hello
│                   └── appmaster
├── gs-yarn-batch-restart-container
│   └── src
│       └── main
│           ├── resources
│           └── java
│               └── hello
│                   └── container
├── gs-yarn-batch-restart-client
│   └── src
│       └── main
│           ├── resources
│           └── java
│               └── hello
│                   └── client
└── gs-yarn-batch-restart-dist
    └── src
        └── test
            └── java
                └── hello

for example, with

mkdir -p gs-yarn-batch-restart-appmaster/src/main/resources
mkdir -p gs-yarn-batch-restart-appmaster/src/main/java/hello/appmaster
mkdir -p gs-yarn-batch-restart-container/src/main/resources
mkdir -p gs-yarn-batch-restart-container/src/main/java/hello/container
mkdir -p gs-yarn-batch-restart-client/src/main/resources
mkdir -p gs-yarn-batch-restart-client/src/main/java/hello/client
mkdir -p gs-yarn-batch-restart-dist/src/test/java/hello

on *nix systems:

Create a Gradle build file

Below is the initial Gradle build file. If you are using Spring Tool Suite (STS), you can import the guide directly.

build.gradle

buildscript {
    repositories {
        maven { url "http://repo.spring.io/libs-release" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.3.3.RELEASE")
    }
}

allprojects {
    apply plugin: 'base'
}

subprojects { subproject ->
    apply plugin: 'java'
    apply plugin: 'eclipse'
    apply plugin: 'idea'
    version =  '0.1.0'
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/libs-release" }
    }
    dependencies {
        compile("org.springframework.data:spring-yarn-boot:2.4.0.RELEASE")
    }
    task copyJars(type: Copy) {
        from "$buildDir/libs"
        into "$rootDir/gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/"
        include "**/*.jar"
    }
    configurations {
        compile.exclude group: "org.slf4j", module: "slf4j-log4j12"
        runtime.exclude group: "org.slf4j", module: "slf4j-log4j12"
    }
    assemble.doLast {copyJars.execute()}
}

project('gs-yarn-batch-restart-client') {
    apply plugin: 'spring-boot'
}

project('gs-yarn-batch-restart-appmaster') {
    apply plugin: 'spring-boot'
    dependencies {
        compile("org.springframework.data:spring-yarn-batch:2.4.0.RELEASE")
        runtime("org.springframework.boot:spring-boot-starter-batch:1.3.3.RELEASE")
        runtime("org.hsqldb:hsqldb:2.3.1")
        runtime("commons-dbcp:commons-dbcp:1.2.2")
    }
}

project('gs-yarn-batch-restart-container') {
    apply plugin: 'spring-boot'
    dependencies {
        compile("org.springframework.data:spring-yarn-batch:2.4.0.RELEASE")
        runtime("org.springframework.boot:spring-boot-starter-batch:1.3.3.RELEASE")
    }
}

project('gs-yarn-batch-restart-dist') {
    dependencies {
        compile project(":gs-yarn-batch-restart-client")
        compile project(":gs-yarn-batch-restart-appmaster")
        compile project(":gs-yarn-batch-restart-container")
        testCompile("org.hsqldb:hsqldb:2.3.1")
        testCompile("org.springframework.data:spring-yarn-boot-test:2.4.0.RELEASE")
        testCompile("org.hamcrest:hamcrest-core:1.2.1")
        testCompile("org.hamcrest:hamcrest-library:1.2.1")
    }
    test.dependsOn(':gs-yarn-batch-restart-client:assemble')
    test.dependsOn(':gs-yarn-batch-restart-appmaster:assemble')
    test.dependsOn(':gs-yarn-batch-restart-container:assemble')
    clean.doLast {ant.delete(dir: "target")}
}

task wrapper(type: Wrapper) {
    gradleVersion = '1.11'
}

In the above gradle build file we simply create three different jars — client, appmaster and container, each having classes for its specific role. These jars are then repackaged by Spring Boot’s gradle plugin to create executable jars.

You also need a settings.gradle file to define the sub-projects.

settings.gradle

include 'gs-yarn-batch-restart-client','gs-yarn-batch-restart-appmaster','gs-yarn-batch-restart-container','gs-yarn-batch-restart-dist'

Create a Remote Batch Step

First we create an HdfsTasklet class.

gs-yarn-batch-restart-container/src/main/java/hello/container/HdfsTasklet.java

package hello.container;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.hadoop.fs.FsShell;

public class HdfsTasklet implements Tasklet {

	private static final Log log = LogFactory.getLog(HdfsTasklet.class);

	@Autowired
	private Configuration configuration;

	@Override
	public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
		String fileName = chunkContext.getStepContext().getStepName().replaceAll("\\W", "");
		FsShell shell = new FsShell(configuration);
		boolean exists = shell.test("/tmp/" + fileName);
		shell.close();
		if (exists) {
			log.info("File /tmp/" + fileName + " exist");
			log.info("Hello from HdfsTasklet ok");
			return RepeatStatus.FINISHED;
		} else {
			log.info("Hello from HdfsTasklet fail");
			throw new RuntimeException("File /tmp/" + fileName + " does't exist");
		}
	}

}
  • We @AutoWired Hadoop’s Configuration to be able to use FsShell.

  • We simply check if a file exists in HDFS and throw RuntimeException if the file doesn’t exist. This is done to simulate error in a Tasklet. If the file does exist we simply return FINISHED from a Tasklet.

  • To figure out which file name to use, we access stepname which looks something like remoteStep1:partition0 and remove any illegal file name characters.

Next we create a ContainerApplication class.

gs-yarn-batch-restart-container/src/main/java/hello/container/ContainerApplication.java

package hello.container;

import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.batch.BatchAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.yarn.batch.config.EnableYarnRemoteBatchProcessing;

@Configuration
@EnableAutoConfiguration(exclude = { BatchAutoConfiguration.class })
@EnableYarnRemoteBatchProcessing
public class ContainerApplication {

	@Autowired
	private StepBuilderFactory steps;

	@Bean
	public Step remoteStep1() throws Exception {
		return steps
			.get("remoteStep1")
			.tasklet(tasklet())
			.build();
	}

	@Bean
	public Step remoteStep2() throws Exception {
		return steps
			.get("remoteStep2")
			.tasklet(tasklet())
			.build();
	}

	@Bean
	public Tasklet tasklet() {
		return new HdfsTasklet();
	}

	public static void main(String[] args) {
		SpringApplication.run(ContainerApplication.class, args);
	}

}
  • We simply create two steps named remoteStep1 and remoteStep2 and attach HdfsTasklet to those steps.

Next we create an application.yml file for the container app.

gs-yarn-batch-restart-container/src/main/resources/application.yml

spring:
    batch:
        job:
            enabled: false
    hadoop:
        fsUri: hdfs://localhost:8020
        resourceManagerHost: localhost
    yarn:
        batch:
            enabled: true
  • We disable batch functionality in Spring Boot core to use YARN specific features.

  • We add Hadoop configuration for HDFS. This can be customized for accessing a real cluster.

  • We enable batch processing on YARN by using spring.yarn.batch.enable property.

Create a Batch Job

Now we create a AppmasterApplication class.

gs-yarn-batch-restart-appmaster/src/main/java/hello/appmaster/AppmasterApplication.java

package hello.appmaster;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.SimplePartitioner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.yarn.batch.config.EnableYarnBatchProcessing;
import org.springframework.yarn.batch.partition.StaticPartitionHandler;

@Configuration
@EnableAutoConfiguration
@EnableYarnBatchProcessing
public class AppmasterApplication {

	@Autowired
	private JobBuilderFactory jobFactory;

	@Autowired
	private StepBuilderFactory stepFactory;

	@Bean
	public Job job() throws Exception {
		return jobFactory.get("job")
			.incrementer(jobParametersIncrementer())
			.start(master1())
			.next(master2())
			.build();
	}

	@Bean
	public JobParametersIncrementer jobParametersIncrementer() {
		return new RunIdIncrementer();
	}

	@Bean
	protected Step master1() throws Exception {
		return stepFactory.get("master1")
			.partitioner("remoteStep1", partitioner())
			.partitionHandler(partitionHandler1())
			.build();
	}

	@Bean
	protected Step master2() throws Exception {
		return stepFactory.get("master2")
			.partitioner("remoteStep2", partitioner())
			.partitionHandler(partitionHandler2())
			.build();
	}

	@Bean
	protected Partitioner partitioner() {
		return new SimplePartitioner();
	}

	@Bean
	protected PartitionHandler partitionHandler1() {
		StaticPartitionHandler handler = new StaticPartitionHandler();
		handler.setStepName("remoteStep1");
		handler.setGridSize(2);
		return handler;
	}

	@Bean
	protected PartitionHandler partitionHandler2() {
		StaticPartitionHandler handler = new StaticPartitionHandler();
		handler.setStepName("remoteStep2");
		handler.setGridSize(2);
		return handler;
	}

	public static void main(String[] args) {
		SpringApplication.run(AppmasterApplication.class, args);
	}

}
  • We simply create two master steps named master1 and master2. Then we configure those steps to be partitioned on YARN and we set the grid size to two.

Next we create a application.yml file for appmaster.

gs-yarn-batch-restart-appmaster/src/main/resources/application.yml

spring:
    batch:
        job:
            enabled: false
    datasource:
        url: jdbc:hsqldb:hsql://localhost:9001/testdb
        driverClassName: org.hsqldb.jdbcDriver
    hadoop:
        fsUri: hdfs://localhost:8020
        resourceManagerHost: localhost
    yarn:
        appName: gs-yarn-batch-restart
        applicationDir: /app/gs-yarn-batch-restart/
        batch:
            enabled: true
            name: job
            jobs:
              - name: job
                enabled: true
                next: true
                failNext: false
                restart: true
                failRestart: false
        appmaster:
            keepContextAlive: false
            launchcontext:
                archiveFile: gs-yarn-batch-restart-container-0.1.0.jar
  • Again, we disable batch functionality in Boot core to use a YARN specific features.

  • We add Hadoop configuration for HDFS. This can be customized for accessing a real cluster.

  • We enable batch processing on YARN by using spring.yarn.batch.enable property.

  • We define a job named job to run automatically.

  • We enable the job named job and allow it to do a next operation with an indication that job execution should not fail if job parameters cannot be incremented.

  • We enable job restart and with an indication that job should not fail if job cannot be restarted.

Create a Yarn Client

Now we create a ClientApplication class.

gs-yarn-batch-restart-client/src/main/java/hello/client/ClientApplication.java

package hello.client;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.yarn.client.YarnClient;

@EnableAutoConfiguration
public class ClientApplication {

	public static void main(String[] args) {
		SpringApplication.run(ClientApplication.class, args)
			.getBean(YarnClient.class)
			.submitApplication();
	}

}
  • @EnableAutoConfiguration tells Spring Boot to start adding beans based on classpath setting, other beans, and various property settings.

  • Specific auto-configuration for Spring YARN components takes place in the same way it does in a regular Spring Boot app.

The main() method uses Spring Boot’s SpringApplication.run() method to launch an application. From there, we simply request a bean of type YarnClient and execute its submitApplication() method. What happens next depends on application configuration, which we go through later in this guide. Did you notice that there wasn’t a single line of XML?

Next we create an application.yml file for the client.

gs-yarn-batch-restart-client/src/main/resources/application.yml

spring:
    hadoop:
        fsUri: hdfs://localhost:8020
        resourceManagerHost: localhost
    yarn:
        appName: gs-yarn-batch-restart
        applicationDir: /app/gs-yarn-batch-restart/
        client:
            files:
              - "file:target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-container-0.1.0.jar"
              - "file:target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-appmaster-0.1.0.jar"
            launchcontext:
                archiveFile: gs-yarn-batch-restart-appmaster-0.1.0.jar
  • Here we simply defined all the files of the application that are needed for submission.

Build the Application

For gradle simply execute the clean and build tasks.

./gradlew clean build

To skip existing tests if any:

./gradlew clean build -x test

For maven simply execute the clean and package goals.

mvn clean package

To skip existing tests if any:

mvn clean package -DskipTests=true

Below listing shows files after a succesfull gradle build.

gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-client-0.1.0.jar
gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-appmaster-0.1.0.jar
gs-yarn-batch-restart-dist/target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-container-0.1.0.jar

Run the Application

Now that you’ve successfully compiled and packaged your application, it’s time for the fun part of executing it on a Hadoop YARN.

Because we need to persist the Spring Batch job status a database is needed. We’ve bundled an instance of HSQL which is easy to start using in-memory mode. In a separate terminal window run the following:

$ cd db/
$ unzip hsqldb-2.3.1.zip
$ cd hsqldb-2.3.1/hsqldb/data/
$ java -cp ../lib/hsqldb.jar org.hsqldb.server.Server --database.0 mem:testdb --dbname.0 testdb --silent false --trace true

Note: If you build this from scratch you can download the HSQL zip file from http://sourceforge.net/projects/hsqldb/files/hsqldb/hsqldb_2_3/.

Back to setting up for actually running the application. First create two empty files /tmp/remoteStep1partition0 and /tmp/remoteStep1partition1 in HDFS:

$ hdfs dfs -touchz /tmp/remoteStep1partition0
$ hdfs dfs -touchz /tmp/remoteStep1partition1

Then run the application:

$ cd gs-yarn-batch-restart-dist
$ java -jar target/gs-yarn-batch-restart-dist/gs-yarn-batch-restart-client-0.1.0.jar

If you check the application status from a YARN resource manager, you’ll see that application FAILED because partitioned steps of the second phase failed. Now create files /tmp/remoteStep2partition0 and /tmp/remoteStep2partition1:

$ hdfs dfs -touchz /tmp/remoteStep2partition0
$ hdfs dfs -touchz /tmp/remoteStep2partition1

Runt the application again:

$ java -jar target/dist/gs-yarn-batch-restart-client-0.1.0.jar

You should now be able to see that application finished successfully and the only the failed partitioned steps were executed.

Test the Application

Below is a class which can be used to execute this application as a JUnit test without running Hadoop cluster.

gs-yarn-batch-restart-dist/src/test/java/hello/AppIT.java

package hello;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import hello.client.ClientApplication;

import java.io.File;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.hsqldb.Server;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.core.io.Resource;
import org.springframework.data.hadoop.fs.FsShell;
import org.springframework.yarn.boot.test.junit.AbstractBootYarnClusterTests;
import org.springframework.yarn.test.context.MiniYarnClusterTest;
import org.springframework.yarn.test.junit.ApplicationInfo;
import org.springframework.yarn.test.support.ContainerLogUtils;

@MiniYarnClusterTest
public class AppIT extends AbstractBootYarnClusterTests {

	@Test
	public void testApp() throws Exception {
		FsShell shell = new FsShell(getConfiguration());
		shell.touchz("/tmp/remoteStep1partition0");
		shell.touchz("/tmp/remoteStep1partition1");

		ApplicationInfo info1 = submitApplicationAndWait(ClientApplication.class, new String[0], 4, TimeUnit.MINUTES);
		assertThat(info1.getYarnApplicationState(), is(YarnApplicationState.FINISHED));
		assertLogs(ContainerLogUtils.queryContainerLogs(getYarnCluster(), info1.getApplicationId()), 10, 2, 2);

		shell.touchz("/tmp/remoteStep2partition0");
		shell.touchz("/tmp/remoteStep2partition1");
		shell.close();

		ApplicationInfo info2 = submitApplicationAndWait(ClientApplication.class, new String[0], 4, TimeUnit.MINUTES);
		assertThat(info2.getYarnApplicationState(), is(YarnApplicationState.FINISHED));
		assertLogs(ContainerLogUtils.queryContainerLogs(getYarnCluster(), info2.getApplicationId()), 6, 2, 0);
	}

	private void assertLogs(List<Resource> resources, int numResources, int numOk, int numFail) throws Exception {
		int ok = 0;
		int fail = 0;
		assertThat(resources, notNullValue());
		assertThat(resources.size(), is(numResources));
		for (Resource res : resources) {
			File file = res.getFile();
			String content = ContainerLogUtils.getFileContent(file);
			if (file.getName().endsWith("stdout")) {
				assertThat(file.length(), greaterThan(0l));
				if (file.getName().equals("Container.stdout")) {
					assertThat(content, containsString("Hello from HdfsTasklet"));
					if (content.contains("Hello from HdfsTasklet ok")) {
						ok++;
					}
					if (content.contains("Hello from HdfsTasklet fail")) {
						fail++;
					}
				}
			} else if (file.getName().endsWith("stderr")) {
				assertThat("stderr file is not empty: " + content, file.length(), is(0l));
			}
		}
		assertThat("Failed to find ok's from logs", ok, is(numOk));
		assertThat("Failed to find fail's from logs", fail, is(numFail));
	}

	private Server server = null;

	@Before
	public void startDb() {
		if (server == null) {
			server = new Server();
			server.setSilent(false);
			server.setTrace(true);
			server.setDatabaseName(0, "testdb");
			server.setDatabasePath(0, "mem:testdb");
			server.start();
		}
	}

	@After
	public void stopDb() {
		if (server != null) {
			server.stop();
			server = null;
		}
	}

}

Running the JUnit test doesn’t require existing database instance because as seen from a above example, a HSQL instance is created within a test itself.

Summary

Congratulations! You’ve just developed a Spring YARN application!

See Also

The following guides may also be helpful:

Want to write a new guide or contribute to an existing one? Check out our contribution guidelines.

All guides are released with an ASLv2 license for the code, and an Attribution, NoDerivatives creative commons license for the writing.