From 06e04a4c509b236afb93c78bc3abe2476dff534b Mon Sep 17 00:00:00 2001 From: "Peter J. Keleher" <keleher@cs.umd.edu> Date: Mon, 30 Sep 2024 11:35:55 -0400 Subject: [PATCH] auto --- assign4.md | 285 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 assign4.md diff --git a/assign4.md b/assign4.md new file mode 100644 index 0000000..4356be3 --- /dev/null +++ b/assign4.md @@ -0,0 +1,285 @@ +## Assignment 4: Spark + +Assignment 4 focuses on using Apache Spark for doing large-scale data analysis +tasks. For this assignment, we will use relatively small datasets and we won't +run anything in distributed mode; however Spark can be easily used to run the +same programs on much larger datasets. + +## Setup + +Download files for Assignment 4 <a href="https://ceres.cs.umd.edu/424/assign/assignment4Dist.tgz?2">here</a>. + +## Getting Started with Spark + +This guide is basically a summary of the excellent tutorials that can be found +at the [Spark website](http://spark.apache.org). + +[Apache Spark](https://spark.apache.org) is a relatively new cluster computing +framework, developed originally at UC Berkeley. It significantly generalizes the +2-stage Map-Reduce paradigm (originally proposed by Google and popularized by +open-source Hadoop system); Spark is instead based on the abstraction of +**resilient distributed datasets (RDDs)**. An RDD is basically a distributed +collection of items, that can be created in a variety of ways. Spark provides a +set of operations to transform one or more RDDs into an output RDD, and analysis +tasks are written as chains of these operations. + +Spark can be used with the Hadoop ecosystem, including the HDFS file system and +the YARN resource manager. + + +### Setup + +Download the startup files [here](https://ceres.cs.umd.edu/424/assign/assignment4Dist.tgz?2). + +As before, use the Dockerfile to create and start an image: +- `docker build --rm -t 424 .` +- `docker run -it -v $(pwd):/424 424` (``docker run -it -v `pwd`:/424 424`` for tcsh) + + +## Spark and Python + +Spark primarily supports three languages: Scala (Spark is written in Scala), +Java, and Python. We will use Python here -- you can follow the instructions at +the tutorial and quick start +(http://spark.apache.org/docs/latest/quick-start.html) for other languages. The +Java equivalent code can be very verbose and hard to follow. The below shows a +way to use the Python interface through the standard Python shell. + +### PySpark Shell + +You can also use the PySpark Shell directly. + +1. `$SPARKHOME/bin/pyspark`: This will start a Python shell (it will also output +a bunch of stuff about what Spark is doing). The relevant variables are +initialized in this python shell, but otherwise it is just a standard Python +shell. + +2. `>>> textFile = sc.textFile("Dockerfile")`: This creates a new RDD, called +`textFile`, by reading data from a local file. The `sc.textFile` commands create +an RDD containing one entry per line in the file. + +3. You can see some information about the RDD by doing `textFile.count()` or + `textFile.first()`, or `textFile.take(5)` (which prints an array containing 5 + items from the RDD). + +4. We recommend you follow the rest of the commands in the quick start guide +(http://spark.apache.org/docs/latest/quick-start.html). Here we will simply do +the Word Count application. + +#### Word Count Application + +The following command (in the pyspark shell) does a word count, i.e., it counts +the number of times each word appears in the file `Dockerfile`. Use +`counts.take(5)` to see the output. + +`>>> counts = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: +(word, 1)).reduceByKey(lambda a, b: a + b)` + +In more detail, from the docker container created as above: +``` +root@c509f18fe2e3:/424# $SPARKHOME/bin/pyspark +Python 3.10.12 (main, Jul 29 2024, 16:56:48) [GCC 11.4.0] on linux +Type "help", "copyright", "credits" or "license" for more information. +Setting default log level to "WARN". +To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). +24/08/23 17:17:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable +Welcome to + ____ __ + / __/__ ___ _____/ /__ + _\ \/ _ \/ _ `/ __/ '_/ + /__ / .__/\_,_/_/ /_/\_\ version 3.5.2 + /_/ + +Using Python version 3.10.12 (main, Jul 29 2024 16:56:48) +Spark context Web UI available at http://c509f18fe2e3:4040 +Spark context available as 'sc' (master = local[*], app id = local-1724433466996). +SparkSession available as 'spark'. +>>> textFile = sc.textFile("Dockerfile") +>>> textFile.take(5) +['# Use Ubuntu 22.04 as the base image', 'FROM ubuntu:22.04', '', '# Set the working directory', 'WORKDIR /424'] +>>> counts = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) +Traceback (most recent call last): + File "<stdin>", line 1, in <module> +NameError: name 'textFile' is not defined. Did you mean: 'textFile'? +>>> +>>> counts = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b) +>>> +>>> counts.take(5) +[('#', 9), ('Use', 1), ('as', 1), ('image', 1), ('', 35)] +>>> +``` + +Here is the same code without the use of `lambda` functions. + +``` +def split(line): + return line.split(" ") + +def generateone(word): + return (word, 1) + +def sum(a, b): + return a + b + +counts = textFile.flatMap(split).map(generateone).reduceByKey(sum) + +counts.take(5) +``` + +The `flatmap` splits each line into words, and the following `map` and `reduce` +do the counting (we will discuss this in the class, but here is an excellent and +detailed description: [Hadoop Map-Reduce +Tutorial](http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html#Source+Code) +(look for Walk-Through). + +The `lambda` representation is more compact and preferable, especially for small +functions, but for large functions, it is better to separate out the +definitions. + +### Running as an Application + +Instead of using a shell, you can also write your code as a python file, and +*submit* that to the spark cluster. The assignment distro contains a +python file `wordcount.py`, which runs the program in a local mode. To run the +program, do: `$SPARKHOME/bin/spark-submit wordcount.py`. This creates +a directory `output`, containing a file that indicates success or +failure, and another file that contains the output: +``` +root@c509f18fe2e3:/424# $SPARKHOME/bin/spark-submit wordcount.py +24/08/23 17:28:04 INFO SparkContext: Running Spark version 3.5.2 +24/08/23 17:28:04 INFO SparkContext: OS info Linux, 6.10.0-linuxkit, aarch64 +24/08/23 17:28:04 INFO SparkContext: Java version 11.0.24 +24/08/23 17:28:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable +24/08/23 17:28:04 INFO ResourceUtils: ============================================================== +24/08/23 17:28:04 INFO ResourceUtils: No custom resources configured for spark.driver. +24/08/23 17:28:04 INFO ResourceUtils: ============================================================== +24/08/23 17:28:04 INFO SparkContext: Submitted application: Simple App +24/08/23 17:28:04 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) +24/08/23 17:28:04 INFO ResourceProfile: Limiting resource is cpu +24/08/23 17:28:04 INFO ResourceProfileManager: Added ResourceProfile id: 0 +24/08/23 17:28:04 INFO SecurityManager: Changing view acls to: root + + .... + +root@c509f18fe2e3:/424# ls -l output +total 4 +-rw-r--r-- 1 root root 0 Aug 23 17:25 _SUCCESS +-rw-r--r-- 1 root root 1477 Aug 23 17:25 part-00000 +root@c509f18fe2e3:/424# cat output/part-00000 +('', 35) +('"alias', 2) +('#', 9) +('&&', 5) +('-C', 1) +('-rf', 1) +('-xzf', 1) +('-y', 1) +('/', 1) +('/424', 1) +('/root/.bashrc', 2) + + .... + +root@c509f18fe2e3:/424# +``` + +### More... + +We encourage you to look at the [Spark Programming +Guide](https://spark.apache.org/docs/latest/programming-guide.html) and play +with the other RDD manipulation commands. You should also try out the Scala and +Java interfaces. + +## Assignment Details + +We have provided a Python file: `assignment.py`, that initializes the folllowing +RDDs: +* An RDD consisting of lines from a Shakespeare play (`play.txt`) +* An RDD consisting of lines from a log file (`NASA_logs_sample.txt`) +* An RDD consisting of 2-tuples indicating user-product ratings from Amazon + Dataset (`amazon-ratings.txt`) +* An RDD consisting of JSON documents pertaining to all the Noble Laureates over + last few years (`prize.json`) + +Your tasks are to fill out the six functions defined in +`functions.py` (starting with `task`). The amount of code that you write +will typically be small (several would be one-liners), with the exception of +the last one. + +All tasks are worth a single point each. + +- **Task 1**: This function takes as input the amazonInputRDD and calculate the + proportion of 1.0 rating review out of all reviews made by each customer. The + output will be an RDD where the key is the customer's user id, and the value + is the proportion in decimal. This can be completed by using `aggregateByKey` + or `reduceByKey` along with `map`. + +- **Task 2**: Write just the flatmap function (`task2_flatmap`) that takes in a parsed JSON document (from `prize.json`) and returns the surnames of the Nobel Laureates. In other words, the following command should create an RDD with all the surnames. We will use `json.loads` to parse the JSONs (this is already done). Make sure to look at what it returns so you know how to access the information inside the parsed JSONs (these are basically nested dictionaries). (https://docs.python.org/2/library/json.html) +``` + task2_result = nobelRDD.map(json.loads).flatMap(task2_flatmap) +``` + +- **Task 3**: This function operates on the `logsRDD`. It takes as input a list +of *dates* and returns an RDD with "hosts" that were present in the log on all +of those dates. The dates would be provided as strings, in the same format that +they appear in the logs (e.g., '01/Jul/1995' and '02/Jul/1995'). The format of +the log entries should be self-explanatory, but here are more details if you +need: [NASA Logs](http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html) Try to +minimize the number of RDDs you end up creating. + +- **Task 4**: On the `logsRDD`, for two given days (provided as input analogous +to Task 9 above), use a 'cogroup' to create the following RDD: the key of the +RDD will be a host, and the value will be a 2-tuple, where the first element is +a list of all URLs fetched from that host on the first day, and the second +element is the list of all URLs fetched from that host on the second day. Use +`filter` to first create two RDDs from the input `logsRDD`. + +- **Task 5**: NLP often needs to preprocess the input data and can + benefit a lot from cluster + computing. [Tokenization](https://en.wikipedia.org/wiki/Lexical_analysis#Tokenization) + is the process of chopping up the raw + text. [Bigrams](http://en.wikipedia.org/wiki/Bigram) are sequences + of two consecutive words. For example, the previous sentence + contains the following bigrams: "Bigrams are", "are simply", "simply + sequences", "sequences of", etc. Your task here is to tokenize each + line by using punctuation and space to find tokens that consist + solely of alphanumeric letters (e.g. `Task + 5: I'm easy.` will be tokenized into `["Task", "5", "I", "m", + "easy"]`); and count the appearance of each *bigram* of such + tokens. The return value should be a RDD where the key is a bigram, + and the value is its count. + +- **Task 6**: Define a *character definition* as a line in `play.txt` that starts and ends w/ a + '*', and contains nothing but whitespace and upper-case + letters. Define the *attribution* of a line in the file as either + "none", if there have been no character definitions, and the last + character definition otherwise. Create an RDD with all characters + that have been defined, together w/ their attribution counts as + values. For example: +``` +silly +*HOLIDAY* +nice +guy +*HARDEN* +jerk +*HOLIDAY* +Again, a good guy +``` +should result in: +- ("HARDEN", 1) +- ("HOLIDAY", 3) +- ("none", 1) +though RDDs are unordered. + +### Sample results.txt File +You can use spark-submit to run the `assignment.py` file, but it would be easier +to develop with `pyspark` (by copying the commands over). + +**results.txt** shows the results of running assignment.py on our code using: +`$SPARKHOME/bin/spark-submit assignment.py` + +### Submission + +Submit the `functions.py` file [on gradescope](https://www.gradescope.com/courses/811728/assignments/4669995). + -- GitLab