Spark 2.0 was released in July 2016. Changes from the release notes:
- Unifying DataFrame and Dataset: In Scala and Java, DataFrame and Dataset have been unified, i.e. DataFrame is just a type alias for Dataset of Row. In Python and R, given the lack of type safety, DataFrame is the main programming interface.
- SparkSession: new entry point that replaces the old SQLContext and HiveContext for DataFrame and Dataset APIs. SQLContext and HiveContext are kept for backward compatibility.
Documentation
Setup
$ virtualenv --python=python2.7 ve
$ . ve/bin/activate
$ pip install pyspark
PySpark
pyspark
pyspark is a Python module and a command line tool.
import pyspark
# A SparkSession can be used to create DataFrame, register DataFrame as tables,
# execute SQL over tables, cache tables, and read parquet files.
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
# A SparkContext represents the connection to a Spark cluster,
# and can be used to create RDD and broadcast variables on that cluster.
sc = pyspark.SparkContext()
The pyspark command line tool creates a SparkSession object named spark and a SparkContext object named sc for you as if the above code were executed at start up.
jobs
DataFrames
reading
The SparkSession object has a read attribute containing a DataFrameReader object. This object in turn has methods for creating a DataFrame object from files on the file system:
rows and columns
A DataFrame object represents a relational set of data. count returns the number of rows:
$ cat data.json
{"name": "John", "age": 32}
{"name": "Mary", "age": 27}
$ pyspark
>>> df = spark.read.json('data.json')
>>> df.count()
2
>>> df.columns
['age', 'name']
>>> df.age
Column<age>
>>> df['age']
Column<age>
>>> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
>>> df.head(1)
Row(_corrupt_record=None, age=32, name=u'John')
>>> df.show()
+---+----+
|age|name|
+---+----+
| 32|John|
| 27|Mary|
+---+----+
collect
to a list of rows
from a list of rows with parallize
filter
>>> df.filter("age > 30").collect()
[Row(age=32, name=u'John')]
>>> df.filter(df.age > 30).collect()
[Row(age=32, name=u'John')]
The first form takes a string which gets evaluated. There is an identifier for the columns.
The second form is a bit surprising. The pyspark.sql.column.Column class has overridden some of the operators to return pyspark.sql.column.Column values.
select
The select method takes as arguments strings or pyspark.sql.column.Column objects. If an argument is a string, it must refer to one of the columns:
>>> df.select('age').collect()
[Row(age=32), Row(age=27)]
>>> df.select(df.age).collect()
[Row(age=32), Row(age=27)]
>>> df.select('*').collect()
[Row(age=32, name=u'John'), Row(age=27, name=u'Mary')]
>>> df.select('*', df.age + 1).collect()
[Row(age=32, name=u'John', (age + 1)=33), Row(age=27, name=u'Mary', (age + 1)=28)]
The selectExpr takes as arguments strings or pyspark.sql.column.Column objects. If the argument is a string it is evaluated:
>>> df.selectExpr('*', 'age + 1').collect()
[Row(age=32, name=u'John', (age + 1)=33), Row(age=27, name=u'Mary', (age + 1)=28)]
groupBy
groupBy takes as arguments pyspark.sql.column.Column objects, which can be referenced by their names.
$ cat data.json
{"name": "John", "age": 32, "children": {"Billy": 7, "Sally": 3}, "sex": "male"}
{"name": "Mary", "age": 27, "children": {"Rocco": 2}, "sex": "female"}
{"name": "Lynn", "age": 42, "children": {}, "sex": "female"}
$ pyspark
>>> df = spark.read.json('data.json')
>>> df.groupBy('sex').avg('age').collect()
[Row(sex=u'female', avg(age)=34.5), Row(sex=u'male', avg(age)=32.0)]
join
orderBy
also limit, sort
explode
The function pyspark.sql.functions.explode can be used to deal with data that isn't in 1st normal form:
$ cat data.json
{"name": "John", "age": 32, "children": ["Billy", "Sally"]}
{"name": "Mary", "age": 27, "children": ["Rocco"]}
$ pyspark
>>> import pyspark.sql.functions as f
>>> df = spark.read.json('data.json')
>>> df.select('name', 'age', f.explode('children').alias('child')).collect()
[Row(name=u'John', age=32, child=u'Billy'), Row(name=u'John', age=32, child=u'Sally'), Row(name=u'Mary', age=27, child=u'Rocco')]
Here is an example of using split and explode to deal with data not in 1st normal form when it is serialized as a string. The 2nd argument to split is a regular expression:
$ cat data.json
{"name": "John", "age": 32, "children": "Billy|Sally"}
{"name": "Mary", "age": 27, "children": "Rocco"}
>>> import pyspark.sql.functions as f
>>> df = spark.read.json('data.json')
>>> df.select('name', 'age', f.explode(f.split(df['children'], '\|')).alias('child')).collect()
[Row(name=u'John', age=32, child=u'Billy'), Row(name=u'John', age=32, child=u'Sally'), Row(name=u'Mary', age=27, child=u'Rocco')]
Explode can also deal with dictionaries:
$ cat data.json
{"name": "John", "age": 32, "children": {"Billy": 7, "Sally": 3}}
{"name": "Mary", "age": 27, "children": {"Rocco": 2}}
>>> import pyspark.sql.functions as f
>>> import pyspark.sql.types as t
>>> df = spark.read.json(
'data.json',
schema=t.StructType(
[
t.StructField('name', t.StringType()),
t.StructField('age', t.IntegerType()),
t.StructField('children', t.MapType(t.StringType(), t.IntegerType()))
]
)
)
>>> df.collect()
[Row(name=u'John', age=32, children={u'Billy': 7, u'Sally': 3}), Row(name=u'Mary', age=27, children={u'Rocco': 2})]
>>> df.select('name', 'age', f.explode(df['children']).alias('child_name', 'child_age')).collect()
[Row(name=u'John', age=32, child_name=u'Billy', child_age=7), Row(name=u'John', age=32, child_name=u'Sally', child_age=3), Row(name=u'Mary', age=27, child_name=u'Rocco', child_age=2)]
The opposite of exploding can be done with the collect_list or the collect_set functions. The latter de-dupes the list:
cat data.json
{"name": "John", "age": 32, "children": {"Billy": 7, "Sally": 3}, "sex": "male"}
{"name": "Mary", "age": 27, "children": {"Rocco": 2}, "sex": "female"}
{"name": "Lynn", "age": 42, "children": {}, "sex": "female"}
{"name": "John", "age": 67, "children": {"Ruth": 34}, "sex": "male"}
$ pyspark
>>> import pyspark.sql.functions as f
>>> df = spark.read.json('data.json')
>>> df.groupBy('sex').agg(f.collect_list('name')).show()
+------+------------------+
| sex|collect_list(name)|
+------+------------------+
|female| [Mary, Lynn]|
| male| [John, John]|
+------+------------------+
>>> df.groupBy('sex').agg(f.collect_set('name')).show()
+------+-----------------+
| sex|collect_set(name)|
+------+-----------------+
|female| [Mary, Lynn]|
| male| [John]|
+------+-----------------+
One can also create a map:
$ cat data.json
{"name": "John", "age": 32, "children": {"Billy": 7, "Sally": 3}, "sex": "male"}
{"name": "Mary", "age": 27, "children": {"Rocco": 2}, "sex": "female"}
{"name": "Lynn", "age": 42, "children": {}, "sex": "female"}
{"name": "Bill", "age": 67, "children": {"Ruth": 34}, "sex": "male"}
$ pyspark
>>> import pyspark.sql.functions as f
>>> df = spark.read.json('data.json')
>>> df.groupBy('sex').agg(f.collect_list(f.create_map('name', 'age'))).show()
+------+----------------------------+
| sex|collect_list(map(name, age))|
+------+----------------------------+
|female| [[Mary -> 27], [L...|
| male| [[John -> 32], [B...|
+------+----------------------------+
writing
The write attribute returns a DataFrameWriter object with these methods: