Monday, September 19, 2011

Joins with plain Map Reduce or MultipleInputs



Being a map reduce developer I’d never recommend to write joins of data sets using custom map reduce code. You have very intelligent and powerful tools handy in hadoop like hive and pig that can easily join huge data sets with the choice of join like inner, outer etc. But if such a scenario arises where you need to do join using map reduce you should be able to accomplish that with your knowledge on basic map reduce programming.
                Let us look into a mocked up example for the same.

Problem Statement: A retailer has a customer data base and he need to do some promotions based on the same. He chooses bulk sms as the choice of his promotion which is done by a third part for him. And once the sms is pushed the sms provider returns the delivery status back to the retailer. Now let us look into more details which makes things a little complicated
We have 3 input files as follows
1.       UserDetails.txt
                                 i.            Every record is of the format ‘mobile number , consumer name’
2.       DeliveryDetails.txt
                                 i.            Every record is of the format ‘mobile number, delivery status code’
3.       DeliveryStatusCodes.txt
                                 i.            Every record is of the format ‘delivery status code, status message’
The retailer has a consumer data base(UserDetails.txt)  from which only the mobile number are provided to a bulk sms provider. He can’t reveal the customer name due to security reasons. Once the messages are pushed the sms provider sends back a report of the mobile numbers with status code (DeliveryDetails.txt) and also a look up file that relates every status code to the corresponding Status message (DeliveryStatusCodes.txt).
 The requirement is that for meaningful information we need the consumer name along with its corresponding status message. And we need to obtain the same from these 3 files.

Sample Inputs
File 1 – UserDetails.txt
123 456, Jim
456 123, Tom
789 123, Harry
789 456, Richa

File 2 – DeliveryDetails.txt
123 456, 001
456 123, 002
789 123, 003
789 456, 004

File 3 – DeliveryStatusCodes.txt
001, Delivered
002, Pending
003, Failed
004, Resend

Expected Output
Jim, Delivered
Tom, Pending
Harry, Failed
Richa, Resend


Solution : Using core MapReduce
1.       Use two different mapper classes for both processing the  initial inputs from UserDetails.txt and DeliveryDetails.txt, The Key value output from the mappers should be as follows
a)      UserDetails.txt
                                                         i.            Key(Text) – mobile number
                                                       ii.            Value(Text) – An identifier to indicate the source of input(using ‘CD’ for the customer details file) + Customer Name
b)      DeliveryDetails.txt
                                                         i.            Key(Text) – mobile number
                                                       ii.            Value(Text) – An identifier to indicate the source of input(using ‘DR’ for the delivery report file) + Status Code
So here since the two files needs to be parsed separately using two mappers. I’m using
UserFile Mapper.java to process UserDetails.txt and
DeliveryFileMapper.java to process DeliveryDetails.txt
In map reduce API, I’m using MulipleInputFormat to specify which input to go into which mapper. But the ouput key value pairs from the mapper go into the same reducer, for the Reducer to identify the source of the value we are prepending the values ‘CD’ or ‘DR’.

2.       On the reducer end use distributed cache to distribute the DeliveryStatusCodes.txt. Parse the file and load the contents into HashMap with Key being the status code and value being the status message

3.       On the reducer every key would be having two values one with prefix ‘CD’ and other ‘DR’. (For simplicity let us assume only 2 values, in real time it can be more). Identify the records and from CD get the customer name corresponding to the cell number (input key) and from DR get the status code. On obtaining the status code do a look up on the HashMap to get the status message. So finally the output Key values from the reducer would be as follows
a)      Key : Customer Name
b)      Value : Status Message

Let’s just look at the source code

Mapper Class1: UserFileMapper.java

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class UserFileMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
    //variables to process Consumer Details
    private String cellNumber,customerName,fileTag="CD~";
   
    /* map method that process ConsumerDetails.txt and frames the initial key value pairs
       Key(Text) – mobile number
       Value(Text) – An identifier to indicate the source of input(using ‘CD’ for the customer details file) + Customer Name
     */
    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
    {
       //taking one line/record at a time and parsing them into key value pairs
        String line = value.toString();
        String splitarray[] = line.split(",");
        cellNumber = splitarray[0].trim();
        customerName = splitarray[1].trim();
       
      //sending the key value pair out of mapper
        output.collect(new Text(cellNumber), new Text(fileTag+customerName));
     }
}

Mapper Class2:DeliverFileMapper.java

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class DeliveryFileMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
    //variables to process delivery report
    private String cellNumber,deliveryCode,fileTag="DR~";
   
   /* map method that process DeliveryReport.txt and frames the initial key value pairs
    Key(Text) – mobile number
    Value(Text) – An identifier to indicate the source of input(using ‘DR’ for the delivery report file) + Status Code*/

    public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
    {
       //taking one line/record at a time and parsing them into key value pairs
        String line = value.toString();
        String splitarray[] = line.split(",");
        cellNumber = splitarray[0].trim();
        deliveryCode = splitarray[1].trim();
       
        //sending the key value pair out of mapper
        output.collect(new Text(cellNumber), new Text(fileTag+deliveryCode));
     }
}

Reducer Class:SmsReducer.java

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class SmsReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
      
       //Variables to aid the join process
       private String customerName,deliveryReport;
       /*Map to store Delivery Codes and Messages
       Key being the status code and vale being the status message*/
       private static Map<String,String> DeliveryCodesMap= new HashMap<String,String>();
      
       public void configure(JobConf job)
       {
              //To load the Delivery Codes and Messages into a hash map
              loadDeliveryStatusCodes();
             
       }


       public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
    {
        while (values.hasNext())
        {
             String currValue = values.next().toString();
             String valueSplitted[] = currValue.split("~");
             /*identifying the record source that corresponds to a cell number
             and parses the values accordingly*/
             if(valueSplitted[0].equals("CD"))
             {
               customerName=valueSplitted[1].trim();
             }
             else if(valueSplitted[0].equals("DR"))
             {
              //getting the delivery code and using the same to obtain the Message
               deliveryReport = DeliveryCodesMap.get(valueSplitted[1].trim());
             }
        }
        
        //pump final output to file
        if(customerName!=null && deliveryReport!=null)
        {
               output.collect(new Text(customerName), new Text(deliveryReport));
        }
        else if(customerName==null)
               output.collect(new Text("customerName"), new Text(deliveryReport));
        else if(deliveryReport==null)
               output.collect(new Text(customerName), new Text("deliveryReport"));
        
    }
      
      
       //To load the Delivery Codes and Messages into a hash map
    private void loadDeliveryStatusCodes()
    {
       String strRead;
       try {
              //read file from Distributed Cache
                     BufferedReader reader = new BufferedReader(new FileReader("DeliveryStatusCodes.txt"));
                     while ((strRead=reader.readLine() ) != null)
                     {
                           String splitarray[] = strRead.split(",");
                           //parse record and load into HahMap
                           DeliveryCodesMap.put(splitarray[0].trim(), splitarray[1].trim());
                          
                     }
              }
              catch (FileNotFoundException e) {
              e.printStackTrace();
              }catch( IOException e ) {
                       e.printStackTrace();
                }
             
       }
}

Driver Class: SmsDriver.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.lib.MultipleInputs;

import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class SmsDriver extends Configured implements Tool
{
       public int run(String[] args) throws Exception {

              //get the configuration parameters and assigns a job name
              JobConf conf = new JobConf(getConf(), SmsDriver.class);
              conf.setJobName("SMS Reports");

              //setting key value types for mapper and reducer outputs
              conf.setOutputKeyClass(Text.class);
              conf.setOutputValueClass(Text.class);

              //specifying the custom reducer class
              conf.setReducerClass(SmsReducer.class);

              //Specifying the input directories(@ runtime) and Mappers independently for inputs from multiple sources
              MultipleInputs.addInputPath(conf, new Path(args[0]), TextInputFormat.class, UserFileMapper.class);
              MultipleInputs.addInputPath(conf, new Path(args[1]), TextInputFormat.class, DeliveryFileMapper.class);
             
              //Specifying the output directory @ runtime
              FileOutputFormat.setOutputPath(conf, new Path(args[2]));

              JobClient.runJob(conf);
              return 0;
       }

       public static void main(String[] args) throws Exception {
              int res = ToolRunner.run(new Configuration(), new SmsDriver(),
                           args);
              System.exit(res);
       }
}


Let us go in for a small code walk through, I’m not going in for each and every line of code as it is commented and is bit self-explanatory. The few points to keep in mind

1.       The only difference in the code is that we are using MultipleInputFormat instead of FileInputFormat. This is necessary as we use two mappers and we need the output of the two mappers to be processed by a single reducer
2.       When we normally execute our map reduce with the hadoop jar command the last two arguments on the command line represent the input and output dir in hdfs. But here instead of two we’d have three input locations and one output location.
3.       The second key thing to be noted here is that in place of input locations don’t provide the full path with file names. Provide the input directories instead. Load the two files in two separate directories and provide the corresponding paths to mappers.
4.       Since my driver is getting the arguments from command line, the order of arguments is also very critical. Make sure that the input directories always point to their corresponding mappers itself.

You can run the above example with the following command on CLI as
hadoop jar /home/bejoys/samples/ smsMarketing.jar com.bejoy.samples.smsmarketing.SmsDriver  -files /home/bejoys/samples/ DeliveryStatusCodes.txt /userdata/bejoys/samples/sms/consumerdata /userdata/bejoys/samples/sms/deliveryinformation /userdata/bejoys/samples/sms/output  

Note:
                     i.            Since the join the happening on reduce, it is termed as a reduce side join.
                   ii.            This is a very basic approach to implement joins in map reduce and is for those who have a basic knowledge on map reduce programming. You can implement it in more sophisticated manner in mapreduce frame work using DataJoin Mappers and Reducers with TaggedMap Output Types.

But if it is a join then I’d strongly recommend you to go in with Pig or Hive as both of these are highly optimized for implementing joins. Also you can eliminate the coding effort you need to put in. It is not exaggerating if I say I can implement the same in a single step using hive. Let us just check it out

Using Hive
1.       Load the data into 3 hive tables
2.       Perform join using hive QL

Creating Hive tables to store the files
CREATE TABLE customer_details (cellNumber String,consumerName String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH '/home/bejoys/samples/ConsumerDetails.txt' INTO TABLE customer_details;

CREATE TABLE delivery_report (cellNumber String,statusCode int)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH '/home/bejoys/samples/DeliveryReport.txt' INTO TABLE delivery_report;

CREATE TABLE status_codes (statusCode int,statusMessage String)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

LOAD DATA LOCAL INPATH '/home/bejoys/samples/DeliveryStatusCodes.txt' INTO TABLE status_codes;

Hive Query to execute Join operation on data sets

Select cd.consumerName,sc.statusMessage FROM customer_details cd
JOIN delivery_report dr ON (cd.cellNumber = dr.cellNumber) JOIN
status_codes sc ON(dr.statusCode = sc.statusCode);

You can optimize the hive Query again for performance boosting. Refer Optimizing Joins in hive

25 comments:

  1. Thanks a lot for this nice post Bejoy..Very helpful.

    ReplyDelete
  2. I am not able to open the last link ( Optimizing Joins in hive ). It says Not Found.

    ReplyDelete
  3. Hey Raihan

    Thanks for pointing out the broken link. The issue is fixed. Please share your feedbacks in future as well.

    ReplyDelete
  4. Bejoy,

    Can you please explain how does the data flow happen from mapper till the output file in terms of datanode. (i.e. how exactly the mapper reads the file and where the intermediate results are stored.

    ReplyDelete
  5. This comment has been removed by the author.

    ReplyDelete
  6. Hi Krishnan

    On a high level, in case of plain mapreduce, the map tasks are allocated by JobTracker considering data locality. The map tasks process the files and then stored the intermediate output on the local file system. The reduce tasks then pick its share of map outputs from individual nodes where map tasks are executed and then performs the reduce functionality.

    You have the in memory sort phase happening on the mapper side which sorts the keys that should go into each reducer. Also there is a merge phase happening on the reduce side for the map outputs from various mappers corresponding to that reducer before the reduce() method is called.

    Hadoop Definitive guide by Tom White describes this in great detail.

    Now coming to this particular example, as I mentioned before every reducer gets some specific set of keys for processing and a key will go into a single reducer only in default. So here similar keys(here it is mobile number) go into same reducer and there the user details and delivery details based on a mobile number are joined.

    If you are looking out for more specific details feel free to reply back.

    ReplyDelete
  7. Loading DeliveryStatusCodes.txt in memory does not seem to be idea solution. Reducer would crash if size of DeliveryStatusCodes.txt is more than memory allocated to reducer.

    ReplyDelete
  8. Hi Abshiek

    You are right. Use of DC and in memory joins are possible only if one of the tables/data set is small enough to fit in memory. For joining larger data sets you always have an option of reduce side joins.

    From my practical experience I have come across a bunch full of use cases where the actual data size is in terms of TBs and reference/look up data size in terms of MBs. With this in memory join you can gain tremendous performance improvement.

    You can take a look a look at in memory joins in mapper and there by you can completely avoid reduce phase.Too much of performance gain.
    http://kickstarthadoop.blogspot.in/2011/05/hadoop-for-dependent-data-splits-using.html


    In memory joins are possible even if the data size of smaller table is in terms of GBs as well. You can take a look at how bucketed joins are implemented in hive.


    ReplyDelete
  9. It's nice post..
    But please also post to implement this in pig.
    So difference between pig and havi can even be understood.

    ReplyDelete
  10. I am getting an error.
    How do you run this in pseudo-distributed mode.
    If I give local path:
    ERROR security.UserGroupInformation: PriviledgedActionException as:mhduser cause:org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:54310/usr/local/projects/data/UserDetails.txt


    And if i give hdfs path:
    Exception in thread "main" java.io.FileNotFoundException: File /user/mhduser/data/DeliveryStatusCodes.txt does not exist.
    at org.apache.hadoop.util.GenericOptionsParser.validateFiles(GenericOptionsParser.java:379)

    ReplyDelete
  11. This comment has been removed by the author.

    ReplyDelete
  12. My program didn't find DeliverStatuesCodes.txt

    cloudera@cloudera-vm:/tmp/test/joins$ hadoop fs -lsr | grep .txt
    -rw-r--r-- 1 cloudera supergroup 51 2013-05-14 17:53 /user/cloudera/DeliveryStatusCodes.txt
    -rw-r--r-- 1 cloudera supergroup 51 2013-05-14 17:47 /user/cloudera/joins/deliverydetails/DeliveryDetails.txt
    -rw-r--r-- 1 cloudera supergroup 55 2013-05-14 17:47 /user/cloudera/joins/userdetails/UserDetails.txt

    cloudera@cloudera-vm:/tmp/test/joins$ ls -lrt
    total 26684
    -rw-r--r-- 1 cloudera cloudera 27311796 2013-05-14 17:45 MapReduceJobs.jar
    -rw-r--r-- 1 cloudera cloudera 51 2013-05-14 17:45 DeliveryStatusCodes.txt

    cloudera@cloudera-vm:/tmp/test/joins$
    hadoop jar MapReduceJobs.jar joins/userdetails joins/deliverydetails joins/output

    What am i doing possibly wrong?

    ReplyDelete
    Replies
    1. put in folder where your jar is located....

      Delete
  13. Good work…unique site and interesting too… keep it up…looking forward for more updates.cheap bulk sms india |cheap bulk sms in indore |bulk voice sms in india |long code sms service india

    ReplyDelete
  14. This comment has been removed by the author.

    ReplyDelete
  15. when i am doing it in eclipse using mapreduce it shows compile time exception when am i adding second input path.can u sugest why?

    ReplyDelete
  16. This comment has been removed by the author.

    ReplyDelete
  17. This comment has been removed by the author.

    ReplyDelete
  18. Hi Bejoy,
    Thanks for good article on MR joins.
    I think you should use KeyValueLongInputFormat type because your key from mappers are numeric so it should be specific for sorting for accuracy.

    Any thoughts ?

    ReplyDelete
  19. Hi,
    Here is my sample program joining 2 datasets.
    The program has 2 mappers and 1 reducer joining the values obtained from 2 different mappers having 2 different files as input.

    I am getting an error in the hadoop jar command.

    command: hadoop jar /home/rahul/Downloads/testjars/datajoin.jar DataJoin /user/rahul/cust.txt /user/rahul/delivery.txt /user/rahul/output


    Error: Inavalid number of arguments Datajoin

    It is actually expecting only 1 input path and 1 output path whereas in my command I have 2 inputs for 2 different mappers and 1 output.

    Can anyone help me out ?
    import java.io.IOException;
    import java.util.StringTokenizer;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;

    public class DataJoin {

    public static class TokenizerMapper1
    extends Mapper{

    private Text word = new Text();

    public void map(Object key, Text value, Context context
    ) throws IOException, InterruptedException {

    String itr[] = value.toString().split("::");
    word.set(itr[0].trim());
    context.write(word,new Text("CD~" + itr[1]));
    }

    }



    public static class TokenizerMapper2
    extends Mapper{

    private Text word = new Text();

    public void map(Object key, Text value, Context context
    ) throws IOException, InterruptedException {

    String itr[] = value.toString().split("::");
    word.set(itr[0].trim());
    context.write(word,new Text("DD~" + itr[1]));
    }

    }




    public static class IntSumReducer
    extends Reducer {
    private Text result = new Text();

    public void reduce(Text key, Iterable values,
    Context context
    ) throws IOException, InterruptedException {
    String sum="";


    for (Text val : values) {
    sum += val.toString();
    }
    result.set(sum);
    context.write(key, result);
    }
    }

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
    System.err.println("Usage: DataJoin ");
    System.exit(2);
    }
    Job job = new Job(conf, "Data Join");
    job.setJarByClass(DataJoin.class);


    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    MultipleInputs.addInputPath(job, new Path(otherArgs[0]), TextInputFormat.class, TokenizerMapper1.class);
    MultipleInputs.addInputPath(job, new Path(otherArgs[1]), TextInputFormat.class, TokenizerMapper2.class);

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    }

    ReplyDelete
  20. Hi,

    Here the output of both the maps would be grouped or not ? Shouldn't the input to the reducer be like this :

    key 123456
    value CD~JIM,DR~001
    ?

    If they are not grouped then reducer can ideally work on only one key value pair at a time . How are we processing 2 key value pairs here ? Plz answer my queries

    ReplyDelete
  21. Thanks a lot for nice example and awesome explanation !

    ReplyDelete
  22. The same error is continuing though the input file is present in the path.

    Exception in thread "main" java.io.FileNotFoundException: File /user/hduser/DeliveryDetails.txt does not exist.
    at org.apache.hadoop.util.GenericOptionsParser.validateFiles(GenericOptionsParser.java:397)
    at org.apache.hadoop.util.GenericOptionsParser.processGeneralOptions(GenericOptionsParser.java:301)
    at org.apache.hadoop.util.GenericOptionsParser.parseGeneralOptions(GenericOptionsParser.java:431)
    at org.apache.hadoop.util.GenericOptionsParser.(GenericOptionsParser.java:170)
    at org.apache.hadoop.util.GenericOptionsParser.(GenericOptionsParser.java:153)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:64)
    at SmsDriver.main(SmsDriver.java:45)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

    ReplyDelete