Spark#
A fast and general compute engine (originally for Hadoop data).
Often paired with Hadoop for its distributed filesystem (HDFS), cluster resource management and parallel processing.
Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL (extract, transform, load), Machine Learning, stream processing, and graph computation.
Also communicates well with some databases and other resources.
Installation of Spark and its dependencies is explained in the Installation chapter.
# Set environment variables for PySpark (system and version dependent!)
# if not already set persistently (e.g., in .bashrc or .bash_profile or Windows environment variables)
import os
# Set the Java home path to the one you are using ((un)comment and edit as needed):
# os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/zulu-18.jdk/Contents/Home" (Liland's old Mac)
# os.environ["JAVA_HOME"] = "C:/Program Files/Java/jdk1.8.0_281" # or similar on Windows (Liland's Windows)
os.environ["JAVA_HOME"] = "/Library/Internet Plug-Ins/JavaAppletPlugin.plugin/Contents/Home" # Liland's Mac
# If you are using environments in Python, you can set the environment variables like the alternative below.
# The default Python environment is used if the variables are set to "python" (edit if needed):
os.environ["PYSPARK_PYTHON"] = "python" # or similar to "/Users/kristian/miniforge3/envs/tf_M1/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python" # or similar to "/Users/kristian/miniforge3/envs/tf_M1/bin/python"
# On Windows you need to specify where the Hadoop drivers are located (uncomment and edit if needed):
# os.environ["HADOOP_HOME"] = "C:/Hadoop/hadoop-3.3.1"
# Set the Hadoop version to the one you are using, e.g., none:
os.environ["PYSPARK_HADOOP_VERSION"] = "without"
Spark and Cassandra#
Cassandra is one of the databases that work well with Spark.
Same type of distributed processing.
Same way of replicating for fault tolerance.
Spark can be deployed on the same nodes as Cassandra for:
local (short traveled) data manipulation, and
combination of results to a central hub (MapReduce).
Requires drivers from Datastax
Automatically downloaded and applied with the following configuration.
A SparkSession instantiates Spark, applies configurations and connects to a data source.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkCassandraApp').\
config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.5.1').\
config('spark.cassandra.connection.host', 'localhost').\
config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
config('spark.cassandra.connection.port', '9042').getOrCreate()
# Some warnings are to be expected.
# If running this cell does not give any output after ~30 seconds, there is likely an error in the configuration (JAVA_HOME, HADOOP_HOME, etc.).
:: loading settings :: url = jar:file:/Users/kristian/miniforge3/envs/IND320_2024/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/kristian/.ivy2/cache
The jars for the packages stored in: /Users/kristian/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5f44c7b6-773f-4f66-8ca5-9b13137aba0f;1.0
confs: [default]
found com.datastax.spark#spark-cassandra-connector_2.12;3.5.1 in central
found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.1 in central
found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
found org.apache.cassandra#java-driver-core-shaded;4.18.1 in central
found com.datastax.oss#native-protocol;1.5.1 in central
found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
found com.typesafe#config;1.4.1 in central
found org.slf4j#slf4j-api;1.7.26 in central
found io.dropwizard.metrics#metrics-core;4.1.18 in central
found org.hdrhistogram#HdrHistogram;2.1.12 in central
found org.reactivestreams#reactive-streams;1.0.3 in central
found org.apache.cassandra#java-driver-mapper-runtime;4.18.1 in central
found org.apache.cassandra#java-driver-query-builder;4.18.1 in central
found org.apache.commons#commons-lang3;3.10 in central
found com.thoughtworks.paranamer#paranamer;2.8 in central
found org.scala-lang#scala-reflect;2.12.19 in central
:: resolution report :: resolve 1979ms :: artifacts dl 102ms
:: modules in use:
com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 from central in [default]
com.datastax.oss#native-protocol;1.5.1 from central in [default]
com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.1 from central in [default]
com.datastax.spark#spark-cassandra-connector_2.12;3.5.1 from central in [default]
com.thoughtworks.paranamer#paranamer;2.8 from central in [default]
com.typesafe#config;1.4.1 from central in [default]
io.dropwizard.metrics#metrics-core;4.1.18 from central in [default]
org.apache.cassandra#java-driver-core-shaded;4.18.1 from central in [default]
org.apache.cassandra#java-driver-mapper-runtime;4.18.1 from central in [default]
org.apache.cassandra#java-driver-query-builder;4.18.1 from central in [default]
org.apache.commons#commons-lang3;3.10 from central in [default]
org.hdrhistogram#HdrHistogram;2.1.12 from central in [default]
org.reactivestreams#reactive-streams;1.0.3 from central in [default]
org.scala-lang#scala-reflect;2.12.19 from central in [default]
org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 from central in [default]
org.slf4j#slf4j-api;1.7.26 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 16 | 0 | 0 | 0 || 16 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-5f44c7b6-773f-4f66-8ca5-9b13137aba0f
confs: [default]
0 artifacts copied, 16 already retrieved (0kB/60ms)
24/12/04 21:21:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Accessing tables#
Note: The following sets of commands assume that the Cassandra notebook has been run first to set up the relevant keyspace and tables.
# .load() is used to load data from Cassandra table as a Spark DataFrame.
spark.read.format("org.apache.spark.sql.cassandra").options(table="my_first_table", keyspace="my_first_keyspace").load().show()
[Stage 0:> (0 + 0) / 1]
[Stage 0:> (0 + 1) / 1]
+---+----------+-------+
|ind| company| model|
+---+----------+-------+
| 1| Tesla|Model S|
| 2| Tesla|Model 3|
| 3| Polestar| 3|
| 4|Volkswagen| ID.4|
+---+----------+-------+
Database views#
Useful for “setting the scene” before a more simplified data extraction.
The below example simply attaches to the correct keyspace and table.
The view could also be a selection into that table to query further.
# Create view for simpler SQL queries
spark.read.format("org.apache.spark.sql.cassandra").options(table="table_with_uuid", keyspace="my_first_keyspace").load().createOrReplaceTempView("my_first_table_view")
Spark DataFrame#
Related to a Pandas data frame, but can be distributed over compute nodes.
Various functions like filters, statistical calculations, groupBy, Pandas functions (mapInPandas), joins, etc.
Export to Pandas and JSON.
Reads many formats, including SQL, JSON, Excel, …
# Read CSV file into Spark DataFrame
planets = spark.read.csv("../../data/planets.csv", header=True, inferSchema=True)
planets.show()
+-------+---------+---------+
| planet| distance| diameter|
+-------+---------+---------+
|Mercury| 0.387 AU| 4878 km|
| Venus| 0.723 AU| 12104 km|
| Earth| 1.000 AU| 12756 km|
| Mars| 1.524 AU| 6787 km|
|Jupiter| 5.203 AU|142796 km|
| Saturn| 9.546 AU|120660 km|
| Uranus|19.218 AU| 51118 km|
|Neptune|30.069 AU| 48600 km|
+-------+---------+---------+
# Select only Tesla company
# DataFrame -->
spark.sql("select * from my_first_table_view").filter("company = 'Tesla'").show()
+--------------------+-------+-------+-------+
| id|company| model| price|
+--------------------+-------+-------+-------+
|54956df0-b27d-11e...| Tesla|Model S|20000.0|
|5496cd80-b27d-11e...| Tesla|Model S|21000.0|
+--------------------+-------+-------+-------+
# Equivalent to the above but in pure SQL
spark.sql("select * from my_first_table_view where company = 'Tesla'").show()
+--------------------+-------+-------+-------+
| id|company| model| price|
+--------------------+-------+-------+-------+
|54956df0-b27d-11e...| Tesla|Model S|20000.0|
|5496cd80-b27d-11e...| Tesla|Model S|21000.0|
+--------------------+-------+-------+-------+
# Select all data from the view and convert it to Pandas DataFrame
spark.sql("select * from my_first_table_view").toPandas()
id | company | model | price | |
---|---|---|---|---|
0 | 5496cd80-b27d-11ef-bd96-8f81262f2f14 | Tesla | Model S | 21000.0 |
1 | 54956df0-b27d-11ef-bd96-8f81262f2f14 | Tesla | Model S | 20000.0 |
2 | 549742b0-b27d-11ef-bd96-8f81262f2f14 | Oldsmobile | Model 6C | 135000.0 |
# View data as a table and select only Tesla company
df = spark.sql("select * from my_first_table_view")
df.filter(df.company == 'Tesla').toPandas() # Equivalent to "company = 'Tesla'"
id | company | model | price | |
---|---|---|---|---|
0 | 54956df0-b27d-11ef-bd96-8f81262f2f14 | Tesla | Model S | 20000.0 |
1 | 5496cd80-b27d-11ef-bd96-8f81262f2f14 | Tesla | Model S | 21000.0 |
# Filter also on price > 20000
df.filter((df.company == 'Tesla') & (df.price > 20000)).toPandas()
id | company | model | price | |
---|---|---|---|---|
0 | 5496cd80-b27d-11ef-bd96-8f81262f2f14 | Tesla | Model S | 21000.0 |
Aggregation, grouping and filtering#
These can be combined in many ways.
Starting from the left.
Order is important.
# Aggregate prices by company and sort by company name
df.groupBy("company").agg({"price": "avg"}).orderBy('company').toPandas()
[Stage 15:> (0 + 10) / 20]
[Stage 15:===================> (7 + 10) / 20]
[Stage 15:=================================> (12 + 8) / 20]
[Stage 15:===============================================> (17 + 3) / 20]
company | avg(price) | |
---|---|---|
0 | Oldsmobile | 135000.0 |
1 | Tesla | 20500.0 |
Write data to Cassandra#
One can append or overwrite data in existing database tables.
PySpark is picky regarding data formats.
Reading data from the existing table and extracting formatting is possible.
PySpark is case sensitive, while Cassandra is not by default.
See example of case sensitvity issues in the Cassandra notebook.
# Create two new cars in a Pandas DataFrame
import pandas as pd
newCars = pd.DataFrame([[459, 'Ford', 'Escort'], [460, 'Ford', 'Transit']], columns=['ind', 'company', 'model'])
newCars
ind | company | model | |
---|---|---|---|
0 | 459 | Ford | Escort |
1 | 460 | Ford | Transit |
# Convert the Pandas DataFrame to Spark DataFrame and save it to Cassandra (append mode)
spark.createDataFrame(newCars).write.format("org.apache.spark.sql.cassandra")\
.options(table="my_first_table", keyspace="my_first_keyspace").mode("append").save()
[Stage 23:> (0 + 10) / 10]
# Check if the new cars are in the table
spark.read.format("org.apache.spark.sql.cassandra")\
.options(table="my_first_table", keyspace="my_first_keyspace").load()\
.createOrReplaceTempView("my_first_table_view2")
spark.sql("select * from my_first_table_view2").toPandas()
ind | company | model | |
---|---|---|---|
0 | 3 | Polestar | 3 |
1 | 1 | Tesla | Model S |
2 | 2 | Tesla | Model 3 |
3 | 460 | Ford | Transit |
4 | 459 | Ford | Escort |
5 | 4 | Volkswagen | ID.4 |
Exercise#
Create a table matching the structure of the planets data.
Insert the planets data into the table using Spark.
Read the data back from Cassandra through Spark and print it.
# Stop Spark session
try:
spark.stop()
except ConnectionRefusedError:
print("Spark session already stopped.")