// CSC 369: Distributed Computing
// Alex Dekhtyar
// Simple TextInputFormat test

// this program reads in a file, treats it as a TextInputFormat input
// and prints every key value pair

import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.Text;        // Hadoop's serialized String wrapper class
import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function
import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function
import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; // class for "pointing" at input file(s)
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file
import org.apache.hadoop.fs.Path;                // Hadoop's implementation of directory path/filename
import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object

import java.io.IOException;


public class FITest {

// Note: we use LongWritable keys and Text values everywhere!

public static class FIMapper extends Mapper< LongWritable, Text, LongWritable, Text > {

public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

      context.write(key,value);
 } // map


} // Mapper class


public static class FIReducer  extends  Reducer< LongWritable, Text, LongWritable, Text> {

@Override  // we are overriding the Reducer's reduce() method

public void reduce( LongWritable key, Iterable<Text> values, Context context)
     throws IOException, InterruptedException {

  String name = "";

  for (Text val : values) {
     context.write(key, val);
  } // for
} // reduce

} // reducer class


//  MapReduce Driver

public static void main(String[] args) throws Exception {

     Job  job = Job.getInstance();  
     job.setJarByClass(FITest.class);  

     // This Handling of input DEFAULTS to TextInputFile as the default input format!

     FileInputFormat.addInputPath(job, new Path("./test/", "data.csv")); 
     FileOutputFormat.setOutputPath(job, new Path("./test/","fi-out")); 
     // ----------------------------------------------------------------------------

      job.setMapperClass(FIMapper.class);
      job.setReducerClass(FIReducer.class);
      job.setOutputKeyClass(LongWritable.class); 
      job.setOutputValueClass(Text.class); 
      job.setJobName("TextInputFile Test");

      System.exit(job.waitForCompletion(true) ? 0:1);


  } // main()


} // MapReduce Driver class