## CSC 369 ## Winter 2019 ## Alex Dekhtyar ## Simple example of using Hadoop files ## Join messages.in and users.in on the key (userId) import pyspark ## importing pyspark package ## Get Spark Context Object (our spark connection object) sc = pyspark.SparkContext() ## load messages.in file into an RDD ## we use new Hadoop API, hence newAPIHadoopFile() method call ## Parameters: ## 1. path (on HDFS) to the file ## 2. Hadoop class (full description) for file type ## 3. Hadoop class for key type ## 4. Hadoop class for value type ## 5. Configuration, passed as dictionary ## Because we are reading KeyValueTextInputFormat, we set the key-value separator in the configuration messages = sc.newAPIHadoopFile("/data/messages.in", "org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf= {"mapreduce.input.keyvaluelinerecordreader.key.value.separator":","}) ## load users.in file into an RDD ## same order of parameters users = sc.newAPIHadoopFile("/data/users.in", "org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf= {"mapreduce.input.keyvaluelinerecordreader.key.value.separator":","}) ## ## Join the two RDDs. ## ## This computes an Inner Join - message with a userID ## that is not in the users RDD will be removed from the resultig RDD rdd = messages.join(users) ## ## Save resulting RDD as a collection of text files in the ~/test/sparkJoin directory on HDFS ## rdd.saveAsTextFile("test/sparkJoin") ### ## # if rerunning the program, do not forget to remove ~/test/sparkJoin/* ## ###