Spark#

  • A fast and general compute engine (originally for Hadoop data).

    • Often paired with Hadoop for its distributed filesystem (HDFS), cluster resource management and parallel processing.

  • Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL (extract, transform, load), Machine Learning, stream processing, and graph computation.

  • Also communicates well with some databases and other resources.

  • Installation of Spark and its dependencies is explained in the Installation chapter.

# Set environment variables for PySpark (system and version dependent!) 
# if not already set persistently (e.g., in .bashrc or .bash_profile or Windows environment variables)
import os
# Set the Java home path to the one you are using ((un)comment and edit as needed):
# os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/zulu-18.jdk/Contents/Home" (Liland's old Mac)
# os.environ["JAVA_HOME"] = "C:/Program Files/Java/jdk1.8.0_281" # or similar on Windows (Liland's Windows)
os.environ["JAVA_HOME"] = "/Library/Internet Plug-Ins/JavaAppletPlugin.plugin/Contents/Home" # Liland's Mac

# If you are using environments in Python, you can set the environment variables like the alternative below.
# The default Python environment is used if the variables are set to "python" (edit if needed):
os.environ["PYSPARK_PYTHON"] = "python" # or similar to "/Users/kristian/miniforge3/envs/tf_M1/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python" # or similar to "/Users/kristian/miniforge3/envs/tf_M1/bin/python"

# On Windows you need to specify where the Hadoop drivers are located (uncomment and edit if needed):
# os.environ["HADOOP_HOME"] = "C:/Hadoop/hadoop-3.3.1"

# Set the Hadoop version to the one you are using, e.g., none:
os.environ["PYSPARK_HADOOP_VERSION"] = "without"

Spark and Cassandra#

  • Cassandra is one of the databases that work well with Spark.

    • Same type of distributed processing.

    • Same way of replicating for fault tolerance.

  • Spark can be deployed on the same nodes as Cassandra for:

    • local (short traveled) data manipulation, and

    • combination of results to a central hub (MapReduce).

  • Requires drivers from Datastax

    • Automatically downloaded and applied with the following configuration.

  • A SparkSession instantiates Spark, applies configurations and connects to a data source.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.5.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').getOrCreate()
# Some warnings are to be expected.
# If running this cell does not give any output after ~30 seconds, there is likely an error in the configuration (JAVA_HOME, HADOOP_HOME, etc.).
:: loading settings :: url = jar:file:/Users/kristian/miniforge3/envs/IND320_2024/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/kristian/.ivy2/cache
The jars for the packages stored in: /Users/kristian/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5f44c7b6-773f-4f66-8ca5-9b13137aba0f;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.5.1 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.1 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 in central
	found org.apache.cassandra#java-driver-core-shaded;4.18.1 in central
	found com.datastax.oss#native-protocol;1.5.1 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found org.apache.cassandra#java-driver-mapper-runtime;4.18.1 in central
	found org.apache.cassandra#java-driver-query-builder;4.18.1 in central
	found org.apache.commons#commons-lang3;3.10 in central
	found com.thoughtworks.paranamer#paranamer;2.8 in central
	found org.scala-lang#scala-reflect;2.12.19 in central
:: resolution report :: resolve 1979ms :: artifacts dl 102ms
	:: modules in use:
	com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 from central in [default]
	com.datastax.oss#native-protocol;1.5.1 from central in [default]
	com.datastax.spark#spark-cassandra-connector-driver_2.12;3.5.1 from central in [default]
	com.datastax.spark#spark-cassandra-connector_2.12;3.5.1 from central in [default]
	com.thoughtworks.paranamer#paranamer;2.8 from central in [default]
	com.typesafe#config;1.4.1 from central in [default]
	io.dropwizard.metrics#metrics-core;4.1.18 from central in [default]
	org.apache.cassandra#java-driver-core-shaded;4.18.1 from central in [default]
	org.apache.cassandra#java-driver-mapper-runtime;4.18.1 from central in [default]
	org.apache.cassandra#java-driver-query-builder;4.18.1 from central in [default]
	org.apache.commons#commons-lang3;3.10 from central in [default]
	org.hdrhistogram#HdrHistogram;2.1.12 from central in [default]
	org.reactivestreams#reactive-streams;1.0.3 from central in [default]
	org.scala-lang#scala-reflect;2.12.19 from central in [default]
	org.scala-lang.modules#scala-collection-compat_2.12;2.11.0 from central in [default]
	org.slf4j#slf4j-api;1.7.26 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   16  |   0   |   0   |   0   ||   16  |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-5f44c7b6-773f-4f66-8ca5-9b13137aba0f
	confs: [default]
	0 artifacts copied, 16 already retrieved (0kB/60ms)
24/12/04 21:21:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

Accessing tables#

Note: The following sets of commands assume that the Cassandra notebook has been run first to set up the relevant keyspace and tables.

# .load() is used to load data from Cassandra table as a Spark DataFrame.
spark.read.format("org.apache.spark.sql.cassandra").options(table="my_first_table", keyspace="my_first_keyspace").load().show()
[Stage 0:>                                                          (0 + 0) / 1]
[Stage 0:>                                                          (0 + 1) / 1]
                                                                                
+---+----------+-------+
|ind|   company|  model|
+---+----------+-------+
|  1|     Tesla|Model S|
|  2|     Tesla|Model 3|
|  3|  Polestar|      3|
|  4|Volkswagen|   ID.4|
+---+----------+-------+

Database views#

  • Useful for “setting the scene” before a more simplified data extraction.

  • The below example simply attaches to the correct keyspace and table.

    • The view could also be a selection into that table to query further.

# Create view for simpler SQL queries
spark.read.format("org.apache.spark.sql.cassandra").options(table="table_with_uuid", keyspace="my_first_keyspace").load().createOrReplaceTempView("my_first_table_view")

Spark DataFrame#

  • Related to a Pandas data frame, but can be distributed over compute nodes.

  • Various functions like filters, statistical calculations, groupBy, Pandas functions (mapInPandas), joins, etc.

  • Export to Pandas and JSON.

  • Reads many formats, including SQL, JSON, Excel, …

# Read CSV file into Spark DataFrame
planets = spark.read.csv("../../data/planets.csv", header=True, inferSchema=True)
planets.show()
+-------+---------+---------+
| planet| distance| diameter|
+-------+---------+---------+
|Mercury| 0.387 AU|  4878 km|
|  Venus| 0.723 AU| 12104 km|
|  Earth| 1.000 AU| 12756 km|
|   Mars| 1.524 AU|  6787 km|
|Jupiter| 5.203 AU|142796 km|
| Saturn| 9.546 AU|120660 km|
| Uranus|19.218 AU| 51118 km|
|Neptune|30.069 AU| 48600 km|
+-------+---------+---------+
# Select only Tesla company
#             DataFrame                    -->
spark.sql("select * from my_first_table_view").filter("company = 'Tesla'").show()
+--------------------+-------+-------+-------+
|                  id|company|  model|  price|
+--------------------+-------+-------+-------+
|54956df0-b27d-11e...|  Tesla|Model S|20000.0|
|5496cd80-b27d-11e...|  Tesla|Model S|21000.0|
+--------------------+-------+-------+-------+
# Equivalent to the above but in pure SQL
spark.sql("select * from my_first_table_view where company = 'Tesla'").show()
+--------------------+-------+-------+-------+
|                  id|company|  model|  price|
+--------------------+-------+-------+-------+
|54956df0-b27d-11e...|  Tesla|Model S|20000.0|
|5496cd80-b27d-11e...|  Tesla|Model S|21000.0|
+--------------------+-------+-------+-------+
# Select all data from the view and convert it to Pandas DataFrame
spark.sql("select * from my_first_table_view").toPandas()
id company model price
0 5496cd80-b27d-11ef-bd96-8f81262f2f14 Tesla Model S 21000.0
1 54956df0-b27d-11ef-bd96-8f81262f2f14 Tesla Model S 20000.0
2 549742b0-b27d-11ef-bd96-8f81262f2f14 Oldsmobile Model 6C 135000.0
# View data as a table and select only Tesla company
df = spark.sql("select * from my_first_table_view")
df.filter(df.company == 'Tesla').toPandas() # Equivalent to "company = 'Tesla'"
id company model price
0 54956df0-b27d-11ef-bd96-8f81262f2f14 Tesla Model S 20000.0
1 5496cd80-b27d-11ef-bd96-8f81262f2f14 Tesla Model S 21000.0
# Filter also on price > 20000
df.filter((df.company == 'Tesla') & (df.price > 20000)).toPandas()
id company model price
0 5496cd80-b27d-11ef-bd96-8f81262f2f14 Tesla Model S 21000.0

Aggregation, grouping and filtering#

  • These can be combined in many ways.

  • Starting from the left.

  • Order is important.

# Aggregate prices by company and sort by company name
df.groupBy("company").agg({"price": "avg"}).orderBy('company').toPandas()
[Stage 15:>                                                       (0 + 10) / 20]
[Stage 15:===================>                                    (7 + 10) / 20]
[Stage 15:=================================>                      (12 + 8) / 20]
[Stage 15:===============================================>        (17 + 3) / 20]
                                                                                
company avg(price)
0 Oldsmobile 135000.0
1 Tesla 20500.0

Write data to Cassandra#

  • One can append or overwrite data in existing database tables.

  • PySpark is picky regarding data formats.

    • Reading data from the existing table and extracting formatting is possible.

  • PySpark is case sensitive, while Cassandra is not by default.

# Create two new cars in a Pandas DataFrame
import pandas as pd
newCars = pd.DataFrame([[459, 'Ford', 'Escort'], [460, 'Ford', 'Transit']], columns=['ind', 'company', 'model'])
newCars
ind company model
0 459 Ford Escort
1 460 Ford Transit
# Convert the Pandas DataFrame to Spark DataFrame and save it to Cassandra (append mode)
spark.createDataFrame(newCars).write.format("org.apache.spark.sql.cassandra")\
.options(table="my_first_table", keyspace="my_first_keyspace").mode("append").save()
[Stage 23:>                                                       (0 + 10) / 10]
                                                                                
# Check if the new cars are in the table
spark.read.format("org.apache.spark.sql.cassandra")\
.options(table="my_first_table", keyspace="my_first_keyspace").load()\
.createOrReplaceTempView("my_first_table_view2")

spark.sql("select * from my_first_table_view2").toPandas()
ind company model
0 3 Polestar 3
1 1 Tesla Model S
2 2 Tesla Model 3
3 460 Ford Transit
4 459 Ford Escort
5 4 Volkswagen ID.4

Exercise#

  • Create a table matching the structure of the planets data.

  • Insert the planets data into the table using Spark.

  • Read the data back from Cassandra through Spark and print it.

# Stop Spark session
try:
    spark.stop()
except ConnectionRefusedError:
    print("Spark session already stopped.")