Spark

Setup

Install scala 2.11 and sbt:

$ brew install scala sbt

Incidentally, I have been using emr-5.0.0. The Hadoop and Spark versions are 2.7 and 2.0. If you intend to run on EMR, it is important that your jar should be compiled with the same version of Scala used to compile Spark, and this includes both the local Spark library and the Spark installed on the EMR cluster.

Download Spark 2.0. If a uncompiled version was downloaded, compile it with:

$ ./build/mvn -DskipTests clean package

The bin directory contains two useful scripts: bin/spark-shell and bin/spark-submit. You might want to put these in your search path. Symlinks aren't good enough. Here are the two scripts that I put in my search path:

#!/bin/bash

cd /Users/clark/Local/src/spark-2.0.0

exec ./bin/spark-shell "$@"

and

#!/bin/bash

cd /Users/clark/Local/src/spark-2.0.0

exec ./bin/spark-submit "$@"

Resilient Distributed Datasets (RDDs)

RDD API

Read a file on the local file system into an RDD:

import org.apache.spark.SparkContext

val sc = new SparkContext()
val lines = sc.textFile("/PATH/TO/file.txt")

lines is of type org.apache.spark.rdd.RDD[String]. Each line in the source file becomes a String in the RDD.

An RDD provides first and take methods for getting the first and the first N records.

Here is an example of using Spark to process and /etc/passwd file:

val lines = sc.textFile("/etc/passwd")
val validLines = lines.filter((line) => !line.startsWith("#"))
val data = validLines.map((line) => line.split(':'))
data.map((row) => row.mkString("\t")).saveAsTextFile("/tmp/passwd.d");

After reading in the file, the comments are removed. Then the data is transformed to a dataset of type RDD[Array[String]].

Finally the data is converted back to RDD[String] so it can be saved to the directory /tmp/passwd.d. In the output file, the fields are tab delimited instead of colon delimited.

An Array can be converted to an RDD:

val rdd = sc.parallelize(Array("foo\tbar", "baz\tquux", "wombat\twumpus"))

Using cache()

How to read or write a CSV

Datasets

Data Frames

In Spark 2.0, a DataFrame is a Dataset of Row.

Load the data into a Spark grid:

$ spark-shell
scala> val df = spark.read.json("/Users/clark/samplehose.en.2016-04-14-03.json.gz")

Display the JSON Schema:

scala> df.printSchema()
root
 |-- _augmentations: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- actor: struct (nullable = true)
 |    |-- activityCount: long (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- displayName: string (nullable = true)
 |    |-- followerCount: long (nullable = true)
 |    |-- followingCount: long (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- image: struct (nullable = true)
 |    |    |-- height: long (nullable = true)
 |    |    |-- objectType: string (nullable = true)
...

How many records have positive sentiment:

scala> df.filter(df("meta.sentiment") > 0).count()
res9: Long = 114

Distribution of sentiment values:

scala> df.groupBy("meta.sentiment").count().show()
sentiment count
0         301  
-1        33   
null      5    
-2        15   
1         28   
-3        30   
2         6    
-4        157  
3         4    
-5        13   
4         3    
5         7    
-6        38   
6         53   
-7        4    
7         7    
-9        22   
8         2    
-10       3    
10        3

create DataFrame from Dataset (or RDD?) createOrReplaceTempView

Map/Reduce Equivalents

I've been disappointed at times with the underlying implementation of some of the RDD methods. For example, the takeSample method seems to try to send the entire RDD to a single process, causing a failure if the RDD is large.

If you know how to implement a job as a Map/Reduce job in an efficient way, it can be replicated in Spark using map() and reduceByKey(), with some provisos. The map() lambda should return (key, value) pairs.

If you need a mapper which does not return exactly one output pair per input, use flatMap() instead of map().

If the reducer needs to change the key value in the output, use groupByKey() instead of reduceByKey().

Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License