Getting Started

For instructions on obtaining access to the Vanderbilt Big Data Cluster, see this blog post.

Wordcount

Wordcount is the “hello world” of map-reduce jobs; the frequency of each word in a given set of documents, or corpus, is counted, and finally, the list of unique words is sorted by document frequency. The map step of this process consists of teasing out each individual word from a document and emitting a tuple, (word, 1). In the reduce step, tuples are grouped together according to their primary key, which in this case is the word, and the values are summed to get the total occurrences of each word. Finally, the records are sorted by occurrence count.

The Spark Shell

Spark is written in Scala, and Spark distributions provide their own Scala-Spark REPL (Read Evaluate Print Loop), a command-line environment for toying around with code snippets. To this end, let’s start implementing wordcount in the REPL.

Starting the REPL

Spark can run locally on a single machine on \( n \) nodes, it can run as a standalone Spark cluster, and it can run on top of YARN. If you’re using Spark locally, then to initialize the REPL:

$SPARK_HOME/bin/spark-shell

If you’ve connected to the BigData cluster through SFTP (via ssh bigdata.accre.vanderbilt.edu), you’ll need to start the REPL first:

spark-shell

and then specify that you’re using YARN as the application Master:

$SPARK_HOME/bin/spark-shell --master=yarn

At this point, you should see Spark’s splash screen and a Scala prompt:

[username@abd740 ~]$ spark-shell --master=yarn 
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc (master = yarn-client, app id = application_1486918126261_0072).
SQL context available as sqlContext.

These last two lines indicate the entry points for using the Spark API, the Spark context sc and the SQL context sqlContext. We won’t be using the SQL context in this tutorial but we will invoke the Spark context, which we can examine by simply typing sc into the Scala REPL:

scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@49d98c3e

scala> 

Creating an RDD

We’re going to use the textFile method of the SparkContext sc to load in a file in the /data directory of HDFS (note that we could also read in a local file as well) and store it as a Resilient Distributed Dataset (RDD):

scala> val lines = sc.textFile("hdfs:///data/Spark_README.md")
lines: org.apache.spark.rdd.RDD[String] = hdfs:///data/Spark_README.md MapPartitionsRDD[5] at textFile at <console>:27

The REPL indicates that lines is now a MapPartitionsRDD. In the grand scheme of things, mapping to different partitions is the key to the map-reduce paradigm; instead of sending data to a single node and processing it in chunks, the program is sent to the node(s) where the data resides on disk and is executed in parallel fashion. Pretty cool right?

Note that lines is declared as val for value; in addition to being a strongly-typed language, Scala emphasizes functional programming and immutable state. Thus, values can be thought of as immutable variables. Once a value is created, it’s properties cannot be changed.

Transformations

To demonstrate our mapping capabilities, we’ll first use the RDD method flatMap; when our RDD lines was created, Spark separated the input text at line endings so that each element of the RDD corresponds to a single line of text (this can easily be verified by executing lines.take(5) which will print the first 5 elements of the RDD). Now we want to take each line and break it up into words, and for this we can use an anonymous function line => line.split("\\s+") which takes a string and splits it at whitespace, storing the non-whitespace in an array of strings. But since we don’t care about lines but only words in this problem, we’d rather have just an RDD of strings with each element corresponding to a word; this is precisely what flatMap does, mapping one string to potentially many strings:

scala> val words = lines.flatMap(line => line.split("\\s+"))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at flatMap at <console>:29

scala> words take 5 foreach println
#
Apache
Spark

Spark
scala> 

Since we have whitespace in the elements, we can go ahead and filter that out:

scala> words.filter(word => word.length > 0) take 5 foreach println
#
Apache
Spark
Spark
is

Scala supports unlimited chaining, so we can just add our filter method where we define words:

scala> val words = lines.flatMap(line => line.split("\\s+")).filter(word => word.length > 0)
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:29

scala> words take 5 foreach println
#
Apache
Spark
Spark
is

Now, we want to count each word, and to do that, we will map each word to a Tuple (word, 1) where the integer 1 signifies that this word has been encounted once at this particular location:

scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:31

scala> pairs take 5 foreach println
(#,1)
(Apache,1)
(Spark,1)
(Spark,1)
(is,1)

Now, we’ve transformed our data for a format suitable for the reduce phase

Reductions

The reduce phase of map-reduce consists of grouping, or aggregating, some data by a key and combining all the data associated with that key. In our example, the keys to group by are just the words themselves, and to get a total occurrence count for each word, we want to sum up all the values (1s) for a given key. To do this, we use the method reduceByKey:

scala> val wordCounts = pairs.reduceByKey((a, b) => a + b) 

Then, if we want sorted results, we need this incantation:

scala> val summary = wordCounts.takeOrdered(10)(Ordering[Int].reverse.on(_._2))
summary: Array[(String, Int)] = Array((the,21), (to,14), (Spark,13), (for,11), (and,10), (##,8), (a,8), (run,7), (can,6), (is,6))

scala> summary.foreach(println)
(the,21)
(to,14)
(Spark,13)
(for,11)
(and,10)
(##,8)
(a,8)
(run,7)
(can,6)
(is,6)

Writing to file

The Scala function println will in fact print to stdout, but in a multinode cluster, this is not a particularly reliable way to write output in a place that is easily reachable. Instead, we can use the RDD method saveAsTextFile to write our file to HDFS. As you might have noticed, our value summary is not an RDD but an ordinary Scala array. So:

scala> val summaryRDD = sc.makeRDD(summary)
summaryRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[11] at makeRDD at <console>:33

Then, to write it to file, we need to specify an output directory (either absolute HDFS path or relative to our own HDFS home directory) that will hold our results in potentially many files named part-XXXXX:

scala> summaryRDD.saveAsTextFile("foo_directory")

Now, if we exit the Spark REPL, we can inquire about our HDFS files like so:

[username@abd740 ~]$ hadoop fs -ls foo_directory
Found 3 items
-rw-r--r--   3 usergroup usergroup          0 2017-02-14 13:04 foo_directory/_SUCCESS
-rw-r--r--   3 usergroup usergroup         46 2017-02-14 13:04 foo_directory/part-00000
-rw-r--r--   3 usergroup usergroup         36 2017-02-14 13:04 foo_directory/part-00001

and view the contents via:

[username@abd740 ~]$ hadoop fs -cat foo_directory/part-*
(the,21)
(to,14)
(Spark,13)
(for,11)
(and,10)
(##,8)
(a,8)
(run,7)
(can,6)
(is,6)

Conclusions

This is an admittedly simple example, but it demonstrates the core components of a map-reduce job. And, it shows off the relative simplicity of writing distributed jobs in Spark. The hadoop fs commands should be pretty intuitive for *nix users.