{"id":205,"date":"2012-06-11T16:37:16","date_gmt":"2012-06-11T20:37:16","guid":{"rendered":"http:\/\/lichun.cc\/blog\/?p=205"},"modified":"2012-06-11T16:37:16","modified_gmt":"2012-06-11T20:37:16","slug":"creating-amazon-emr-job-using-java-api","status":"publish","type":"post","link":"https:\/\/www.lichun.cc\/blog\/2012\/06\/creating-amazon-emr-job-using-java-api\/","title":{"rendered":"Creating Amazon EMR job using Java API"},"content":{"rendered":"<p>There are few documents about the Amazon EMR Java API usage, in case you are finding codes configuring and starting EMR job using Java, here are the codes I use:<\/p>\n<p>(a brief official sample can be found <a href=\"http:\/\/docs.amazonwebservices.com\/ElasticMapReduce\/latest\/DeveloperGuide\/calling-emr-with-java-sdk.html\" target=\"_blank\">here<\/a>, the file contains the following code is <a href=\"http:\/\/lichun.cc\/blog\/wp-content\/uploads\/2012\/06\/EMRStarter.zip\">Here<\/a>)<\/p>\n<p><!--more--><\/p>\n<p>The codes include 2 steps, 1 for debugging and 1 for my own job. You can delete the debugging one if you want. (The code is modified based on some code I borrowed online, which I forget where I found it&#8230;thanks to the original author).<\/p>\n<pre>\npublic class EMRStarter {\n\n\tprivate static final String HADOOP_VERSION = \"0.20\";\n\tprivate static final int INSTANCE_COUNT = 2;\n\tprivate static final String INSTANCE_TYPE = InstanceType.M1Small.toString();\n\tprivate static final String FLOW_NAME = \"EMR Test\";\n\tprivate static final String BUCKET_NAME = \"my-bucket\";\n\tprivate static final String S3N_HADOOP_JAR = \"s3:\/\/\" + BUCKET_NAME + \"\/jar\/InventoryStorageCost.jar\"; \/\/ jar\n\tprivate static final String outputDir = \"test\";\n\tprivate static final String S3N_LOG_URI = \"s3:\/\/\" + BUCKET_NAME + \"\/log\/\" + outputDir;\n\tprivate static final String[] JOB_ARGS = new String[] { \n\t\t\t\"s3:\/\/path\",\n\t\t\t\"s3:\/\/path\",\n\t\t\t\"s3:\/\/path\",\n\t\t\t\"arg\"\n\t\t\t};\n\n\tprivate static final List<String> ARGS_AS_LIST = Arrays.asList(JOB_ARGS);\n\tprivate static final List<JobFlowExecutionState> DONE_STATES = Arrays.asList(new JobFlowExecutionState[] {\n\t\t\tJobFlowExecutionState.COMPLETED, JobFlowExecutionState.FAILED, JobFlowExecutionState.TERMINATED \n        });\n\n\tstatic AmazonElasticMapReduce emr;\n\n\t\/**\n\t * \n\t * The only information needed to create a client are security credentials * consisting of the AWS Access Key ID and\n\t * Secret Access Key. All other * configuration, such as the service end points, are performed * automatically.\n\t * Client parameters, such as proxies, can be specified in an * optional ClientConfiguration object when\n\t * constructing a client.\n\t * \n\t * @see com.amazonaws.auth.BasicAWSCredentials\n\t * \n\t * @see com.amazonaws.auth.PropertiesCredentials\n\t * \n\t * @see com.amazonaws.ClientConfiguration\n\t *\/\n\n\tprivate static void init() throws Exception {\n\t\tFile file = new File(\"AwsCredentials.properties\");\n\t\tAWSCredentials credentials = new PropertiesCredentials(file);\n\t\temr = new AmazonElasticMapReduceClient(credentials);\n\t}\n\n\tpublic static void main(String[] args) throws Exception {\n\t\tSystem.out.println(\"===========================================\");\n\t\tSystem.out.println(\"Welcome to the Elastic Map Reduce!\");\n\t\tSystem.out.println(\"===========================================\");\n\n\t\tinit();\n\n\t\tStepFactory stepFactory = new StepFactory();\n\t\t\/\/ create the debugging step\n\t\tStepConfig enableDebugging = new StepConfig().withName(\"Enable Debugging\")\n\t\t\t\t.withActionOnFailure(\"TERMINATE_JOB_FLOW\").withHadoopJarStep(stepFactory.newEnableDebuggingStep());\n\n\t\ttry {\n\t\t\t\/\/ Configure instances to use\n\t\t\tJobFlowInstancesConfig instances = new JobFlowInstancesConfig();\n\n\t\t\tSystem.out.println(\"Using EMR Hadoop v\" + HADOOP_VERSION);\n\t\t\tinstances.setHadoopVersion(HADOOP_VERSION);\n\n\t\t\tSystem.out.println(\"Using instance count: \" + INSTANCE_COUNT);\n\t\t\tinstances.setInstanceCount(INSTANCE_COUNT);\n\n\t\t\tSystem.out.println(\"Using master instance type: \" + INSTANCE_TYPE);\n\t\t\tinstances.setMasterInstanceType(INSTANCE_TYPE);\n\n\t\t\tSystem.out.println(\"Using slave instance type: \" + INSTANCE_TYPE);\n\t\t\tinstances.setSlaveInstanceType(INSTANCE_TYPE);\n\n\t\t\t\/\/ Configure the job flow\n\t\t\tSystem.out.println(\"Configuring flow: \" + FLOW_NAME);\n\t\t\tRunJobFlowRequest request = new RunJobFlowRequest(FLOW_NAME, instances);\n\n\t\t\tSystem.out.println(\"tusing log URI: \" + S3N_LOG_URI);\n\t\t\trequest.setLogUri(S3N_LOG_URI);\n\n\t\t\t\/\/ Configure the Hadoop jar to use\n\t\t\tSystem.out.println(\"tusing jar URI: \" + S3N_HADOOP_JAR);\n\t\t\tHadoopJarStepConfig jarConfig = new HadoopJarStepConfig(S3N_HADOOP_JAR);\n\t\t\tSystem.out.println(\"tusing args: \" + ARGS_AS_LIST);\n\t\t\tjarConfig.setArgs(ARGS_AS_LIST);\n\n\t\t\t\/\/ main step\n\t\t\tStepConfig stepConfig = new StepConfig(S3N_HADOOP_JAR.substring(S3N_HADOOP_JAR.indexOf('\/') + 1), jarConfig);\n\t\t\trequest.setSteps(Arrays.asList(new StepConfig[] { enableDebugging, stepConfig }));\n\n\t\t\t\/\/ Run the job flow\n\t\t\tRunJobFlowResult result = emr.runJobFlow(request);\n\t\t\t\/\/ Check the status of the running job\n\t\t\tString lastState = \"\";\n\t\t\tSTATUS_LOOP: while (true) {\n\t\t\t\tDescribeJobFlowsRequest desc = new DescribeJobFlowsRequest(Arrays.asList(new String[] { result\n\t\t\t\t\t\t.getJobFlowId() }));\n\n\t\t\t\tDescribeJobFlowsResult descResult = emr.describeJobFlows(desc);\n\n\t\t\t\tfor (JobFlowDetail detail : descResult.getJobFlows()) {\n\t\t\t\t\tString state = detail.getExecutionStatusDetail().getState();\n\t\t\t\t\tif (isDone(state)) {\n\t\t\t\t\t\tSystem.out.println(\"Job \" + state + \": \" + detail.toString());\n\t\t\t\t\t\tbreak STATUS_LOOP;\n\t\t\t\t\t} else if (!lastState.equals(state)) {\n\t\t\t\t\t\tlastState = state;\n\t\t\t\t\t\tSystem.out.println(\"Job \" + state + \" at \" + new Date().toString());\n\t\t\t\t\t}\n\t\t\t\t}\n\n\t\t\t\tThread.sleep(10000);\n\n\t\t\t}\n\n\t\t} catch (AmazonServiceException ase) {\n\t\t\tSystem.out.println(\"Caught Exception: \" + ase.getMessage());\n\t\t\tSystem.out.println(\"Reponse Status Code: \" + ase.getStatusCode());\n\t\t\tSystem.out.println(\"Error Code: \" + ase.getErrorCode());\n\t\t\tSystem.out.println(\"Request ID: \" + ase.getRequestId());\n\t\t}\n\n\t}\n\n\t\/**\n\t * \n\t * @param value\n\t * \n\t * @return\n\t *\/\n\n\tpublic static boolean isDone(String value) {\n\t\tJobFlowExecutionState state = JobFlowExecutionState.fromValue(value);\n\t\treturn DONE_STATES.contains(state);\n\t}\n\n}\n\n<\/pre>\n","protected":false},"excerpt":{"rendered":"<p>There are few documents about the Amazon EMR Java API usage, in case you are finding codes configuring and starting EMR job using Java, here are the codes I use: (a brief official sample can be found here, the file contains the following code is Here)<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_mi_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0,"jetpack_publicize_message":"","jetpack_is_tweetstorm":false,"jetpack_publicize_feature_enabled":true},"categories":[14],"tags":[40,41,42,83],"jetpack_publicize_connections":[],"jetpack_featured_media_url":"","jetpack_shortlink":"https:\/\/wp.me\/p2s9sh-3j","jetpack_sharing_enabled":true,"jetpack_likes_enabled":true,"_links":{"self":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/205"}],"collection":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/comments?post=205"}],"version-history":[{"count":0,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/205\/revisions"}],"wp:attachment":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/media?parent=205"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/categories?post=205"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/tags?post=205"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}