{"id":272,"date":"2012-06-14T15:09:48","date_gmt":"2012-06-14T19:09:48","guid":{"rendered":"http:\/\/www.lichun.cc\/blog\/?p=272"},"modified":"2013-11-25T18:47:07","modified_gmt":"2013-11-25T22:47:07","slug":"wordcount-mapreduce-example-using-hive-on-local-and-emr","status":"publish","type":"post","link":"https:\/\/www.lichun.cc\/blog\/2012\/06\/wordcount-mapreduce-example-using-hive-on-local-and-emr\/","title":{"rendered":"Wordcount mapreduce example using Hive on local and EMR"},"content":{"rendered":"<p><a href=\"http:\/\/hive.apache.org\/\" target=\"_blank\">Hive<\/a> is a data warehouse system for Hadoop that facilitates easy data summarization, ad-hoc queries, and the analysis of large datasets stored in Hadoop compatible file systems.<\/p>\n<p>In short, you can run a Hadoop MapReduce using SQL-like statements with Hive.<\/p>\n<p>Here is an WordCount example I did using Hive. The example first shows how to do it on your <b>Local<\/b> machine, then I will show how to do it using Amazon EMR.<\/p>\n<h1>Local<\/h1>\n<p>1. Install Hive.<\/p>\n<p>First you need to install Hadoop on your local, <a title=\"Build Local Single Node Hadoop Cluster on Linux\" href=\"http:\/\/www.lichun.cc\/blog\/2012\/05\/build-local-haoop-cluster-on-linux\/\" target=\"_blank\">here is a post<\/a> for how to do it. After you installed Hadoop, you can use <a href=\"https:\/\/cwiki.apache.org\/confluence\/display\/Hive\/GettingStarted#GettingStarted-InstallationandConfiguration\" target=\"_blank\">this official tutorial<\/a>.<\/p>\n<p><!--more--><\/p>\n<p>*2. This step may not needed, in case you meet error says the IP address cannot be accessed, go to your Hadoop folder, edit the <code>conf\/core-site.xml<\/code>, change <b>fs.default.name<\/b> from IP to your hostname (for me it&#8217;s <code>http:\/\/localhost.localdomain:9000<\/code><\/p>\n<p>3. Write mapper &amp; reducer for our WordCount example, here I use <b>python<\/b>, you can use any script languages you like.<\/p>\n<p>Mapper: (word_count_mapper.py)<\/p>\n<pre>#!\/usr\/bin\/python\r\nimport sys\r\n\r\nfor line in sys.stdin:\r\n\tline = line.strip();\r\n\twords = line.split(\" \");\r\n\t# write the tuples to stdout\r\n\tfor word in words:\r\n\t\tprint '%s\\t%s' % (word, \"1\")<\/pre>\n<p>Reducer: (word_count_reducer.py)<\/p>\n<pre>#!\/usr\/bin\/python\r\nimport sys\r\n\r\n# maps words to their counts\r\nword2count = {}\r\n\r\nfor line in sys.stdin:\r\n    line = line.strip()\r\n\r\n    # parse the input we got from mapper.py\r\n    word, count = line.split('\\t', 1)\r\n    # convert count (currently a string) to int\r\n    try:\r\n        count = int(count)\r\n    except ValueError:\r\n        continue\r\n\r\n    try:\r\n        word2count[word] = word2count[word]+count\r\n    except:\r\n        word2count[word] = count\r\n\r\n# write the tuples to stdout\r\n# Note: they are unsorted\r\nfor word in word2count.keys():\r\n    print '%st%s'% ( word, word2count[word] )<\/pre>\n<p>4. Write the Hive script <b>word_count.hql<\/b>. Note: you can run the following codes line by line in Hive console as well.<\/p>\n<pre>drop table if exists raw_lines;\r\n\r\n-- create table raw_line, and read all the lines in '\/user\/inputs', this is the path on your local HDFS\r\ncreate external table if not exists raw_lines(line string)\r\nROW FORMAT DELIMITED\r\nstored as textfile\r\nlocation '\/user\/inputs';\r\n\r\ndrop table if exists word_count;\r\n\r\n-- create table word_count, this is the output table which will be put in '\/user\/outputs' as a text file, this is the path on your local HDFS\r\n\r\ncreate external table if not exists word_count(word string, count int)\r\nROW FORMAT DELIMITED\r\nFIELDS TERMINATED BY '\\t'\r\nlines terminated by '\\n' STORED AS TEXTFILE LOCATION '\/user\/outputs\/';\r\n\r\n-- add the mapper&amp;reducer scripts as resources, please change <b>your\/local\/path<\/b>\r\nadd file your\/local\/path\/word_count_mapper.py;\r\nadd file your\/local\/path\/word_count_reducer.py;\r\n\r\nfrom (\r\n        from raw_lines\r\n        map raw_lines.line\r\n        --call the mapper here\r\n        using 'word_count_mapper.py'\r\n        as word, count\r\n        cluster by word) map_output\r\ninsert overwrite table word_count\r\nreduce map_output.word, map_output.count\r\n--call the reducer here\r\nusing 'word_count_reducer.py'\r\nas word,count;<\/pre>\n<p>5. Put some text files on HDFS &#8216;\/user\/inputs\/&#8217; using Hadoop commandline (<code>Hadoop dfs -copyFromLocal <i>source<\/i> <i>destination<\/i><\/code>)<\/p>\n<p>6. Run your script!<\/p>\n<pre>hive -f word_count.hql<\/pre>\n<p>The script will create 2 tables, read input data in <b>raw_lines<\/b> table and add mapper &amp; reducer scripts as resources; do the MapReduce and store the data in <b>word_count<\/b> table, which you can find the text file in &#8216;\/user\/outputs&#8217;.<\/p>\n<p>IN case you meet the safe mode error, you can close the safe mode manually:<\/p>\n<pre>hadoop dfsadmin -safemode leave<\/pre>\n<p>IMPORTANT:<br \/>\nIn your script file, PLEASE do not for get to add &#8220;#!\/usr\/bin\/python&#8221; at the first line. I forgot to add and met this error, which cost me half an hour to figure out why&#8230;<\/p>\n<pre>Starting Job = job_201206131927_0006, Tracking URL = http:\/\/domU-12-31-39-03-BD-57.compute-1.internal:9100\/jobdetails.jsp?jobid=job_201206131927_0006\r\nKill Command = \/home\/hadoop\/bin\/hadoop job\u00a0 -Dmapred.job.tracker=10.249.190.165:9001 -kill job_201206131927_0006\r\nHadoop job information for Stage-1: number of mappers: 1; number of reducers: 1\r\n2012-06-13 20:56:15,119 Stage-1 map = 0%,\u00a0 reduce = 0%\r\n2012-06-13 20:57:10,489 Stage-1 map = 100%,\u00a0 reduce = 100%\r\nEnded Job = job_201206131927_0006 with errors\r\nError during job, obtaining debugging information...\r\nExamining task ID: task_201206131927_0006_m_000002 (and more) from job job_201206131927_0006\r\nException in thread \"Thread-120\" java.lang.RuntimeException: Error while reading from task log url\r\nat org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor.getErrors(TaskLogProcessor.java:130)\r\nat org.apache.hadoop.hive.ql.exec.JobDebugger.showJobFailDebugInfo(JobDebugger.java:211)\r\nat org.apache.hadoop.hive.ql.exec.JobDebugger.run(JobDebugger.java:81)\r\nat java.lang.Thread.run(Thread.java:662)\r\nCaused by: java.io.IOException: Server returned HTTP response code: 400 for URL: http:\/\/10.248.42.34:9103\/tasklog?taskid=attempt_201206131927_0006_m_000000_2&amp;start=-8193\r\nat sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1436)\r\nat java.net.URL.openStream(URL.java:1010)\r\nat org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor.getErrors(TaskLogProcessor.java:120)\r\n... 3 more\r\nCounters:\r\nFAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask\r\nMapReduce Jobs Launched:\r\nJob 0: Map: 1\u00a0 Reduce: 1\u00a0\u00a0 HDFS Read: 0 HDFS Write: 0 FAIL\r\nTotal MapReduce CPU Time Spent: 0 msec<\/pre>\n<h1>Amazon EMR<\/h1>\n<p>Running Hive script on EMR is very simple actually. I will use pictures to show how I did.<\/p>\n<p>Here is the code I modified for EMR:<\/p>\n<pre>create external table if not exists raw_lines(line string)\r\nROW FORMAT DELIMITED\r\nstored as TEXTFILE\r\nLOCATION '${INPUT}';\r\n\r\ncreate external table if not exists word_count(word string, count int)\r\nROW FORMAT DELIMITED\r\nFIELDS TERMINATED BY 't'\r\nlines terminated by 'n'\r\nSTORED AS TEXTFILE \r\nLOCATION '${OUTPUT}';\r\n\r\nfrom (\r\n        from raw_lines\r\n        map raw_lines.line\r\n        using '${SCRIPT}\/word_count_mapper.py'\r\n        as word, count\r\n        cluster by word) map_output\r\ninsert overwrite table word_count\r\nreduce map_output.word, map_output.count\r\nusing '${SCRIPT}\/word_count_reducer.py'\r\nas word,count;<\/pre>\n<p>Note in the script, I use INPUT, OUTPUT, SCRIPT variables, INPUT\/OUTPUT are set by EMR automatically in the step (2) below, SCRIPT is set by me in the <b>Extra args<\/b>.<\/p>\n<p>All files are stored in S3.<\/p>\n<p>1. Create EMR job:<br \/>\n<a href=\"https:\/\/i0.wp.com\/www.lichun.cc\/blog\/wp-content\/uploads\/2012\/06\/hive-emr-1.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-large wp-image-290\" title=\"hive-emr-1\" alt=\"\" src=\"https:\/\/i0.wp.com\/www.lichun.cc\/blog\/wp-content\/uploads\/2012\/06\/hive-emr-1.png?resize=625%2C409\" width=\"625\" height=\"409\" data-recalc-dims=\"1\" \/><\/a><\/p>\n<p>2. Set the Hive script path and arguments<br \/>\n<a href=\"https:\/\/i0.wp.com\/www.lichun.cc\/blog\/wp-content\/uploads\/2012\/06\/hive-emr-2.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-large wp-image-291\" title=\"hive-emr-2\" alt=\"\" src=\"https:\/\/i0.wp.com\/www.lichun.cc\/blog\/wp-content\/uploads\/2012\/06\/hive-emr-2.png?resize=625%2C406\" width=\"625\" height=\"406\" data-recalc-dims=\"1\" \/><\/a><\/p>\n<p>3. Set instances<br \/>\n<a href=\"https:\/\/i0.wp.com\/www.lichun.cc\/blog\/wp-content\/uploads\/2012\/06\/hive-emr-3.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-large wp-image-292\" title=\"hive-emr-3\" alt=\"\" src=\"https:\/\/i0.wp.com\/www.lichun.cc\/blog\/wp-content\/uploads\/2012\/06\/hive-emr-3.png?resize=625%2C406\" width=\"625\" height=\"406\" data-recalc-dims=\"1\" \/><\/a><\/p>\n<p>4. Set Log path<br \/>\n<a href=\"https:\/\/i0.wp.com\/www.lichun.cc\/blog\/wp-content\/uploads\/2012\/06\/hive-emr-4.png\"><img decoding=\"async\" loading=\"lazy\" class=\"alignnone size-large wp-image-293\" title=\"hive-emr-4\" alt=\"\" src=\"https:\/\/i0.wp.com\/www.lichun.cc\/blog\/wp-content\/uploads\/2012\/06\/hive-emr-4.png?resize=625%2C412\" width=\"625\" height=\"412\" data-recalc-dims=\"1\" \/><\/a><\/p>\n<p>5. Run the Job!<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Hive is a data warehouse system for Hadoop that facilitates easy data summarization, ad-hoc queries, and the analysis of large datasets stored in Hadoop compatible file systems. In short, you can run a Hadoop MapReduce using SQL-like statements with Hive. Here is an WordCount example I did using Hive. The example first shows how to [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_mi_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0,"jetpack_publicize_message":"","jetpack_is_tweetstorm":false,"jetpack_publicize_feature_enabled":true},"categories":[19],"tags":[41,54,83,55],"jetpack_publicize_connections":[],"jetpack_featured_media_url":"","jetpack_shortlink":"https:\/\/wp.me\/p2s9sh-4o","jetpack_sharing_enabled":true,"jetpack_likes_enabled":true,"_links":{"self":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/272"}],"collection":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/comments?post=272"}],"version-history":[{"count":1,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/272\/revisions"}],"predecessor-version":[{"id":591,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/272\/revisions\/591"}],"wp:attachment":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/media?parent=272"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/categories?post=272"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/tags?post=272"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}