Hi Guys this is my fourth post related to Big Data , and from now onward i will try to post more programs rather than theoretical knowledge , so without wasting much of time lets make some programs of hadoop 🙂

Scenario :-

we need to find out the highest salary of employee with in a particular city .

Example data Set :-  (All values are separated by space(white space) )

003 Amit Delhi India 12000
004 Anil Delhi India 15000
005 Deepak Delhi India 34000
006 Fahed Agra India 45000
007 Ravi Patna India 98777
008 Avinash Punjab India 120000
009 Saajan Punjab India 54000
001 Harit Delhi India 20000
002 Hardy Agra India 20000

we will create same program with mapreduce , pig and hive programming models.

Map Reduce :-

As we know map reduce work on (key,value) pair so :-

For Map phase :-

our output key value will be :- ( city , name + salary )

so that in reduce phase we can split ( name ,salary ) , and compare salary of each employee according to key ( which is city , and that’s what we are looking for ) .

and output for reduce phase will be look like :-

(city , max(sal) name( corresponding name of employee with maximum salary )

Now lets create program

Mapper :-  class name :- empmap

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class empmap extends Mapper<Object, Text, Text, Text> {

public void map(Object key,Text value,Context ctx) throws IOException, InterruptedException
{
String[] arr=value.toString().split(“\\s”);
ctx.write(new Text(arr[2].toString()), new Text((arr[1].toString()) + ” ” +arr[4].toString()));
}
}

Reducer :-  class name :- empreduce

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class empreduce extends Reducer<Text, Text,Text, Text>
{
public void reduce(Text key,Iterable<Text> itr,Context context) throws IOException, InterruptedException
{ int maxsal=0;
String s= “” ;
String sal = ” “;

for (Text val : itr){
String arr[] = val.toString().split(“\\s”);
if (maxsal < Integer.parseInt(arr[1]))
{
maxsal = Integer.parseInt(arr[1]);
sal = arr[1].toString();

s = arr[0].toString();

}

}
context.write(new Text(key), new Text(s.toString() +”  ” + sal.toString()));

}

}

Main Class :-    class name :- empmain 

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public final class empmain {

public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException
{
Configuration conf=new Configuration();
Job job=Job.getInstance(conf,”emain”);
job.setJarByClass(empmain.class);
job.setMapperClass(empmap.class);
//job.setNumReduceTasks(0);
job.setReducerClass(empreduce.class);
//job.setMapOutputKeyClass(Text.class);
////job.setReducerClass(Empreduce.class);
//job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
//job.setOutputValueClass(ArrayWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

To run the program :- 

1. Copy the data set file into  hdfs :-

hadoop fs -copyFromLocal employee /empdata1

2. Run the jar file :-

hadoop jar testemployee.jar empmain /empdata1 /maxsalempout

Output we get will be :- 

a) write following command

hadoop fs -ls /maxsalempout
15/07/07 16:15:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
Found 2 items
-rw-r–r– 1 user supergroup 0 2015-07-07 16:13 /maxsalempout/_SUCCESS
-rw-r–r– 1 user supergroup 71 2015-07-07 16:13 /maxsalempout/part-r-00000

b ) now

hadoop fs -cat /maxsalempout/part-r-00000
15/07/07 16:16:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
Agra Fahed 45000
Delhi Deepak 34000
Patna Ravi 98777
Punjab Avinash 120000

above output is what we are looking for 

Pig :- 

lets create the same program in Pig :-  filename :- emp.pig

in Pig we will use join approach to find out the desired out put :-

sdata = LOAD ‘/empdata1’ using PigStorage (‘ ‘) AS (eno:int,name:chararray,city:chararray,country:chararray,sal:int);
groupcity = GROUP sdata by city ;
d= FOREACH groupcity generate MAX(sdata.sal) as a2 ;
main = JOIN sdata by sal,d by a2 ;
result = FOREACH main generate $0,$1,$2,$3,$4;
dump result;

seems very smallas comparision to MapReduce , hence we should prefer pig for this program :-

OutPut :-

a) command :-

pig emp.pig

Result:-

(5,Deepak,Delhi,India,34000)
(6,Fahed,Agra,India,45000)
(7,Ravi,Patna,India,98777)
(8,Avinash,Punjab,India,120000)

Hive :- 

lets create a program for hive as well :-

we create a table emp  ( hope we all know how to create table in hive with ” ” as fields terminated)

our fields :-

(eno int,name String,city String,country String,sal int)

hive> select * from emp ;
OK
3 Amit Delhi India 12000
4 Anil Delhi India 15000
5 Deepak Delhi India 34000
6 Fahed Agra India 45000
7 Ravi Patna India 98777
8 Avinash Punjab India 120000
9 Saajan Punjab India 54000
1 Harit Delhi India 20000
2 Hardy Agra India 20000

query :- 

select name,city,sal from emp where emp.sal IN (select max(sal) from emp group by city);  

Result :-

Deepak Delhi 34000
Fahed Agra 45000
Ravi Patna 98777
Avinash Punjab 120000

Conclusion :-  From above all we can conclude that , Hive is best for finding out the results , therefore it is strictly recommended to use hive for structured datasets and warehousing .

Thanks for reading ,

please like or comment   ,if u find this post useful or wastage of time , Will get back again with some more programming example.

Thanks.

Cheers .