// CSC 369: Distributed Computing
// Alex Dekhtyar
// Multiple input files

// Section 1: Imports
import java.util.ArrayList;
import java.util.ListIterator;

import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.Text;        // Hadoop's serialized String wrapper class
import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function
import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function
import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // class for "pointing" at input file(s)
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; // class for  standard text input
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; // class for "pointing" at input file(s)
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; // key-value input files
import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object
import org.apache.hadoop.fs.Path;                // Hadoop's implementation of directory path/filename
import java.io.IOException;


public class multiInMR {

// We have TWO mapper classes, one per input file.

// Mapper for User file
public static class UserMapper     
     extends Mapper< Text, Text, Text, Text > {

public void map(Text key, Text value, Context context)
      throws IOException, InterruptedException {

        String name = value.toString();
        String out = "A\t"+name;
        context.write(key, new Text(out));

 } // map

}  // mapper class

// Mapper for messages file
public static class MessageMapper     
     extends Mapper< LongWritable, Text, Text, Text > {


public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {


        String text[] = value.toString().split(",");
        if (text.length == 2) {
          String id = text[0];
          String message = text[1];
          String out = "B\t"+ message;
          context.write(new Text(id), new Text(out));
        }

 } // map

} // MyMapperClass


//  Reducer: we only need one reducer class

public static class JoinReducer 
      extends  Reducer< Text, Text, Text, Text> {


public void reduce( Text key, Iterable<Text> values, Context context)
     throws IOException, InterruptedException {
 
  ArrayList<String> name = new ArrayList();
  ArrayList<String> messages = new ArrayList();

  for (Text val : values) {
    context.write(key, val);
  }

 } // reduce

} // reducer


//  MapReduce Driver


  public static void main(String[] args) throws Exception {

      Configuration conf = new Configuration();
      conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");

     Job  job = Job.getInstance(conf);  
     job.setJarByClass(multiInMR.class);  

   //  Get Multiple Inputs set up.
       MultipleInputs.addInputPath(job, new Path("./test/", "users.in"),
                          KeyValueTextInputFormat.class, UserMapper.class );
       MultipleInputs.addInputPath(job, new Path("./test/", "messages.in"),
                          TextInputFormat.class, MessageMapper.class ); 
 
       FileOutputFormat.setOutputPath(job, new Path("./test/","mout")); // put what you need as output file

      job.setReducerClass(JoinReducer.class);
      job.setOutputKeyClass(Text.class); // specify the output class (what reduce() emits) for key
      job.setOutputValueClass(Text.class); // specify the output class (what reduce() emits) for value

   // step 6: Set up other job parameters at will
      job.setJobName("Reduce Side Join");

   // step 7:  ?

   // step 8: profit
      System.exit(job.waitForCompletion(true) ? 0:1);


  } // main()


} // MyMapReduceDriver