We all know that Hive read/write data in folder level, the limit here is that by default it will only read/write the files from/to the folder specified. But sometimes, our input data are organized by using subfolders, then Hive cannot read them if you only specify the root folder; or you want to output to separate folders instead of putting all the output data in the same folder.
For example, we have sales data dumped to hdfs(or s3), and their path structure is like sales/city=BEIJING/day=20140401/data.tsv , as you can see, the data is partitioned by city and day, although we can copy all the data.tsv to the same folder, we need to do the copy and change the filename to avoid conflict, it will be a pain if the files are a lot and huge. On the other hand, even if we do copy all the data.tsv to the same folder, when output, we want to separate the output to different folders by city and day, how to do that?
Can hive be smart enough to read all the subfolder’s data and output to separate folders? The answer is Yes.
Just like MultipleInputs, Hadoop also supports MultipleOutputs, thanks to the equality, we can output different data/format in the same MapReduce job.
It’s very easy to use this useful feature, as before, I will mainly use Java code to demonstrate the usage, hope the code can explain itself 🙂
Note: I wrote and ran the following code using Hadoop 1.0.3, but it should be working in 0.20.205 as well
SecondarySort is a technique that you can control the order of inputs that comes to Reducers.
For example, we wants to Join two different datasets, One dataset contains the attribute that the other dataset need to use. For simplicity, we call the first dataset ATTRIBUTE set, the other dataset DATA set. Since the ATTRIBUTE set is also very large, it’s not practical to put it in the Distributed Cache.
Now we want to join these two tables, for each record in the DATA set, we get its ATTRIBUTE. If we don’t use SecondarySort, after the map step, the DATA and ATTRIBUTE will come in arbitrary order, so if we want to append the ATTRIBUTE to each DATA, we need to store all the DATA in memory, and later when we meet the ATTRIBUTE, we then assign the ATTRIBUTE to DATA.
I was using custom jar for my mapreduce job in the past few years, and because it’s pure java programming, I have a lot of flexibility. But writing java results in a lot of code to maintain, and most of the mapreduce jobs are just joining with a little spice in it, so moving to Hive may be a better path.
The mapreduce job I face here is to left outer join two different datasets using the same keys, because it’s a outer join, there will be null values, and for these null values, I want to lookup the default values to assign from a map.
For example, I have two datasets:
dataset 1: KEY_ID CITY SIZE_TYPE
dataset 2: KEY_ID POPULATION
DistributedCache is a very useful Hadoop feature that enables you to pass resource files to each mapper or reducer.
For example, you have a file stopWordList.txt that contains all the stop words you want to exclude when you do word count. And In your reducer, you want to check each value passed by mapper, if the value appears in the stop word list, we pass it and goes to the next value.
In order to use DistributedCache, first you need to set the file in the job configuration driver:
Hive is a data warehouse system for Hadoop that facilitates easy data summarization, ad-hoc queries, and the analysis of large datasets stored in Hadoop compatible file systems.
In short, you can run a Hadoop MapReduce using SQL-like statements with Hive.
Here is an WordCount example I did using Hive. The example first shows how to do it on your Local machine, then I will show how to do it using Amazon EMR.
1. Install Hive.
First you need to install Hadoop on your local, here is a post for how to do it. After you installed Hadoop, you can use this official tutorial.
There are few documents about the Amazon EMR Java API usage, in case you are finding codes configuring and starting EMR job using Java, here are the codes I use:
(a brief official sample can be found here, the file contains the following code is Here)
MultipleInputs is a feature that supports different input formats in the MapReduce.
For example, we have two files with different formats:
(1) First file format:
(2) Second file format:
In order to read the custom format, we need to write Record Class, RecordReader, InputFormat for each one.
InputFormat is needed by MultipleInputs, an InputFormat use RecordReader to read the file and return value, the value is a Record Class instance
Here is the implementation:
GenericWritable is another Hadoop feature that let you pass values with different types to the reducer, it’s a wrapper for Writable instances.
Suppose we have different input formats (see MultipleInput), one is FirstClass and another one is SecondClass.(note: you can have multiple, not just 2). And you want to include both of them in your reducer based on the same key value, here is what you can do:
We use the same code used in MultipleInput.