PySpark


It’s Electric!

What is PySpark?

PySpark exposes the Apache Spark programming model to Python through a feature rich API. Utilize the ease of python scripting for your next parallel computing cluster task in machine learning, SQL, graph analytics and streaming.

What is cluster computing?

Method for parallel computing, where large data is parsed into smaller pieces, and processed in cluster nodes. Cluster nodes are slaves controlled by a master, which delegates data delivery and reception after processing.

Why is it useful?

It’s an optimized solution for very large data sets and processes that are too large and computationally expensive for a single machine or small set up.

Spark Jobs?

A directed acyclic graph of procedural work flows.

Resilient Distributed Datasets

Core data structure used in the Spark architecture. This is a fundamental reason for using Spark. RDD’s allow the parallel operations to be performed, increasing the speed of the executions.

Getting Started

Spark Context

How do we connect to a cluster? We first create an instance of SparkContext after importing the module.

from pyspark import SparkContext

sc = SparkContext()

Spark Session

We can now create an interface to our connection by creating a new SparkSession to make calls to our spark cluster.

Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

spark = sc.SparkSession() 

Alternatively, we could err on the side of caution and utilize .getOrCreate() which either creates a new session, or accesses a pre-existing session as seen below.

spark = SparkSession.builder.getOrCreate()

Querying Data

Building Queries

Construct a query as a string using standard SQL syntax, save as a query variable for convenience.

query = "FROM table SELECT *"

Requesting a Query

Pass the query to spark.sql() and save the results.

table_query = spark.sql(query)

Viewing Query Results

Append .show() to your requested query to view your instantiated table.

table_query.show()

Create DataFrames

Convert Query Results to a DataFrame

Append .toPandas() to a query result to convert format to a Pandas DataFrame.

df = table_query.toPandas()

Clusters

Convert DataFrame to a Spark Cluster

Pass a dataframe into spark.createDataFrame() as an argument to generate a Spark Cluster from your initial DataFrame.

spark_df = spark.createDataFrame(df)

Tables

Create temporary table

To initialize a temporary table, chain .createOrReplaceTempView("title") to a Spark Cluster DatFrame

spark_df.createOrReplaceTempView("temp")

Creating a New Table

Generate a new table by simply calling the .table() module from the Spark library

table = spark.table("title")

Add a New Column

You can add a new column to an existing table with two parameters defining the new column’s name, and a series of data as shown below.

table = table.withColumn("new_Column", table.oldColumn + 1)

Viewing Available Tables

You can take a peek at the tables you have available with the .listTables() feature.

print(spark.catalog.listTables())

Convert CSV file to Spark Cluster

Set Target File

Create a string with the location and name of your file.

file_path = "filepath//filename.csv"

Read the File Data

Pass the file path to .read.csv() with an optional parameter to use headers from the CSV file.

fileContents = spark.read.csv(file_path, header=True)

Display The Data

View your data by using .show()

fileContents.show()
Written on September 1, 2018