Hadoop is known to process independent data slices, but what about dependent data. Suppose for the processing of employee’s salary we need data on their location as well as their grades but the location and grade details are available with another file . So while processing each line of the employee file we need to look up information from location and grade files. How do we achieve this? Go for a distributed cache approach.
Distributed Cache
Hadoop has the concept of a distributed cache which all task trackers (nodes) have access to. When we want to distribute some common data across all task trackers we go for distributed cache. When we need to distribute a file , multiple copies of the file would be maintained for all task trackers to access. When we need to look up for some references, the reference data/file would be initially posted in the distributed cache. The main point to be noted here is that the files chosen to be distributed should be very small. The maximum size of a file to be distributed in a medium range cluster shouldn’t be more than 100MB.(this value could vary from cluster to cluster)
Solution to our Problem
When we consider our scenario say we have 1 million employees across 25 locations spanning across 71 grades. On a very crude analysis out here, we can see that the location and grade data is relatively too small compared to Employee data. So here our approach could be like, processing the employee data that is in HDFS with the other two reference data.
For mock calculation purposes I’m implementing the addition of a location bonus and monthly bonus based on grade to all employees in addition to the basic salary calculation defined in the previous example.
So other than the employees.txt file, we do have two more input files.
1. Location.txt which has details like location id, location name and annual bonus
2. Grade.txt which has details like Grade id , Grade and annual Bonus
Employee.txt
10007~James Wilson~L105~G06~110000~22~8
10100~Roger Williams~L103~G09~145000~20~8
Location.txt
L105,Bangalore,200000
L03,Hyderabad,160000
Grade.txt
G06,D3,450000
G09,F3,500000
Mapper Class - SalaryCalculatorMapper.java
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
public class SalaryCalculatorMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, NullWritable>
{
//Variables for Map Reduce
private Text word = new Text();
//Variables for business calculations
private String employeeId,employeeName,locationId,gradeId;
private Double monthlyPay,daysWorked,monthlyHolidays,currentMonthPay,locationAllowance,gradeBonus;
//data structures for storing location and grade details
private static Map<String, String> locationMap = new HashMap<String, String>();
private static Map<String, String> gradeMap = new HashMap<String, String>();
//variables for processing reference files
private String locId,locName,locAllowance,grId,grade,grBonus;
public void configure(JobConf job)
{
try {
processFile(new File("location.txt"));
processFile(new File("grade.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
public void processFile(File fFile) throws FileNotFoundException {
Scanner scanner = new Scanner(fFile);
try {
// first use a Scanner to get each line
while (scanner.hasNextLine()) {
if(fFile.getName().equals("location.txt"))
processLineLocation(scanner.nextLine());
else if(fFile.getName().equals("grade.txt"))
processLineGrade(scanner.nextLine());
}
} finally {
// ensure the underlying stream is always closed
scanner.close();
}
}
public void processLineLocation(String aLine) {
// use a second Scanner to parse the content of each line
try {
Scanner scanner = new Scanner(aLine);
scanner.useDelimiter(",");
if (scanner.hasNext()) {
locId = scanner.next();
locName = scanner.next();
locAllowance = scanner.next();
//we dont need location name for our computations hence not including the same in map
locationMap.put(locId.trim(),locAllowance.trim());
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void processLineGrade(String aLine) {
// use a second Scanner to parse the content of each line
try {
Scanner scanner = new Scanner(aLine);
scanner.useDelimiter(",");
if (scanner.hasNext()) {
grId = scanner.next();
grade = scanner.next();
grBonus = scanner.next();
//we dont need grade for our computations hence not including the same in map
gradeMap.put(grId.trim(),grBonus.trim());
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void map(LongWritable key, Text value, OutputCollector<Text, NullWritable> output, Reporter reporter) throws IOException
{
/* extracting each line and sending it for processing to process method and
retrieving the result back for sending to output collector*/
String outputText=processRecord(value.toString());
//casting the String to Text
word.set(outputText);
//Sending key value to output collector
output.collect(word, NullWritable.get());
}
public String processRecord(String record)
{
StringBuilder outputRecord=new StringBuilder("");
try {
Scanner scanner = new Scanner(record);
//setting the delimiter used in input file
scanner.useDelimiter("~");
if ( scanner.hasNext() )
{
employeeId = scanner.next();
employeeName=scanner.next();
locationId=scanner.next();
gradeId=scanner.next();
monthlyPay=Double.parseDouble(scanner.next());
daysWorked=Double.parseDouble(scanner.next());
monthlyHolidays=Double.parseDouble(scanner.next());
//Initializing the calculated salary as 0
currentMonthPay=0.0;
//Computing location allowance and grade bonus
locationAllowance = Double.parseDouble(locationMap.get(locationId.trim()));
gradeBonus = Double.parseDouble(gradeMap.get(gradeId.trim()));
//monthly salary computations
if((daysWorked+monthlyHolidays)!=30)
currentMonthPay=(monthlyPay/30)*(daysWorked+monthlyHolidays);
else
currentMonthPay=monthlyPay;
//total salary for a month
currentMonthPay = currentMonthPay+(locationAllowance/12) +(gradeBonus/12);
//rounding using Big Decimal
BigDecimal bd = new BigDecimal(currentMonthPay);
bd = bd.setScale(2,BigDecimal.ROUND_UP);//2 - decimal places
currentMonthPay = bd.doubleValue();
//get output in the format Employee Id~Employee Name~Monthly Pay~Days Worked~Calculated Salary
outputRecord.append(employeeId);
outputRecord.append("~");
outputRecord.append(employeeName);
outputRecord.append("~");
outputRecord.append(monthlyPay.toString());
outputRecord.append("~");
outputRecord.append(daysWorked.toString());
outputRecord.append("~");
outputRecord.append(currentMonthPay.toString());
}
}
catch (Exception e) {
e.printStackTrace();
}
return outputRecord.toString();
}
}
Driver Class – SalaryCalculator.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class SalaryCalculator extends Configured implements Tool {
public int run(String[] args) throws Exception {
conf.setJobName("CostCenterSummary");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(NullWritable.class);
conf.setMapperClass(SalaryCalculatorMapper.class);
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new SalaryCalculator(),
args);
System.exit(res);
}
}
Reducer Class
No reducer class is required if you don’t need one, during run time the default reducer class would be substituted in map reduce execution. But the point to be noted here is that when you don’t specify a reducer class the default reducer class instantiated would have the input and output key value types same as that of the mapper’s output key value types. If you need a different key value type as reducer output then you need to define your custom reducer.
How to run the map reduce job?
Follow the steps in order to run the job
1. Pack these 2 files into a jar (salary calc.jar) and copy the same into some location in LFS(/home/bejoys/samples/)
2. Copy the input file into some location in LFS (/home/bejoys/samples/input.txt)
3. Copy the input into HDFS
Hadoop fs – copyFromLocal /home/bejoys/samples/employee.txt /userdata/bejoys/samples/salcalc/input/
4. Run the jar
5. hadoop jar /home/bejoys/samples/salarycalc.jar com.bejoy.hadoop.samples.salcal.SalaryCalculator –files /home/bejoys/samples/location.txt, /home/bejoys/samples/grade.txt –D mapred.reduce.tasks=0 /userdata/bejoys/samples/salcalc/input/ /userdata/bejoys/samples/salcalc/output/
6. Retrieve the output on to LFS
hadoop fs -getmerge /userdata/bejoys/samples/salcalc/output /home/bejoys/samples/salcal_output.txt
text – location in LFS
text – location in HDFS
Points to be noted here
1. NullWritable
If you look at the example, it is merely for parallel processing and you don’t need a key value concept out here but map reduce programming supports only key value pair programming. Here we treat the whole record as key and since we have no value we go in for NullWritable.
Unlike other Writable classes we don’t create a new instance of the same (new NullWritable()) rather we just get an instance of the same(NullWritable.get()). This is because unlike other Writables in Hadoop NullWritable is Singleton.
2. –files
This option is used during run time to distribute files in cache. Only small files should be placed on distributed cache. When you specify multiple files to be loaded on to distributed cache they have to be specified separated by comma. Make sure that there are no spaces between file names and comma. These files would be retrieved by the task trackers into their local file system before the execution of tasks.
great example...!!!
ReplyDeletewow....... nice explanation
ReplyDeletei am looking for more examples.....
ReplyDeleteHi Ramanjaneya
ReplyDeleteYou can get a variety of examples relating to different practical cases in my blog. Just browse through :)
simple nice explaination...
ReplyDeletewhat if the lookup files are very big and cannot be cached..
here is a scenario http://blog.matthewrathbone.com/2013/02/09/real-world-hadoop-implementing-a-left-outer-join-in-hadoop-map-reduce.html
but i could not understand it at all..since i have just started learning hadoop...would be great if you can explain this in simpler way as you always do..
well crafted, you really nailed it !!
ReplyDeleteYes really good. You have used Old Api, are you planning to explain the examples using new hadoop api.
ReplyDeleteBest Big Data Hadoop Training in Hyderabad @ Kalyan Orienit
ReplyDeleteFollow the below links to know more knowledge on Hadoop
WebSites:
================
Hadoop Training in Hyderabad
Hadoop Training in Hyderabad
Hadoop Training in Hyderabad
Videos:
===============
Hadoop Training in Hyderabad
Hadoop Training in Hyderabad
Hadoop Training in Hyderabad
Hadoop Training in Hyderabad
Hadoop Training in Hyderabad
Hadoop Training in Hyderabad
Best Big Data Hadoop Training in Hyderabad @ Kalyan Orienit