PySpark
It’s Electric!
What is PySpark?
PySpark exposes the Apache Spark programming model to Python through a feature rich API. Utilize the ease of python scripting for your next parallel computing cluster task in machine learning, SQL, graph analytics and streaming.
What is cluster computing?
Method for parallel computing, where large data is parsed into smaller pieces, and processed in cluster nodes. Cluster nodes are slaves controlled by a master, which delegates data delivery and reception after processing.
Why is it useful?
It’s an optimized solution for very large data sets and processes that are too large and computationally expensive for a single machine or small set up.
Spark Jobs?
A directed acyclic graph of procedural work flows.
Resilient Distributed Datasets
Core data structure used in the Spark architecture. This is a fundamental reason for using Spark. RDD’s allow the parallel operations to be performed, increasing the speed of the executions.
Getting Started
Spark Context
How do we connect to a cluster? We first create an instance of SparkContext after importing the module.
from pyspark import SparkContext
sc = SparkContext()
Spark Session
We can now create an interface to our connection by creating a new SparkSession
to make calls to our spark cluster.
Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
spark = sc.SparkSession()
Alternatively, we could err on the side of caution and utilize .getOrCreate()
which either creates a new session, or accesses
a pre-existing session as seen below.
spark = SparkSession.builder.getOrCreate()
Querying Data
Building Queries
Construct a query as a string using standard SQL syntax, save as a query
variable for convenience.
query = "FROM table SELECT *"
Requesting a Query
Pass the query to spark.sql()
and save the results.
table_query = spark.sql(query)
Viewing Query Results
Append .show()
to your requested query to view your instantiated table.
table_query.show()
Create DataFrames
Convert Query Results to a DataFrame
Append .toPandas()
to a query result to convert format to a Pandas DataFrame.
df = table_query.toPandas()
Clusters
Convert DataFrame to a Spark Cluster
Pass a dataframe into spark.createDataFrame()
as an argument to generate a Spark Cluster from your initial DataFrame.
spark_df = spark.createDataFrame(df)
Tables
Create temporary table
To initialize a temporary table, chain .createOrReplaceTempView("title")
to a Spark Cluster DatFrame
spark_df.createOrReplaceTempView("temp")
Creating a New Table
Generate a new table by simply calling the .table()
module from the Spark library
table = spark.table("title")
Add a New Column
You can add a new column to an existing table with two parameters defining the new column’s name, and a series of data as shown below.
table = table.withColumn("new_Column", table.oldColumn + 1)
Viewing Available Tables
You can take a peek at the tables you have available with the .listTables()
feature.
print(spark.catalog.listTables())
Convert CSV file to Spark Cluster
Set Target File
Create a string with the location and name of your file.
file_path = "filepath//filename.csv"
Read the File Data
Pass the file path to .read.csv()
with an optional parameter to use headers from the CSV file.
fileContents = spark.read.csv(file_path, header=True)
Display The Data
View your data by using .show()
fileContents.show()