// CSC 369: Distributed Computing // Alex Dekhtyar // KeyValueFileTextInputFormat test import java.io.IOException; import org.apache.hadoop.io.IntWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.LongWritable; // Hadoop's serialized int wrapper class import org.apache.hadoop.io.Text; // Hadoop's serialized String wrapper class import org.apache.hadoop.mapreduce.Mapper; // Mapper class to be extended by our Map function import org.apache.hadoop.mapreduce.Reducer; // Reducer class to be extended by our Reduce function import org.apache.hadoop.mapreduce.Job; // the MapReduce job class that is used a the driver import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // class for "pointing" at input file(s) import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // class for "pointing" at output file import org.apache.hadoop.fs.Path; // Hadoop's implementation of directory path/filename // Import KeyValueTextInputFormat import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.conf.Configuration; public class KeyValueTest { // Using Text for both keys and values public static class KVMapper extends Mapper< Text, Text, Text, Text > { @Override public void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, value); } // map } // Mapper Class public static class KVReducer extends Reducer< Text, Text, Text, Text> { @Override public void reduce( Text key, Iterable values, Context context) throws IOException, InterruptedException { String name = ""; for (Text val : values) { name = val.toString(); } // for context.write(key, new Text(name)); } // reduce } // reducer // MapReduce Driver public static void main(String[] args) throws Exception { // Need configuration to set up the key-value separator Configuration conf = new Configuration(); conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",","); Job job = Job.getInstance(conf); job.setJarByClass(KeyValueTest.class); KeyValueTextInputFormat.addInputPath(job, new Path("test/", "data")); FileOutputFormat.setOutputPath(job, new Path("./test/","kv-output")); // Override default input format job.setInputFormatClass(KeyValueTextInputFormat.class); //----------------------------------------------------- job.setMapperClass(KVMapper.class); job.setReducerClass(KVReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setJobName("Key Value Input File Test"); System.exit(job.waitForCompletion(true) ? 0:1); } // main() } // MyMapReduceDriver