Tuesday, June 21, 2011

How to create patritions on the fly in hive tables based on data? OR How can you implement Dynamic Partitions in hive for Larger Tables?

Straight answer depends on your business case but still partitions would be a better approach. Let us look at a business scenario on this issue

Problem: I have a huge table in my EDW that holds 5 billion rows. Every record has a column pay_location which has 300 distinct values across the table. Need to do some processing on the same within Hadoop environment and at a time my processing involves data only from certain pay_locations.

Table Schema in DWH
Column Name
Data Type

Solution: We can accomplish the same in 2 easy steps
Step 1: SQOOP import the table into hive from DWH
Step 2: Analyze the data using Hive QL
                Fist we need to SQOOP import the data into hive table ‘invoice_details’ using the basic SQOOP import command as follows.
sqoop import --driver <driver name> --connect <connection string> --username <username> -P --table Invoice_Details --split-by Invoice_Id --num-mappers <num of mappers> --warehouse-dir <hdfs dir> --hive-import --hive-table invoice_details_hive

We’d look into the second part here in more detail. How to effectively and efficiently analyze the same in hive. In our requirement it is clear that we can go ahead with a partitioned table approach for our data set, as the data analysis is made pay_location by pay_location we can do the partition based on pay_location itself.
Now how can we do the partition? Simple, need to create a partitioned table and load data into each partition. So first we can create an equivalent partitioned table in hive

CREATE TABLE invoice_details_hive _partitioned(Invoice_Id double, Invoice_Date string, Invoice_Amount double,Paid_Date string)PARTITIONED BY(pay_location string);

Once table creation is completed we need to load data into the partitions on invoice_details_hive _partitioned from invoice_details_hive . How can we do it? Can we go ahead for individual insert statements for each pay_location like?

INSERT OVERWRITE TABLE invoice_details_hive _partitioned PARTITION(pay_location=’USA’)
SELECT idh.Invoice_Id, idh.Invoice_Date, idh.Invoice_Amount, idh.Paid_Date FROM invoice_details_hive idh WHERE pay_location=’USA’;

If we follow this approach we may have to go in for 300 insert statements as there are 300 distinct values for pay_location in invoice_details_hive table. This type of implementation can be called as STATIC PARTIONS. But in our scenario static partitions won’t serve the purpose or rather it is too tedious. We’d have to implement the concept of DYNAMIC PARTITIONS introduced from hive 0.6 onwards. With Dynamic partitions we just need a single Insert Overwrite statement to create and load data into all the partitions.

INSERT OVERWRITE TABLE invoice_details_hive _partitioned PARTITION(pay_location)
SELECT idh.Invoice_Id, idh.Invoice_Date, idh.Invoice_Amount, idh.Paid_Date, idh.pay_location FROM invoice_details_hive idh;

This Single Query would implement dynamic partition for you, when you use dynamic partitions the last column from the select query on the source table should be column used for partitioning in the destination table (idh.pay_location)

When you try executing the query you can see hive throwing some fatal errors, like dynamic partition mode is strict and dynamic partition not enabled. So we need to set the following parameters in hive shell
1.       set hive.exec.dynamic.partition=true;
To enable dynamic partitions, by default it is false
2.       set hive.exec.dynamic.partition.mode=nonstrict;
We are using the dynamic partition without a static partition (A table can be partitioned based on multiple columns in hive) in such case we have to enable the non strict mode. In strict mode we can use dynamic partition only with a Static Partition.
3.       set hive.exec.max.dynamic.partitions.pernode=300;
The default value is 100, we have to modify the same according to the possible no of partitions that would come in your case
4.       set hive.exec.max.created.files=150000;
The default values is 100000 but for larger tables it can exceed the default, so we may have to update the same

In practical scenarios I did find the Dynamic Partition not working with the above query on really large tables and shooting a java print error after completion of first map process. This could be due to the larger number of files created on the first map process. However a slight modification of the job can help you overcome the same, group the records in your hive query on the map process and process them on the reduce side. ie use a map reduce process to achieve your goal rather than two map process. You can implement the same in your hive query itself with the usage of DISTRIBUTE BY, so the modified query would be

FROM invoice_details_hive idh
INSERT OVERWRITE TABLE invoice_details_hive _partitioned PARTITION(pay_location)
SELECT idh.Invoice_Id, idh.Invoice_Date, idh.Invoice_Amount, idh.Paid_Date, idh.pay_location
DISTRIBUTE BY pay_location;

With this approach you don’t need to overwrite the hive.exec.max.created.files parameter.

Sunday, June 12, 2011

How does your search provider get you some common search terms as you type? OR How do the auto suggest feature work well with search providers?

It isn’t a core hadoop technological area, but still being map reduce programmers dealing on large data volumes we should be aware on search engines as these scenarios  involve very large volume of data, to be more specific the whole of data in web. It’d seem kind of easy but if you think more deeply over, it is not as simple as you think. If you have already thought about this, there would be at least a few questions you missed asking yourself like, what is the data structure that auto suggest uses to retrieve data from? Does it involve client side processing? Let us answer the whole scenario with a series of related questions.  A basic knowledge on search engines is always great before we go ahead, so let us start with basic working of web searches.

How does a basic search work?
                When we do a search we are not actually searching the web but instead we are searching the index table created by the search provider. So now we should know how the data is sourced into these index tables. It is an outcome of the pre process that the search provider does on web pages to deliver you most relevant details on your searches. There is web crawler or rather the spider that runs across the web getting pages linked to a particular page and there by following links on the new page and so on, with this process the search provider gets all the related web pages then indexes the made on the pages and stores them on data base or rather a huge distributed data base. It is from this index data that you get your search results on the fly.
This video could throw more light on the search process within google http://www.google.com/howgoogleworks/

Let us go the area that has to be addressed.
 How does this Auto suggest feature work?
                Before answering this question, you have noticed this feature in google search. What is it called in google?  Google Instant. Now lets us just drill down on the working possibilities such an auto suggest. (What mentioned here are the possibilities and is not related to the working of any search provider). It is hard to fetch as you type from a database that holds the whole web data, but it is proved possible. First, apart from the indexed data the searches from every region/county are also stored, and you get instant suggestions based on this as well as if you are logged in, then based on your previous searches as well. The instant suggestions are more related to these data than the data indexed from spider.

What would be the data structure used to store this data?
                If I’m storing the search key words one by one as a whole, then fetching them in micro second is a challenge because we may have to go in for sub string operations pattern matching operations on strings etc. With all these operations under the hood the results can't be instant.So here we need a faster data structure which would aid retrievals and inserts in range of micro or nano seconds. What would you go for the same in java? We may think of some data structure that could store elements as individual characters(not as a whole string) like an array, a list or a tree. Isn’t a linked data structure like a tree beneficial to do the job? Let us look into a sample tree now

From this tree correlate the fact that you are searching for the word benz, if there is no lower limit to the number of characters from where the auto suggest has to work, then when you type ‘b’ you would have the words like


Then when you go for ‘be’, you would get


What do you infer from this? When you type the letters you retrieve the  the sub elements of the same are retrieved from the tree till its leaf. Remember, it is not just leaf nodes that are being displayed intermediate nodes are also chosen as well. Not all of the sub nodes are chosen, there are pretty much prerequisites, filtering and ranking used while such a tree is constructed and results/nodes are fetched.

What is the most suitable tree implementation here?
                It is a Binary search tree? Can be, but there could be better implementations of tree that is faster than a BSE in terms of insertions and retrievals. Such an implementation is a Trie or a pefix tree.

What is a trie or a prefix tree?
                Now that is a whole topic as such. Wiki could better explain you on this. Refer

Where does the request processing happens in auto suggest, on client or server ?
                Can the processing be on client side? To process the request we need data. It would be pointless to fetch a relatively large volume of data to client and do the processing. So in auto suggest there is an ajax GET request passed to sever to fetch the most relevant data on every key stroke. We can call this as ‘search before you type’ ie when you are googling for ‘apple‘ and as soon as you type ’ap’ itself, it fetches you apple, apple ipod etc .So in short request processing takes place on the server by means of ajax calls.

NOTE: This post defines just one suitable implementation of auto suggest feature in search engines. There would be more sophisticated implications and implementations of the same which would vary from provider to provider.

Thursday, June 9, 2011

Extract date in required formats from hive tables

Problem: My hive table has date in the format ‘yyyy-MM-dd HH:mm:ss’ which was obtained from  a database table through SQOOP. I need it in my query of the format ‘yyyy-MM-dd ’.

Solution: Use a regex expression to extract the required value.
When we sqoop in the date value to hive from rdbms, the data type hive uses to store that date is String. You can use a substring functions to achieve the same, the most easiest way would be to use the regexp_extract() function in hive.

To get the date alone in ‘yyyy-MM-dd ’ format we can go in for
You can use it as Select *, regexp_extract(column_datetime,'(.*\-.*\-.*)\\s(.*)',1) from sample_table;

**column_datetime is a column of type string that stores date in the format ‘yyyy-MM-dd HH:mm:ss’ which is part of hive table sample_table

Some possible cases
Case: 01
If you want the timestamp alone regexp_extract(column_datetime,'(.*)\\s(.*)',2)

Case: 02
If you want the year and month alone of the format ‘yyyy-MM’ then use regexp_extract(column_datetime,'(.*\-.*)\-.*',1)

Case: 03
All these were mere String operations done using regular expressions now if I want my date in the format ‘dd-MM- yyyy’ it’d be better to go in for a combination date functions and string concatenation functions. We can achieve our goal using this combination of functions

concat(year(column_datetime),'-',month(column_datetime),'-',day(column_datetime) )

So we can use the same in our query  like
Select *, concat(day(column_datetime),'-',month(column_datetime),'-', year(column_datetime))
 from sample_table;

But there would be issues in using this that if dates and month values are less than 10 it would be represented with a single character, we can overcome the same with the use of case functionality as

concat(CASE WHEN day(install_datetime) < 10 THEN concat('0',day(install_datetime)) ELSE trim(day(install_datetime)) END,'-',CASE WHEN month(install_datetime) < 10 THEN concat('0',month(install_datetime)) ELSE trim(month(install_datetime)) END,'-',year(install_datetime))

A sample usage in hive QL would be as
select *,concat(CASE WHEN day(install_datetime) < 10 THEN concat('0',day(install_datetime)) ELSE trim(day(install_datetime)) END,'-',CASE WHEN month(install_datetime) < 10 THEN concat('0',month(install_datetime)) ELSE trim(month(install_datetime)) END,'-',year(install_datetime)) from sample_table;

Thursday, June 2, 2011

Analyzing Apache logs with Pig

Analyzing log files, churning them and extracting meaningful information is a potential use case in Hadoop. We don’t have to go in for MapReduce programming for these analyses; instead we can go for tools like Pig and Hive for this log analysis. I’d just give you a start off on the analysis part. Let us consider Pig for apache log analysis. Pig has some built in libraries that would help us load the apache log files into pig and also some cleanup operation on string values from crude log files. All the functionalities are available in the piggybank.jar mostly available under pig/contrib/piggybank/java/ directory. As the first step we need to register this jar file with our pig session then only we can use the functionalities in our Pig Latin
1.       Register PiggyBank jar
REGISTER /usr/lib/pig/contrib/piggybank/java/piggybank.jar;
Once we have registered the jar file we need to define a few functionalities to be used in our Pig Latin. For any basic apache log analysis we need a loader to load the log files in a column oriented format in pig, we can create a apache log loader as
2.       Define a log loader
DEFINE ApacheCommonLogLoader org.apache.pig.piggybank.storage.apachelog.CommonLogLoader();
(Piggy Bank has other log loaders as well)
In apache log files the default format of date is ‘dd/MMM/yyyy:HH:mm:ss Z’ . But such a date won’t help us much in case of log analysis we may have to extract date without time stamp. For that we use DateExtractor()
3.       Define Date Extractor
DEFINE DayExtractor org.apache.pig.piggybank.evaluation.util.apachelogparser.DateExtractor('yyyy-MM-dd');
Once we have the required functionalities with us we need to first load the log file into pig
4.       Load apachelog file into pig
--load the log files from hdfs into pig using CommonLogLoader
logs = LOAD '/userdata/bejoys/pig/p01/access.log.2011-01-01' USING ApacheCommonLogLoader AS (ip_address, rfc, userId, dt, request, serverstatus, returnobject, referersite, clientbrowser);

Now we are ready to dive in for the actual log analysis. There would be multiple information you need to extract out of a log; we’d see a few of those common requirements out here

Note: you need to first register the jar, define the classes to be used and load the log files into pig before trying out any of the pig latin below

Requirement 1: Find unique hits per day
PIG Latin
--Extracting the day alone and grouping records based on days
grpd = GROUP logs BY DayExtractor(dt) as day;
--looping through each group to get the unique no of userIds
cntd = FOREACH grpd
                tempId =  logs.userId;
                uniqueUserId = DISTINCT tempId;
                GENERATE group AS day,COUNT(uniqueUserId) AS cnt;
--sorting the processed records based on no of unique user ids in descending order
srtd = ORDER cntd BY cnt desc;
--storing the final result into a hdfs directory
STORE srtd INTO '/userdata/bejoys/pig/ApacheLogResult1';

Requirement 1: Find unique hits to websites (IPs) per day
PIG Latin

--Extracting the day alone and grouping records based on days and ip address
grpd = GROUP logs BY (DayExtractor(dt) as day,ip_address);
--looping through each group to get the unique no of userIds
cntd = FOREACH grpd
                tempId =  logs.userId;
                uniqueUserId = DISTINCT tempId;
                GENERATE group AS day,COUNT(uniqueUserId) AS cnt;
--sorting the processed records based on no of unique user ids in descending order
srtd = ORDER cntd BY cnt desc;
--storing the final result into a hdfs directory
STORE srtd INTO '/userdata/bejoys/pig/ ApacheLogResult2 ';

Note: When you use pig latin in grunt shell we need to know a few factors
1.       When we issue a pig statement in grunt and press enter only the semantic check is being done, no execution is triggered.
2.       All the pig statements are executed only after the STORE command is submitted, ie map reduce programs would be triggered only after STORE is submitted
3.       Also in this case you don’t have to load the log files again and again to pig once it is loaded we can use the same for all related operations in that session. Once you are out of the grunt shell the loaded files are lost, you’d have to perform the register and log file loading steps all over again.

Wednesday, June 1, 2011

Implementing basic SQL Update statement in Hive

Hive is not meant for point to point queries and hence sql update functionality would be least required in hive that should be the reason hive doesn’t have update functionality for rows or rather individual columns in a row. There would be cases you find a much more suitable use case in hive, but the same can’t be implemented as it includes an update statement on a few rows specified by a condition. We can implement the basic sql row update in hive by following a few series of steps as follows

1.       Load all the rows that has to be updated into a hdfs dir
a.       Here in the select statement that fetches the rows that satisfy the condition, we specify the column names one by one rather than a Select *
b.      In place of the colum whose value is to be updated we include the new value hard coded
2.       Remove these rows loaded into hdfs dir in step 1 from table
a.       Do an insert overwrite on the same table to include only those record that do not satisfies the condition
3.       Load the updated rows from hdfs dir in Step into into table

Let’s look at the same through an example
                We have a hive table employee with the following columns employeeId, employeeName, experienceMonths, salary and visaEligibility. We need to do an operation equivalent to the SQL query below in hive.

SQL Query
update table employee set visaEligibility = 'YES' where experienceMonths >36 ;

Equivalent Hive QL
#load the rows that fall in the condition into an hdfs dir with the corresponding column value modified
INSERT OVERWRITE DIRECTORY '/userdata/bejoy/employeetemp' SELECT employeeId,employeeName, experienceMonths ,salary,'YES' FROM employee WHERE  experienceMonths > =36;

#removes the records from table that has been loaded into hdfs dir
#use the negate condition to the one provided in previous step
INSERT OVERWRITE TABLE employee SELECT * FROM employee WHERE experienceMonths < 36;

#load the rows from hdfs dir on to table on top of its current contents
LOAD DATA INPATH '/userdata/bejoy/employeetemp' INTO TABLE employee;

A better Approach
                When I understood some more functionalities of hive, it was evident to me that there is a better way to solve this out using CASE statement. The same result could be obtained in a single hive query as stated below

INSERT OVERWRITE TABLE employee SELECT employeeId,employeeName, experienceMonths ,salary, CASE WHEN experienceMonths >=36 THEN ‘YES’ ELSE visaEligibility END AS visaEligibility FROM employee;

The approaches and solutions described in this post are for non partitioned tables. Ideally in real rime scenarios the data volume would be too large to handle in a single partition so we have to go in for multiple partitions. But the same approaches could be used out there as well.