PySpark

Spark 2.0 was released in July 2016. Changes from the release notes:

  • Unifying DataFrame and Dataset: In Scala and Java, DataFrame and Dataset have been unified, i.e. DataFrame is just a type alias for Dataset of Row. In Python and R, given the lack of type safety, DataFrame is the main programming interface.
  • SparkSession: new entry point that replaces the old SQLContext and HiveContext for DataFrame and Dataset APIs. SQLContext and HiveContext are kept for backward compatibility.

Documentation

Setup

$ virtualenv --python=python2.7 ve
$ . ve/bin/activate
$ pip install pyspark

PySpark

pyspark

pyspark is a Python module and a command line tool.

import pyspark

# A SparkSession can be used to create DataFrame, register DataFrame as tables,
# execute SQL over tables, cache tables, and read parquet files.
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

# A SparkContext represents the connection to a Spark cluster,
# and can be used to create RDD and broadcast variables on that cluster.
sc = pyspark.SparkContext()

The pyspark command line tool creates a SparkSession object named spark and a SparkContext object named sc for you as if the above code were executed at start up.

jobs

DataFrames

reading

The SparkSession object has a read attribute containing a DataFrameReader object. This object in turn has methods for creating a DataFrame object from files on the file system:

rows and columns

DataFrame

A DataFrame object represents a relational set of data. count returns the number of rows:

$ cat data.json 
{"name": "John", "age": 32}
{"name": "Mary", "age": 27}

$ pyspark
>>> df = spark.read.json('data.json')
>>> df.count()
2

>>> df.columns
['age', 'name']
>>> df.age
Column<age>
>>> df['age']
Column<age>
>>> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

>>> df.head(1)
Row(_corrupt_record=None, age=32, name=u'John')
>>> df.show()
+---+----+
|age|name|
+---+----+
| 32|John|
| 27|Mary|
+---+----+

collect

to a list of rows

from a list of rows with parallize

filter

>>> df.filter("age > 30").collect()
[Row(age=32, name=u'John')]

>>> df.filter(df.age > 30).collect()
[Row(age=32, name=u'John')]

The first form takes a string which gets evaluated. There is an identifier for the columns.

The second form is a bit surprising. The pyspark.sql.column.Column class has overridden some of the operators to return pyspark.sql.column.Column values.

select

The select method takes as arguments strings or pyspark.sql.column.Column objects. If an argument is a string, it must refer to one of the columns:

>>> df.select('age').collect()
[Row(age=32), Row(age=27)]
>>> df.select(df.age).collect()
[Row(age=32), Row(age=27)]
>>> df.select('*').collect()
[Row(age=32, name=u'John'), Row(age=27, name=u'Mary')]
>>> df.select('*', df.age + 1).collect()
[Row(age=32, name=u'John', (age + 1)=33), Row(age=27, name=u'Mary', (age + 1)=28)]

The selectExpr takes as arguments strings or pyspark.sql.column.Column objects. If the argument is a string it is evaluated:

>>> df.selectExpr('*', 'age + 1').collect()
[Row(age=32, name=u'John', (age + 1)=33), Row(age=27, name=u'Mary', (age + 1)=28)]

groupBy

groupBy takes as arguments pyspark.sql.column.Column objects, which can be referenced by their names.

$ cat data.json 
{"name": "John", "age": 32, "children": {"Billy": 7, "Sally": 3}, "sex": "male"}
{"name": "Mary", "age": 27, "children": {"Rocco": 2}, "sex": "female"}
{"name": "Lynn", "age": 42, "children": {}, "sex": "female"}

$ pyspark
>>> df = spark.read.json('data.json')
>>> df.groupBy('sex').avg('age').collect()
[Row(sex=u'female', avg(age)=34.5), Row(sex=u'male', avg(age)=32.0)]

join

orderBy

also limit, sort

explode

The function pyspark.sql.functions.explode can be used to deal with data that isn't in 1st normal form:

$ cat data.json
{"name": "John", "age": 32, "children": ["Billy", "Sally"]}
{"name": "Mary", "age": 27, "children": ["Rocco"]}

$ pyspark
>>> import pyspark.sql.functions as f
>>> df = spark.read.json('data.json')
>>> df.select('name', 'age', f.explode('children').alias('child')).collect()
[Row(name=u'John', age=32, child=u'Billy'), Row(name=u'John', age=32, child=u'Sally'), Row(name=u'Mary', age=27, child=u'Rocco')]

Here is an example of using split and explode to deal with data not in 1st normal form when it is serialized as a string. The 2nd argument to split is a regular expression:

$ cat data.json
{"name": "John", "age": 32, "children": "Billy|Sally"}
{"name": "Mary", "age": 27, "children": "Rocco"}

>>> import pyspark.sql.functions as f
>>> df = spark.read.json('data.json')
>>> df.select('name', 'age', f.explode(f.split(df['children'], '\|')).alias('child')).collect()
[Row(name=u'John', age=32, child=u'Billy'), Row(name=u'John', age=32, child=u'Sally'), Row(name=u'Mary', age=27, child=u'Rocco')]

Explode can also deal with dictionaries:

$ cat data.json
{"name": "John", "age": 32, "children": {"Billy": 7, "Sally": 3}}
{"name": "Mary", "age": 27, "children": {"Rocco": 2}}

>>> import pyspark.sql.functions as f
>>> import pyspark.sql.types as t
>>> df = spark.read.json(
    'data.json',
    schema=t.StructType(
        [
            t.StructField('name', t.StringType()),
            t.StructField('age', t.IntegerType()),
            t.StructField('children', t.MapType(t.StringType(), t.IntegerType()))
        ]
    )
)
>>> df.collect()
[Row(name=u'John', age=32, children={u'Billy': 7, u'Sally': 3}), Row(name=u'Mary', age=27, children={u'Rocco': 2})]
>>> df.select('name', 'age', f.explode(df['children']).alias('child_name', 'child_age')).collect()
[Row(name=u'John', age=32, child_name=u'Billy', child_age=7), Row(name=u'John', age=32, child_name=u'Sally', child_age=3), Row(name=u'Mary', age=27, child_name=u'Rocco', child_age=2)]

The opposite of exploding can be done with the collect_list or the collect_set functions. The latter de-dupes the list:

cat data.json 
{"name": "John", "age": 32, "children": {"Billy": 7, "Sally": 3}, "sex": "male"}
{"name": "Mary", "age": 27, "children": {"Rocco": 2}, "sex": "female"}
{"name": "Lynn", "age": 42, "children": {}, "sex": "female"}
{"name": "John", "age": 67, "children": {"Ruth": 34}, "sex": "male"}

$ pyspark
>>> import pyspark.sql.functions as f
>>> df = spark.read.json('data.json')
>>> df.groupBy('sex').agg(f.collect_list('name')).show()
+------+------------------+
|   sex|collect_list(name)|
+------+------------------+
|female|      [Mary, Lynn]|
|  male|      [John, John]|
+------+------------------+

>>> df.groupBy('sex').agg(f.collect_set('name')).show()
+------+-----------------+
|   sex|collect_set(name)|
+------+-----------------+
|female|     [Mary, Lynn]|
|  male|           [John]|
+------+-----------------+

One can also create a map:

$ cat data.json 
{"name": "John", "age": 32, "children": {"Billy": 7, "Sally": 3}, "sex": "male"}
{"name": "Mary", "age": 27, "children": {"Rocco": 2}, "sex": "female"}
{"name": "Lynn", "age": 42, "children": {}, "sex": "female"}
{"name": "Bill", "age": 67, "children": {"Ruth": 34}, "sex": "male"}

$ pyspark
>>> import pyspark.sql.functions as f
>>> df = spark.read.json('data.json')
>>> df.groupBy('sex').agg(f.collect_list(f.create_map('name', 'age'))).show()
+------+----------------------------+
|   sex|collect_list(map(name, age))|
+------+----------------------------+
|female|        [[Mary -> 27], [L...|
|  male|        [[John -> 32], [B...|
+------+----------------------------+

writing

The write attribute returns a DataFrameWriter object with these methods:

SQL

RDD

RDD Programming Guide

UDF

udf

PY4J

https://www.py4j.org/

Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License