### Lab 8-2 helper code

# imports

import pyspark

from pyspark.sql import SparkSession

import pyspark.sql.types as tp
import pandas as pd


# setup
import pyspark.sql.SparkSession  # import the SparkSession class

spark = SparkSession.builder.getOrCreate()




# KATZENJAMMER dataset files

filenames = {"albums":"/data/katzenjammer/Albums.csv",
             "band": "/data/katzenjammer/Band.csv",
             "instruments": "/data/katzenjammer/Instruments.csv",
             "performance": "/data/katzenjammer/Performance.csv",
             "songs": "/data/katzenjammer/Songs.csv",
             "tracklists": "/data/katzenjammer/Tracklists.csv",
             "vocals": "/data/katzenjammer/Vocals.csv"}

attributeLists = {"albums": [tp.StructField("AId",tp.IntegerType()),
                             tp.StructField("Title", tp.StringType()),
                             tp.StructField("Year", tp.IntegerType()),
                             tp.StructField("Label",tp.StringType()),
                             tp.StructField("Type", tp.StringType())],
                  "band": [tp.StructField("Id", tp.IntegerType()),
                           tp.StructField("Firstname", tp.StringType()),
                           tp.StructField("Lastname", tp.StringType())],
                  "instruments": [tp.StructField("SongId", tp.IntegerType()),
                                  tp.StructField("BandmateId", tp.IntegerType()),
                                  tp.StructField("Instrument", tp.StringType())],
                   "performance": [tp.StructField("SongId", tp.IntegerType()),
                                   tp.StructField("BandMate", tp.IntegerType()),
                                   tp.StructField("StagePosition", tp.StringType())],
                    "songs": [tp.StructField("SongId", tp.IntegerType()),
                              tp.StructField("Title", tp.StringType())],
                     "tracklists": [tp.StructField("AlbumId", tp.IntegerType()),
                                    tp.StructField("Position", tp.IntegerType()),
                                    tp.StructField("SongId", tp.IntegerType())],
                     "vocals": [tp.StructField("SongId", tp.IntegerType()),
                                tp.StructField("Bandmate", tp.IntegerType()),
                                tp.StructField("Type", tp.StringType())]
                      }

schemas = {key: tp.StructType(alist) for key,alist in zip(attributeLists, 
                attributeLists.values())}

# Data Frames

albumsDF = spark.read.format("csv").schema(schemas["albums"]).option("header",True).option("quote", "'").load(filenames["albums"])
bandDF = spark.read.format("csv").schema(schemas["band"]).option("header",True).option("quote","'").load(filenames["band"])
instrumentsDF = spark.read.format("csv").schema(schemas["instruments"]).option("header",True).option("quote","'").load(filenames["instruments"])
performanceDF = spark.read.format("csv").schema(schemas["performance"]).option("header",True).option("quote","'").load(filenames["performance"])
songsDF = spark.read.format("csv").schema(schemas["songs"]).option("header",True).option("quote","'").load(filenames["songs"])
tracklistsDF = spark.read.format("csv").schema(schemas["tracklists"]).option("header",True).option("quote",".").load(filenames["tracklists"])
vocalsDF = spark.read.format("csv").schema(schemas["vocals"]).option("header",True).option("quote","'").load(filenames["vocals"])