// CSC 369: Distributed Computing
// Alex Dekhtyar
// FindMax: use of Top K MapReduce pattern to find
//  largest number in a sequence.

import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.Text;        // Hadoop's serialized String wrapper class
import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function
import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function
import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; // class for "pointing" at input file(s)
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file
import org.apache.hadoop.fs.Path;                // Hadoop's implementation of directory path/filename
import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object


// Exception handling

import java.io.IOException;


public class FindMax {


// Mapper  Class Template


public static class MaxMapper     // Need to replace the four type labels there with actual Java class names
     extends Mapper< LongWritable, Text, LongWritable, Text > {

private  int currentMax=0;  // current max in a given split

@Override


public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

   String v = value.toString();
   int n = Integer.parseInt(v);
   if (n > currentMax) {
      currentMax = n;
   }

 } // map


protected void cleanup(Context context)
      throws IOException, InterruptedException {

   LongWritable key = new LongWritable(1);
   Text val = new Text(new Integer(currentMax).toString());

   context.write(key, val);

}




} // MyMapperClass


//  Reducer Class Template

public static class MaxReducer   // needs to replace the four type labels with actual Java class names
      extends  Reducer< LongWritable, Text, LongWritable, LongWritable> {

@Override  // we are overriding the Reducer's reduce() method

public void reduce( LongWritable key, Iterable<Text> values, Context context)
     throws IOException, InterruptedException {


  int maxVal = 0;
  int n;
  int j = 0;
  for (Text val : values) {
      n = Integer.parseInt(val.toString());
      if (n > maxVal) {
          maxVal = n; 
      }
      j = j+1; 
  } //for

   context.write(new LongWritable(maxVal), new LongWritable(j));

 } // reduce


} // reducer


//  MapReduce Driver


  // we do everything here in main()
  public static void main(String[] args) throws Exception {

      Configuration conf = new Configuration();
      conf.setInt("mapreduce.input.lineinputformat.linespermap",100);

     // step 1: get a new MapReduce Job object
     Job  job = Job.getInstance(conf);  //  job = new Job() is now deprecated
     
    // step 2: register the MapReduce class
      job.setJarByClass(FindMax.class);  

   //  step 3:  Set Input and Output files
       
       NLineInputFormat.addInputPath(job, new Path("/data/", "numbers.txt")); // put what you need as input file
       FileOutputFormat.setOutputPath(job, new Path("./test/","max-out")); // put what you need as output file
       job.setInputFormatClass(NLineInputFormat.class);   

   // step 4:  Register mapper and reducer
      job.setMapperClass(MaxMapper.class);
      job.setReducerClass(MaxReducer.class);
  
   //  step 5: Set up output information
       job.setMapOutputKeyClass(LongWritable.class);
       job.setMapOutputValueClass(Text.class);
       job.setOutputKeyClass(LongWritable.class); // specify the output class (what reduce() emits) for key
       job.setOutputValueClass(LongWritable.class); // specify the output class (what reduce() emits) for value

   // step 6: Set up other job parameters at will
      job.setJobName("Max of Array of Numbers");

   // step 7:  ?

   // step 8: profit
      System.exit(job.waitForCompletion(true) ? 0:1);


  } // main()


} // MyMapReduceDriver