{"id":63,"date":"2012-05-24T10:28:49","date_gmt":"2012-05-24T14:28:49","guid":{"rendered":"http:\/\/lichun.cc\/blog\/?p=63"},"modified":"2012-05-24T10:28:49","modified_gmt":"2012-05-24T14:28:49","slug":"hadoop-multipleinputs-usage","status":"publish","type":"post","link":"https:\/\/www.lichun.cc\/blog\/2012\/05\/hadoop-multipleinputs-usage\/","title":{"rendered":"Hadoop MultipleInputs sample usage"},"content":{"rendered":"<p>MultipleInputs is a feature that supports different input formats in the MapReduce.<\/p>\n<p>For example, we have two files with different formats:<\/p>\n<p><span style=\"color: #ff6600;\">(1) First file format:<\/span><\/p>\n<p><span style=\"color: #ff6600;\">VALUE<\/span><\/p>\n<p><span style=\"color: #ff6600;\">(2) Second file format:<\/span><\/p>\n<p><span style=\"color: #ff6600;\">VALUE ADDITIONAL<\/span><\/p>\n<p>In order to read the custom format, we need to write <span style=\"color: #ff6600;\">Record\u00a0Class, RecordReader, InputFormat<\/span> for each one.<\/p>\n<p>InputFormat is needed by MultipleInputs, an InputFormat use RecordReader to read the file and return value, the value is a Record Class instance<\/p>\n<p>Here is the implementation:<br \/>\n<!--more--><br \/>\n<span style=\"color: #ff0000;\">( if you write them in one big file, you need to add &#8220;static&#8221; modifier to each class, I wrote them in one big file in order to test easily)<\/span><\/p>\n<p><span style=\"color: #ff0000;\">My Hadoop version is 0.20.2, lower version may has bugs<\/span><\/p>\n<p><a href=\"http:\/\/lichun.cc\/blog\/wp-content\/uploads\/2012\/08\/MultipleInputsTest.zip\">Download My Test Code<\/a><\/p>\n<p><strong>1. First file format:<\/strong><\/p>\n<p>(1) Record Class \u00a0 (must implements Writable)<\/p>\n<pre>public static class FirstClass implements Writable {\n     private String value;\n\n     public FirstClass() {\n         this.value = \"TEST\";\n     }\n\n     public FirstClass(String val) {\n         this.value = val;\n     }\n\n     @Override\n     public void readFields(DataInput in) throws IOException {\n        if (null == in) {\n           throw new IllegalArgumentException(\"in cannot be null\");\n        }\n        String value = in.readUTF();\n        this.value = value.trim();\n     }\n\n     @Override\n     public void write(DataOutput out) throws IOException {\n         if (null == out) {\n             throw new IllegalArgumentException(\"out cannot be null\");\n         }\n         out.writeUTF(this.value);\n     }\n\n     @Override\n     public String toString() {\n         return \"FirstClasst\" + value;\n     }\n}<\/pre>\n<p>(2) RecordReader<\/p>\n<pre>public static class FirstClassReader extends RecordReader&lt;Text, FirstClass&gt; {\n\tprivate LineRecordReader lineRecordReader = null;\n\tprivate Text key = null;\n\tprivate FirstClass valueFirstClass = null;\n\n\t@Override\n\tpublic void close() throws IOException {\n\t\tif (null != lineRecordReader) {\n\t\t\tlineRecordReader.close();\n\t\t\tlineRecordReader = null;\n\t\t}\n\t\tkey = null;\n\t\tvalueFirstClass = null;\n\t}\n\n\t@Override\n\tpublic Text getCurrentKey() throws IOException, InterruptedException {\n\t\treturn key;\n\t}\n\n\t@Override\n\tpublic FirstClass getCurrentValue() throws IOException, InterruptedException {\n\t\treturn valueFirstClass;\n\t}\n\n\t@Override\n\tpublic float getProgress() throws IOException, InterruptedException {\n\t\treturn lineRecordReader.getProgress();\n\t}\n\n\t@Override\n\tpublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {\n\t\tclose();\n\t\tlineRecordReader = new LineRecordReader();\n\t\tlineRecordReader.initialize(split, context);\n\t}\n\n\t@Override\n\tpublic boolean nextKeyValue() throws IOException, InterruptedException {\n\t\tif (!lineRecordReader.nextKeyValue()) {\n\t\t\tkey = null;\n\t\t\tvalueFirstClass = null;\n\t\t\treturn false;\n\t\t}\n\n\t\t\/\/ otherwise, take the line and parse it\n\t\tText line = lineRecordReader.getCurrentValue();\n\t\tString str = line.toString();\n\t\tSystem.out.println(\"FirstClass:\" + str);\n\t\tString[] arr = str.split(\"t\", -1);\n\t\tkey = new Text(arr[0].trim());\n\t\tvalueFirstClass = new FirstClass(arr[1].trim());\n\n\t\treturn true;\n\t}\n}<\/pre>\n<p>(3) InputFormat<\/p>\n<pre>public static class FirstInputFormat extends FileInputFormat&lt;Text, FirstClass&gt; {\n\t@Override\n\tpublic RecordReader&lt;Text, FirstClass&gt; createRecordReader(InputSplit split, TaskAttemptContext context)\n\t\t\tthrows IOException, InterruptedException {\n\t\treturn new FirstClassReader();\n\t}\n}<\/pre>\n<p><strong>2. Second file format:<\/strong><\/p>\n<p>(1) Record Class \u00a0 (must implements Writable)<\/p>\n<pre>public static class SecondClass implements Writable {\n\n\tprivate String value;\n\tprivate int additional;\n\n\tpublic SecondClass() {\n\t\tthis.value = \"TEST\";\n\t\tthis.additional = 0;\n\t}\n\n\tpublic SecondClass(String val, int addi) {\n\t\tthis.value = val;\n\t\tthis.additional = addi;\n\t}\n\n\t@Override\n\tpublic void readFields(DataInput in) throws IOException {\n\t\tif (null == in) {\n\t\t\tthrow new IllegalArgumentException(\"in cannot be null\");\n\t\t}\n\t\tString value = in.readUTF();\n\t\tint addi = in.readInt();\n\t\tthis.value = value.trim();\n\t\tthis.additional = addi;\n\t}\n\n\t@Override\n\tpublic void write(DataOutput out) throws IOException {\n\t\tif (null == out) {\n\t\t\tthrow new IllegalArgumentException(\"out cannot be null\");\n\t\t}\n\t\tout.writeUTF(this.value);\n\t\tout.writeInt(this.additional);\n\t}\n\t@Override\n\tpublic String toString() {\n\t\treturn \"SecondClasst\" + value + \"t\" + additional;\n\t}\n}<\/pre>\n<p>(2) RecordReader<\/p>\n<pre>public static class SecondClassReader extends RecordReader&lt;Text, SecondClass&gt; {\n\n\tprivate LineRecordReader lineRecordReader = null;\n\tprivate Text key = null;\n\tprivate SecondClass valueSecondClass = null;\n\n\t@Override\n\tpublic void close() throws IOException {\n\t\tif (null != lineRecordReader) {\n\t\t\tlineRecordReader.close();\n\t\t\tlineRecordReader = null;\n\t\t}\n\t\tkey = null;\n\t\tvalueSecondClass = null;\n\t}\n\n\t@Override\n\tpublic Text getCurrentKey() throws IOException, InterruptedException {\n\t\treturn key;\n\t}\n\n\t@Override\n\tpublic SecondClass getCurrentValue() throws IOException, InterruptedException {\n\t\treturn valueSecondClass;\n\t}\n\n\t@Override\n\tpublic float getProgress() throws IOException, InterruptedException {\n\t\treturn lineRecordReader.getProgress();\n\t}\n\n\t@Override\n\tpublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {\n\t\tclose();\n\n\t\tlineRecordReader = new LineRecordReader();\n\t\tlineRecordReader.initialize(split, context);\n\t}\n\n\t@Override\n\tpublic boolean nextKeyValue() throws IOException, InterruptedException {\n\t\tif (!lineRecordReader.nextKeyValue()) {\n\t\t\tkey = null;\n\t\t\tvalueSecondClass = null;\n\t\t\treturn false;\n\t\t}\n\n\t\t\/\/ otherwise, take the line and parse it\n\t\tText line = lineRecordReader.getCurrentValue();\n\n\t\tString str = line.toString();\n\n\t\tSystem.out.println(\"SecondClass:\" + str);\n\t\tString[] arr = str.split(\"t\", -1);\n\t\tint addi = Integer.parseInt(arr[2]);\n\n\t\tkey = new Text(arr[0].trim());\n\t\tvalueSecondClass = new SecondClass(arr[1].trim(), addi);\n\n\t\treturn true;\n\t}\n}<\/pre>\n<p>(3) InputFormat<\/p>\n<pre>public static class SecondInputFormat extends FileInputFormat&lt;Text, SecondClass&gt; {\n\t@Override\n\tpublic RecordReader&lt;Text, SecondClass&gt; createRecordReader(InputSplit split, TaskAttemptContext context)\n\t\t\tthrows IOException, InterruptedException {\n\t\treturn new SecondClassReader();\n\t}\n}<\/pre>\n<p><strong>3. Now we write Mapper for each file format<\/strong><br \/>\n(1) FirstMap, the key output format is Text, the value output format is Text (these outputs are inputs in reducer)<\/p>\n<pre>public static class FirstMap extends Mapper&lt;Text, FirstClass, Text, Text&gt; {\n\tpublic void map(Text key, FirstClass value, Context context) throws IOException, InterruptedException {\n\t\tSystem.out.println(\"FirstMap:\" + key.toString() + \" \" + value.toString());\n\t\tcontext.write(key, new Text(value.toString()));\n\t}\n}<\/pre>\n<p>(2) SecondMap, the key output format is Text, the value output format is Text (these outputs are inputs in reducer)<\/p>\n<pre>public static class SecondMap extends Mapper&lt;Text, SecondClass, Text, Text&gt; {\n\tpublic void map(Text key, SecondClass value, Context context) throws IOException, InterruptedException {\n\t\tSystem.out.println(\"SecondMap:\" + key.toString() + \" \" + value.toString());\n\t\tcontext.write(key, new Text(value.toString()));\n\t}\n}<\/pre>\n<p><strong>4. Write Reducer<\/strong>,<span style=\"color: #ff0000;\"> IMPORTANT: you can only use one reducer, so if in your <strong>mappers<\/strong> you want to output different key\/value type, you need to use GenericWritable to wrap up them.<\/span><\/p>\n<pre>public static class MyReducer extends Reducer&lt;Text, Text, Text, Text&gt; {\n\tpublic void reduce(Text key, Iterable values, Context context) throws IOException,\n\t\t\tInterruptedException {\n\t\tfor (Text value : values) {\n\t\t\tSystem.out.println(\"Reduce:\" + key.toString() + \" \" + value.toString());\n\t\t\tcontext.write(key, value);\n\t\t}\n\t}\n}<\/pre>\n<p>5. In the Driver, we need to specify the multiple input format for MultipleInput<\/p>\n<pre>public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {\n\tPath firstPath = new Path(args[0]);\n\tPath sencondPath = new Path(args[1]);\n\tPath outputPath = new Path(args[2]);\n\n\tConfiguration conf = new Configuration();\n\n\tJob job = new Job(conf);\n\tjob.setJarByClass(MultipleInputsTest.class);\n\tjob.setJobName(\"MultipleInputs Test\");\n\n\tjob.setReducerClass(MyReducer.class);\n\n        \/\/output format for mapper\n\tjob.setMapOutputKeyClass(Text.class);\n\tjob.setMapOutputValueClass(Text.class);\n\n        \/\/output format for reducer\n\tjob.setOutputKeyClass(Text.class);\n\tjob.setOutputValueClass(Text.class);\n\n        \/\/use MultipleOutputs and specify different Record class and Input formats\n\tMultipleInputs.addInputPath(job, firstPath, FirstInputFormat.class, FirstMap.class);\n\tMultipleInputs.addInputPath(job, sencondPath, SecondInputFormat.class, SecondMap.class);\n\n\tFileOutputFormat.setOutputPath(job, outputPath);\n\n\tjob.waitForCompletion(true);\n\n}<\/pre>\n","protected":false},"excerpt":{"rendered":"<p>MultipleInputs is a feature that supports different input formats in the MapReduce. For example, we have two files with different formats: (1) First file format: VALUE (2) Second file format: VALUE ADDITIONAL In order to read the custom format, we need to write Record\u00a0Class, RecordReader, InputFormat for each one. InputFormat is needed by MultipleInputs, an [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_mi_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0,"jetpack_publicize_message":"","jetpack_is_tweetstorm":false,"jetpack_publicize_feature_enabled":true},"categories":[19],"tags":[16,83,20],"jetpack_publicize_connections":[],"jetpack_featured_media_url":"","jetpack_shortlink":"https:\/\/wp.me\/p2s9sh-11","jetpack_sharing_enabled":true,"jetpack_likes_enabled":true,"_links":{"self":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/63"}],"collection":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/comments?post=63"}],"version-history":[{"count":0,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/63\/revisions"}],"wp:attachment":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/media?parent=63"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/categories?post=63"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/tags?post=63"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}