// CSC 369: Distributed Computing
// Alex Dekhtyar
// Fixed Record Input format test

import java.io.IOException;
import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.Text;        // Hadoop's serialized String wrapper class
import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function
import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function
import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file
import org.apache.hadoop.fs.Path;                // Hadoop's implementation of directory path/filename
import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object

// Import FixedLengthInputFormat class

import org.apache.hadoop.mapreduce.lib.input.FixedLengthInputFormat;


public class FixedRecordTest {

// For Fixed Record format, use LongWritable for key and BytesWritable for value

public static class FRMapper     
     extends Mapper< LongWritable, BytesWritable, LongWritable, BytesWritable > {

public void map(LongWritable key, BytesWritable value, Context context)
      throws IOException, InterruptedException {

   context.write(key,value);

 } // map

} // Mapper Class


public static class FRReducer  
      extends  Reducer< LongWritable, BytesWritable, LongWritable, BytesWritable> {

@Override 

public void reduce( LongWritable key, Iterable<BytesWritable> values, Context context)
     throws IOException, InterruptedException {

  String name = "";

  for (BytesWritable val : values) {
    
      context.write(key, val);
  } // for

 } // reduce

} // reducer


//  MapReduce Driver

  public static void main(String[] args) throws Exception {


      // Need to set Record Length

      Configuration conf = new Configuration();
      FixedLengthInputFormat.setRecordLength(conf, 5);
  


      Job  job = Job.getInstance(conf);  
      job.setJarByClass(FixedRecordTest.class);  
      FixedLengthInputFormat.addInputPath(job, new Path("./test/", "data.csv")); 
      FileOutputFormat.setOutputPath(job, new Path("./test/","fr-out")); 


      //Override the default input format class 
      job.setInputFormatClass(FixedLengthInputFormat.class);   

      job.setMapperClass(FRMapper.class);
      job.setReducerClass(FRReducer.class);
      job.setOutputKeyClass(LongWritable.class); 
      job.setOutputValueClass(BytesWritable.class);
      job.setJobName("Fixed Record Test");
      System.exit(job.waitForCompletion(true) ? 0:1);

  } // main()

} // MyMapReduceDriver