Tuesday, June 21, 2011

How to create patritions on the fly in hive tables based on data? OR How can you implement Dynamic Partitions in hive for Larger Tables?


Straight answer depends on your business case but still partitions would be a better approach. Let us look at a business scenario on this issue

Problem: I have a huge table in my EDW that holds 5 billion rows. Every record has a column pay_location which has 300 distinct values across the table. Need to do some processing on the same within Hadoop environment and at a time my processing involves data only from certain pay_locations.

Table Schema in DWH
Invoice_Details
Column Name
Data Type
Invoice_Id
double
Invoice_Date
Date
Invoice_Amount
double
Pay_Location
VarChar
Paid_Date
Date


Solution: We can accomplish the same in 2 easy steps
Step 1: SQOOP import the table into hive from DWH
Step 2: Analyze the data using Hive QL
                Fist we need to SQOOP import the data into hive table ‘invoice_details’ using the basic SQOOP import command as follows.
sqoop import --driver <driver name> --connect <connection string> --username <username> -P --table Invoice_Details --split-by Invoice_Id --num-mappers <num of mappers> --warehouse-dir <hdfs dir> --hive-import --hive-table invoice_details_hive

We’d look into the second part here in more detail. How to effectively and efficiently analyze the same in hive. In our requirement it is clear that we can go ahead with a partitioned table approach for our data set, as the data analysis is made pay_location by pay_location we can do the partition based on pay_location itself.
Now how can we do the partition? Simple, need to create a partitioned table and load data into each partition. So first we can create an equivalent partitioned table in hive

CREATE TABLE invoice_details_hive _partitioned(Invoice_Id double, Invoice_Date string, Invoice_Amount double,Paid_Date string)PARTITIONED BY(pay_location string);

Once table creation is completed we need to load data into the partitions on invoice_details_hive _partitioned from invoice_details_hive . How can we do it? Can we go ahead for individual insert statements for each pay_location like?

INSERT OVERWRITE TABLE invoice_details_hive _partitioned PARTITION(pay_location=’USA’)
SELECT idh.Invoice_Id, idh.Invoice_Date, idh.Invoice_Amount, idh.Paid_Date FROM invoice_details_hive idh WHERE pay_location=’USA’;

If we follow this approach we may have to go in for 300 insert statements as there are 300 distinct values for pay_location in invoice_details_hive table. This type of implementation can be called as STATIC PARTIONS. But in our scenario static partitions won’t serve the purpose or rather it is too tedious. We’d have to implement the concept of DYNAMIC PARTITIONS introduced from hive 0.6 onwards. With Dynamic partitions we just need a single Insert Overwrite statement to create and load data into all the partitions.

INSERT OVERWRITE TABLE invoice_details_hive _partitioned PARTITION(pay_location)
SELECT idh.Invoice_Id, idh.Invoice_Date, idh.Invoice_Amount, idh.Paid_Date, idh.pay_location FROM invoice_details_hive idh;

This Single Query would implement dynamic partition for you, when you use dynamic partitions the last column from the select query on the source table should be column used for partitioning in the destination table (idh.pay_location)

When you try executing the query you can see hive throwing some fatal errors, like dynamic partition mode is strict and dynamic partition not enabled. So we need to set the following parameters in hive shell
1.       set hive.exec.dynamic.partition=true;
To enable dynamic partitions, by default it is false
2.       set hive.exec.dynamic.partition.mode=nonstrict;
We are using the dynamic partition without a static partition (A table can be partitioned based on multiple columns in hive) in such case we have to enable the non strict mode. In strict mode we can use dynamic partition only with a Static Partition.
3.       set hive.exec.max.dynamic.partitions.pernode=300;
The default value is 100, we have to modify the same according to the possible no of partitions that would come in your case
4.       set hive.exec.max.created.files=150000;
The default values is 100000 but for larger tables it can exceed the default, so we may have to update the same

In practical scenarios I did find the Dynamic Partition not working with the above query on really large tables and shooting a java print error after completion of first map process. This could be due to the larger number of files created on the first map process. However a slight modification of the job can help you overcome the same, group the records in your hive query on the map process and process them on the reduce side. ie use a map reduce process to achieve your goal rather than two map process. You can implement the same in your hive query itself with the usage of DISTRIBUTE BY, so the modified query would be

FROM invoice_details_hive idh
INSERT OVERWRITE TABLE invoice_details_hive _partitioned PARTITION(pay_location)
SELECT idh.Invoice_Id, idh.Invoice_Date, idh.Invoice_Amount, idh.Paid_Date, idh.pay_location
DISTRIBUTE BY pay_location;

With this approach you don’t need to overwrite the hive.exec.max.created.files parameter.

2 comments:

  1. Very nice article, thanks Bejoy

    ReplyDelete
  2. Very helpful.
    Just a note:
    If running via Amazons EMR service setting the paremters via the interactive hive terminal doesn't have any affect.

    Now worries, just update the hive/conf/hive-site.xml file
    And all is good.

    edit hive/conf/hive-site.xml add:




    hive.exec.max.dynamic.partitions
    100000


    hive.exec.max.dynamic.partitions.pernode
    100000



    hive.exec.dynamic.partition
    true



    hive.exec.dynamic.partition.mode
    nonstrict





    https://forums.aws.amazon.com/message.jspa?messageID=322898

    ReplyDelete