{"id":100,"date":"2013-11-18T19:33:19","date_gmt":"2013-11-18T23:33:19","guid":{"rendered":"http:\/\/lichun.cc\/blog\/?p=100"},"modified":"2014-04-17T17:13:26","modified_gmt":"2014-04-17T21:13:26","slug":"hadoop-use-secondarysort-to-keep-different-inputs-in-order","status":"publish","type":"post","link":"https:\/\/www.lichun.cc\/blog\/2013\/11\/hadoop-use-secondarysort-to-keep-different-inputs-in-order\/","title":{"rendered":"use Secondary Sort to keep different inputs in order in Hadoop"},"content":{"rendered":"<p><strong>SecondarySort<\/strong> is a technique that you can control the order of inputs that comes to Reducers.<\/p>\n<p><b>For example<\/b>, we wants to Join two different datasets, One dataset contains the attribute that the other dataset need to use. For simplicity, we call the first dataset <strong>ATTRIBUTE<\/strong> set, the other dataset <strong>DATA<\/strong> set. Since the <strong>ATTRIBUTE<\/strong> set is also very large, it&#8217;s not practical to put it in the <strong>Distributed Cache<\/strong>.<\/p>\n<p>Now we want to join these two tables, for each record in the <strong>DATA<\/strong> set, we get its <strong>ATTRIBUTE<\/strong>. If we don&#8217;t use SecondarySort, after the <strong>map<\/strong> step, \u00a0the <strong>DATA<\/strong> and <strong>ATTRIBUTE<\/strong> will come in arbitrary order, so if we want to append the ATTRIBUTE to each DATA, we need to store all the DATA in memory, and later when we meet the ATTRIBUTE, we then assign the ATTRIBUTE to DATA.<\/p>\n<p><!--more--><\/p>\n<p>The <strong>issue<\/strong> here is that if the DATA set is huge, then store them all in the memory will be a pain in the ass, it will slow the computation dramatically, or even cause crash. The cause is that we don&#8217;t know when ATTRIBUTE arrives.<\/p>\n<p>Suppose if we can guarantee the ATTRIBUTE comes first, then when we receive the DATA from the input stream, we can append the ATTRIBUTE to it right away <strong>without storing it<\/strong>, and output to the output stream.<\/p>\n<p>(the two different datasets have different input format, so we will use <b>MultipleInputs<\/b> and <b>GenericWritable<\/b> here, check <a href=\"http:\/\/www.lichun.cc\/blog\/2012\/05\/hadoop-multipleinputs-usage\/\" target=\"_blank\">Here<\/a> to see how to use <b>MultipleInputs<\/b>, and <a href=\"http:\/\/www.lichun.cc\/blog\/2012\/05\/hadoop-genericwritable-sample-usage\/\" target=\"_blank\">Here<\/a> to see how to use <b>GenericWritable<\/b>, * <span style=\"color: red;\">this is my use case, you may have other use case that no need to use MultipleInputs and GenericWritable but uses SecondarySort<\/span>).<\/p>\n<p>Here is the sample input datasets I use to demonstrate:<\/p>\n<pre><b>DateSet 1(ATTRIBUTE)<\/b>:\r\nFormat  : KEY ATTR1  ATTR2\r\n\r\n<b>DateSet 2(DATA)<\/b>: \r\nFormat  : KEY  DATA\r\n\r\nThe output format we expect is:\r\nFormat  : KEY  DATA  ATTR1  ATTR2\r\n<\/pre>\n<p>The following codes show how to achieve this:<\/p>\n<p>(1) First of all, let&#8217;s set up the class\/Reader\/InputFormat for these two inputs(you can go to <a href=\"http:\/\/www.lichun.cc\/blog\/2012\/05\/hadoop-multipleinputs-usage\/\" target=\"_blank\">Here<\/a> for more details about MultipleInput)<br \/>\n<b>Record Class<\/a><\/p>\n<pre>\r\n    public static class Attribute implements Writable {\r\n        private String key;\r\n        private String attr1, attr2;\r\n\r\n        public Attribute() {  \/\/the empty constructor is required by Hadoop\r\n        }\r\n\r\n        public Attribute(String key, String attr1, String attr2) {\r\n            this.key = key;\r\n            this.attr1 = attr1;\r\n            this.attr2 = attr2;\r\n        }\r\n\r\n        @Override\r\n        public void readFields(DataInput in) throws IOException {\r\n            if (null == in) {\r\n                throw new IllegalArgumentException(\"in cannot be null\");\r\n            }\r\n            String key = in.readUTF();\r\n            String attr1 = in.readUTF();\r\n            String attr2 = in.readUTF();\r\n\r\n            this.key = key;\r\n            this.attr1 = attr1;\r\n            this.attr2 = attr2;\r\n        }\r\n\r\n        @Override\r\n        public void write(DataOutput out) throws IOException {\r\n            if (null == out) {\r\n                throw new IllegalArgumentException(\"out cannot be null\");\r\n            }\r\n            out.writeUTF(this.key);\r\n            out.writeUTF(this.attr1);\r\n            out.writeUTF(this.attr2);\r\n        }\r\n\r\n        @Override\r\n        public String toString() {\r\n            return \"Attribute\\t\" + key + \"\\t\" + attr1 + \"\\t\" + attr2;\r\n        }\r\n    }\r\n\r\n    public static class Data implements Writable {\r\n\r\n        private String key;\r\n        private String data;\r\n\r\n        public Data() {\r\n        }\r\n\r\n        public Data(String key, String data) {\r\n            this.key = key;\r\n            this.data = data;\r\n        }\r\n\r\n        @Override\r\n        public void readFields(DataInput in) throws IOException {\r\n            if (null == in) {\r\n                throw new IllegalArgumentException(\"in cannot be null\");\r\n            }\r\n            String key = in.readUTF();\r\n            String data = in.readUTF();\r\n\r\n            this.key = key;\r\n            this.data = data;\r\n        }\r\n\r\n        @Override\r\n        public void write(DataOutput out) throws IOException {\r\n            if (null == out) {\r\n                throw new IllegalArgumentException(\"out cannot be null\");\r\n            }\r\n            out.writeUTF(this.key);\r\n            out.writeUTF(this.data);\r\n        }\r\n\r\n        @Override\r\n        public String toString() {\r\n            return \"Data\\t\" + key + \"\\t\" + data;\r\n        }\r\n    }\r\n<\/pre>\n<p><b>RecordReader<\/b><\/p>\n<pre>\r\n   public static class AttributeReader extends RecordReader<Text, Attribute> {\r\n        private LineRecordReader lineRecordReader = null;\r\n        private Text key = null;\r\n        private Attribute attribute = null;\r\n\r\n        @Override\r\n        public void close() throws IOException {\r\n            if (null != lineRecordReader) {\r\n                lineRecordReader.close();\r\n                lineRecordReader = null;\r\n            }\r\n            key = null;\r\n            attribute = null;\r\n        }\r\n\r\n        @Override\r\n        public Text getCurrentKey() throws IOException, InterruptedException {\r\n            return key;\r\n        }\r\n\r\n        @Override\r\n        public Attribute getCurrentValue() throws IOException, InterruptedException {\r\n            return attribute;\r\n        }\r\n\r\n        @Override\r\n        public float getProgress() throws IOException, InterruptedException {\r\n            return lineRecordReader.getProgress();\r\n        }\r\n\r\n        @Override\r\n        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {\r\n            close();\r\n            lineRecordReader = new LineRecordReader();\r\n            lineRecordReader.initialize(split, context);\r\n        }\r\n\r\n        @Override\r\n        public boolean nextKeyValue() throws IOException, InterruptedException {\r\n            if (!lineRecordReader.nextKeyValue()) {\r\n                key = null;\r\n                attribute = null;\r\n                return false;\r\n            }\r\n            \/\/ otherwise, take the line and parse it\r\n            Text line = lineRecordReader.getCurrentValue();\r\n            String str = line.toString();\r\n            String[] arr = str.split(\"\\t\", -1);\r\n\r\n            key = new Text(arr[0].trim());\r\n            attribute = new Attribute(arr[0].trim(), arr[1].trim(), arr[2].trim());\r\n\r\n            return true;\r\n        }\r\n    }\r\n\r\n    public static class DataReader extends RecordReader<Text, Data> {\r\n        private LineRecordReader lineRecordReader = null;\r\n        private Text key = null;\r\n        private Data data = null;\r\n\r\n        @Override\r\n        public void close() throws IOException {\r\n            if (null != lineRecordReader) {\r\n                lineRecordReader.close();\r\n                lineRecordReader = null;\r\n            }\r\n            key = null;\r\n            data = null;\r\n        }\r\n\r\n        @Override\r\n        public Text getCurrentKey() throws IOException, InterruptedException {\r\n            return key;\r\n        }\r\n\r\n        @Override\r\n        public Data getCurrentValue() throws IOException, InterruptedException {\r\n            return data;\r\n        }\r\n\r\n        @Override\r\n        public float getProgress() throws IOException, InterruptedException {\r\n            return lineRecordReader.getProgress();\r\n        }\r\n\r\n        @Override\r\n        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {\r\n            close();\r\n            lineRecordReader = new LineRecordReader();\r\n            lineRecordReader.initialize(split, context);\r\n        }\r\n\r\n        @Override\r\n        public boolean nextKeyValue() throws IOException, InterruptedException {\r\n            if (!lineRecordReader.nextKeyValue()) {\r\n                key = null;\r\n                data = null;\r\n                return false;\r\n            }\r\n\r\n            \/\/ otherwise, take the line and parse it\r\n            Text line = lineRecordReader.getCurrentValue();\r\n            String str = line.toString();\r\n            String[] arr = str.split(\"\\t\", -1);\r\n\r\n            key = new Text(arr[0].trim());\r\n            data = new Data(arr[0].trim(), arr[1].trim());\r\n\r\n            return true;\r\n        }\r\n    }\r\n<\/pre>\n<p><b>InputFormat<\/b><\/p>\n<pre>\r\n   public static class AttributeInputFormat extends FileInputFormat<Text, Attribute> {\r\n        @Override\r\n        public RecordReader<Text, Attribute> createRecordReader(InputSplit split, TaskAttemptContext context)\r\n                throws IOException, InterruptedException {\r\n            return new AttributeReader();\r\n        }\r\n    }\r\n\r\n    public static class DataInputFormat extends FileInputFormat<Text, Data> {\r\n        @Override\r\n        public RecordReader<Text, Data> createRecordReader(InputSplit split, TaskAttemptContext context)\r\n                throws IOException, InterruptedException {\r\n            return new DataReader();\r\n        }\r\n    }\r\n<\/pre>\n<p>(2) Now we need to use GenericRecord to pass the 2 different records to Reducer(go to <a href=\"http:\/\/www.lichun.cc\/blog\/2012\/05\/hadoop-genericwritable-sample-usage\/\" target=\"_blank\">Here<\/a> to read more on GenericRecord):<\/p>\n<pre>\r\n    public static class MyGenericWritable extends GenericWritable {\r\n        @SuppressWarnings(\"unchecked\")\r\n        private static Class<? extends Writable>[] CLASSES = (Class<? extends Writable>[]) new Class[] {\r\n                Attribute.class, Data.class };\r\n\r\n        \/\/ this empty initialize is required by Hadoop\r\n        public MyGenericWritable() {\r\n        }\r\n\r\n        public MyGenericWritable(Writable instance) {\r\n            set(instance);\r\n        }\r\n\r\n        @Override\r\n        protected Class<? extends Writable>[] getTypes() {\r\n            return CLASSES;\r\n        }\r\n\r\n        @Override\r\n        public String toString() {\r\n            return \"MyGenericWritable [getTypes()=\" + Arrays.toString(getTypes()) + \"]\";\r\n        }\r\n    }\r\n<\/pre>\n<p>(3) We need mappers for the records respectively. <b>NOTE: Here we set the order for different inputs, in the following code, I set 0 for Attribute and 1 for Data, so Attribute will come first.<\/b><\/p>\n<pre>\r\npublic static class AttributeMap extends Mapper<Text, Attribute, SecondarySortableTextKey, MyGenericWritable> {\r\n        public void map(Text key, Attribute value, Context context) throws IOException, InterruptedException {\r\n            context.write(new SecondarySortableTextKey(key, new IntWritable(0)), new MyGenericWritable(value));\r\n        }\r\n    }\r\n\r\n    public static class DataMap extends Mapper<Text, Data, SecondarySortableTextKey, MyGenericWritable> {\r\n        public void map(Text key, Data value, Context context) throws IOException, InterruptedException {\r\n            context.write(new SecondarySortableTextKey(key, new IntWritable(1)), new MyGenericWritable(value));\r\n        }\r\n    }\r\n<\/pre>\n<p>(4) Write the Reducer. You see in the code, we assume the attribute will come first, and data comes later. For each data input, if the attribute is null, means no corresponding attribute for this data. <\/p>\n<pre>\r\n    public static class MyReducer extends Reducer<SecondarySortableTextKey, MyGenericWritable, Text, Text> {\r\n        public void reduce(SecondarySortableTextKey key, Iterable<MyGenericWritable> values, Context context)\r\n                throws IOException, InterruptedException {\r\n            Attribute attribute = null;\r\n            for (MyGenericWritable value : values) {\r\n                Writable rawValue = value.get();\r\n                if (rawValue instanceof Attribute) {\r\n                    attribute = (Attribute) rawValue;\r\n                }\r\n                if (rawValue instanceof Data) {\r\n                    System.out.println(key.getTextKey() + \" SecondClass\");\r\n                    Data data = (Data) rawValue;\r\n                    String result;\r\n                    if (attribute == null) {\r\n                        result = data.data;\r\n                    } else {\r\n                        result = data.data + \"\\t\" + attribute.attr1 + \"\\t\" + attribute.attr2;\r\n                    }\r\n                    context.write(key.getTextKey(), new Text(result));\r\n                }\r\n            }\r\n        }\r\n    }\r\n<\/pre>\n<p>(5) Create the secondary key wrapper class, in this class, we not only include the original Key, but also the order.<\/p>\n<pre>\r\n    public static class SecondarySortableTextKey implements WritableComparable<SecondarySortableTextKey> {\r\n        private Text textKey;\r\n        private IntWritable secondarySortOrder;\r\n\r\n        public SecondarySortableTextKey() {\r\n        }\r\n\r\n        public SecondarySortableTextKey(Text textKey, IntWritable secondarySortOrder) {\r\n            this.textKey = textKey;\r\n            this.secondarySortOrder = secondarySortOrder;\r\n        }\r\n\r\n        public Text getTextKey() {\r\n            return this.textKey;\r\n        }\r\n\r\n        @Override\r\n        public void write(DataOutput out) throws IOException {\r\n            textKey.write(out);\r\n            secondarySortOrder.write(out);\r\n        }\r\n\r\n        @Override\r\n        public void readFields(DataInput in) throws IOException {\r\n            Text textKey = new Text();\r\n            textKey.readFields(in);\r\n            IntWritable secondarySortOrder = new IntWritable();\r\n            secondarySortOrder.readFields(in);\r\n\r\n            this.textKey = textKey;\r\n            this.secondarySortOrder = secondarySortOrder;\r\n        }\r\n\r\n        @Override\r\n        public int compareTo(SecondarySortableTextKey secondarySortableTextKey) {\r\n            int comparisonResult = textKey.compareTo(secondarySortableTextKey.textKey);\r\n            if (0 == comparisonResult) {\r\n                comparisonResult = secondarySortOrder.compareTo(secondarySortableTextKey.secondarySortOrder);\r\n            }\r\n            return comparisonResult;\r\n        }\r\n    }\r\n<\/pre>\n<p>As you can see, the <b>secondarySortOrder<\/b> value tells the order.<br \/>\n(6) Set our own Partitioner, since right now we are using <b>SecondarySortableTextKey<\/b> as the key in Reducer, if we don&#8217;t change the partitioner, by default it will use SecondarySortableTextKey to partition, but we only want to change the input order, now the partition, so we should write a custom Partitioner:<\/p>\n<pre>\r\n    public static class SecondarySortableTextKeyPartitioner extends Partitioner<SecondarySortableTextKey, Writable> {\r\n        private static final HashPartitioner<Text, Writable> HASH_PARTITIONER = new HashPartitioner<Text, Writable>();\r\n\r\n        @Override\r\n        public int getPartition(SecondarySortableTextKey key, Writable value, int numPartitions) {\r\n            return HASH_PARTITIONER.getPartition(key.getTextKey(), value, numPartitions);\r\n        }\r\n    }\r\n<\/pre>\n<p>You see here we only use the original key to partition, instead of the SecondarySortableTextKey.<br \/>\n(6) Custom group comparator. The partitioner only guarantee the same keys go to the same Reducer, but it doesn&#8217;t guarantee they go as a group, so we need to write this comparator.<\/p>\n<pre>\r\n    public static class SecondarySortableTextKeyGroupingComparator extends WritableComparator {\r\n        protected SecondarySortableTextKeyGroupingComparator() {\r\n            super(SecondarySortableTextKey.class, true);\r\n        }\r\n\r\n        @SuppressWarnings(\"rawtypes\")\r\n        @Override\r\n        public int compare(WritableComparable w1, WritableComparable w2) {\r\n            SecondarySortableTextKey k1 = (SecondarySortableTextKey) w1;\r\n            SecondarySortableTextKey k2 = (SecondarySortableTextKey) w2;\r\n\r\n            return k1.textKey.compareTo(k2.textKey);\r\n        }\r\n    }\r\n<\/pre>\n<p>As you can see, for the same group, we only care about the original key, not the SecondarySortableTextKey.<br \/>\n(7) Finally, we write the comparator by the secondary order as well.<\/p>\n<pre>\r\n    public static class SecondarySortableTextKeySortComparator extends WritableComparator {\r\n        protected SecondarySortableTextKeySortComparator() {\r\n            super(SecondarySortableTextKey.class, true);\r\n        }\r\n\r\n        @SuppressWarnings(\"rawtypes\")\r\n        @Override\r\n        public int compare(WritableComparable w1, WritableComparable w2) {\r\n            SecondarySortableTextKey k1 = (SecondarySortableTextKey) w1;\r\n            SecondarySortableTextKey k2 = (SecondarySortableTextKey) w2;\r\n\r\n            int compare = k1.textKey.compareTo(k2.textKey);\r\n            if (compare == 0) {\r\n                compare = k1.secondarySortOrder.compareTo(k2.secondarySortOrder);\r\n            }\r\n            return compare;\r\n        }\r\n    }\r\n<\/pre>\n<p>Here you see, we compare the Key first, if the keys are the same, we compare the secondary order we set in the mapper. This will guarantee the lower secondary order record comes first.<br \/>\n(8) We write the driver to execute the MapReduce job, all the previous class we wrote will be set here.<\/p>\n<pre>\r\n   public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {\r\n        Path firstPath = new Path(args[0]);\r\n        Path sencondPath = new Path(args[1]);\r\n        Path outputPath = new Path(args[2]);\r\n\r\n        Configuration conf = new Configuration();\r\n\r\n        Job job = new Job(conf);\r\n        job.setJarByClass(SecondarySortTest.class);\r\n        job.setJobName(\"MultipleInputs Test\");\r\n\r\n        job.setReducerClass(MyReducer.class);\r\n\r\n        job.setMapOutputKeyClass(SecondarySortableTextKey.class);\r\n        job.setMapOutputValueClass(MyGenericWritable.class);\r\n\r\n        job.setOutputKeyClass(Text.class);\r\n        job.setOutputValueClass(Text.class);\r\n\r\n        job.setPartitionerClass(SecondarySortableTextKeyPartitioner.class);\r\n        job.setSortComparatorClass(SecondarySortableTextKeySortComparator.class);\r\n        job.setGroupingComparatorClass(SecondarySortableTextKeyGroupingComparator.class);\r\n\r\n        System.out.println(\"start\");\r\n\r\n        MultipleInputs.addInputPath(job, firstPath, AttributeInputFormat.class, AttributeMap.class);\r\n        MultipleInputs.addInputPath(job, sencondPath, DataInputFormat.class, DataMap.class);\r\n\r\n        FileOutputFormat.setOutputPath(job, outputPath);\r\n\r\n        job.waitForCompletion(true);\r\n    }\r\n<\/pre>\n<p>In order to test this, I wrote the sample data:<\/p>\n<pre>\r\nAttribute Dataset:\r\n    1 green huge\r\n    2 yellow small\r\n\r\nData Dataset:\r\n    1 data1\r\n    2 data2\r\n    2 data22\r\n    3 data3\r\n\r\nUsing the above code, the result is:\r\n    1 data1 green huge\r\n    2 data2 yellow small\r\n    2 data22 yellow small\r\n    3 data3\r\n\r\nWhich is exactly what we want.\r\n<\/pre>\n<p>You can download the script and sample data from <a href=\"http:\/\/www.lichun.cc\/blog\/wp-content\/uploads\/2013\/11\/SecondarySortTest.zip\">here<\/a>.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>SecondarySort is a technique that you can control the order of inputs that comes to Reducers. For example, we wants to Join two different datasets, One dataset contains the attribute that the other dataset need to use. For simplicity, we call the first dataset ATTRIBUTE set, the other dataset DATA set. Since the ATTRIBUTE set [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_mi_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0,"jetpack_publicize_message":"","jetpack_is_tweetstorm":false,"jetpack_publicize_feature_enabled":true},"categories":[19],"tags":[16,83,25],"jetpack_publicize_connections":[],"jetpack_featured_media_url":"","jetpack_shortlink":"https:\/\/wp.me\/p2s9sh-1C","jetpack_sharing_enabled":true,"jetpack_likes_enabled":true,"_links":{"self":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/100"}],"collection":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/comments?post=100"}],"version-history":[{"count":31,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/100\/revisions"}],"predecessor-version":[{"id":657,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/100\/revisions\/657"}],"wp:attachment":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/media?parent=100"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/categories?post=100"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/tags?post=100"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}