Setup
Install scala 2.11 and sbt:
$ brew install scala sbt
Incidentally, I have been using emr-5.0.0. The Hadoop and Spark versions are 2.7 and 2.0. If you intend to run on EMR, it is important that your jar should be compiled with the same version of Scala used to compile Spark, and this includes both the local Spark library and the Spark installed on the EMR cluster.
Download Spark 2.0. If a uncompiled version was downloaded, compile it with:
$ ./build/mvn -DskipTests clean package
The bin directory contains two useful scripts: bin/spark-shell and bin/spark-submit. You might want to put these in your search path. Symlinks aren't good enough. Here are the two scripts that I put in my search path:
#!/bin/bash
cd /Users/clark/Local/src/spark-2.0.0
exec ./bin/spark-shell "$@"
and
#!/bin/bash
cd /Users/clark/Local/src/spark-2.0.0
exec ./bin/spark-submit "$@"
Resilient Distributed Datasets (RDDs)
Read a file on the local file system into an RDD:
import org.apache.spark.SparkContext
val sc = new SparkContext()
val lines = sc.textFile("/PATH/TO/file.txt")
lines is of type org.apache.spark.rdd.RDD[String]. Each line in the source file becomes a String in the RDD.
An RDD provides first and take methods for getting the first and the first N records.
Here is an example of using Spark to process and /etc/passwd file:
val lines = sc.textFile("/etc/passwd")
val validLines = lines.filter((line) => !line.startsWith("#"))
val data = validLines.map((line) => line.split(':'))
data.map((row) => row.mkString("\t")).saveAsTextFile("/tmp/passwd.d");
After reading in the file, the comments are removed. Then the data is transformed to a dataset of type RDD[Array[String]].
Finally the data is converted back to RDD[String] so it can be saved to the directory /tmp/passwd.d. In the output file, the fields are tab delimited instead of colon delimited.
An Array can be converted to an RDD:
val rdd = sc.parallelize(Array("foo\tbar", "baz\tquux", "wombat\twumpus"))
Using cache()
How to read or write a CSV
Datasets
Data Frames
In Spark 2.0, a DataFrame is a Dataset of Row.
Load the data into a Spark grid:
$ spark-shell
scala> val df = spark.read.json("/Users/clark/samplehose.en.2016-04-14-03.json.gz")
Display the JSON Schema:
scala> df.printSchema()
root
|-- _augmentations: array (nullable = true)
| |-- element: string (containsNull = true)
|-- actor: struct (nullable = true)
| |-- activityCount: long (nullable = true)
| |-- description: string (nullable = true)
| |-- displayName: string (nullable = true)
| |-- followerCount: long (nullable = true)
| |-- followingCount: long (nullable = true)
| |-- id: string (nullable = true)
| |-- image: struct (nullable = true)
| | |-- height: long (nullable = true)
| | |-- objectType: string (nullable = true)
...
How many records have positive sentiment:
scala> df.filter(df("meta.sentiment") > 0).count()
res9: Long = 114
Distribution of sentiment values:
scala> df.groupBy("meta.sentiment").count().show()
sentiment count
0 301
-1 33
null 5
-2 15
1 28
-3 30
2 6
-4 157
3 4
-5 13
4 3
5 7
-6 38
6 53
-7 4
7 7
-9 22
8 2
-10 3
10 3
create DataFrame from Dataset (or RDD?) createOrReplaceTempView
Map/Reduce Equivalents
I've been disappointed at times with the underlying implementation of some of the RDD methods. For example, the takeSample method seems to try to send the entire RDD to a single process, causing a failure if the RDD is large.
If you know how to implement a job as a Map/Reduce job in an efficient way, it can be replicated in Spark using map() and reduceByKey(), with some provisos. The map() lambda should return (key, value) pairs.
If you need a mapper which does not return exactly one output pair per input, use flatMap() instead of map().
If the reducer needs to change the key value in the output, use groupByKey() instead of reduceByKey().