## CSC 369
## Winter 2019
## Alex Dekhtyar

## Simple example of using Hadoop files

## Join messages.in and users.in on the key (userId)

import pyspark     ## importing pyspark package


## Get Spark Context Object (our spark connection object)

sc = pyspark.SparkContext()

## load messages.in file into an RDD
## we use new Hadoop API, hence newAPIHadoopFile() method call
## Parameters:
##           1. path (on HDFS) to the file
##           2. Hadoop class (full description) for file type
##           3. Hadoop class for key type
##           4. Hadoop class for value type
##           5. Configuration, passed as dictionary 

## Because we are reading KeyValueTextInputFormat, we set the key-value separator in the configuration

messages = sc.newAPIHadoopFile("/data/messages.in", "org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat",
                         "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", 
                          conf= {"mapreduce.input.keyvaluelinerecordreader.key.value.separator":","})

## load users.in file into an RDD
## same order of parameters 

users = sc.newAPIHadoopFile("/data/users.in", "org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat",
                         "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", 
                          conf= {"mapreduce.input.keyvaluelinerecordreader.key.value.separator":","})

##
## Join the two RDDs. 
##
## This computes an Inner Join - message with a userID
## that is not in the users RDD will be removed from the resultig RDD


rdd = messages.join(users)

##
## Save resulting RDD as a collection of text files in the ~/test/sparkJoin directory on HDFS
## 
 
rdd.saveAsTextFile("test/sparkJoin")                             

###
## 
#  if rerunning the program, do not forget to remove ~/test/sparkJoin/*
##
###