Friday, May 6, 2011

Hadoop for Parallel Computations - no reduce phase

In the word count example (word-count-hadoop-example) we can see two set of parallel process, a map and a reduce. In reduce process basically what happens is an aggregation of values or rather an operation on values that share the same key. Now consider a scenario where we have a just a parallel process to be performed no aggregation required. Here we go in for the map operation and no reduce, ie we’d explicitly set the number of reducers to zero.

Why set reducers to Zero?
                We know that between map and reduce phases there is a key phase, the sort and shuffle. Sort and Shuffle is responsible for sorting the keys in ascending order and then grouping values based on same keys. This phase is really expensive and if reduce phase is not required we should avoid it as avoiding reduce phase would eliminate sort and shuffle phase as well. In order to eliminate reduce phase we have to explicitly set mapred.reduce.tasks=0 because by default there would be a value set for your cluster by your admin in conf/mapred-site.xml.

An example Business scenario
                It is just a mock scenario just to explain one possibility of Hadoop usage in mere parallel processing scenario. It needn’t be a eligible use case for Hadoop.
Problem Statement
                We have a database dump (extract) with us which contains the following details related to an employee as Employee ID, Employee Name, Location Id, Grade Id, Monthly Pay, Days Worked in a Month and No of holidays in that month. We need to calculate the monthly salary based on the no of days worked by that employee. (Just imagine that the file size is huge).

Proposed Solution
                Since we need to calculate the salary for such large data set and the data being independent chunks (ie we can process line by line, data on one line is independent chunk) we can go for Hadoop.
(Some basic calculations are used here.)

Input File - Employee.txt
                A two line sample of input file (db extract) is given.
10007~James Wilson~L105~G06~110000~22~8
10100~Roger Williams~L103~G09~145000~20~8

Mapper Class - SalaryCalculatorMapper.java
package com.bejoy.hadoop.samples.salcal;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Scanner;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;

public class SalaryCalculatorMapper extends Mapper<LongWritable, Text, Text, Text>
{
             //Variables for Map Reduce
             private final static Text dummyVal = new Text("");
           private Text word = new Text();
          
           //Variables for business calculations
           private String employeeId,employeeName,locationId,gradeId;
           private Double monthlyPay,daysWorked,monthlyHolidays,currentMonthPay;
          
           public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
           {
            /* extracting each line and sending it for processing to process method  and
             retrieving the result back for sending to output collector*/
             String output=processRecord(value.toString());
               
             //casting the String to Text
                   word.set(output);
                   
                   //Sending key value to output collector
                   context.write(word, dummyVal);
           }
          
           public String processRecord(String record)
           {
             StringBuilder outputRecord=new StringBuilder("");
             
             try {
                        Scanner scanner = new Scanner(record);
                        //setting the delimiter used in input file
                        scanner.useDelimiter("~");
                        if ( scanner.hasNext() )
                        {
                          employeeId = scanner.next();
                          employeeName=scanner.next();
                          locationId=scanner.next();
                          gradeId=scanner.next();
                          monthlyPay=Double.parseDouble(scanner.next());
                          daysWorked=Double.parseDouble(scanner.next());
                          monthlyHolidays=Double.parseDouble(scanner.next());
                          
                          //Initializing the calculated salary as 0
                          currentMonthPay=0.0;
                          
                          //salary computations
                          if((daysWorked+monthlyHolidays)!=30)
                                currentMonthPay=(monthlyPay/30)*(daysWorked+monthlyHolidays);
                          else
                                currentMonthPay=monthlyPay;
                          
                          //rounding using Big Decimal
                          BigDecimal bd = new BigDecimal(currentMonthPay);
                          bd = bd.setScale(2,BigDecimal.ROUND_UP);//2 -  decimal places
                          currentMonthPay = bd.doubleValue();
                          
                        //get output in the format Employee Id~Employee Name~Monthly Pay~Days Worked~Calculated Salary
                        outputRecord.append(employeeId);
                        outputRecord.append("~");
                        outputRecord.append(employeeName);
                        outputRecord.append("~");
                        outputRecord.append(monthlyPay.toString());
                        outputRecord.append("~");
                        outputRecord.append(daysWorked.toString());
                        outputRecord.append("~");
                        outputRecord.append(currentMonthPay.toString());
                       
                         
                        } 
                       
              }
               catch (Exception e) {
                              e.printStackTrace();
                        }
               
               return outputRecord.toString();
           }
}

Reducer Class – SalaryCalculatorReducer.java
package com.bejoy.hadoop.samples.salcal;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Reducer;

public class SalaryCalculatorReducer extends Reducer<Text, Text, Text, Text>
{
      //Reduce method for just outputting the key from mapper as the value from mapper is just an empty string    
      public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
           {
             context.write(key, new Text(""));
           }
}

Driver Class – SalaryCalculator.java
package com.bejoy.hadoop.samples.salcal;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;



public class SalaryCalculator extends Configured implements Tool
{
      public int run(String[] args) throws Exception
      {
            //getting configuration object and setting job name
            Configuration conf = getConf();
        Job job = new Job(conf, "Salary Calculator Demo");
       
        //setting the class names
        job.setJarByClass(SalaryCalculator.class);
        job.setMapperClass(SalaryCalculatorMapper.class);
        job.setReducerClass(SalaryCalculatorReducer.class);

        //setting the output data type classes
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //to accept the hdfs input and outpur dir at run time
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new SalaryCalculator(), args);
        System.exit(res);
    }
}

How to run the map reduce job?
                                Follow the steps in order to run the job
1.       Pack these 3 files into a jar (salary calc.jar) and copy the same into some location in LFS(/home/bejoys/samples/)
2.       Copy the input file into some location in LFS (/home/bejoys/samples/employee.txt)
3.       Copy the input into HDFS
Hadoop fs – copyFromLocal /home/bejoys/samples/input.txt  /userdata/bejoys/samples/salcalc/input/
4.       Run the jar
hadoop jar /home/bejoys/samples/salarycalc.jar com.bejoy.hadoop.samples.salcal.SalaryCalculator /userdata/bejoys/samples/salcalc/input/ /userdata/bejoys/samples/salcalc/output/
5.       Retrieve the output on to LFS
hadoop fs -getmerge /userdata/bejoys/samples/salcalc/output /home/bejoys/samples/salcal_output.txt
text – location in LFS
text – location in HDFS

Do we need a reducer here?
No we don’t. In fact you don’t even need the SalaryCalculatorReducer class, but my merely avoiding this class and not setting the reducer class in Job object won’t do your job. (Commenting this line of code won’t serve the purpose job.setReducerClass(SalaryCalculatorReducer.class)). Even if you don’t set your reducer class here a default reducer would be fired by the map reduce job that matches and the output key and value types from your mapper.

How can we avoid the reduce operation?
Just the number of reduce tasks to zero, you can do it in two ways
1.       Alter your driver class to add a line of code as
job.setNumReduceTasks(0);
2.       Altering the code and they packing it as jar is little time consuming so we can do the same in command line itself during run time. Alter your Hadoop jar command to include the number of reducers as well
hadoop jar /home/bejoys/samples/salarycalc.jar com.bejoy.hadoop.samples.salcal.SalaryCalculator –D mapred.reduce.tasks=0 /userdata/bejoys/samples/salcalc/input/ /userdata/bejoys/samples/salcalc/output/


Do I have to specify an empty text as the mapper/reducer value?
                No you don’t need to. You can use NullWritable instead. Refer hadoop for processing dependent data splits

3 comments: