Tuesday, May 24, 2011

Hadoop for dependent data splits / Using Distributed Cache in Hadoop Map Reduce


Hadoop is known to process independent data slices, but what about dependent data. Suppose for the processing of employee’s salary we need data on their location as well as their grades but the location and grade details are available with another file . So while processing each line of the employee file we need to look up information from location and grade files. How do we achieve this? Go for a distributed cache approach.

Distributed Cache
                Hadoop has the concept of a distributed cache which all task trackers (nodes) have access to. When we want to distribute some common data across all task trackers we go for distributed cache. When we need to distribute a file , multiple copies of the file would be maintained for all task trackers to access. When we need to look up for some references, the reference data/file would be initially posted in the distributed cache. The main point to be noted here is that the files chosen to be distributed should be very small. The maximum size of a file to be distributed in a medium range cluster shouldn’t be more than 100MB.(this value could vary from cluster to cluster)

Solution to our Problem
                When we consider our scenario say we have 1 million employees across 25 locations spanning across 71 grades. On a very crude analysis out here, we can see that the location and grade data is relatively too small compared to Employee data. So here our approach could be like, processing the employee data that is in HDFS with the other two reference data.
                For mock calculation purposes I’m implementing the addition of a location bonus and monthly bonus based on grade to all employees in addition to the basic salary calculation defined in the previous example.
                So other than the employees.txt file, we do have two more input files.
1.       Location.txt which has details like location id, location name and annual  bonus
2.       Grade.txt which has details like Grade id , Grade and annual Bonus

Employee.txt
10007~James Wilson~L105~G06~110000~22~8
10100~Roger Williams~L103~G09~145000~20~8

Location.txt
L105,Bangalore,200000
L03,Hyderabad,160000

Grade.txt
G06,D3,450000
G09,F3,500000

Mapper Class - SalaryCalculatorMapper.java

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

public class SalaryCalculatorMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, NullWritable>
{
             //Variables for Map Reduce
             private Text word = new Text();
          
           //Variables for business calculations
           private String employeeId,employeeName,locationId,gradeId;
           private Double monthlyPay,daysWorked,monthlyHolidays,currentMonthPay,locationAllowance,gradeBonus;
          
           //data structures for storing location and grade details
           private static Map<String, String> locationMap = new HashMap<String, String>();
           private static Map<String, String> gradeMap = new HashMap<String, String>();
          
           //variables for processing reference files
           private String locId,locName,locAllowance,grId,grade,grBonus;
          
           public void configure(JobConf job)
           {
                  try {
                        processFile(new File("location.txt"));
                        processFile(new File("grade.txt"));
                  } catch (FileNotFoundException e) {
                        e.printStackTrace();
                  }
            }
           public void processFile(File fFile) throws FileNotFoundException {

                  Scanner scanner = new Scanner(fFile);
                  try {
                        // first use a Scanner to get each line
                        while (scanner.hasNextLine()) {
                              if(fFile.getName().equals("location.txt"))
                                    processLineLocation(scanner.nextLine());
                              else if(fFile.getName().equals("grade.txt"))
                                    processLineGrade(scanner.nextLine());
                        }
                  } finally {
                        // ensure the underlying stream is always closed
                        scanner.close();
                  }
            }

            public void processLineLocation(String aLine) {
                  // use a second Scanner to parse the content of each line

                  try {
                        Scanner scanner = new Scanner(aLine);
                        scanner.useDelimiter(",");
                        if (scanner.hasNext()) {
                              locId = scanner.next();
                              locName = scanner.next();
                              locAllowance = scanner.next();
                              //we dont need location name for our computations hence not including the same in map
                              locationMap.put(locId.trim(),locAllowance.trim());
                        }
                  } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                  }
            }
           
            public void processLineGrade(String aLine) {
                  // use a second Scanner to parse the content of each line
                  try {
                        Scanner scanner = new Scanner(aLine);
                        scanner.useDelimiter(",");
                        if (scanner.hasNext()) {
                              grId = scanner.next();
                              grade = scanner.next();
                              grBonus = scanner.next();
                              //we dont need grade for our computations hence not including the same in map
                              gradeMap.put(grId.trim(),grBonus.trim());
                        }
                  } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                  }
            }

          
           public void map(LongWritable key, Text value, OutputCollector<Text, NullWritable> output, Reporter reporter) throws IOException 
           {
            /* extracting each line and sending it for processing to process method  and
             retrieving the result back for sending to output collector*/
             String outputText=processRecord(value.toString());
               
             //casting the String to Text
                   word.set(outputText);
                   
                   //Sending key value to output collector
                   output.collect(word, NullWritable.get());
           }
          
           public String processRecord(String record)
           {
             StringBuilder outputRecord=new StringBuilder("");
             
             try {
                        Scanner scanner = new Scanner(record);
                        //setting the delimiter used in input file
                        scanner.useDelimiter("~");
                        if ( scanner.hasNext() )
                        {
                          employeeId = scanner.next();
                          employeeName=scanner.next();
                          locationId=scanner.next();
                          gradeId=scanner.next();
                          monthlyPay=Double.parseDouble(scanner.next());
                          daysWorked=Double.parseDouble(scanner.next());
                          monthlyHolidays=Double.parseDouble(scanner.next());
                          
                          //Initializing the calculated salary as 0
                          currentMonthPay=0.0;
                          
                          //Computing location allowance and grade bonus
                          locationAllowance = Double.parseDouble(locationMap.get(locationId.trim()));
                          gradeBonus = Double.parseDouble(gradeMap.get(gradeId.trim()));
                          
                          //monthly salary computations
                          if((daysWorked+monthlyHolidays)!=30)
                                currentMonthPay=(monthlyPay/30)*(daysWorked+monthlyHolidays);
                          else
                                currentMonthPay=monthlyPay;
                          
                          //total salary for a month
                          currentMonthPay = currentMonthPay+(locationAllowance/12) +(gradeBonus/12);
                          
                          //rounding using Big Decimal
                          BigDecimal bd = new BigDecimal(currentMonthPay);
                          bd = bd.setScale(2,BigDecimal.ROUND_UP);//2 -  decimal places
                          currentMonthPay = bd.doubleValue();
                          
                        //get output in the format Employee Id~Employee Name~Monthly Pay~Days Worked~Calculated Salary
                        outputRecord.append(employeeId);
                        outputRecord.append("~");
                        outputRecord.append(employeeName);
                        outputRecord.append("~");
                        outputRecord.append(monthlyPay.toString());
                        outputRecord.append("~");
                        outputRecord.append(daysWorked.toString());
                        outputRecord.append("~");
                        outputRecord.append(currentMonthPay.toString());
                       
                         
                        } 
                       
              }
               catch (Exception e) {
                              e.printStackTrace();
                        }
               
               return outputRecord.toString();
           }
}

Driver Class – SalaryCalculator.java

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;



public class SalaryCalculator extends Configured implements Tool {

      public int run(String[] args) throws Exception {

            JobConf conf = new JobConf(getConf(), SalaryCalculator.class);
            conf.setJobName("CostCenterSummary");

            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(NullWritable.class);

            conf.setMapperClass(SalaryCalculatorMapper.class);
           

            FileInputFormat.addInputPath(conf, new Path(args[0]));
            FileOutputFormat.setOutputPath(conf, new Path(args[1]));

            JobClient.runJob(conf);
            return 0;
      }

      public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new SalaryCalculator(),
                        args);
            System.exit(res);
      }
}

Reducer Class
No reducer class is required if you don’t need one, during run time the default reducer class would be substituted in map reduce execution. But the point to be noted here is that when you don’t specify a reducer class the default reducer class instantiated would have the input and output key value types same as that of the mapper’s output key value types. If you need a different key value type as reducer output then you need to define your custom reducer.

How to run the map reduce job?
                                Follow the steps in order to run the job
1.       Pack these 2  files into a jar (salary calc.jar) and copy the same into some location in LFS(/home/bejoys/samples/)
2.       Copy the input file into some location in LFS (/home/bejoys/samples/input.txt)
3.       Copy the input into HDFS
Hadoop fs – copyFromLocal /home/bejoys/samples/employee.txt  /userdata/bejoys/samples/salcalc/input/
4.       Run the jar
5.       hadoop jar /home/bejoys/samples/salarycalc.jar com.bejoy.hadoop.samples.salcal.SalaryCalculator –files /home/bejoys/samples/location.txt, /home/bejoys/samples/grade.txt  –D mapred.reduce.tasks=0 /userdata/bejoys/samples/salcalc/input/ /userdata/bejoys/samples/salcalc/output/
6.       Retrieve the output on to LFS
hadoop fs -getmerge /userdata/bejoys/samples/salcalc/output /home/bejoys/samples/salcal_output.txt
text – location in LFS
text – location in HDFS

Points to be noted here
1.       NullWritable
If you look at the example, it is merely for parallel processing and you don’t need a key value concept out here but map reduce programming supports only key value pair programming. Here we treat the whole record as key and since we have no value we go in for NullWritable.
                Unlike other Writable classes we don’t create a new instance of the same (new NullWritable()) rather we just get an instance of the same(NullWritable.get()). This is because unlike other Writables in Hadoop NullWritable is Singleton.

2.       –files
This option is used during run time to distribute files in cache. Only small files should be placed on distributed cache. When you specify multiple files to be loaded on to distributed cache they have to be specified separated by comma. Make sure that there are no spaces between file names and comma. These files would be retrieved by the task trackers into their local file system before the execution of tasks.

17 comments:

  1. Hi Ramanjaneya

    You can get a variety of examples relating to different practical cases in my blog. Just browse through :)

    ReplyDelete
  2. simple nice explaination...
    what if the lookup files are very big and cannot be cached..
    here is a scenario http://blog.matthewrathbone.com/2013/02/09/real-world-hadoop-implementing-a-left-outer-join-in-hadoop-map-reduce.html
    but i could not understand it at all..since i have just started learning hadoop...would be great if you can explain this in simpler way as you always do..

    ReplyDelete
  3. well crafted, you really nailed it !!

    ReplyDelete
  4. Yes really good. You have used Old Api, are you planning to explain the examples using new hadoop api.

    ReplyDelete
  5. Hai mate, well crafted, you really nailed it.It was awesome to see the good explanation of Hadoop information over this blog. And keep updating on latest technology info. for getting more knowledge to the Hadoop Lovers.
    Hadoop Training in hyderabad

    ReplyDelete
  6. Actually, you have explained the technology to the fullest. Thanks for sharing the information you have got. It helped me a lot. I experimented your thoughts in my training program.


    Hadoop Training Chennai
    Hadoop Training in Chennai
    Big Data Training in Chennai

    ReplyDelete
  7. Thank you for sharing such a usefull information on your blog, I am inspired with your post writing style & how continuously you describe this topic.

    Hadoop Online Training | Qlikview Online Training | Tableau Online Training | SAS Online Training | Android Online Training | Business Analyst Online Training

    ReplyDelete
  8. Welcome to Wiztech Automation - Embedded System Training in Chennai. We have knowledgeable Team for Embedded Courses handling and we also are after Job Placements offer provide once your Successful Completion of Course. We are Providing on Microcontrollers such as 8051, PIC, AVR, ARM7, ARM9, ARM11 and RTOS. Free Accommodation, Individual Focus, Best Lab facilities, 100% Practical Training and Job opportunities.

    Embedded System Training in chennai
    Embedded System Training Institute in chennai
    Embedded Training in chennai
    Embedded Course in chennai
    Best Embedded System Training in chennai
    Best Embedded System Training Institute in chennai
    Best Embedded System Training Institutes in chennai
    Embedded Training Institute in chennai
    Embedded System Course in chennai
    Best Embedded System Training in chennai

    ReplyDelete
  9. Well Said. The content provided is true up to my knowledge. This made me to understand the concepts very clear. Thanks for sharing this wonderful information in here. Keep blogging article like this. I have bookmarked this page for future reference as well.

    Hadoop Training Chennai | Hadoop Training in Chennai | JAVA Course in Chennai

    ReplyDelete
  10. Thanks for posting such a interesting blog and it is really informative. If you are interested in taking java as a professional carrier visit this website.JAVA Training in Chennai

    ReplyDelete
  11. You can refer to the below url they are providing very good free videos on hadoop:

    For free videos from previous sessions refer:
    http://hadoopbigdatatutorial.com/hadoop-training/big-data-tutorial

    ReplyDelete
  12. The strategy you have posted on this technology helped me to get into the next level and had lot of information in it. The python programming language is very popular and most widely used.
    Python Training in Chennai | Python Course in Chennai

    ReplyDelete