// CSC 369: Distributed Computing // Alex Dekhtyar // FindMax: use of Top K MapReduce pattern to find // largest number in a sequence. import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.Text; // Hadoop's serialized String wrapper class import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; // class for "pointing" at input file(s) import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file import org.apache.hadoop.fs.Path; // Hadoop's implementation of directory path/filename import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object // Exception handling import java.io.IOException; public class FindMax { // Mapper Class Template public static class MaxMapper // Need to replace the four type labels there with actual Java class names extends Mapper< LongWritable, Text, LongWritable, Text > { private int currentMax=0; // current max in a given split @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String v = value.toString(); int n = Integer.parseInt(v); if (n > currentMax) { currentMax = n; } } // map protected void cleanup(Context context) throws IOException, InterruptedException { LongWritable key = new LongWritable(1); Text val = new Text(new Integer(currentMax).toString()); context.write(key, val); } } // MyMapperClass // Reducer Class Template public static class MaxReducer // needs to replace the four type labels with actual Java class names extends Reducer< LongWritable, Text, LongWritable, LongWritable> { @Override // we are overriding the Reducer's reduce() method public void reduce( LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { int maxVal = 0; int n; int j = 0; for (Text val : values) { n = Integer.parseInt(val.toString()); if (n > maxVal) { maxVal = n; } j = j+1; } //for context.write(new LongWritable(maxVal), new LongWritable(j)); } // reduce } // reducer // MapReduce Driver // we do everything here in main() public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.setInt("mapreduce.input.lineinputformat.linespermap",100); // step 1: get a new MapReduce Job object Job job = Job.getInstance(conf); // job = new Job() is now deprecated // step 2: register the MapReduce class job.setJarByClass(FindMax.class); // step 3: Set Input and Output files NLineInputFormat.addInputPath(job, new Path("/data/", "numbers.txt")); // put what you need as input file FileOutputFormat.setOutputPath(job, new Path("./test/","max-out")); // put what you need as output file job.setInputFormatClass(NLineInputFormat.class); // step 4: Register mapper and reducer job.setMapperClass(MaxMapper.class); job.setReducerClass(MaxReducer.class); // step 5: Set up output information job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(LongWritable.class); // specify the output class (what reduce() emits) for key job.setOutputValueClass(LongWritable.class); // specify the output class (what reduce() emits) for value // step 6: Set up other job parameters at will job.setJobName("Max of Array of Numbers"); // step 7: ? // step 8: profit System.exit(job.waitForCompletion(true) ? 0:1); } // main() } // MyMapReduceDriver