Friday, February 10, 2012

Enable Multiple threads in a mapper aka MultithreadedMapper

As the name suggests it is map task that spawns multiple threads. A map task can be considered as a process which runs on its own jvm boundary. Multithreaded spawns multiple threads within the same map task. Don’t confuse the same as multiple tasks within the same jvm (this is achieved with jvm reuse). When I say a task has multiple threads, a task would be reusing the input split as defined by the input format and record reader reads the input like a normal map task. The multi threading happens after this stage; once the record reading has happened then the input/task is divided into multiple threads.  (ie the input IO is not multi threaded and multiple threads come into picture after that)
MultiThreadedMapper is a good fit if your operation is highly CPU intensive and multiple threads getting multiple cycles could help in speeding up the task. If IO intensive, then running multiple tasks is much better than multi thread as in multiple tasks multiple IO reads would be happening in parallel.
Let us see how we can use MultiThreadedMapper. There are different ways to do the same in old mapreduce API and new API.
Enable Multi threaded map runner as
-D = org.apache.hadoop.mapred.lib.MultithreadedMapRunner

Your mapper class should sub class (extend) instead of org.apache.hadoop.mapreduce.Mapper . The Multithreadedmapper has a different implementation of run() method.

You can set the number of threads within a mapper in MultiThreadedMapper by
MultithreadedMapper.setNumberOfThreads(n); or = n

Note: Don’t think it in a way that multi threaded mapper is better than normal map reduce as it spawns less jvms and less number of processes. If a mapper is loaded with lots of threads the chances of that jvm crashing are more and the cost of re-execution of such a hadoop task would be terribly high.
                Don’t use Multi Threaded Mapper to control the number of jvms spanned, if that is your goal you need to tweak the mapred.job.reuse.jvm.num.tasks parameter whose default value is 1, means no jvm reuse across tasks.
                The threads are at the bottom level ie within a map task and the higher levels on hadoop framework like the job has no communication regarding the same.

Thursday, February 9, 2012

Query Hbase tables using hive/ Mount an Hbase table into hive

                You can use Hbase as the data store for your hive table. On hive table creation we need to specify a mapping for the same.  What all needs to be provided
1.       The hbase table name
2.       The mapping between hbase Column Family:Qualifier to hive Columns
If you are mapping a hbase column Family itself to a hive column then the data type of that hive column has to be Map. Also in the DDL the table has to me specified as External

Example of mapping a hbase table (employee_hbase) to a hive table employee_hive

CREATE EXTERNAL TABLE employee_hive(key INT, value1 STRING,value2  Map<STRING,STRING>)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val,cf2:")
TBLPROPERTIES("" = "employee_hbase");

Use Compression with Mapreduce

                               Hadoop is intended for storing large data volumes, so compression becomes a mandatory requirement here. There are different compression formats available like gzip,Bzip,LZO etc. Of these Bzip(the latest) and LZO are splittable and in that Bzip offers a better compression but the decompression of the same is expensive. When we look at both space and time LZO is more advisable. Also LZO supports indexing which would again help you while using hive on your data. While running mapreduce with compression we need to know at least the following
1.       How to run map reduce on compressed data
2.       How to produce compressed output from mapreduce

Running Mapreduce on compressed data
                It is very straight forward, no need to implement any custom input format for the same. You can use any input formats with compression. The only step is to add the compression codec to the value in io.compression.codecs

Suppose if you are using LZO then your value would look something like
io.compression.codecs  =,, com.hadoop.compression.lzo.LzopCodec

Then configure and run your map reduce jobs as you do normally on uncompressed files. When map wants to process a file and if it is compressed it would check for the io.compression.codecs   and use a suitable codec from there to read the file.

Produce compressed data from map reduce
                It is again straight forward and you can achieve the same by setting the following parameters. (Using LZO here)
mapred.output.compression.codec= com.hadoop.compression.lzo.LzopCodec

You get your output compressed in LZO. Again here also you can use the same with any normal output formats.

Index LZO files
                It is possible with just 2 lines of code as

//Run theLZO indexer on files in hdfs
LzoIndexer indexer = new LzoIndexer(fs.getConf());

Compress Intermediate output (map output)
                Compressing intermediate output is also a good idea in map reduce. The map outputs have to be copied across nodes to reducers and if compressed it saves network and transfer time. Just specify the following configuration parameters as hadoop.compression.lzo.LzoCodec