Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.

Assignment 9: Spark

Due Dec 10, 2023, 11:59PM

Assignment 9 focuses on using Apache Spark for doing large-scale data analysis tasks. For this assignment, we will use relatively small datasets and we won't run anything in distributed mode; however Spark can be easily used to run the same programs on much larger datasets.

Setup

Download files for Assignment 9 here.

Getting Started with Spark

This guide is basically a summary of the excellent tutorials that can be found at the Spark website.

Apache Spark is a relatively new cluster computing framework, developed originally at UC Berkeley. It significantly generalizes the 2-stage Map-Reduce paradigm (originally proposed by Google and popularized by open-source Hadoop system); Spark is instead based on the abstraction of resilient distributed datasets (RDDs). An RDD is basically a distributed collection of items, that can be created in a variety of ways. Spark provides a set of operations to transform one or more RDDs into an output RDD, and analysis tasks are written as chains of these operations.

Spark can be used with the Hadoop ecosystem, including the HDFS file system and the YARN resource manager.

Vagrant

This is a fine way to do this project, though Docker is a bit more streamlined if you already have docker locally.

As before, we have provided a VagrantFile in the assignment9 directory. You can try to use the included spark distribution directly. If this does not work, you might be better off using the docker approach.

This step is included in the VagrantFile, but if you get any error related to $SPARKHOME, you can set the variable with:
export SPARKHOME=/vagrant/spark-3.0.1-bin-hadoop2.7 and then
echo "export SPARKHOME=/vagrant/spark-3.0.1-bin-hadoop2.7" >> .bashrc

We are ready to use Spark.

Docker

This is the recommended way to run the project if you have Apple silicon, and gives you a chance to learn about containers at the same time. Docker Get Started describes setup. Probably works w/ windows as well, but I have no direct experience with that.

Steps:

  • Install docker
  • Build your image: docker build -t assign9 .
  • Start a container based on that image, and attach to a bash shell in it: docker run -v "$(PWD)":/assign9 -it assign9 (docker run -v `pwd`:/assign9 -it assign9 for tcsh).
    • You will drop right into /assign9, which is where the enclosing directory is mounted in the container.
    • Any changes you make either in this container directory, or outside in the shell of your host machine are reflected on the other side.
    • The container will shut down as soon as you exit the shell.
    • Clean exited containers via docker container prune -f.
    • Ignore the version of spark in the distro.

Spark and Python

Spark primarily supports three languages: Scala (Spark is written in Scala), Java, and Python. We will use Python here -- you can follow the instructions at the tutorial and quick start (http://spark.apache.org/docs/latest/quick-start.html) for other languages. The Java equivalent code can be very verbose and hard to follow. The below shows a way to use the Python interface through the standard Python shell.

PySpark Shell

You can also use the PySpark Shell directly.

  1. $SPARKHOME/bin/pyspark: This will start a Python shell (it will also output a bunch of stuff about what Spark is doing). The relevant variables are initialized in this python shell, but otherwise it is just a standard Python shell.

  2. >>> textFile = sc.textFile("Dockerfile"): This creates a new RDD, called textFile, by reading data from a local file. The sc.textFile commands create an RDD containing one entry per line in the file.

  3. You can see some information about the RDD by doing textFile.count() or textFile.first(), or textFile.take(5) (which prints an array containing 5 items from the RDD).

  4. We recommend you follow the rest of the commands in the quick start guide (http://spark.apache.org/docs/latest/quick-start.html). Here we will simply do the Word Count application.

Word Count Application

The following command (in the pyspark shell) does a word count, i.e., it counts the number of times each word appears in the file Dockerfile. Use counts.take(5) to see the output.

>>> counts = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)

In more detail, from the docker container created as above:

root@d36910b1feb0:/assign9# $SPARKHOME/bin/pyspark
Python 3.10.12 (main, Jun 11 2023, 05:26:28) [GCC 11.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/02 12:35:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
      /_/

Using Python version 3.10.12 (main, Jun 11 2023 05:26:28)
Spark context Web UI available at http://d36910b1feb0:4040
Spark context available as 'sc' (master = local[*], app id = local-1701520517201).
SparkSession available as 'spark'.
>>> textFile = sc.textFile("Dockerfile")
>>> counts = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
>>> counts.take(5)
[('#', 9), ('Use', 1), ('as', 1), ('image', 1), ('', 35)]

Here is the same code without the use of lambda functions.

def split(line):
    return line.split(" ")
def generateone(word):
    return (word, 1)
def sum(a, b):
    return a + b

textfile.flatMap(split).map(generateone).reduceByKey(sum)

The flatmap splits each line into words, and the following map and reduce do the counting (we will discuss this in the class, but here is an excellent and detailed description: Hadoop Map-Reduce Tutorial (look for Walk-Through).

The lambda representation is more compact and preferable, especially for small functions, but for large functions, it is better to separate out the definitions.

Running as an Application

Instead of using a shell, you can also write your code as a python file, and submit that to the spark cluster. The assignment9 directory contains a python file wordcount.py, which runs the program in a local mode. To run the program, do: $SPARKHOME/bin/spark-submit wordcount.py

More...

We encourage you to look at the Spark Programming Guide and play with the other RDD manipulation commands. You should also try out the Scala and Java interfaces.

Assignment Details

We have provided a Python file: assignment.py, that initializes the folllowing RDDs:

  • An RDD consisting of lines from a Shakespeare play (play.txt)
  • An RDD consisting of lines from a log file (NASA_logs_sample.txt)
  • An RDD consisting of 2-tuples indicating user-product ratings from Amazon Dataset (amazon-ratings.txt)
  • An RDD consisting of JSON documents pertaining to all the Noble Laureates over last few years (prize.json)

Your tasks are to fill out the six functions defined in functions.py (starting with task). The amount of code that you write will typically be small (several would be one-liners), with the exception of the last one.

All tasks are worth a single point each.

  • Task 1: This function takes as input the amazonInputRDD and calculate the proportion of 1.0 rating review out of all reviews made by each customer. The output will be an RDD where the key is the customer's user id, and the value is the proportion in decimal. This can be completed by using aggregateByKey or reduceByKey along with map.

  • Task 2: Write just the flatmap function (task2_flatmap) that takes in a parsed JSON document (from prize.json) and returns the surnames of the Nobel Laureates. In other words, the following command should create an RDD with all the surnames. We will use json.loads to parse the JSONs (this is already done). Make sure to look at what it returns so you know how to access the information inside the parsed JSONs (these are basically nested dictionaries). (https://docs.python.org/2/library/json.html)

     	task2_result = nobelRDD.map(json.loads).flatMap(task2_flatmap)
  • Task 3: This function operates on the logsRDD. It takes as input a list of dates and returns an RDD with "hosts" that were present in the log on all of those dates. The dates would be provided as strings, in the same format that they appear in the logs (e.g., '01/Jul/1995' and '02/Jul/1995'). The format of the log entries should be self-explanatory, but here are more details if you need: NASA Logs Try to minimize the number of RDDs you end up creating.

  • Task 4: On the logsRDD, for two given days (provided as input analogous to Task 9 above), use a 'cogroup' to create the following RDD: the key of the RDD will be a host, and the value will be a 2-tuple, where the first element is a list of all URLs fetched from that host on the first day, and the second element is the list of all URLs fetched from that host on the second day. Use filter to first create two RDDs from the input logsRDD.

  • Task 5: NLP often needs to preprocess the input data and can benefit a lot from cluster computing. Tokenization is the process of chopping up the raw text. Bigrams are sequences of two consecutive words. For example, the previous sentence contains the following bigrams: "Bigrams are", "are simply", "simply sequences", "sequences of", etc. Your task here is to tokenize each line by using punctuation and space to find tokens that consist solely of alphanumeric letters (e.g. Task 5: I'm easy. will be tokenized into ["Task", "5", "I", "m", "easy"]); and count the appearance of each bigram of such tokens. The return value should be a RDD where the key is a bigram, and the value is its count.

  • Task 6: Define a character definition as a line in play.txt that starts and ends w/ a '*', and contains nothing but whitespace and upper-case letters. Define the attribution of a line in the file as either "none", if there have been no character definitions, and the last character definition otherwise. Create an RDD with all characters that have been defined, together w/ their attribution counts as values. For example:

silly
*HOLIDAY*
nice
guy
*HARDEN*
jerk
*HOLIDAY*
Again, a good guy

should result in:

  • ("HARDEN", 1)
  • ("HOLIDAY", 3)
  • ("none", 1) though RDDs are unordered.

Sample results.txt File

You can use spark-submit to run the assignment.py file, but it would be easier to develop with pyspark (by copying the commands over).

results.txt shows the results of running assignment.py on our code using: $SPARKHOME/bin/spark-submit assignment.py

Submission

Submit the functions.py file on gradescope.