// CSC 369: Distributed Computing // Alex Dekhtyar // Two Hadoop Jobs Chained together // Section 1: Imports import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.Text; // Hadoop's serialized String wrapper class import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // class for "pointing" at input file(s) import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file import org.apache.hadoop.fs.Path; // Hadoop's implementation of directory path/filename import java.io.IOException; public class filter { // First MapReduce Mapper public static class counterMapper extends Mapper< LongWritable, Text, Text, LongWritable > { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, new LongWritable(1)); } // map } //counterMapper // First MapReduce Reducer public static class counterReducer extends Reducer< Text, LongWritable, Text, LongWritable> { @Override public void reduce( Text key, Iterable values, Context context) throws IOException, InterruptedException { // output the word with the number of its occurrences long sum = 0; for(LongWritable one : values) { sum = sum+ one.get(); } context.write(key, new LongWritable(sum)); } // reduce } // reducer // MapReduce Mapper 2 public static class totalCountMapper extends Mapper< LongWritable, Text, LongWritable, LongWritable > { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(new LongWritable(1), new LongWritable(1)); } // map } // totalCountMapper // MapReduce Reducer 2 public static class totalCountReducer extends Reducer< LongWritable, LongWritable, LongWritable, LongWritable> { @Override public void reduce( LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { long sum = 0; // count number of unique words for(LongWritable one : values) { sum = sum+ one.get(); } context.write(key, new LongWritable(sum)); } // reduce } // reducer // MapReduce Driver public static void main(String[] args) throws Exception { // MapReduce Job #1 Job job = Job.getInstance(); job.setJarByClass(filter.class); FileInputFormat.addInputPath(job, new Path("./test/", "words")); // put what you need as input file FileOutputFormat.setOutputPath(job, new Path("./test/","counts")); // put what you need as output file job.setMapperClass(counterMapper.class); job.setReducerClass(counterReducer.class); job.setOutputKeyClass(Text.class); // specify the output class (what reduce() emits) for key job.setOutputValueClass(LongWritable.class); // specify the output class (what reduce() emits) for value job.setJobName("Chains"); job.waitForCompletion(true); // MapReduce Job #2 Job countAllJob = Job.getInstance(); countAllJob.setJarByClass(filter.class); // Note how we are chaining inputs FileInputFormat.addInputPath(countAllJob, new Path("./test/counts", "part-r-00000")); FileOutputFormat.setOutputPath(countAllJob, new Path("./test/","totals")); countAllJob.setMapperClass(totalCountMapper.class); countAllJob.setReducerClass(totalCountReducer.class); countAllJob.setOutputKeyClass(LongWritable.class); countAllJob.setOutputValueClass(LongWritable.class); countAllJob.setJobName("Count em All!"); System.exit(countAllJob.waitForCompletion(true) ? 0: 1); } // main() } // MyMapReduceDriver