// CSC 369: Distributed Computing
// Alex Dekhtyar

//  Two Hadoop Jobs Chained together

// Section 1: Imports
import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.Text;        // Hadoop's serialized String wrapper class
import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function
import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function
import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // class for "pointing" at input file(s)
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file
import org.apache.hadoop.fs.Path;                // Hadoop's implementation of directory path/filename
import java.io.IOException;


public class filter {

// First MapReduce Mapper

public static class counterMapper     
     extends Mapper< LongWritable, Text, Text, LongWritable > {


public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

   context.write(value, new LongWritable(1));

 } // map

} //counterMapper

// First MapReduce Reducer

public static class counterReducer  
      extends  Reducer< Text, LongWritable, Text, LongWritable> {

@Override  

public void reduce( Text key, Iterable<LongWritable> values, Context context)
     throws IOException, InterruptedException {

// output the word with the number of its occurrences
  long sum = 0; 

 for(LongWritable one : values) {
   sum = sum+ one.get();
 } 
 context.write(key, new LongWritable(sum));

 } // reduce
} // reducer


// MapReduce Mapper 2

public static class totalCountMapper  
     extends Mapper< LongWritable, Text, LongWritable, LongWritable > {

public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

   context.write(new LongWritable(1), new LongWritable(1));

 } // map

} // totalCountMapper

// MapReduce Reducer 2

public static class totalCountReducer 
      extends  Reducer< LongWritable, LongWritable, LongWritable, LongWritable> {

@Override

public void reduce( LongWritable key, Iterable<LongWritable> values, Context context)
     throws IOException, InterruptedException {

  long sum = 0; 

 // count number of unique words

 for(LongWritable one : values) {
   sum = sum+ one.get();
 }
 
 context.write(key, new LongWritable(sum));

 } // reduce
} // reducer



//  MapReduce Driver

public static void main(String[] args) throws Exception {

 // MapReduce Job #1

     Job  job = Job.getInstance(); 
     job.setJarByClass(filter.class);  

     FileInputFormat.addInputPath(job, new Path("./test/", "words")); // put what you need as input file
     FileOutputFormat.setOutputPath(job, new Path("./test/","counts")); // put what you need as output file
     job.setMapperClass(counterMapper.class);
     job.setReducerClass(counterReducer.class);
     job.setOutputKeyClass(Text.class); // specify the output class (what reduce() emits) for key
     job.setOutputValueClass(LongWritable.class); // specify the output class (what reduce() emits) for value
     job.setJobName("Chains");
     job.waitForCompletion(true);

 // MapReduce Job #2

   Job countAllJob = Job.getInstance();
   countAllJob.setJarByClass(filter.class);

   // Note how we are chaining inputs
   FileInputFormat.addInputPath(countAllJob, new Path("./test/counts", "part-r-00000"));
   FileOutputFormat.setOutputPath(countAllJob, new Path("./test/","totals")); 

   countAllJob.setMapperClass(totalCountMapper.class);
   countAllJob.setReducerClass(totalCountReducer.class);
   countAllJob.setOutputKeyClass(LongWritable.class);
   countAllJob.setOutputValueClass(LongWritable.class);
   countAllJob.setJobName("Count em All!");
    
   System.exit(countAllJob.waitForCompletion(true) ? 0: 1);
   

  } // main()


} // MyMapReduceDriver