// CSC 369: Distributed Computing
// Alex Dekhtyar
// NLineInputFormat test

import java.io.IOException;
import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.Text;        // Hadoop's serialized String wrapper class
import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function
import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function
import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file
import org.apache.hadoop.fs.Path;                // Hadoop's implementation of directory path/filename
import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object


// Import NLineInput Format

import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;


public class NLTest {

  // Using LongWritable for keys, and Text for values
 
public static class NLMapper  
     extends Mapper< LongWritable, Text, LongWritable, Text > {

public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

   context.write(key,value);
} // map

} // Mapper Class


public static class NLReducer 
      extends  Reducer< LongWritable, Text, LongWritable, Text> {

@Override 

public void reduce( LongWritable key, Iterable<Text> values, Context context)
     throws IOException, InterruptedException {

  String name = "";

  for (Text val : values) {
     context.write(key, val);
  } // for

 } // reduce

} // reducer


//  MapReduce Driver

  public static void main(String[] args) throws Exception {

      // Set up the number of lines per file split
  
      Configuration conf = new Configuration();
      conf.setInt("mapreduce.input.lineinputformat.linespermap",4);


      Job  job = Job.getInstance(conf);  //  job = new Job() is now deprecated
      job.setJarByClass(NLTest.class);  
      NLineInputFormat.addInputPath(job, new Path("./test/", "data")); // put what you need as input file
      FileOutputFormat.setOutputPath(job, new Path("./test/","nl-out")); // put what you need as output file
  
      // Override default Input Format

      job.setInputFormatClass(NLineInputFormat.class);   

      job.setMapperClass(NLMapper.class);
      job.setReducerClass(NLReducer.class);
      job.setOutputKeyClass(LongWritable.class); // specify the output class (what reduce() emits) for key
      job.setOutputValueClass(Text.class); // specify the output class (what reduce() emits) for value
      job.setJobName("N Lines Input Test Version");
      System.exit(job.waitForCompletion(true) ? 0:1);

  } // main()

} // MapReduce driver