Monday, October 24, 2011

Enable Sorted Bucketing in Hive


From the hive documents mostly we get to an impression as for grouping records, we go in for partitions and for sampling purposes, ie for evenly distributed records across multiple files we go in for buckets. But can we group records based on some columns/fields in buckets as well (individual files in buckets).
Concepts get clearer when we explain it through examples. So I’m taking the same route here.  Once with a hadoop assignment we did design a hadoop hybrid solution where the final output was on a hive partitioned table. This final output has to be consumed by an Oracle DWH for some legacy applications. The hand shake between hadoop and oracle team was they wanted ‘n’ files for each sub partition/folder and the files should have data grouped based on a few columns in the table (country and continent). If the files are grouped then the oracle load would be much efficient. How can we get the solution materialized?

1.       After hive operations do a map reduce on the final folders that would do the Group by
You do this by setting the number of reducers to ’n’ for n output files while running against each sub folder.It is really not a good solution because you have to run the map reduce for all sub partitions/folders which is definitely a performance glitch.
2.       Bucketing in hive
Using bucketing in hive for sub paritions. It is not plain bucketing but sorted bucketing. Normally we enable bucketing in hive during table creation as
CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
  Id INT,name String
)
PARTITIONED BY (dt STRING,hour STRING)
CLUSTERED BY(country,continent) INTO n BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
LOCATION '/home/test_dir';

When we go into sorted bucketing/grouped bucketing our DDL would look like

CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(         
Id INT,name String
)
PARTITIONED BY (dt STRING,hour STRING)
CLUSTERED BY(country,continent) SORTED BY(country,continent) INTO n BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
LOCATION '/home/test_dir';     

Now to enforce bucketing while loading data into the table, we need to enable a hive parameter as follows
set hive.enforce.bucketing = true;
                     
With this DDL our requirement would be satisfied. The n individual files within each sub partitions and the records would be grouped into n files based on country, continent. ie the a particular combination of country, continent would be present in only one file. Now if the question arises, which combination in which file? It is decided by the hash partitioning function. If you want control over that you need to write your custom hash partitioner and plug in the same into your hive session.

NOTE: When we use partitions data is stored under individual directories/sub directories in hdfs. But when we use buckets the records are stored as files with naming convention ranging from 0 to n-1.

NOTE: In partitioned tables when we issue a query only the required partitions are scanned, no need to specify any hints in your hive query. But for bucketed tables it is not the case, you need to hint your hive query if you want to scan some particular buckets else the whole set of files would be scanned. We hint the buckets using TABLESAMPLE clause in our hive query. For example in our example if we want to choose only the data from BUCKET 2
SELECT * FROM test_table TABLESAMPLE(2 OUT OF n BUCKETS)WHERE dt=’2011-10-11’ AND hr=’13’;

10 comments:

  1. hi
    very good post.
    i have one question.
    How do we know that required data is resided in 2 bucket?

    thanks
    tiru

    ReplyDelete
  2. Hi Tiru,

    Bucking is basically used for sampling and not for queries that point to certain data group. Say you want to calculate the average of age(one column in table) and you not looking at exact average but an approximate one. In those cases bucketing is used . Also bucketing is used to optimize joins as well.

    Also when I mentioned '2 of n' buckets means, choose 2 buckets out of n. If I mention 'n on n' then whole data would be used as input.

    Regards
    Bejoy

    ReplyDelete
  3. Can you Explain, how bucketing can optimize join?
    Ex : i have 10 tables in hive.
    I want to create a new table which will be a join result of all 10 tables.
    Another question is can i do the join of 10 table in a single query and store the output in 1 table? and what i can think about to optimize the join query in hive(especially when it is a 10+ table join)?I need serious help on this.
    Please explain it by the help of a code.

    ReplyDelete
  4. Hi Jeet

    10 tables on a join has limited scope of optimization. If you can break down the query then you can utilize the various optimization techniques and apply them individually. I can comment on it in a more detailed manner only if I know your use case, queries, data volume involved for each table and your cluster statistics.

    What is Bucketed Map Join?
    If two tables are bucked o the same key/keys on which the join is done and if one of the tables are medium sized then you can benefit from bucketed map join. When an input split from large table is processed by a mapper the corresponding bucket from smaller table can be loaded in memory and achieve a map side join and there by eliminating the reduce phase. This speeds up the join operation to a greater extent.

    ReplyDelete
  5. how would we know how many partitioned bucketed in table?

    Is there any query it show before we use (bucket 3 out of n)
    Please help me any body on this.

    ReplyDelete
  6. To know the partitioning and bucketing information regarding an existing table use
    DESCRIBE FORMATTED/EXTENDED

    ReplyDelete
  7. Hi I am not able execute any of the above queries even loaded with data

    ReplyDelete
  8. Ho Bejoy, would you be willing to explain why the "cluster by" - which is a synonym for (distribute by && sort by) would then require (apparently redundant) "sort by" as well?

    ReplyDelete
    Replies
    1. The following is directly from the hive wiki:

      >> Cluster By is a short-cut for both Distribute By and Sort By. https://cwiki.apache.org/Hive/languagemanual-sortby.html

      Delete
  9. Bejoy,

    Was really useful information. I have a question which I am posing as a use-case.

    Say that there exists a bunch of ids that come from an external source. And there is the HDFS that has buckets hashed by id and ordered by id. There is a need to delete the records corresponding to the external ids.

    1. Can I delete the records from a specific bucket ?
    2. How can I make this process optimal ?
    3. If deletions can take place at the bucket level, what happens if a few records are deleted ? Does it create a new bucket with the remaining data or are the deleted rows marked as null ?
    4. What happens if all the rows in a bucket are deleted ?

    ReplyDelete