Hadoop MultipleInputs sample usage

MultipleInputs is a feature that supports different input formats in the MapReduce.

For example, we have two files with different formats:

(1) First file format:

VALUE

(2) Second file format:

VALUE ADDITIONAL

In order to read the custom format, we need to write Record Class, RecordReader, InputFormat for each one.

InputFormat is needed by MultipleInputs, an InputFormat use RecordReader to read the file and return value, the value is a Record Class instance

Here is the implementation:

( if you write them in one big file, you need to add “static” modifier to each class, I wrote them in one big file in order to test easily)

My Hadoop version is 0.20.2, lower version may has bugs

Download My Test Code

1. First file format:

(1) Record Class   (must implements Writable)

public static class FirstClass implements Writable {
     private String value;

     public FirstClass() {
         this.value = "TEST";
     }

     public FirstClass(String val) {
         this.value = val;
     }

     @Override
     public void readFields(DataInput in) throws IOException {
        if (null == in) {
           throw new IllegalArgumentException("in cannot be null");
        }
        String value = in.readUTF();
        this.value = value.trim();
     }

     @Override
     public void write(DataOutput out) throws IOException {
         if (null == out) {
             throw new IllegalArgumentException("out cannot be null");
         }
         out.writeUTF(this.value);
     }

     @Override
     public String toString() {
         return "FirstClasst" + value;
     }
}

(2) RecordReader

public static class FirstClassReader extends RecordReader<Text, FirstClass> {
	private LineRecordReader lineRecordReader = null;
	private Text key = null;
	private FirstClass valueFirstClass = null;

	@Override
	public void close() throws IOException {
		if (null != lineRecordReader) {
			lineRecordReader.close();
			lineRecordReader = null;
		}
		key = null;
		valueFirstClass = null;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public FirstClass getCurrentValue() throws IOException, InterruptedException {
		return valueFirstClass;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return lineRecordReader.getProgress();
	}

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		close();
		lineRecordReader = new LineRecordReader();
		lineRecordReader.initialize(split, context);
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (!lineRecordReader.nextKeyValue()) {
			key = null;
			valueFirstClass = null;
			return false;
		}

		// otherwise, take the line and parse it
		Text line = lineRecordReader.getCurrentValue();
		String str = line.toString();
		System.out.println("FirstClass:" + str);
		String[] arr = str.split("t", -1);
		key = new Text(arr[0].trim());
		valueFirstClass = new FirstClass(arr[1].trim());

		return true;
	}
}

(3) InputFormat

public static class FirstInputFormat extends FileInputFormat<Text, FirstClass> {
	@Override
	public RecordReader<Text, FirstClass> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		return new FirstClassReader();
	}
}

2. Second file format:

(1) Record Class   (must implements Writable)

public static class SecondClass implements Writable {

	private String value;
	private int additional;

	public SecondClass() {
		this.value = "TEST";
		this.additional = 0;
	}

	public SecondClass(String val, int addi) {
		this.value = val;
		this.additional = addi;
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		if (null == in) {
			throw new IllegalArgumentException("in cannot be null");
		}
		String value = in.readUTF();
		int addi = in.readInt();
		this.value = value.trim();
		this.additional = addi;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		if (null == out) {
			throw new IllegalArgumentException("out cannot be null");
		}
		out.writeUTF(this.value);
		out.writeInt(this.additional);
	}
	@Override
	public String toString() {
		return "SecondClasst" + value + "t" + additional;
	}
}

(2) RecordReader

public static class SecondClassReader extends RecordReader<Text, SecondClass> {

	private LineRecordReader lineRecordReader = null;
	private Text key = null;
	private SecondClass valueSecondClass = null;

	@Override
	public void close() throws IOException {
		if (null != lineRecordReader) {
			lineRecordReader.close();
			lineRecordReader = null;
		}
		key = null;
		valueSecondClass = null;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	@Override
	public SecondClass getCurrentValue() throws IOException, InterruptedException {
		return valueSecondClass;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return lineRecordReader.getProgress();
	}

	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		close();

		lineRecordReader = new LineRecordReader();
		lineRecordReader.initialize(split, context);
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if (!lineRecordReader.nextKeyValue()) {
			key = null;
			valueSecondClass = null;
			return false;
		}

		// otherwise, take the line and parse it
		Text line = lineRecordReader.getCurrentValue();

		String str = line.toString();

		System.out.println("SecondClass:" + str);
		String[] arr = str.split("t", -1);
		int addi = Integer.parseInt(arr[2]);

		key = new Text(arr[0].trim());
		valueSecondClass = new SecondClass(arr[1].trim(), addi);

		return true;
	}
}

(3) InputFormat

public static class SecondInputFormat extends FileInputFormat<Text, SecondClass> {
	@Override
	public RecordReader<Text, SecondClass> createRecordReader(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		return new SecondClassReader();
	}
}

3. Now we write Mapper for each file format
(1) FirstMap, the key output format is Text, the value output format is Text (these outputs are inputs in reducer)

public static class FirstMap extends Mapper<Text, FirstClass, Text, Text> {
	public void map(Text key, FirstClass value, Context context) throws IOException, InterruptedException {
		System.out.println("FirstMap:" + key.toString() + " " + value.toString());
		context.write(key, new Text(value.toString()));
	}
}

(2) SecondMap, the key output format is Text, the value output format is Text (these outputs are inputs in reducer)

public static class SecondMap extends Mapper<Text, SecondClass, Text, Text> {
	public void map(Text key, SecondClass value, Context context) throws IOException, InterruptedException {
		System.out.println("SecondMap:" + key.toString() + " " + value.toString());
		context.write(key, new Text(value.toString()));
	}
}

4. Write Reducer, IMPORTANT: you can only use one reducer, so if in your mappers you want to output different key/value type, you need to use GenericWritable to wrap up them.

public static class MyReducer extends Reducer<Text, Text, Text, Text> {
	public void reduce(Text key, Iterable values, Context context) throws IOException,
			InterruptedException {
		for (Text value : values) {
			System.out.println("Reduce:" + key.toString() + " " + value.toString());
			context.write(key, value);
		}
	}
}

5. In the Driver, we need to specify the multiple input format for MultipleInput

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
	Path firstPath = new Path(args[0]);
	Path sencondPath = new Path(args[1]);
	Path outputPath = new Path(args[2]);

	Configuration conf = new Configuration();

	Job job = new Job(conf);
	job.setJarByClass(MultipleInputsTest.class);
	job.setJobName("MultipleInputs Test");

	job.setReducerClass(MyReducer.class);

        //output format for mapper
	job.setMapOutputKeyClass(Text.class);
	job.setMapOutputValueClass(Text.class);

        //output format for reducer
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(Text.class);

        //use MultipleOutputs and specify different Record class and Input formats
	MultipleInputs.addInputPath(job, firstPath, FirstInputFormat.class, FirstMap.class);
	MultipleInputs.addInputPath(job, sencondPath, SecondInputFormat.class, SecondMap.class);

	FileOutputFormat.setOutputPath(job, outputPath);

	job.waitForCompletion(true);

}

3 thoughts on “Hadoop MultipleInputs sample usage

  1. sreenath

    The explanation is in detail along with the program. Thanks a lot !!!
    It would be great if you can provide sample input files to execute and check the program.

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>