use Secondary Sort to keep different inputs in order in Hadoop

SecondarySort is a technique that you can control the order of inputs that comes to Reducers.

For example, we wants to Join two different datasets, One dataset contains the attribute that the other dataset need to use. For simplicity, we call the first dataset ATTRIBUTE set, the other dataset DATA set. Since the ATTRIBUTE set is also very large, it’s not practical to put it in the Distributed Cache.

Now we want to join these two tables, for each record in the DATA set, we get its ATTRIBUTE. If we don’t use SecondarySort, after the map step,  the DATA and ATTRIBUTE will come in arbitrary order, so if we want to append the ATTRIBUTE to each DATA, we need to store all the DATA in memory, and later when we meet the ATTRIBUTE, we then assign the ATTRIBUTE to DATA.

The issue here is that if the DATA set is huge, then store them all in the memory will be a pain in the ass, it will slow the computation dramatically, or even cause crash. The cause is that we don’t know when ATTRIBUTE arrives.

Suppose if we can guarantee the ATTRIBUTE comes first, then when we receive the DATA from the input stream, we can append the ATTRIBUTE to it right away without storing it, and output to the output stream.

(the two different datasets have different input format, so we will use MultipleInputs and GenericWritable here, check Here to see how to use MultipleInputs, and Here to see how to use GenericWritable, * this is my use case, you may have other use case that no need to use MultipleInputs and GenericWritable but uses SecondarySort).

Here is the sample input datasets I use to demonstrate:

DateSet 1(ATTRIBUTE):
Format  : KEY ATTR1  ATTR2

DateSet 2(DATA): 
Format  : KEY  DATA

The output format we expect is:
Format  : KEY  DATA  ATTR1  ATTR2

The following codes show how to achieve this:

(1) First of all, let’s set up the class/Reader/InputFormat for these two inputs(you can go to Here for more details about MultipleInput)
Record Class

    public static class Attribute implements Writable {
        private String key;
        private String attr1, attr2;

        public Attribute() {  //the empty constructor is required by Hadoop
        }

        public Attribute(String key, String attr1, String attr2) {
            this.key = key;
            this.attr1 = attr1;
            this.attr2 = attr2;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            if (null == in) {
                throw new IllegalArgumentException("in cannot be null");
            }
            String key = in.readUTF();
            String attr1 = in.readUTF();
            String attr2 = in.readUTF();

            this.key = key;
            this.attr1 = attr1;
            this.attr2 = attr2;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            if (null == out) {
                throw new IllegalArgumentException("out cannot be null");
            }
            out.writeUTF(this.key);
            out.writeUTF(this.attr1);
            out.writeUTF(this.attr2);
        }

        @Override
        public String toString() {
            return "Attribute\t" + key + "\t" + attr1 + "\t" + attr2;
        }
    }

    public static class Data implements Writable {

        private String key;
        private String data;

        public Data() {
        }

        public Data(String key, String data) {
            this.key = key;
            this.data = data;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            if (null == in) {
                throw new IllegalArgumentException("in cannot be null");
            }
            String key = in.readUTF();
            String data = in.readUTF();

            this.key = key;
            this.data = data;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            if (null == out) {
                throw new IllegalArgumentException("out cannot be null");
            }
            out.writeUTF(this.key);
            out.writeUTF(this.data);
        }

        @Override
        public String toString() {
            return "Data\t" + key + "\t" + data;
        }
    }

RecordReader

   public static class AttributeReader extends RecordReader {
        private LineRecordReader lineRecordReader = null;
        private Text key = null;
        private Attribute attribute = null;

        @Override
        public void close() throws IOException {
            if (null != lineRecordReader) {
                lineRecordReader.close();
                lineRecordReader = null;
            }
            key = null;
            attribute = null;
        }

        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return key;
        }

        @Override
        public Attribute getCurrentValue() throws IOException, InterruptedException {
            return attribute;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return lineRecordReader.getProgress();
        }

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            close();
            lineRecordReader = new LineRecordReader();
            lineRecordReader.initialize(split, context);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!lineRecordReader.nextKeyValue()) {
                key = null;
                attribute = null;
                return false;
            }
            // otherwise, take the line and parse it
            Text line = lineRecordReader.getCurrentValue();
            String str = line.toString();
            String[] arr = str.split("\t", -1);

            key = new Text(arr[0].trim());
            attribute = new Attribute(arr[0].trim(), arr[1].trim(), arr[2].trim());

            return true;
        }
    }

    public static class DataReader extends RecordReader {
        private LineRecordReader lineRecordReader = null;
        private Text key = null;
        private Data data = null;

        @Override
        public void close() throws IOException {
            if (null != lineRecordReader) {
                lineRecordReader.close();
                lineRecordReader = null;
            }
            key = null;
            data = null;
        }

        @Override
        public Text getCurrentKey() throws IOException, InterruptedException {
            return key;
        }

        @Override
        public Data getCurrentValue() throws IOException, InterruptedException {
            return data;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return lineRecordReader.getProgress();
        }

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            close();
            lineRecordReader = new LineRecordReader();
            lineRecordReader.initialize(split, context);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!lineRecordReader.nextKeyValue()) {
                key = null;
                data = null;
                return false;
            }

            // otherwise, take the line and parse it
            Text line = lineRecordReader.getCurrentValue();
            String str = line.toString();
            String[] arr = str.split("\t", -1);

            key = new Text(arr[0].trim());
            data = new Data(arr[0].trim(), arr[1].trim());

            return true;
        }
    }

InputFormat

   public static class AttributeInputFormat extends FileInputFormat {
        @Override
        public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            return new AttributeReader();
        }
    }

    public static class DataInputFormat extends FileInputFormat {
        @Override
        public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            return new DataReader();
        }
    }

(2) Now we need to use GenericRecord to pass the 2 different records to Reducer(go to Here to read more on GenericRecord):

    public static class MyGenericWritable extends GenericWritable {
        @SuppressWarnings("unchecked")
        private static Class[] CLASSES = (Class[]) new Class[] {
                Attribute.class, Data.class };

        // this empty initialize is required by Hadoop
        public MyGenericWritable() {
        }

        public MyGenericWritable(Writable instance) {
            set(instance);
        }

        @Override
        protected Class[] getTypes() {
            return CLASSES;
        }

        @Override
        public String toString() {
            return "MyGenericWritable [getTypes()=" + Arrays.toString(getTypes()) + "]";
        }
    }

(3) We need mappers for the records respectively. NOTE: Here we set the order for different inputs, in the following code, I set 0 for Attribute and 1 for Data, so Attribute will come first.

public static class AttributeMap extends Mapper {
        public void map(Text key, Attribute value, Context context) throws IOException, InterruptedException {
            context.write(new SecondarySortableTextKey(key, new IntWritable(0)), new MyGenericWritable(value));
        }
    }

    public static class DataMap extends Mapper {
        public void map(Text key, Data value, Context context) throws IOException, InterruptedException {
            context.write(new SecondarySortableTextKey(key, new IntWritable(1)), new MyGenericWritable(value));
        }
    }

(4) Write the Reducer. You see in the code, we assume the attribute will come first, and data comes later. For each data input, if the attribute is null, means no corresponding attribute for this data.

    public static class MyReducer extends Reducer {
        public void reduce(SecondarySortableTextKey key, Iterable values, Context context)
                throws IOException, InterruptedException {
            Attribute attribute = null;
            for (MyGenericWritable value : values) {
                Writable rawValue = value.get();
                if (rawValue instanceof Attribute) {
                    attribute = (Attribute) rawValue;
                }
                if (rawValue instanceof Data) {
                    System.out.println(key.getTextKey() + " SecondClass");
                    Data data = (Data) rawValue;
                    String result;
                    if (attribute == null) {
                        result = data.data;
                    } else {
                        result = data.data + "\t" + attribute.attr1 + "\t" + attribute.attr2;
                    }
                    context.write(key.getTextKey(), new Text(result));
                }
            }
        }
    }

(5) Create the secondary key wrapper class, in this class, we not only include the original Key, but also the order.

    public static class SecondarySortableTextKey implements WritableComparable {
        private Text textKey;
        private IntWritable secondarySortOrder;

        public SecondarySortableTextKey() {
        }

        public SecondarySortableTextKey(Text textKey, IntWritable secondarySortOrder) {
            this.textKey = textKey;
            this.secondarySortOrder = secondarySortOrder;
        }

        public Text getTextKey() {
            return this.textKey;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            textKey.write(out);
            secondarySortOrder.write(out);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            Text textKey = new Text();
            textKey.readFields(in);
            IntWritable secondarySortOrder = new IntWritable();
            secondarySortOrder.readFields(in);

            this.textKey = textKey;
            this.secondarySortOrder = secondarySortOrder;
        }

        @Override
        public int compareTo(SecondarySortableTextKey secondarySortableTextKey) {
            int comparisonResult = textKey.compareTo(secondarySortableTextKey.textKey);
            if (0 == comparisonResult) {
                comparisonResult = secondarySortOrder.compareTo(secondarySortableTextKey.secondarySortOrder);
            }
            return comparisonResult;
        }
    }

As you can see, the secondarySortOrder value tells the order.
(6) Set our own Partitioner, since right now we are using SecondarySortableTextKey as the key in Reducer, if we don’t change the partitioner, by default it will use SecondarySortableTextKey to partition, but we only want to change the input order, now the partition, so we should write a custom Partitioner:

    public static class SecondarySortableTextKeyPartitioner extends Partitioner {
        private static final HashPartitioner HASH_PARTITIONER = new HashPartitioner();

        @Override
        public int getPartition(SecondarySortableTextKey key, Writable value, int numPartitions) {
            return HASH_PARTITIONER.getPartition(key.getTextKey(), value, numPartitions);
        }
    }

You see here we only use the original key to partition, instead of the SecondarySortableTextKey.
(6) Custom group comparator. The partitioner only guarantee the same keys go to the same Reducer, but it doesn’t guarantee they go as a group, so we need to write this comparator.

    public static class SecondarySortableTextKeyGroupingComparator extends WritableComparator {
        protected SecondarySortableTextKeyGroupingComparator() {
            super(SecondarySortableTextKey.class, true);
        }

        @SuppressWarnings("rawtypes")
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            SecondarySortableTextKey k1 = (SecondarySortableTextKey) w1;
            SecondarySortableTextKey k2 = (SecondarySortableTextKey) w2;

            return k1.textKey.compareTo(k2.textKey);
        }
    }

As you can see, for the same group, we only care about the original key, not the SecondarySortableTextKey.
(7) Finally, we write the comparator by the secondary order as well.

    public static class SecondarySortableTextKeySortComparator extends WritableComparator {
        protected SecondarySortableTextKeySortComparator() {
            super(SecondarySortableTextKey.class, true);
        }

        @SuppressWarnings("rawtypes")
        @Override
        public int compare(WritableComparable w1, WritableComparable w2) {
            SecondarySortableTextKey k1 = (SecondarySortableTextKey) w1;
            SecondarySortableTextKey k2 = (SecondarySortableTextKey) w2;

            int compare = k1.textKey.compareTo(k2.textKey);
            if (compare == 0) {
                compare = k1.secondarySortOrder.compareTo(k2.secondarySortOrder);
            }
            return compare;
        }
    }

Here you see, we compare the Key first, if the keys are the same, we compare the secondary order we set in the mapper. This will guarantee the lower secondary order record comes first.
(8) We write the driver to execute the MapReduce job, all the previous class we wrote will be set here.

   public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Path firstPath = new Path(args[0]);
        Path sencondPath = new Path(args[1]);
        Path outputPath = new Path(args[2]);

        Configuration conf = new Configuration();

        Job job = new Job(conf);
        job.setJarByClass(SecondarySortTest.class);
        job.setJobName("MultipleInputs Test");

        job.setReducerClass(MyReducer.class);

        job.setMapOutputKeyClass(SecondarySortableTextKey.class);
        job.setMapOutputValueClass(MyGenericWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setPartitionerClass(SecondarySortableTextKeyPartitioner.class);
        job.setSortComparatorClass(SecondarySortableTextKeySortComparator.class);
        job.setGroupingComparatorClass(SecondarySortableTextKeyGroupingComparator.class);

        System.out.println("start");

        MultipleInputs.addInputPath(job, firstPath, AttributeInputFormat.class, AttributeMap.class);
        MultipleInputs.addInputPath(job, sencondPath, DataInputFormat.class, DataMap.class);

        FileOutputFormat.setOutputPath(job, outputPath);

        job.waitForCompletion(true);
    }

In order to test this, I wrote the sample data:

Attribute Dataset:
    1 green huge
    2 yellow small

Data Dataset:
    1 data1
    2 data2
    2 data22
    3 data3

Using the above code, the result is:
    1 data1 green huge
    2 data2 yellow small
    2 data22 yellow small
    3 data3

Which is exactly what we want.

You can download the script and sample data from here.

Leave a Reply

Your email address will not be published. Required fields are marked *