// CSC 369: Distributed Computing
// Alex Dekhtyar
// Graph Scan

import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class
import org.apache.hadoop.io.Text;        // Hadoop's serialized String wrapper class

import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function
import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function

import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver

import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; // class for "pointing" at input file(s)
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file
import org.apache.hadoop.fs.Path;                // Hadoop's implementation of directory path/filename
import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object
import java.io.IOException;


// This graph scan computes the difference between 
// the number of outgoing and incoming edges for each node in the graph

public class TwitterTest {

public static class TMapper 
     extends Mapper< LongWritable, Text, LongWritable, LongWritable > {

public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {

   String v = value.toString();
   if (v.charAt(0) != '#') {
      String record[] = v.split("\\s+");
      if (record.length == 2) {
         int from = Integer.parseInt(record[0]);
         int to = Integer.parseInt(record[1]);

         context.write(new LongWritable(from), new LongWritable(1));
         context.write(new LongWritable(to), new LongWritable(-1));
      }
   }

 } // map

} // MyMapperClass



public static class TReducer
      extends  Reducer< LongWritable, LongWritable, LongWritable, LongWritable> {

@Override

public void reduce( LongWritable key, Iterable<LongWritable> values, Context context)
     throws IOException, InterruptedException {

  long cn = 0;

  for (LongWritable val : values) {
      cn = cn+ val.get();   

  } // for

  context.write(key, new LongWritable(cn));

 } // reduce

} // reducer


//  MapReduce Driver


  public static void main(String[] args) throws Exception {

      Configuration conf = new Configuration();

      // force some splits
      conf.setInt("mapreduce.input.lineinputformat.linespermap",50000);

      Job  job = Job.getInstance(conf);  
      job.setJarByClass(TwitterTest.class);  

      NLineInputFormat.addInputPath(job, new Path("/data/", args[0])); 
      FileOutputFormat.setOutputPath(job, new Path("./test/","tout")); 
      job.setInputFormatClass(NLineInputFormat.class);   

      job.setMapperClass(TMapper.class);
      job.setReducerClass(TReducer.class);
 

      // Set number of reducers
      job.setNumReduceTasks(5); 
 
      job.setMapOutputKeyClass(LongWritable.class);
      job.setMapOutputValueClass(LongWritable.class);
      job.setOutputKeyClass(LongWritable.class); 
      job.setOutputValueClass(LongWritable.class); 

      job.setJobName("Twitter Graph Test");
      System.exit(job.waitForCompletion(true) ? 0:1);


  } // main()


} // MyMapReduceDriver