// CSC 369: Distributed Computing // Alex Dekhtyar // Java Hadoop Template // Multiple input files // Section 1: Imports // Data containers for Map() and Reduce() functions // You would import the data types needed for your keys and values import java.util.ArrayList; import java.util.ListIterator; import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.Text; // Hadoop's serialized String wrapper class import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // class for "pointing" at input file(s) import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; // class for standard text input import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; // class for "pointing" at input file(s) import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; // key-value input files import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object import org.apache.hadoop.fs.Path; // Hadoop's implementation of directory path/filename // Exception handling import java.io.IOException; public class multiInMR { // Mapper Class Template public static class UserMapper // Need to replace the four type labels there with actual Java class names extends Mapper< Text, Text, Text, Text > { // @Override // we are overriding Mapper's map() method // map methods takes three input parameters // first parameter: input key // second parameter: input value // third parameter: container for emitting output key-value pairs public void map(Text key, Text value, Context context) throws IOException, InterruptedException { String name = value.toString(); String out = "A\t"+name; context.write(key, new Text(out)); } // map } // mapper class public static class MessageMapper // Need to replace the four type labels there with actual Java class names extends Mapper< LongWritable, Text, Text, Text > { // @Override // we are overriding Mapper's map() method // map methods takes three input parameters // first parameter: input key // second parameter: input value // third parameter: container for emitting output key-value pairs public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String text[] = value.toString().split(","); if (text.length == 2) { String id = text[0]; String message = text[1]; String out = "B\t"+ message; context.write(new Text(id), new Text(out)); } } // map } // MyMapperClass // Reducer Class Template public static class JoinReducer // needs to replace the four type labels with actual Java class names extends Reducer< Text, Text, Text, Text> { // note: InValueType is a type of a single value Reducer will work with // the parameter to reduce() method will be Iterable - i.e. a list of these values //@Override // we are overriding the Reducer's reduce() method // reduce takes three input parameters // first parameter: input key // second parameter: a list of values associated with the key // third parameter: container for emitting output key-value pairs public void reduce( Text key, Iterable values, Context context) throws IOException, InterruptedException { ArrayList name = new ArrayList(); ArrayList messages = new ArrayList(); for (Text val : values) { context.write(key, val); } // String text[] = val.toString().split("\t"); // if (text.length ==2) { // String v = text[1]; // String label = text[0]; // // if (label.contains("A")) { // name.add(v.toString()); // } // if (label.contains("B")) { // messages.add(v.toString()); // } // } // } // for // ListIterator nameIterator = name.listIterator(); // while (nameIterator.hasNext()) { // String user = nameIterator.next(); // ListIterator msgIterator = messages.listIterator(); // // while (msgIterator.hasNext()) { // String msg = msgIterator.next(); // // context.write(new Text(user), new Text(msg)); // } // } // while (outer) } // reduce } // reducer // MapReduce Driver // we do everything here in main() public static void main(String[] args) throws Exception { // Step 0: let's get configuration Configuration conf = new Configuration(); conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",","); // learn to process two-column CSV files. // step 1: get a new MapReduce Job object Job job = Job.getInstance(conf); // job = new Job() is now deprecated // step 2: register the MapReduce class job.setJarByClass(multiInMR.class); // step 3: Set Input and Output files MultipleInputs.addInputPath(job, new Path("./test/", "users.in"), KeyValueTextInputFormat.class, UserMapper.class ); // put what you need as input file MultipleInputs.addInputPath(job, new Path("./test/", "messages.in"), TextInputFormat.class, MessageMapper.class ); // put what you need as input file FileOutputFormat.setOutputPath(job, new Path("./test/","mout")); // put what you need as output file // step 4: Register reducer job.setReducerClass(JoinReducer.class); // step 5: Set up output information job.setOutputKeyClass(Text.class); // specify the output class (what reduce() emits) for key job.setOutputValueClass(Text.class); // specify the output class (what reduce() emits) for value // step 6: Set up other job parameters at will job.setJobName("Reduce Side Join"); // step 7: ? // step 8: profit System.exit(job.waitForCompletion(true) ? 0:1); } // main() } // MyMapReduceDriver