Wordcount mapreduce example using Hive on local and EMR

Hive is a data warehouse system for Hadoop that facilitates easy data summarization, ad-hoc queries, and the analysis of large datasets stored in Hadoop compatible file systems.

In short, you can run a Hadoop MapReduce using SQL-like statements with Hive.

Here is an WordCount example I did using Hive. The example first shows how to do it on your Local machine, then I will show how to do it using Amazon EMR.


1. Install Hive.

First you need to install Hadoop on your local, here is a post for how to do it. After you installed Hadoop, you can use this official tutorial.

*2. This step may not needed, in case you meet error says the IP address cannot be accessed, go to your Hadoop folder, edit the conf/core-site.xml, change fs.default.name from IP to your hostname (for me it’s http://localhost.localdomain:9000

3. Write mapper & reducer for our WordCount example, here I use python, you can use any script languages you like.

Mapper: (word_count_mapper.py)

import sys

for line in sys.stdin:
	line = line.strip();
	words = line.split(" ");
	# write the tuples to stdout
	for word in words:
		print '%s\t%s' % (word, "1")

Reducer: (word_count_reducer.py)

import sys

# maps words to their counts
word2count = {}

for line in sys.stdin:
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
        count = int(count)
    except ValueError:

        word2count[word] = word2count[word]+count
        word2count[word] = count

# write the tuples to stdout
# Note: they are unsorted
for word in word2count.keys():
    print '%st%s'% ( word, word2count[word] )

4. Write the Hive script word_count.hql. Note: you can run the following codes line by line in Hive console as well.

drop table if exists raw_lines;

-- create table raw_line, and read all the lines in '/user/inputs', this is the path on your local HDFS
create external table if not exists raw_lines(line string)
stored as textfile
location '/user/inputs';

drop table if exists word_count;

-- create table word_count, this is the output table which will be put in '/user/outputs' as a text file, this is the path on your local HDFS

create external table if not exists word_count(word string, count int)
lines terminated by '\n' STORED AS TEXTFILE LOCATION '/user/outputs/';

-- add the mapper&reducer scripts as resources, please change your/local/path
add file your/local/path/word_count_mapper.py;
add file your/local/path/word_count_reducer.py;

from (
        from raw_lines
        map raw_lines.line
        --call the mapper here
        using 'word_count_mapper.py'
        as word, count
        cluster by word) map_output
insert overwrite table word_count
reduce map_output.word, map_output.count
--call the reducer here
using 'word_count_reducer.py'
as word,count;

5. Put some text files on HDFS ‘/user/inputs/’ using Hadoop commandline (Hadoop dfs -copyFromLocal source destination)

6. Run your script!

hive -f word_count.hql

The script will create 2 tables, read input data in raw_lines table and add mapper & reducer scripts as resources; do the MapReduce and store the data in word_count table, which you can find the text file in ‘/user/outputs’.

IN case you meet the safe mode error, you can close the safe mode manually:

hadoop dfsadmin -safemode leave

In your script file, PLEASE do not for get to add “#!/usr/bin/python” at the first line. I forgot to add and met this error, which cost me half an hour to figure out why…

Starting Job = job_201206131927_0006, Tracking URL = http://domU-12-31-39-03-BD-57.compute-1.internal:9100/jobdetails.jsp?jobid=job_201206131927_0006
Kill Command = /home/hadoop/bin/hadoop job  -Dmapred.job.tracker= -kill job_201206131927_0006
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2012-06-13 20:56:15,119 Stage-1 map = 0%,  reduce = 0%
2012-06-13 20:57:10,489 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_201206131927_0006 with errors
Error during job, obtaining debugging information...
Examining task ID: task_201206131927_0006_m_000002 (and more) from job job_201206131927_0006
Exception in thread "Thread-120" java.lang.RuntimeException: Error while reading from task log url
at org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor.getErrors(TaskLogProcessor.java:130)
at org.apache.hadoop.hive.ql.exec.JobDebugger.showJobFailDebugInfo(JobDebugger.java:211)
at org.apache.hadoop.hive.ql.exec.JobDebugger.run(JobDebugger.java:81)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.io.IOException: Server returned HTTP response code: 400 for URL:
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1436)
at java.net.URL.openStream(URL.java:1010)
at org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor.getErrors(TaskLogProcessor.java:120)
... 3 more
FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
MapReduce Jobs Launched:
Job 0: Map: 1  Reduce: 1   HDFS Read: 0 HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 0 msec

Amazon EMR

Running Hive script on EMR is very simple actually. I will use pictures to show how I did.

Here is the code I modified for EMR:

create external table if not exists raw_lines(line string)
stored as TEXTFILE

create external table if not exists word_count(word string, count int)
lines terminated by 'n'

from (
        from raw_lines
        map raw_lines.line
        using '${SCRIPT}/word_count_mapper.py'
        as word, count
        cluster by word) map_output
insert overwrite table word_count
reduce map_output.word, map_output.count
using '${SCRIPT}/word_count_reducer.py'
as word,count;

Note in the script, I use INPUT, OUTPUT, SCRIPT variables, INPUT/OUTPUT are set by EMR automatically in the step (2) below, SCRIPT is set by me in the Extra args.

All files are stored in S3.

1. Create EMR job:

2. Set the Hive script path and arguments

3. Set instances

4. Set Log path

5. Run the Job!

