Friday, February 10, 2012

Enable Multiple threads in a mapper aka MultithreadedMapper


As the name suggests it is map task that spawns multiple threads. A map task can be considered as a process which runs on its own jvm boundary. Multithreaded spawns multiple threads within the same map task. Don’t confuse the same as multiple tasks within the same jvm (this is achieved with jvm reuse). When I say a task has multiple threads, a task would be reusing the input split as defined by the input format and record reader reads the input like a normal map task. The multi threading happens after this stage; once the record reading has happened then the input/task is divided into multiple threads.  (ie the input IO is not multi threaded and multiple threads come into picture after that)
MultiThreadedMapper is a good fit if your operation is highly CPU intensive and multiple threads getting multiple cycles could help in speeding up the task. If IO intensive, then running multiple tasks is much better than multi thread as in multiple tasks multiple IO reads would be happening in parallel.
Let us see how we can use MultiThreadedMapper. There are different ways to do the same in old mapreduce API and new API.
Old API
Enable Multi threaded map runner as
-D mapred.map.runner.class = org.apache.hadoop.mapred.lib.MultithreadedMapRunner
Or
jobConf.setMapRunnerClass(org.apache.hadoop.mapred.lib.MultithreadedMapRunner);

New API
Your mapper class should sub class (extend) org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper instead of org.apache.hadoop.mapreduce.Mapper . The Multithreadedmapper has a different implementation of run() method.

You can set the number of threads within a mapper in MultiThreadedMapper by
MultithreadedMapper.setNumberOfThreads(n); or
mapred.map.multithreadedrunner.threads = n
 


Note: Don’t think it in a way that multi threaded mapper is better than normal map reduce as it spawns less jvms and less number of processes. If a mapper is loaded with lots of threads the chances of that jvm crashing are more and the cost of re-execution of such a hadoop task would be terribly high.
                Don’t use Multi Threaded Mapper to control the number of jvms spanned, if that is your goal you need to tweak the mapred.job.reuse.jvm.num.tasks parameter whose default value is 1, means no jvm reuse across tasks.
                The threads are at the bottom level ie within a map task and the higher levels on hadoop framework like the job has no communication regarding the same.

Thursday, February 9, 2012

Query Hbase tables using hive/ Mount an Hbase table into hive


                You can use Hbase as the data store for your hive table. On hive table creation we need to specify a mapping for the same.  What all needs to be provided
1.       The hbase table name
2.       The mapping between hbase Column Family:Qualifier to hive Columns
If you are mapping a hbase column Family itself to a hive column then the data type of that hive column has to be Map. Also in the DDL the table has to me specified as External

Example of mapping a hbase table (employee_hbase) to a hive table employee_hive

CREATE EXTERNAL TABLE employee_hive(key INT, value1 STRING,value2  Map<STRING,STRING>)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val,cf2:")
TBLPROPERTIES("hbase.table.name" = "employee_hbase");

Use Compression with Mapreduce


                               Hadoop is intended for storing large data volumes, so compression becomes a mandatory requirement here. There are different compression formats available like gzip,Bzip,LZO etc. Of these Bzip(the latest) and LZO are splittable and in that Bzip offers a better compression but the decompression of the same is expensive. When we look at both space and time LZO is more advisable. Also LZO supports indexing which would again help you while using hive on your data. While running mapreduce with compression we need to know at least the following
1.       How to run map reduce on compressed data
2.       How to produce compressed output from mapreduce

Running Mapreduce on compressed data
                It is very straight forward, no need to implement any custom input format for the same. You can use any input formats with compression. The only step is to add the compression codec to the value in io.compression.codecs

Suppose if you are using LZO then your value would look something like
io.compression.codecs  =  org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, com.hadoop.compression.lzo.LzopCodec

Then configure and run your map reduce jobs as you do normally on uncompressed files. When map wants to process a file and if it is compressed it would check for the io.compression.codecs   and use a suitable codec from there to read the file.

Produce compressed data from map reduce
                It is again straight forward and you can achieve the same by setting the following parameters. (Using LZO here)
mapred.output.compress=true
mapred.output.compression.codec= com.hadoop.compression.lzo.LzopCodec

You get your output compressed in LZO. Again here also you can use the same with any normal output formats.

Index LZO files
                It is possible with just 2 lines of code as

//Run theLZO indexer on files in hdfs
LzoIndexer indexer = new LzoIndexer(fs.getConf());
indexer.index(filePath);

Compress Intermediate output (map output)
                Compressing intermediate output is also a good idea in map reduce. The map outputs have to be copied across nodes to reducers and if compressed it saves network and transfer time. Just specify the following configuration parameters as
mapred.compress.map.output=true
mapred.map.output.compression.codec= hadoop.compression.lzo.LzoCodec

Monday, October 24, 2011

Enable Sorted Bucketing in Hive


From the hive documents mostly we get to an impression as for grouping records, we go in for partitions and for sampling purposes, ie for evenly distributed records across multiple files we go in for buckets. But can we group records based on some columns/fields in buckets as well (individual files in buckets).
Concepts get clearer when we explain it through examples. So I’m taking the same route here.  Once with a hadoop assignment we did design a hadoop hybrid solution where the final output was on a hive partitioned table. This final output has to be consumed by an Oracle DWH for some legacy applications. The hand shake between hadoop and oracle team was they wanted ‘n’ files for each sub partition/folder and the files should have data grouped based on a few columns in the table (country and continent). If the files are grouped then the oracle load would be much efficient. How can we get the solution materialized?

1.       After hive operations do a map reduce on the final folders that would do the Group by
You do this by setting the number of reducers to ’n’ for n output files while running against each sub folder.It is really not a good solution because you have to run the map reduce for all sub partitions/folders which is definitely a performance glitch.
2.       Bucketing in hive
Using bucketing in hive for sub paritions. It is not plain bucketing but sorted bucketing. Normally we enable bucketing in hive during table creation as
CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
  Id INT,name String
)
PARTITIONED BY (dt STRING,hour STRING)
CLUSTERED BY(country,continent) INTO n BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
LOCATION '/home/test_dir';

When we go into sorted bucketing/grouped bucketing our DDL would look like

CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(         
Id INT,name String
)
PARTITIONED BY (dt STRING,hour STRING)
CLUSTERED BY(country,continent) SORTED BY(country,continent) INTO n BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'
LOCATION '/home/test_dir';     

Now to enforce bucketing while loading data into the table, we need to enable a hive parameter as follows
set hive.enforce.bucketing = true;
                     
With this DDL our requirement would be satisfied. The n individual files within each sub partitions and the records would be grouped into n files based on country, continent. ie the a particular combination of country, continent would be present in only one file. Now if the question arises, which combination in which file? It is decided by the hash partitioning function. If you want control over that you need to write your custom hash partitioner and plug in the same into your hive session.

NOTE: When we use partitions data is stored under individual directories/sub directories in hdfs. But when we use buckets the records are stored as files with naming convention ranging from 0 to n-1.

NOTE: In partitioned tables when we issue a query only the required partitions are scanned, no need to specify any hints in your hive query. But for bucketed tables it is not the case, you need to hint your hive query if you want to scan some particular buckets else the whole set of files would be scanned. We hint the buckets using TABLESAMPLE clause in our hive query. For example in our example if we want to choose only the data from BUCKET 2
SELECT * FROM test_table TABLESAMPLE(2 OUT OF n BUCKETS)WHERE dt=’2011-10-11’ AND hr=’13’;

Include values during execution time in hive QL/ Dynamically substitute values in hive


When you play around with data warehousing it is very common to come across scenarios where you’d like to submit values at run time. In production environments when we have to enable a hive job we usually write our series of hive operations in HQL on a file and trigger it using the hive –f option from a shell script or some workflow management systems like oozie. Let’s have this discussion limited to triggering the hive job from shell as it is the basic one.
      Say I’m having a hive job called hive_job.hql, normally from a shell I’d trigger the hive job as
hive -f hive_job.hql

If I need to set some hive config parameters, say I need to enable compression in hive then I’d include the following arguments along as
hive -f hive_job.hql  -hiveconf hive.exec.compress.output=true -hiveconf mapred.output.compress=true -hiveconf mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec

Now the final one, say my hive QL is doing some operations on date range and this date is varying/dynamic. This date is to be accepted from CLI each time. We can achieve the same with the following steps
1.       Pass the variables as config parameters
2.       Refer these config parameters in your hive query

                Let me make it more specific with a small example. I need to perform some operation on records in a table called ‘test_table’ which has a column/field named ‘creation_date’ .(ie I need to filter records based on creation_date). The date range for the operation is supplied at run time. It is achieved as
1.       Pass the variables as config parameters
                Here we need to pass two parameters, the start date and end date to get all the records within a specific date range
hive -f hive_job.hql  -hiveconf start_date=2011-01-01 –hiveconf end _date=2011-12-31
2.       Refer these config parameters in your hive query
                In our hive QL the start date and end date are to be decorated with place holders to be replaced by actual values during execution time.
SELECT * FROM test_table t1 WHERE t1.created_date=’ ${hiveconf: start_date }’ AND t2.created_date=’ ${hiveconf: end _date}’


Let us look into one more example, decide on the number of reducers during run time with the previous set of requirements along with compression. I have 2 components here 

·         Shell script that triggers .hql
Hive_trigger.sh
# The script accepts 3 parameters in order- number of reducers, start date and end date.
NUM_REDUCERS=$1
BEGIN_DATE=$2
CLOSE_DATE=$3
hive -f hive_job.hql  -hiveconf mapred.reduce.tasks= NUM_REDUCERS  -hiveconf start_date= BEGIN_DATE –hiveconf end _date= CLOSE_DATE

·         HQL that accepts parameters at run time
hive_job.hql
SELECT * FROM test_table t1 WHERE t1.created_date=’ ${hiveconf: start_date }’ AND t2.created_date=’ ${hiveconf: end _date}’

The components are ready you can now trigger you shell script. For example as
./hive_trigger.sh 50 2011-01-01 2011-12-31

How to efficiently store data in hive/ Store and retrieve compressed data in hive


Hive is a data warehousing tool built on top of hadoop. The data corresponding to hive tables are stored as delimited files in hdfs. Since it is used for data warehousing, the data for production system hive tables would definitely be at least in terms of hundreds of gigs. Now naturally the question arises, how efficiently we can store this data, definitely it has to be compressed. Now a few more questions arise, how can store compressed data in hive? How can we process and retrieve compressed data in hive using hive QL.
                Now let’s look into these, it is fairly simple if you know hive. Before you use hive you need to enable a few parameters for dealing with compressed tables. It is the same compression enablers when you play around with map reduce along with a few of hive parameters.

·         hive.exec.compress.output=true
·         mapred.output.compress=true
·         mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec

Here I have used LZO as my compression in hdfs, hence using the LzopCodec. Beyond setting this you don’t need to do anything else, use the hive QLs normally as you do with uncompressed data. I have tried out the same successfully with Dynamic Partitions, Buckets etc, It works like any normal hive operations.
               The input data for me from conventional sources were normal text, this raw data was loaded into a staging table. From the staging table with some hive QL the cleansed data was loaded into actual hive tables . The staging table gets flushed every time the data is loaded into target hive table.