// CSC 369: Distributed Computing // Alex Dekhtyar import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.Text; // Hadoop's serialized String wrapper class import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; // class for "pointing" at input file(s) import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file import org.apache.hadoop.fs.Path; // Hadoop's implementation of directory path/filename import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object // Exception handling import java.io.IOException; public class CombinerTest { // Mapper Class Template public static class TMapper // Need to replace the four type labels there with actual Java class names extends Mapper< LongWritable, Text, LongWritable, LongWritable > { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String v = value.toString(); if (v.charAt(0) != '#') { String record[] = v.split("\\s+"); if (record.length == 2) { int from = Integer.parseInt(record[0]); int to = Integer.parseInt(record[1]); context.write(new LongWritable(from), new LongWritable(1)); context.write(new LongWritable(to), new LongWritable(-1)); } } } // map } // MyMapperClass // Reducer Class Template public static class TReducer // needs to replace the four type labels with actual Java class names extends Reducer< LongWritable, LongWritable, LongWritable, LongWritable> { @Override // we are overriding the Reducer's reduce() method public void reduce( LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { long cn = 0; for (LongWritable val : values) { cn = cn+ val.get(); } // for context.write(key, new LongWritable(cn)); } // reduce } // reducer // MapReduce Driver // we do everything here in main() public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInt("mapreduce.input.lineinputformat.linespermap",50000); // step 1: get a new MapReduce Job object Job job = Job.getInstance(conf); // step 2: register the MapReduce class job.setJarByClass(CombinerTest.class); // step 3: Set Input and Output files NLineInputFormat.addInputPath(job, new Path("/data/", args[0])); // put what you need as input file FileOutputFormat.setOutputPath(job, new Path("./test/","tcout")); // put what you need as output file job.setInputFormatClass(NLineInputFormat.class); // step 4: Register mapper and reducer job.setMapperClass(TMapper.class); job.setCombinerClass(TReducer.class); job.setReducerClass(TReducer.class); // Set number of reducers job.setNumReduceTasks(5); // step 5: Set up output information job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(LongWritable.class); // specify the output class (what reduce() emits) for key job.setOutputValueClass(LongWritable.class); // specify the output class (what reduce() emits) for value // step 6: Set up other job parameters at will job.setJobName("Twitter Graph Test with Combiners"); // step 7: ? // step 8: profit System.exit(job.waitForCompletion(true) ? 0:1); } // main() } // MyMapReduceDriver