// CSC 369: Distributed Computing
// Alex Dekhtyar

// Java Hadoop Template
// Multiple input files

// Section 1: Imports


                  // Data containers for Map() and Reduce() functions

                  // You would import the data types needed for your keys and values

import java.util.ArrayList;
import java.util.ListIterator;

import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;        // Hadoop's serialized String wrapper class
import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function
import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function
import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // class for "pointing" at input file(s)
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; // class for  standard text input
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; // class for "pointing" at input file(s)
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; // key-value input files
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object


import org.apache.hadoop.fs.Path;                // Hadoop's implementation of directory path/filename
import java.net.URI; 
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.File;
import java.util.Set;
import java.util.Iterator;

import java.util.HashMap;

// Exception handling

import java.io.IOException;


public class dCacheDemo {



public static class JoinMapper     // Need to replace the four type labels there with actual Java class names
     extends Mapper< LongWritable, Text, LongWritable, Text > {


HashMap<String, String> users = new HashMap();


protected void setup(Context context)
       throws IOException, InterruptedException{

  // retrieve distributed cache file info

try {

  URI cacheFiles[] = context.getCacheFiles();

  URI fileURI = cacheFiles[0];
  Path filePath = new Path(fileURI);

 //  USE getName() to extract file name!!!!!!
  File f = new File(filePath.getName());

  BufferedReader file = new BufferedReader(new FileReader(f));

  String line = "";


  while ((line = file.readLine()) != null) {
     
    String[] record = line.split(",");
    String key  = record[0];
    String value = record[1];
    users.put(key,value);

  }
 
}
 catch (Exception e) {
   context.write(new LongWritable(999), new Text(e.toString()));

  }


} // setup


public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

        String v  = value.toString();
        String[] record =  v.split(",");
        String  message = record[1];
        
        String user = record[0];
        String name = "";       
 
        // map side join with distributed cache content 
        if (users.containsKey(user)) {
            name = users.get(user);
        }
        else  {
            name = "ANONYMOUS";
        }
        String output = record[0]+"," + message + ","+ name;


       context.write(key, new Text(output));


 } // map


}  // mapper 



public static class JoinReducer   // needs to replace the four type labels with actual Java class names
      extends  Reducer< LongWritable, Text, Text, NullWritable> {


public void reduce( LongWritable key, Iterable<Text> values, Context context)
     throws IOException, InterruptedException {
 
  NullWritable nll = NullWritable.get(); 

  for (Text val : values) {
    context.write(val, nll);

  }

}
}

  public static void main(String[] args) throws Exception {

		// Step 0: let's  get configuration

      Configuration conf = new Configuration();
      conf.setInt("mapreduce.input.lineinputformat.linespermap",6); // set the split size


     // step 1: get a new MapReduce Job object
     Job  job = Job.getInstance(conf);  //  job = new Job() is now deprecated
     
    // step 2: register the MapReduce class
      job.setJarByClass(dCacheDemo.class);  

       URI dcacheFile = new URI("/data/users.in");
       System.out.println(dcacheFile.toString());
       // set distributed cache file
       job.addCacheFile(dcacheFile);

       FileInputFormat.addInputPath(job, new Path("/data/", "messages.in"));


       FileOutputFormat.setOutputPath(job, new Path("./test/","dcache-out")); // put what you need as output file

       job.setInputFormatClass(NLineInputFormat.class); 

   // step 4:  Register  reducer
      job.setMapperClass(JoinMapper.class);
      job.setReducerClass(JoinReducer.class);
  
   //  step 5: Set up output information
       job.setMapOutputKeyClass(LongWritable.class);
       job.setMapOutputValueClass(Text.class);
       job.setOutputKeyClass(Text.class); // specify the output class (what reduce() emits) for key
       job.setOutputValueClass(NullWritable.class); // specify the output class (what reduce() emits) for value

   // step 6: Set up other job parameters at will
      job.setJobName("Map Side Join With Distributed Cache");

   // step 7:  ?

   // step 8: profit
      System.exit(job.waitForCompletion(true) ? 0:1);


  } // main()


} // MyMapReduceDriver