Tuesday, August 21, 2012

Performance tuning of hive queries

Hive performance optimization is a larger topic on its own and is very specific to the queries you are using. Infact each query in a query file needs separate performance tuning to get the most robust results.

I'll try to list a few approaches in general used for performance optimization

Limit the data flow down the queries
When you are on a hive query the volume of data that flows each level down is the factor that decides performance. So if you are executing a script that contains a sequence of hive QL, make sure that the data filtration happens on the first few stages rather than bringing unwanted data to bottom. This will give you significant performance numbers as the queries down the lane will have very less data to crunch on.

This is a common bottle neck when some existing SQL jobs are ported to hive, we just try to execute the same sequence of SQL steps in hive as well which becomes a bottle neck on the performance. Understand the requirement or the existing SQL script and design your hive job considering data flow

Use hive merge files
Hive queries are parsed into map only and map reduce job. In a hive script there will lots of hive queries. Assume one of your queries is parsed to a mapreduce job and the output files from the job are very small, say 10 mb. In such a case the subsequent query that consumes this data may generate more number of map tasks and would be inefficient. If you have more jobs on the same data set then all the jobs will get inefficient. In such scenarios if you enable merge files in hive, the first query would run a merge job at the end there by merging small files into  larger ones. This is controlled
using the following parameters

hive.merge.mapfiles=true (true by default in hive)

For more control over merge files you can tweak these properties as well
hive.merge.size.per.task (the max final size of a file after the merge task)
hive.merge.smallfiles.avgsize (the merge job is triggered only if the average output filesizes is less than the specified value)

The default values for the above properties are

When you enable merge an extra map only job is triggered, whether this job gets you an optimization or an over head is totally dependent on your use case or the queries.

Join Optimizations
Joins are very expensive.Avoid it if possible. If it is required try to use join optimizations as map joins, bucketed map joins etc

There is still more left on hive query performance optimization, take this post as the baby step. More tobe added on to this post and will be addded soon . :)

How to migrate a hive table from one hive instance to another or between hive databases

Hive has the EXPORT IMPORT feature since hive 0.8. With this feature you can export the metadata as well as the data for a corresponding table to a file in hdfs using the EXPORT command. The data is stored in json format. Data once exported this way could be imported back to another database or hive instance using the IMPORT command.

The syntax looks something like this:
EXPORT TABLE table_or_partition TO hdfs_path;
IMPORT [[EXTERNAL] TABLE table_or_partition] FROM hdfs_path [LOCATION [table_location]];

Some sample statements would look like:
EXPORT TABLE <table name> TO 'location in hdfs';

Use test_db;
IMPORT FROM 'location in hdfs';

Export Import can be appled on a partition basis as well:
EXPORT TABLE <table name> PARTITION (loc="USA") to 'location in hdfs';

The below import commands imports to an external table instead of a managed one
IMPORT EXTERNAL TABLE FROM 'location in hdfs' LOCATION ‘/location/of/external/table’;

Tuesday, May 22, 2012

Hive Hbase integration/ Hive HbaseHandler : Common issues and resolution

It is common that when we try out hive hbase integration it leads to lots of unexpected errors even though hive and hbase are running individually without any issues. Some common issues are

1) jars not available
The following jars should be available on hive auxpath
  1. usr/lib/hive/lib/hive-hbase-handler-0.7.1-cdh3u2.jar
  2. /usr/lib/hive/lib/hbase-0.90.4-cdh3u2.jar
  3. /usr/lib/hive/lib/zookeeper-3.3.1.jar  
These jars vary with the hbase , hive and zookeeper version running on your cluster

2)zookeeper quorum not available for hive client
Should specify the zoo keeper server host names so that the leader hbase master server can be chosen for the hbase connection


Where zk1,zk2,zk3 should be the actual hostnames of the ZooKeeper servers

These values can be set either on your hive session or permanently on your hive configuration files

1) Setting in hive client session
$ hive -auxpath /usr/lib/hive/lib/hive-hbase-handler-0.7.1-cdh3u2.jar:/usr/lib/hive/lib/hbase-0.90.4-cdh3u2.jar:/usr/lib/hive/lib/zookeeper-3.3.1.jar -hiveconf hbase.zookeeper.quorum=zk1,zk2,zk3

2) Setting in hive-site.xml


Still hive hbase integration not working 
Error thrown : Master not running

Check whether the HbaseMaster is really down. If it is fine there could be other possibilities , a few common ones being
  • Firstly you need to check whether hbase.zookeeper.quorum is correctly set, it shouldn’t be localhost.
  • If multiple hbase clusters share the same zookeeper quorum then the znode parent value will be different for each. If that is the case, then set ‘zookeeper.znode.parent’ also has to be set in hive configuration, to the correct value from hbase-site.xml.

Is compression codecs required on client nodes?

Some times even after having compression codecs available across all nodes in cluster we see some of our jobs giving class not found for compression codecs.

Even though compression /decompression processes are done by task trackers. In certain cases the compression codecs are required on the client nodes. 

Some scenarios are
 Total Order Partitioner
          Before triggering the mapreduce job, the job need to have an understanding on the ranges of key. Only then it can decide on which range of keys should go into which reducer. We need this value before map tasks starts, for that initially the client makes a random across input data sample (seek could be like read first 20 mb skip next 200 mb read next 20 mb etc). 

 Hive and Pig
For better optimization of jobs, uniform distribution of data across reducers and determining number or reducers etc hive and pig actually does a quick seek on input data samples.

In both these cases, since a sample of Input data is actually read on client side before the MR tasks, if data is compressed the compression codec needs to be available on the client node as well.

-libjars not working in custom mapreduce code, How to debug

Mostly application developers bump into this issue. They ship their custom jars to map reduce job but when the classes in those are referred by code it throws a Class not found exception.

For -libjars to work your main class should satisfy the following two conditions.
1) Main Class should implement the Tool interface

 //wrong usage - Tool Interface not implemented
public class WordCount extends Configured {

//right usage
public class WordCount extends Configured implements Tool {
2) Main Class should get the existing configuration using getConf() method rather than creating anew configuration instance.

//wrong usage - creating anew instance of Conf 
public int run(String[] args) throws Exception {
   Configuration conf = new Configuration();
//right usage 
 public int run(String[] args) throws Exception {
    Configuration conf = getConf();

How to recover deleted files from hdfs/ Enable trash in hdfs

If you enable thrash in hdfs, when an rmr is issued the file will be still available in trash for some period. There by you can recover accidentally deleted ones. To enable hdfs thrash
set fs.trash.interval > 1

This specifies the time interval a file deleted would be available in trash. There is a property (fs.trash.checkpoint.interval) that specifies the checkpoint interval NN checks the trash dir at every intervals and deletes all files older than specified fs.trash.interval . ie say you have your
fs.trash.interval as 60 mins and fs.trash.checkpoint.interval as 30 mins, then in every 30 mins a check is performed and deletes all files that are more than 60 mins old.

fs.trash.checkpoint.interval should be equal to or less than fs.trash.interval

The value of fs.trash.interval  is specified in minutes.

fs.trash.interval should be enabled in client node as well as Name Node. Name Node it should be present for check pointing purposes. Based the value in client node it is decided whether to remove a file completely from hdfs or thrash it on an rmr issued from client.

The trash dir by default is /user/X/.Trash

Friday, February 10, 2012

Enable Multiple threads in a mapper aka MultithreadedMapper

As the name suggests it is map task that spawns multiple threads. A map task can be considered as a process which runs on its own jvm boundary. Multithreaded spawns multiple threads within the same map task. Don’t confuse the same as multiple tasks within the same jvm (this is achieved with jvm reuse). When I say a task has multiple threads, a task would be reusing the input split as defined by the input format and record reader reads the input like a normal map task. The multi threading happens after this stage; once the record reading has happened then the input/task is divided into multiple threads.  (ie the input IO is not multi threaded and multiple threads come into picture after that)
MultiThreadedMapper is a good fit if your operation is highly CPU intensive and multiple threads getting multiple cycles could help in speeding up the task. If IO intensive, then running multiple tasks is much better than multi thread as in multiple tasks multiple IO reads would be happening in parallel.
Let us see how we can use MultiThreadedMapper. There are different ways to do the same in old mapreduce API and new API.
Enable Multi threaded map runner as
-D mapred.map.runner.class = org.apache.hadoop.mapred.lib.MultithreadedMapRunner

Your mapper class should sub class (extend) org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper instead of org.apache.hadoop.mapreduce.Mapper . The Multithreadedmapper has a different implementation of run() method.

You can set the number of threads within a mapper in MultiThreadedMapper by
MultithreadedMapper.setNumberOfThreads(n); or
mapred.map.multithreadedrunner.threads = n

Note: Don’t think it in a way that multi threaded mapper is better than normal map reduce as it spawns less jvms and less number of processes. If a mapper is loaded with lots of threads the chances of that jvm crashing are more and the cost of re-execution of such a hadoop task would be terribly high.
                Don’t use Multi Threaded Mapper to control the number of jvms spanned, if that is your goal you need to tweak the mapred.job.reuse.jvm.num.tasks parameter whose default value is 1, means no jvm reuse across tasks.
                The threads are at the bottom level ie within a map task and the higher levels on hadoop framework like the job has no communication regarding the same.

Thursday, February 9, 2012

Query Hbase tables using hive/ Mount an Hbase table into hive

                You can use Hbase as the data store for your hive table. On hive table creation we need to specify a mapping for the same.  What all needs to be provided
1.       The hbase table name
2.       The mapping between hbase Column Family:Qualifier to hive Columns
If you are mapping a hbase column Family itself to a hive column then the data type of that hive column has to be Map. Also in the DDL the table has to me specified as External

Example of mapping a hbase table (employee_hbase) to a hive table employee_hive

CREATE EXTERNAL TABLE employee_hive(key INT, value1 STRING,value2  Map<STRING,STRING>)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val,cf2:")
TBLPROPERTIES("hbase.table.name" = "employee_hbase");

Use Compression with Mapreduce

                               Hadoop is intended for storing large data volumes, so compression becomes a mandatory requirement here. There are different compression formats available like gzip,Bzip,LZO etc. Of these Bzip(the latest) and LZO are splittable and in that Bzip offers a better compression but the decompression of the same is expensive. When we look at both space and time LZO is more advisable. Also LZO supports indexing which would again help you while using hive on your data. While running mapreduce with compression we need to know at least the following
1.       How to run map reduce on compressed data
2.       How to produce compressed output from mapreduce

Running Mapreduce on compressed data
                It is very straight forward, no need to implement any custom input format for the same. You can use any input formats with compression. The only step is to add the compression codec to the value in io.compression.codecs

Suppose if you are using LZO then your value would look something like
io.compression.codecs  =  org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, com.hadoop.compression.lzo.LzopCodec

Then configure and run your map reduce jobs as you do normally on uncompressed files. When map wants to process a file and if it is compressed it would check for the io.compression.codecs   and use a suitable codec from there to read the file.

Produce compressed data from map reduce
                It is again straight forward and you can achieve the same by setting the following parameters. (Using LZO here)
mapred.output.compression.codec= com.hadoop.compression.lzo.LzopCodec

You get your output compressed in LZO. Again here also you can use the same with any normal output formats.

Index LZO files
                It is possible with just 2 lines of code as

//Run theLZO indexer on files in hdfs
LzoIndexer indexer = new LzoIndexer(fs.getConf());

Compress Intermediate output (map output)
                Compressing intermediate output is also a good idea in map reduce. The map outputs have to be copied across nodes to reducers and if compressed it saves network and transfer time. Just specify the following configuration parameters as
mapred.map.output.compression.codec= hadoop.compression.lzo.LzoCodec