Being a map reduce developer I’d never recommend to write joins of data sets using custom map reduce code. You have very intelligent and powerful tools handy in hadoop like hive and pig that can easily join huge data sets with the choice of join like inner, outer etc. But if such a scenario arises where you need to do join using map reduce you should be able to accomplish that with your knowledge on basic map reduce programming.
Let us look into a mocked up example for the same.
Problem Statement: A retailer has a customer data base and he need to do some promotions based on the same. He chooses bulk sms as the choice of his promotion which is done by a third part for him. And once the sms is pushed the sms provider returns the delivery status back to the retailer. Now let us look into more details which makes things a little complicated
We have 3 input files as follows
1. UserDetails.txt
i. Every record is of the format ‘mobile number , consumer name’
2. DeliveryDetails.txt
i. Every record is of the format ‘mobile number, delivery status code’
3. DeliveryStatusCodes.txt
i. Every record is of the format ‘delivery status code, status message’
The retailer has a consumer data base(UserDetails.txt) from which only the mobile number are provided to a bulk sms provider. He can’t reveal the customer name due to security reasons. Once the messages are pushed the sms provider sends back a report of the mobile numbers with status code (DeliveryDetails.txt) and also a look up file that relates every status code to the corresponding Status message (DeliveryStatusCodes.txt).
The requirement is that for meaningful information we need the consumer name along with its corresponding status message. And we need to obtain the same from these 3 files.
Sample Inputs
File 1 – UserDetails.txt
123 456, Jim
456 123, Tom
789 123, Harry
789 456, Richa
File 2 – DeliveryDetails.txt
123 456, 001
456 123, 002
789 123, 003
789 456, 004
File 3 – DeliveryStatusCodes.txt
001, Delivered
002, Pending
003, Failed
004, Resend
Expected Output
Jim, Delivered
Tom, Pending
Harry, Failed
Richa, Resend
Solution : Using core MapReduce
1. Use two different mapper classes for both processing the initial inputs from UserDetails.txt and DeliveryDetails.txt, The Key value output from the mappers should be as follows
a) UserDetails.txt
i. Key(Text) – mobile number
ii. Value(Text) – An identifier to indicate the source of input(using ‘CD’ for the customer details file) + Customer Name
b) DeliveryDetails.txt
i. Key(Text) – mobile number
ii. Value(Text) – An identifier to indicate the source of input(using ‘DR’ for the delivery report file) + Status Code
So here since the two files needs to be parsed separately using two mappers. I’m using
UserFile Mapper.java to process UserDetails.txt and
DeliveryFileMapper.java to process DeliveryDetails.txt
In map reduce API, I’m using MulipleInputFormat to specify which input to go into which mapper. But the ouput key value pairs from the mapper go into the same reducer, for the Reducer to identify the source of the value we are prepending the values ‘CD’ or ‘DR’.
2. On the reducer end use distributed cache to distribute the DeliveryStatusCodes.txt. Parse the file and load the contents into HashMap with Key being the status code and value being the status message
3. On the reducer every key would be having two values one with prefix ‘CD’ and other ‘DR’. (For simplicity let us assume only 2 values, in real time it can be more). Identify the records and from CD get the customer name corresponding to the cell number (input key) and from DR get the status code. On obtaining the status code do a look up on the HashMap to get the status message. So finally the output Key values from the reducer would be as follows
a) Key : Customer Name
b) Value : Status Message
Let’s just look at the source code
Mapper Class1: UserFileMapper.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class UserFileMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
//variables to process Consumer Details
private String cellNumber,customerName,fileTag="CD~";
/* map method that process ConsumerDetails.txt and frames the initial key value pairs
Key(Text) – mobile number
Value(Text) – An identifier to indicate the source of input(using ‘CD’ for the customer details file) + Customer Name
*/
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
//taking one line/record at a time and parsing them into key value pairs
String line = value.toString();
String splitarray[] = line.split(",");
cellNumber = splitarray[0].trim();
customerName = splitarray[1].trim();
//sending the key value pair out of mapper
output.collect(new Text(cellNumber), new Text(fileTag+customerName));
}
}
Mapper Class2:DeliverFileMapper.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class DeliveryFileMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
//variables to process delivery report
private String cellNumber,deliveryCode,fileTag="DR~";
/* map method that process DeliveryReport.txt and frames the initial key value pairs
Key(Text) – mobile number
Value(Text) – An identifier to indicate the source of input(using ‘DR’ for the delivery report file) + Status Code*/
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
//taking one line/record at a time and parsing them into key value pairs
String line = value.toString();
String splitarray[] = line.split(",");
cellNumber = splitarray[0].trim();
deliveryCode = splitarray[1].trim();
//sending the key value pair out of mapper
output.collect(new Text(cellNumber), new Text(fileTag+deliveryCode));
}
}
Reducer Class:SmsReducer.java
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class SmsReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
//Variables to aid the join process
private String customerName,deliveryReport;
/*Map to store Delivery Codes and Messages
Key being the status code and vale being the status message*/
private static Map<String,String> DeliveryCodesMap= new HashMap<String,String>();
public void configure(JobConf job)
{
//To load the Delivery Codes and Messages into a hash map
loadDeliveryStatusCodes();
}
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
while (values.hasNext())
{
String currValue = values.next().toString();
String valueSplitted[] = currValue.split("~");
/*identifying the record source that corresponds to a cell number
and parses the values accordingly*/
if(valueSplitted[0].equals("CD"))
{
customerName=valueSplitted[1].trim();
}
else if(valueSplitted[0].equals("DR"))
{
//getting the delivery code and using the same to obtain the Message
deliveryReport = DeliveryCodesMap.get(valueSplitted[1].trim());
}
}
//pump final output to file
if(customerName!=null && deliveryReport!=null)
{
output.collect(new Text(customerName), new Text(deliveryReport));
}
else if(customerName==null)
output.collect(new Text("customerName"), new Text(deliveryReport));
else if(deliveryReport==null)
output.collect(new Text(customerName), new Text("deliveryReport"));
}
//To load the Delivery Codes and Messages into a hash map
private void loadDeliveryStatusCodes()
{
String strRead;
try {
//read file from Distributed Cache
BufferedReader reader = new BufferedReader(new FileReader("DeliveryStatusCodes.txt"));
while ((strRead=reader.readLine() ) != null)
{
String splitarray[] = strRead.split(",");
//parse record and load into HahMap
DeliveryCodesMap.put(splitarray[0].trim(), splitarray[1].trim());
}
}
catch (FileNotFoundException e) {
e.printStackTrace();
}catch( IOException e ) {
e.printStackTrace();
}
}
}
Driver Class: SmsDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.MultipleInputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SmsDriver extends Configured implements Tool
{
public int run(String[] args) throws Exception {
//get the configuration parameters and assigns a job name
conf.setJobName("SMS Reports");
//setting key value types for mapper and reducer outputs
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
//specifying the custom reducer class
conf.setReducerClass(SmsReducer.class);
//Specifying the input directories(@ runtime) and Mappers independently for inputs from multiple sources
MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, UserFileMapper.class);
MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, DeliveryFileMapper.class);
//Specifying the output directory @ runtime
FileOutputFormat.setOutputPath(conf, new Path(args[2]));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SmsDriver(),
args);
System.exit(res);
}
}
Let us go in for a small code walk through, I’m not going in for each and every line of code as it is commented and is bit self-explanatory. The few points to keep in mind
1. The only difference in the code is that we are using MultipleInputFormat instead of FileInputFormat. This is necessary as we use two mappers and we need the output of the two mappers to be processed by a single reducer
2. When we normally execute our map reduce with the hadoop jar command the last two arguments on the command line represent the input and output dir in hdfs. But here instead of two we’d have three input locations and one output location.
3. The second key thing to be noted here is that in place of input locations don’t provide the full path with file names. Provide the input directories instead. Load the two files in two separate directories and provide the corresponding paths to mappers.
4. Since my driver is getting the arguments from command line, the order of arguments is also very critical. Make sure that the input directories always point to their corresponding mappers itself.
You can run the above example with the following command on CLI as
hadoop jar /home/bejoys/samples/ smsMarketing.jar com.bejoy.samples.smsmarketing.SmsDriver -files /home/bejoys/samples/ DeliveryStatusCodes.txt /userdata/bejoys/samples/sms/consumerdata /userdata/bejoys/samples/sms/deliveryinformation /userdata/bejoys/samples/sms/output
Note:
i. Since the join the happening on reduce, it is termed as a reduce side join.
ii. This is a very basic approach to implement joins in map reduce and is for those who have a basic knowledge on map reduce programming. You can implement it in more sophisticated manner in mapreduce frame work using DataJoin Mappers and Reducers with TaggedMap Output Types.
But if it is a join then I’d strongly recommend you to go in with Pig or Hive as both of these are highly optimized for implementing joins. Also you can eliminate the coding effort you need to put in. It is not exaggerating if I say I can implement the same in a single step using hive. Let us just check it out
Using Hive
1. Load the data into 3 hive tables
2. Perform join using hive QL
Creating Hive tables to store the files
CREATE TABLE customer_details (cellNumber String,consumerName String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
LOAD DATA LOCAL INPATH '/home/bejoys/samples/ConsumerDetails.txt' INTO TABLE customer_details;
CREATE TABLE delivery_report (cellNumber String,statusCode int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
LOAD DATA LOCAL INPATH '/home/bejoys/samples/DeliveryReport.txt' INTO TABLE delivery_report;
CREATE TABLE status_codes (statusCode int,statusMessage String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';
LOAD DATA LOCAL INPATH '/home/bejoys/samples/DeliveryStatusCodes.txt' INTO TABLE status_codes;
Hive Query to execute Join operation on data sets
Select cd.consumerName,sc.statusMessage FROM customer_details cd
JOIN delivery_report dr ON (cd.cellNumber = dr.cellNumber) JOIN
status_codes sc ON(dr.statusCode = sc.statusCode);
You can optimize the hive Query again for performance boosting. Refer Optimizing Joins in hive
Thanks a lot for this nice post Bejoy..Very helpful.
ReplyDeletegood job
ReplyDeleteI am not able to open the last link ( Optimizing Joins in hive ). It says Not Found.
ReplyDeleteHey Raihan
ReplyDeleteThanks for pointing out the broken link. The issue is fixed. Please share your feedbacks in future as well.
Bejoy,
ReplyDeleteCan you please explain how does the data flow happen from mapper till the output file in terms of datanode. (i.e. how exactly the mapper reads the file and where the intermediate results are stored.
This comment has been removed by the author.
ReplyDeleteHi Krishnan
ReplyDeleteOn a high level, in case of plain mapreduce, the map tasks are allocated by JobTracker considering data locality. The map tasks process the files and then stored the intermediate output on the local file system. The reduce tasks then pick its share of map outputs from individual nodes where map tasks are executed and then performs the reduce functionality.
You have the in memory sort phase happening on the mapper side which sorts the keys that should go into each reducer. Also there is a merge phase happening on the reduce side for the map outputs from various mappers corresponding to that reducer before the reduce() method is called.
Hadoop Definitive guide by Tom White describes this in great detail.
Now coming to this particular example, as I mentioned before every reducer gets some specific set of keys for processing and a key will go into a single reducer only in default. So here similar keys(here it is mobile number) go into same reducer and there the user details and delivery details based on a mobile number are joined.
If you are looking out for more specific details feel free to reply back.
Loading DeliveryStatusCodes.txt in memory does not seem to be idea solution. Reducer would crash if size of DeliveryStatusCodes.txt is more than memory allocated to reducer.
ReplyDeleteHi Abshiek
ReplyDeleteYou are right. Use of DC and in memory joins are possible only if one of the tables/data set is small enough to fit in memory. For joining larger data sets you always have an option of reduce side joins.
From my practical experience I have come across a bunch full of use cases where the actual data size is in terms of TBs and reference/look up data size in terms of MBs. With this in memory join you can gain tremendous performance improvement.
You can take a look a look at in memory joins in mapper and there by you can completely avoid reduce phase.Too much of performance gain.
http://kickstarthadoop.blogspot.in/2011/05/hadoop-for-dependent-data-splits-using.html
In memory joins are possible even if the data size of smaller table is in terms of GBs as well. You can take a look at how bucketed joins are implemented in hive.
It's nice post..
ReplyDeleteBut please also post to implement this in pig.
So difference between pig and havi can even be understood.
cool
ReplyDeleteI am getting an error.
ReplyDeleteHow do you run this in pseudo-distributed mode.
If I give local path:
ERROR security.UserGroupInformation: PriviledgedActionException as:mhduser cause:org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:54310/usr/local/projects/data/UserDetails.txt
And if i give hdfs path:
Exception in thread "main" java.io.FileNotFoundException: File /user/mhduser/data/DeliveryStatusCodes.txt does not exist.
at org.apache.hadoop.util.GenericOptionsParser.validateFiles(GenericOptionsParser.java:379)
This comment has been removed by the author.
ReplyDeleteMy program didn't find DeliverStatuesCodes.txt
ReplyDeletecloudera@cloudera-vm:/tmp/test/joins$ hadoop fs -lsr | grep .txt
-rw-r--r-- 1 cloudera supergroup 51 2013-05-14 17:53 /user/cloudera/DeliveryStatusCodes.txt
-rw-r--r-- 1 cloudera supergroup 51 2013-05-14 17:47 /user/cloudera/joins/deliverydetails/DeliveryDetails.txt
-rw-r--r-- 1 cloudera supergroup 55 2013-05-14 17:47 /user/cloudera/joins/userdetails/UserDetails.txt
cloudera@cloudera-vm:/tmp/test/joins$ ls -lrt
total 26684
-rw-r--r-- 1 cloudera cloudera 27311796 2013-05-14 17:45 MapReduceJobs.jar
-rw-r--r-- 1 cloudera cloudera 51 2013-05-14 17:45 DeliveryStatusCodes.txt
cloudera@cloudera-vm:/tmp/test/joins$
hadoop jar MapReduceJobs.jar joins/userdetails joins/deliverydetails joins/output
What am i doing possibly wrong?
put in folder where your jar is located....
DeleteGood work…unique site and interesting too… keep it up…looking forward for more updates.cheap bulk sms india |cheap bulk sms in indore |bulk voice sms in india |long code sms service india
ReplyDeleteThis comment has been removed by the author.
ReplyDeletewhen i am doing it in eclipse using mapreduce it shows compile time exception when am i adding second input path.can u sugest why?
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteHi Bejoy,
ReplyDeleteThanks for good article on MR joins.
I think you should use KeyValueLongInputFormat type because your key from mappers are numeric so it should be specific for sorting for accuracy.
Any thoughts ?
Hi,
ReplyDeleteHere is my sample program joining 2 datasets.
The program has 2 mappers and 1 reducer joining the values obtained from 2 different mappers having 2 different files as input.
I am getting an error in the hadoop jar command.
command: hadoop jar /home/rahul/Downloads/testjars/datajoin.jar DataJoin /user/rahul/cust.txt /user/rahul/delivery.txt /user/rahul/output
Error: Inavalid number of arguments Datajoin
It is actually expecting only 1 input path and 1 output path whereas in my command I have 2 inputs for 2 different mappers and 1 output.
Can anyone help me out ?
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class DataJoin {
public static class TokenizerMapper1
extends Mapper{
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String itr[] = value.toString().split("::");
word.set(itr[0].trim());
context.write(word,new Text("CD~" + itr[1]));
}
}
public static class TokenizerMapper2
extends Mapper{
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
String itr[] = value.toString().split("::");
word.set(itr[0].trim());
context.write(word,new Text("DD~" + itr[1]));
}
}
public static class IntSumReducer
extends Reducer {
private Text result = new Text();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
String sum="";
for (Text val : values) {
sum += val.toString();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: DataJoin ");
System.exit(2);
}
Job job = new Job(conf, "Data Join");
job.setJarByClass(DataJoin.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
MultipleInputs.addInputPath(job, new Path(otherArgs[0]), TextInputFormat.class, TokenizerMapper1.class);
MultipleInputs.addInputPath(job, new Path(otherArgs[1]), TextInputFormat.class, TokenizerMapper2.class);
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Hi,
ReplyDeleteHere the output of both the maps would be grouped or not ? Shouldn't the input to the reducer be like this :
key 123456
value CD~JIM,DR~001
?
If they are not grouped then reducer can ideally work on only one key value pair at a time . How are we processing 2 key value pairs here ? Plz answer my queries
Thanks a lot for nice example and awesome explanation !
ReplyDeleteThe same error is continuing though the input file is present in the path.
ReplyDeleteException in thread "main" java.io.FileNotFoundException: File /user/hduser/DeliveryDetails.txt does not exist.
at org.apache.hadoop.util.GenericOptionsParser.validateFiles(GenericOptionsParser.java:397)
at org.apache.hadoop.util.GenericOptionsParser.processGeneralOptions(GenericOptionsParser.java:301)
at org.apache.hadoop.util.GenericOptionsParser.parseGeneralOptions(GenericOptionsParser.java:431)
at org.apache.hadoop.util.GenericOptionsParser.(GenericOptionsParser.java:170)
at org.apache.hadoop.util.GenericOptionsParser.(GenericOptionsParser.java:153)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:64)
at SmsDriver.main(SmsDriver.java:45)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)