{"id":432,"date":"2013-06-12T15:11:08","date_gmt":"2013-06-12T19:11:08","guid":{"rendered":"http:\/\/www.lichun.cc\/blog\/?p=432"},"modified":"2014-04-03T11:40:53","modified_gmt":"2014-04-03T15:40:53","slug":"use-a-lookup-hashmap-in-hive-script","status":"publish","type":"post","link":"https:\/\/www.lichun.cc\/blog\/2013\/06\/use-a-lookup-hashmap-in-hive-script\/","title":{"rendered":"Use a lookup HashMap in hive script with UDF"},"content":{"rendered":"<p>I was using custom jar for my mapreduce job in the past few years, and because it&#8217;s pure java programming, I have a lot of flexibility. But writing java results in a lot of code to maintain, and most of the mapreduce jobs are just joining with a little spice in it, so moving to Hive may be a better path.<\/p>\n<h1>Problem<\/h1>\n<p>The mapreduce job I face here is to left outer join two different datasets using the same keys, because it&#8217;s a outer join, there will be null values, and for these null values, I want to lookup the default values to assign from a map.<\/p>\n<p>For example, I have two datasets:<br \/>\ndataset 1: KEY_ID CITY SIZE_TYPE<br \/>\ndataset 2: KEY_ID POPULATION<\/p>\n<p><!--more--><\/p>\n<p>And I want to have a left outer join on KEY_ID, for those CITYs cannot find a POPULATION, I will lookup it&#8217;s SIZE_TYPE in a map and find the default values.<\/p>\n<p>Of course I can treat the default value map as a third dataset in the join, the reason I don&#8217;t want to do it is because dataset 1&amp;2 are very huge(suppose millions of rows), and the map is very small(just a few hundred rows), so join these 3 datasets will impact the performance.<\/p>\n<p>In Custom jar, I can use <strong>Distributed Cache<\/strong> to built this map. When I add the map file as a distributed cache file, it will be distributed to every node, and each mapper or reducer will be able to read it. So for each joined pair, I can simply use the SIZE_TYPE as the key to find its corresponding value.<\/p>\n<p>In Hive, it&#8217;s harder because we are writing SQL-like script, the functionality is very limited, and we cannot easily use HashMap like we use it in Java.<\/p>\n<p>Luckily, Hive supports UDF (User-Defined-Function) and ADD FILE, and we can use them to do the same thing.<\/p>\n<p><code>ADD FILE filename<\/code> in Hive equals add a distributed Cache file in java, this file will be distributed to every node.<\/p>\n<p>Writing <code>UDF<\/code> is writing java code, which can be used in Hive script. Here, we will read the file in our UDF and build a HashMap, the HashMap will only be built once.<\/p>\n<h1>Solution<\/h1>\n<p>Suppose we have two datasets and one map file:<\/p>\n<p><b> City Dataset <\/b><\/p>\n<pre>1\t  New York\t  MEDIUM\r\n2\t  Beijing  \tLARGE\r\n3\t  Chicago\t  LARGE\r\n4\t  Tempe  \tSMALL\r\n5\t  LONDON\t  LARGE<\/pre>\n<p><b> Population Dataset <\/b><\/p>\n<pre>1\t  8000000\r\n2\t  20693000\r\n3\t  2700000<\/pre>\n<p><b> Default Population map file <\/b><\/p>\n<pre>LARGE\t  1000000\r\nMEDIUM\t  500000\r\nSMALL\t  100000<\/pre>\n<p><b> Hive Script <\/b><\/p>\n<p>In the hive script, I created 3 tables, 2 tables for the input datasets, 1 for the output.<\/p>\n<p>As you can see in the code, I use <code>ADD FILE \/home\/lichu\/Downloads\/defaultPopulation;<\/code> to add the <b>defaultPopulation<\/b> file to the distributed cache, and I use<\/p>\n<pre>ADD JAR \/home\/lichu\/UDF.jar;\r\nCREATE TEMPORARY FUNCTION getDefaultPopulation AS 'test.GetDefaultPopulation';<\/pre>\n<p>to add the UDF jar file(will show how I write this function in the next section), and create a temporary function in the package. Then I used the &#8216;getDefaultPopulation&#8217; function in the select block.<\/p>\n<pre>-- author: Chun\r\n-- email : me@lichun.cc\r\n\r\nSET city_input = '\/home\/lichu\/Downloads\/city';\r\nSET population_input = '\/home\/lichu\/Downloads\/population';\r\nSET city_population_output = '\/home\/lichu\/Downloads\/city_population_output';\r\n\r\nADD FILE \/home\/lichu\/Downloads\/defaultPopulation;\r\nADD JAR \/home\/lichu\/UDF.jar;\r\nCREATE TEMPORARY FUNCTION getDefaultPopulation AS 'test.GetDefaultPopulation';\r\n\r\nDROP TABLE IF EXISTS city;\r\nCREATE EXTERNAL TABLE city (\r\n     KEY_ID                 INT,\r\n     NAME           \t\tSTRING,\r\n     SIZE_TYPE\t\t        STRING\r\n)\r\nROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'\r\nSTORED AS TEXTFILE LOCATION ${hiveconf:city_input}; \r\n\r\nDROP TABLE IF EXISTS population;\r\nCREATE EXTERNAL TABLE population (\r\n     KEY_ID                 INT,\r\n     POPULATION        \t\tINT\r\n)\r\nROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'\r\nSTORED AS TEXTFILE LOCATION ${hiveconf:population_input}; \r\n\r\nDROP TABLE IF EXISTS city_population;\r\nCREATE EXTERNAL TABLE city_population (\r\n     KEY_ID                 INT,\r\n     NAME           \t\tSTRING,\r\n     SIZE_TYPE\t\t        STRING,\r\n     POPULATION        \t\tINT\r\n)\r\nROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'\r\nSTORED AS TEXTFILE LOCATION ${hiveconf:city_population_output};\r\n\r\nFROM\r\n  (SELECT \r\n     c.KEY_ID,\r\n     c.NAME,\r\n     c.SIZE_TYPE,\r\n     CASE\r\n         WHEN p.POPULATION IS NULL \r\n             THEN getDefaultPopulation(c.SIZE_TYPE, '.\/defaultPopulation')\r\n             ELSE p.POPULATION\r\n     END as POPULATION\r\n   FROM \r\n        city c\r\n   LEFT OUTER JOIN\r\n        population p\r\n   ON\r\n       (c.KEY_ID = p.KEY_ID)\r\n  ) joined\r\nINSERT OVERWRITE TABLE city_population\r\nSELECT *\r\n;<\/pre>\n<p><b> getDefaultPopulation UDF <\/b><\/p>\n<p>Here is the java code I used for the getDefaultPopulation UDF, it&#8217;s very straight forward, notice that I check if the map is null, if it is null, initialize it with the map file, otherwise, just reuse it.<\/p>\n<pre>package test;\r\n\r\nimport java.io.BufferedReader;\r\nimport java.io.FileNotFoundException;\r\nimport java.io.FileReader;\r\nimport java.io.IOException;\r\nimport java.util.HashMap;\r\nimport java.util.Map;\r\n\r\nimport org.apache.hadoop.hive.ql.exec.UDF;\r\nimport org.apache.hadoop.hive.ql.metadata.HiveException;\r\n\r\npublic class GetDefaultPopulation extends UDF {\r\n    private Map&lt;String, Integer&gt; populationMap;\r\n\r\n    public Integer evaluate(String sizeType, String mapFile) throws HiveException {\r\n        if (populationMap == null) {\r\n            populationMap = new HashMap&lt;String, Integer&gt;();\r\n            try {\r\n                BufferedReader lineReader = new BufferedReader(new FileReader(mapFile));\r\n\r\n                String line = null;\r\n                while ((line = lineReader.readLine()) != null) {\r\n                    String[] pair = line.split(\"\\t\");\r\n                    String type = pair[0];\r\n                    int population = Integer.parseInt(pair[1]);\r\n                    populationMap.put(type, population);\r\n                }\r\n            } catch (FileNotFoundException e) {\r\n                throw new HiveException(mapFile + \" doesn't exist\");\r\n            } catch (IOException e) {\r\n                throw new HiveException(\"process file \" + mapFile + \" failed, please check format\");\r\n            }\r\n        }\r\n\r\n        if (populationMap.containsKey(sizeType)) {\r\n            return populationMap.get(sizeType);\r\n        }\r\n\r\n        return null;\r\n    }\r\n}<\/pre>\n<p>Note here I extends <b>UDF<\/b> class, and in your class you must write at least one <b>evaluate<\/b> function, Hive will look for this function when you call it in the script. After you finish the code, just export it to a JAR file in eclipse or build the jar file using command line.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>I was using custom jar for my mapreduce job in the past few years, and because it&#8217;s pure java programming, I have a lot of flexibility. But writing java results in a lot of code to maintain, and most of the mapreduce jobs are just joining with a little spice in it, so moving to [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_mi_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0,"jetpack_publicize_message":"","jetpack_is_tweetstorm":false,"jetpack_publicize_feature_enabled":true},"categories":[19],"tags":[16,54,83],"jetpack_publicize_connections":[],"jetpack_featured_media_url":"","jetpack_shortlink":"https:\/\/wp.me\/p2s9sh-6Y","jetpack_sharing_enabled":true,"jetpack_likes_enabled":true,"_links":{"self":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/432"}],"collection":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/comments?post=432"}],"version-history":[{"count":21,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/432\/revisions"}],"predecessor-version":[{"id":632,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/posts\/432\/revisions\/632"}],"wp:attachment":[{"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/media?parent=432"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/categories?post=432"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.lichun.cc\/blog\/wp-json\/wp\/v2\/tags?post=432"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}