// CSC 369: Distributed Computing
// Alex Dekhtyar
// KeyValueFileTextInputFormat test


import java.io.IOException;
import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.Text;        // Hadoop's serialized String wrapper class
import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function
import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function
import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // class for "pointing" at input file(s)
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file
import org.apache.hadoop.fs.Path;                // Hadoop's implementation of directory path/filename

// Import KeyValueTextInputFormat

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; 
import org.apache.hadoop.conf.Configuration; 



public class KeyValueTest {

// Using Text for both keys and values

public static class KVMapper 
     extends Mapper< Text, Text, Text, Text > {

@Override 

public void map(Text key, Text value, Context context)
      throws IOException, InterruptedException { 

      context.write(key, value);

 } // map

} // Mapper Class


public static class KVReducer extends  Reducer< Text, Text, Text, Text> {

@Override 

public void reduce( Text key, Iterable<Text> values, Context context)
     throws IOException, InterruptedException {

  String name = "";

  for (Text val : values) {
    name = val.toString();
  } // for
  context.write(key, new Text(name));   
 
 } // reduce

} // reducer


//  MapReduce Driver
  public static void main(String[] args) throws Exception {
   
      // Need configuration to set up the key-value separator
      Configuration conf = new Configuration();
      conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",","); 


       Job  job = Job.getInstance(conf);  
       job.setJarByClass(KeyValueTest.class); 
       KeyValueTextInputFormat.addInputPath(job, new Path("test/", "data.csv")); 
       FileOutputFormat.setOutputPath(job, new Path("./test/","kv-output")); 
 
      // Override default input format
      job.setInputFormatClass(KeyValueTextInputFormat.class);            
      //-----------------------------------------------------

      job.setMapperClass(KVMapper.class);
      job.setReducerClass(KVReducer.class);
      job.setOutputKeyClass(Text.class); 
      job.setOutputValueClass(Text.class); 
      job.setJobName("Key Value Input File Test");
      System.exit(job.waitForCompletion(true) ? 0:1);

  } // main()

} // MyMapReduceDriver