We all know that Hive read/write data in folder level, the limit here is that by default it will only read/write the files from/to the folder specified. But sometimes, our input data are organized by using subfolders, then Hive cannot read them if you only specify the root folder; or you want to output to separate folders instead of putting all the output data in the same folder.
For example, we have sales data dumped to hdfs(or s3), and their path structure is like sales/city=BEIJING/day=20140401/data.tsv , as you can see, the data is partitioned by city and day, although we can copy all the data.tsv to the same folder, we need to do the copy and change the filename to avoid conflict, it will be a pain if the files are a lot and huge. On the other hand, even if we do copy all the data.tsv to the same folder, when output, we want to separate the output to different folders by city and day, how to do that?
Can hive be smart enough to read all the subfolder’s data and output to separate folders? The answer is Yes.
Let’s use an example to see how to do this.
In the following example, we try to do this. Given the SALES data and PRODUCT_COSTS data, what is the TOTAL PROFIT of each product in each city on each day.
Format: ID SALE_CITY PRODUCT_NAME QUANTITY SALE_PRICE SNAPSHOT_DAY
Format: ID PRODUCT_NAME PRODUCT_COST SNAPSHOT_DAY
The the Profit of each record is: (SALE_PRICE – PRODUCT_COST) * QUANTITY
Format: PRODUCT_NAME QUANTITY TOTAL_PROFIT
At first, we need to enable the dynamic partition functionality of HIVE.
put the following 2 lines at the top of your hive script:
set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict;
For the input, we can only specify the LOCATION to be the root folder, and when we create the table, we need to specify the partitions.
Here is the code:
DROP TABLE IF EXISTS SALES; CREATE EXTERNAL TABLE SALES ( ID BIGINT, SALE_CITY STRING, PRODUCT_NAME STRING, QUANTITY INT, SALE_PRICE DOUBLE, SNAPSHOT_DAY STRING ) PARTITIONED BY (CITY STRING, DAY STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION "/input/sales/";
To let HIVE actually know all the partitions under the root folder, there are 2 ways.
(a) Manually add all the partitions
Unfortunately, Current Hive cannot be smart enough to automatically recognize all the partitions, we have to manually add them all.
ALTER TABLE SALES ADD PARTITION (city='BEIJING', day='20140401'); ALTER TABLE SALES ADD PARTITION (city='BEIJING', day='20140402'); ALTER TABLE SALES ADD PARTITION (city='TAIYUAN', day='20140401'); ALTER TABLE SALES ADD PARTITION (city='TAIYUAN', day='20140402');
It will be annoying if there are a lot more partitions. But sorry there is no better way using non-modified HIVE.
(b) Use AWS EMR to run your HIVE script
AWS EMR’s modified HIVE has a very convenient command to let HIVE recognize all the partitions automatically. Just add the following line after the table creation.
ALTER TABLE SALES RECOVER PARTITIONS;
And then HIVE will load all the data following the partition rules(xxx=yyy pattern) under the given root folder.
Here is the code for PRODUCT_COST table, there is no partition on it,
DROP TABLE IF EXISTS PRODUCT_COST; CREATE EXTERNAL TABLE PRODUCT_COST ( ID BIGINT, PRODUCT_NAME STRING, PRODUCT_COST DOUBLE, SNAPSHOT_DAY STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION "/input/product_cost/";
For output table, we also need to specify the partitions when we create the table.
DROP TABLE IF EXISTS PROFIT; CREATE EXTERNAL TABLE PROFIT ( PRODUCT_NAME STRING, QUANTITY INT, TOTAL_PROFIT DOUBLE ) PARTITIONED BY (CITY STRING, DAY STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION "/output/";
When you insert the data to the output table, (1) we need to tell HIVE that you’re writing to partitions, (2) we need append the partition data to the end of each output row.
INSERT OVERWRITE TABLE PROFIT PARTITION (CITY, DAY) SELECT s.PRODUCT_NAME, SUM(s.QUANTITY) QUANTITY, SUM( (s.SALE_PRICE - p.PRODUCT_COST) * s.QUANTITY ) TOTAL_PROFIT, s.SALE_CITY, s.SNAPSHOT_DAY FROM SALES s JOIN PRODUCT_COST p ON (s.SNAPSHOT_DAY = p.SNAPSHOT_DAY AND s.PRODUCT_NAME = p.PRODUCT_NAME) GROUP BY s.SNAPSHOT_DAY, s.PRODUCT_NAME, s.SALE_CITY ;
For (1), we added PARTITION (CITY, DAY) after the INSERT OVERWRITE TABLE xxx statement.
For (2), we appended the SALE_CITY, SNAPSHOT_DAY at the end of the record in the order the partitions are specified.
By doing this, Hive will separate the data by the partitions and output it to separate folders using the partition rules(xxx=yyy pattern). Here it will output to /output/city=BEIJING/day=20140401 …
Here are the sample codes, I tested it on my local environment and on AWS EMR cluster, and it produces correct output.