### Lab 8-2 helper code # imports import pyspark from pyspark.sql import SparkSession import pyspark.sql.types as tp import pandas as pd # setup import pyspark.sql.SparkSession # import the SparkSession class spark = SparkSession.builder.getOrCreate() # KATZENJAMMER dataset files filenames = {"albums":"/data/katzenjammer/Albums.csv", "band": "/data/katzenjammer/Band.csv", "instruments": "/data/katzenjammer/Instruments.csv", "performance": "/data/katzenjammer/Performance.csv", "songs": "/data/katzenjammer/Songs.csv", "tracklists": "/data/katzenjammer/Tracklists.csv", "vocals": "/data/katzenjammer/Vocals.csv"} attributeLists = {"albums": [tp.StructField("AId",tp.IntegerType()), tp.StructField("Title", tp.StringType()), tp.StructField("Year", tp.IntegerType()), tp.StructField("Label",tp.StringType()), tp.StructField("Type", tp.StringType())], "band": [tp.StructField("Id", tp.IntegerType()), tp.StructField("Firstname", tp.StringType()), tp.StructField("Lastname", tp.StringType())], "instruments": [tp.StructField("SongId", tp.IntegerType()), tp.StructField("BandmateId", tp.IntegerType()), tp.StructField("Instrument", tp.StringType())], "performance": [tp.StructField("SongId", tp.IntegerType()), tp.StructField("BandMate", tp.IntegerType()), tp.StructField("StagePosition", tp.StringType())], "songs": [tp.StructField("SongId", tp.IntegerType()), tp.StructField("Title", tp.StringType())], "tracklists": [tp.StructField("AlbumId", tp.IntegerType()), tp.StructField("Position", tp.IntegerType()), tp.StructField("SongId", tp.IntegerType())], "vocals": [tp.StructField("SongId", tp.IntegerType()), tp.StructField("Bandmate", tp.IntegerType()), tp.StructField("Type", tp.StringType())] } schemas = {key: tp.StructType(alist) for key,alist in zip(attributeLists, attributeLists.values())} # Data Frames albumsDF = spark.read.format("csv").schema(schemas["albums"]).option("header",True).option("quote", "'").load(filenames["albums"]) bandDF = spark.read.format("csv").schema(schemas["band"]).option("header",True).option("quote","'").load(filenames["band"]) instrumentsDF = spark.read.format("csv").schema(schemas["instruments"]).option("header",True).option("quote","'").load(filenames["instruments"]) performanceDF = spark.read.format("csv").schema(schemas["performance"]).option("header",True).option("quote","'").load(filenames["performance"]) songsDF = spark.read.format("csv").schema(schemas["songs"]).option("header",True).option("quote","'").load(filenames["songs"]) tracklistsDF = spark.read.format("csv").schema(schemas["tracklists"]).option("header",True).option("quote",".").load(filenames["tracklists"]) vocalsDF = spark.read.format("csv").schema(schemas["vocals"]).option("header",True).option("quote","'").load(filenames["vocals"])