There are few documents about the Amazon EMR Java API usage, in case you are finding codes configuring and starting EMR job using Java, here are the codes I use:
(a brief official sample can be found here, the file contains the following code is Here)
The codes include 2 steps, 1 for debugging and 1 for my own job. You can delete the debugging one if you want. (The code is modified based on some code I borrowed online, which I forget where I found it…thanks to the original author).
public class EMRStarter {
private static final String HADOOP_VERSION = "0.20";
private static final int INSTANCE_COUNT = 2;
private static final String INSTANCE_TYPE = InstanceType.M1Small.toString();
private static final String FLOW_NAME = "EMR Test";
private static final String BUCKET_NAME = "my-bucket";
private static final String S3N_HADOOP_JAR = "s3://" + BUCKET_NAME + "/jar/InventoryStorageCost.jar"; // jar
private static final String outputDir = "test";
private static final String S3N_LOG_URI = "s3://" + BUCKET_NAME + "/log/" + outputDir;
private static final String[] JOB_ARGS = new String[] {
"s3://path",
"s3://path",
"s3://path",
"arg"
};
private static final List ARGS_AS_LIST = Arrays.asList(JOB_ARGS);
private static final List DONE_STATES = Arrays.asList(new JobFlowExecutionState[] {
JobFlowExecutionState.COMPLETED, JobFlowExecutionState.FAILED, JobFlowExecutionState.TERMINATED
});
static AmazonElasticMapReduce emr;
/**
*
* The only information needed to create a client are security credentials * consisting of the AWS Access Key ID and
* Secret Access Key. All other * configuration, such as the service end points, are performed * automatically.
* Client parameters, such as proxies, can be specified in an * optional ClientConfiguration object when
* constructing a client.
*
* @see com.amazonaws.auth.BasicAWSCredentials
*
* @see com.amazonaws.auth.PropertiesCredentials
*
* @see com.amazonaws.ClientConfiguration
*/
private static void init() throws Exception {
File file = new File("AwsCredentials.properties");
AWSCredentials credentials = new PropertiesCredentials(file);
emr = new AmazonElasticMapReduceClient(credentials);
}
public static void main(String[] args) throws Exception {
System.out.println("===========================================");
System.out.println("Welcome to the Elastic Map Reduce!");
System.out.println("===========================================");
init();
StepFactory stepFactory = new StepFactory();
// create the debugging step
StepConfig enableDebugging = new StepConfig().withName("Enable Debugging")
.withActionOnFailure("TERMINATE_JOB_FLOW").withHadoopJarStep(stepFactory.newEnableDebuggingStep());
try {
// Configure instances to use
JobFlowInstancesConfig instances = new JobFlowInstancesConfig();
System.out.println("Using EMR Hadoop v" + HADOOP_VERSION);
instances.setHadoopVersion(HADOOP_VERSION);
System.out.println("Using instance count: " + INSTANCE_COUNT);
instances.setInstanceCount(INSTANCE_COUNT);
System.out.println("Using master instance type: " + INSTANCE_TYPE);
instances.setMasterInstanceType(INSTANCE_TYPE);
System.out.println("Using slave instance type: " + INSTANCE_TYPE);
instances.setSlaveInstanceType(INSTANCE_TYPE);
// Configure the job flow
System.out.println("Configuring flow: " + FLOW_NAME);
RunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, instances);
System.out.println("tusing log URI: " + S3N_LOG_URI);
request.setLogUri(S3N_LOG_URI);
// Configure the Hadoop jar to use
System.out.println("tusing jar URI: " + S3N_HADOOP_JAR);
HadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);
System.out.println("tusing args: " + ARGS_AS_LIST);
jarConfig.setArgs(ARGS_AS_LIST);
// main step
StepConfig stepConfig = new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('/') + 1), jarConfig);
request.setSteps(Arrays.asList(new StepConfig[] { enableDebugging, stepConfig }));
// Run the job flow
RunJobFlowResult result = emr.runJobFlow(request);
// Check the status of the running job
String lastState = "";
STATUS_LOOP: while (true) {
DescribeJobFlowsRequest desc = new DescribeJobFlowsRequest(Arrays.asList(new String[] { result
.getJobFlowId() }));
DescribeJobFlowsResult descResult = emr.describeJobFlows(desc);
for (JobFlowDetail detail : descResult.getJobFlows()) {
String state = detail.getExecutionStatusDetail().getState();
if (isDone(state)) {
System.out.println("Job " + state + ": " + detail.toString());
break STATUS_LOOP;
} else if (!lastState.equals(state)) {
lastState = state;
System.out.println("Job " + state + " at " + new Date().toString());
}
}
Thread.sleep(10000);
}
} catch (AmazonServiceException ase) {
System.out.println("Caught Exception: " + ase.getMessage());
System.out.println("Reponse Status Code: " + ase.getStatusCode());
System.out.println("Error Code: " + ase.getErrorCode());
System.out.println("Request ID: " + ase.getRequestId());
}
}
/**
*
* @param value
*
* @return
*/
public static boolean isDone(String value) {
JobFlowExecutionState state = JobFlowExecutionState.fromValue(value);
return DONE_STATES.contains(state);
}
}