// CSC 369: Distributed Computing // Alex Dekhtyar // Java Hadoop Template // Multiple input files // Section 1: Imports // Data containers for Map() and Reduce() functions // You would import the data types needed for your keys and values import java.util.ArrayList; import java.util.ListIterator; import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; // Hadoop's serialized String wrapper class import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // class for "pointing" at input file(s) import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; // class for standard text input import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; // class for "pointing" at input file(s) import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; // key-value input files import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.conf.Configuration; // Hadoop's configuration object import org.apache.hadoop.fs.Path; // Hadoop's implementation of directory path/filename import java.net.URI; import java.io.BufferedReader; import java.io.FileReader; import java.io.File; import java.util.Set; import java.util.Iterator; import java.util.HashMap; // Exception handling import java.io.IOException; public class dCacheDemo { public static class JoinMapper // Need to replace the four type labels there with actual Java class names extends Mapper< LongWritable, Text, LongWritable, Text > { HashMap users = new HashMap(); protected void setup(Context context) throws IOException, InterruptedException{ // retrieve distributed cache file info try { URI cacheFiles[] = context.getCacheFiles(); URI fileURI = cacheFiles[0]; Path filePath = new Path(fileURI); // USE getName() to extract file name!!!!!! File f = new File(filePath.getName()); BufferedReader file = new BufferedReader(new FileReader(f)); String line = ""; while ((line = file.readLine()) != null) { String[] record = line.split(","); String key = record[0]; String value = record[1]; users.put(key,value); } } catch (Exception e) { context.write(new LongWritable(999), new Text(e.toString())); } } // setup public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String v = value.toString(); String[] record = v.split(","); String message = record[1]; String user = record[0]; String name = ""; // map side join with distributed cache content if (users.containsKey(user)) { name = users.get(user); } else { name = "ANONYMOUS"; } String output = record[0]+"," + message + ","+ name; context.write(key, new Text(output)); } // map } // mapper public static class JoinReducer // needs to replace the four type labels with actual Java class names extends Reducer< LongWritable, Text, Text, NullWritable> { public void reduce( LongWritable key, Iterable values, Context context) throws IOException, InterruptedException { NullWritable nll = NullWritable.get(); for (Text val : values) { context.write(val, nll); } } } public static void main(String[] args) throws Exception { // Step 0: let's get configuration Configuration conf = new Configuration(); conf.setInt("mapreduce.input.lineinputformat.linespermap",6); // set the split size // step 1: get a new MapReduce Job object Job job = Job.getInstance(conf); // job = new Job() is now deprecated // step 2: register the MapReduce class job.setJarByClass(dCacheDemo.class); URI dcacheFile = new URI("/data/users.in"); System.out.println(dcacheFile.toString()); // set distributed cache file job.addCacheFile(dcacheFile); FileInputFormat.addInputPath(job, new Path("/data/", "messages.in")); FileOutputFormat.setOutputPath(job, new Path("./test/","dcache-out")); // put what you need as output file job.setInputFormatClass(NLineInputFormat.class); // step 4: Register reducer job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); // step 5: Set up output information job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); // specify the output class (what reduce() emits) for key job.setOutputValueClass(NullWritable.class); // specify the output class (what reduce() emits) for value // step 6: Set up other job parameters at will job.setJobName("Map Side Join With Distributed Cache"); // step 7: ? // step 8: profit System.exit(job.waitForCompletion(true) ? 0:1); } // main() } // MyMapReduceDriver