// CSC 369: Distributed Computing
// Alex Dekhtyar

// This program shows how to create multiple splits of the input file
// to run multiple mappers in parallel
// It also sets a number of reducers

// The payload is the "for each county find how many purchases were made" query

// Section 1: Imports


                  // Data containers for Map() and Reduce() functions

                  // You would import the data types needed for your keys and values
import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.Text;        // Hadoop's serialized String wrapper class


                 // For Map and Reduce jobs

import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function
import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function

                 // To start the MapReduce process

import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver


                // For File "I/O"

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // class for "pointing" at input file(s)
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; // class for "pointing" at input file(s)
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file
import org.apache.hadoop.fs.Path;                // Hadoop's implementation of directory path/filename

// Configuration

import org.apache.hadoop.conf.Configuration;


// Exception handling

import java.io.IOException;


public class NLgroup {


// Mapper  Class 


public static class NLgMapper     // Need to replace the four type labels there with actual Java class names
     extends Mapper< LongWritable, Text, Text, LongWritable > {

// @Override   // we are overriding Mapper's map() method

// map methods takes three input parameters
// first parameter: input key 
// second parameter: input value
// third parameter: container for emitting output key-value pairs

public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

      String record[] = value.toString().split(",");
      String myKey = record[2];
      
      Text outKey = new Text(myKey);
      LongWritable outVal = new LongWritable(1);
      context.write(outKey, outVal);

 } // map


} // MyMapperClass


//  Reducer Class Template

public static class NLgReducer   // needs to replace the four type labels with actual Java class names
      extends  Reducer< Text, LongWritable, Text, LongWritable> {

 // note: InValueType is a type of a single value Reducer will work with
 //       the parameter to reduce() method will be Iterable<InValueType> - i.e. a list of these values

@Override  // we are overriding the Reducer's reduce() method

// reduce takes three input parameters
// first parameter: input key
// second parameter: a list of values associated with the key
// third parameter: container  for emitting output key-value pairs

public void reduce( Text key, Iterable<LongWritable> values, Context context)
     throws IOException, InterruptedException {

  long sum = 0;

  for (LongWritable val : values) {
    sum = sum+1;
  } // for
  
  LongWritable result = new LongWritable(sum);
  // emit final output
  context.write(key, result);   


 } // reduce


} // reducer


//  MapReduce Driver


  // we do everything here in main()
  public static void main(String[] args) throws Exception {

     // Step 0: Configuration set up

     Configuration conf = new Configuration();
     conf.setInt(NLineInputFormat.LINES_PER_MAP, 2500);
     // step 1: get a new MapReduce Job object
     Job  job = Job.getInstance(conf);  //  job = new Job() is now deprecated
     job.setNumReduceTasks(3);
     
    // step 2: register the MapReduce class
      job.setJarByClass(NLgroup.class);  

   //  step 3:  Set Input and Output files
       FileInputFormat.addInputPath(job, new Path("/data/", "iowa.csv")); // put what you need as input file
       job.setInputFormatClass(NLineInputFormat.class);                       // set NLineInputFormat input class
       FileOutputFormat.setOutputPath(job, new Path("./test/","counties")); // put what you need as output file

   // step 4:  Register mapper and reducer
      job.setMapperClass(NLgMapper.class);
      job.setReducerClass(NLgReducer.class);
  
   //  step 5: Set up output information
       job.setOutputKeyClass(Text.class); // specify the output class (what reduce() emits) for key
       job.setOutputValueClass(LongWritable.class); // specify the output class (what reduce() emits) for value

   // step 6: Set up other job parameters at will
      job.setJobName("Group by county");

   // step 7:  ?

   // step 8: profit
      System.exit(job.waitForCompletion(true) ? 0:1);


  } // main()


} // MyMapReduceDriver