// CSC 369: Distributed Computing // Alex Dekhtyar // Graph Scan import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.Text; // Hadoop's serialized String wrapper class import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; // class for "pointing" at input file(s) import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file import org.apache.hadoop.fs.Path; // Hadoop's implementation of directory path/filename import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object import java.io.IOException; // This graph scan computes the difference between // the number of outgoing and incoming edges for each node in the graph public class TwitterTest { public static class TMapper extends Mapper< LongWritable, Text, LongWritable, LongWritable > { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String v = value.toString(); if (v.charAt(0) != '#') { String record[] = v.split("\\s+"); if (record.length == 2) { int from = Integer.parseInt(record[0]); int to = Integer.parseInt(record[1]); context.write(new LongWritable(from), new LongWritable(1)); context.write(new LongWritable(to), new LongWritable(-1)); } } } // map } // MyMapperClass public static class TReducer extends Reducer< LongWritable, LongWritable, LongWritable, LongWritable> { @Override public void reduce( LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { long cn = 0; for (LongWritable val : values) { cn = cn+ val.get(); } // for context.write(key, new LongWritable(cn)); } // reduce } // reducer // MapReduce Driver public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // force some splits conf.setInt("mapreduce.input.lineinputformat.linespermap",50000); Job job = Job.getInstance(conf); job.setJarByClass(TwitterTest.class); NLineInputFormat.addInputPath(job, new Path("/data/", args[0])); FileOutputFormat.setOutputPath(job, new Path("./test/","tout")); job.setInputFormatClass(NLineInputFormat.class); job.setMapperClass(TMapper.class); job.setReducerClass(TReducer.class); // Set number of reducers job.setNumReduceTasks(5); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); job.setJobName("Twitter Graph Test"); System.exit(job.waitForCompletion(true) ? 0:1); } // main() } // MyMapReduceDriver