# Spark
- A fast and general compute engine (originally for [Hadoop](https://hadoop.apache.org/) data).
    - Often paired with Hadoop for its distributed filesystem (HDFS), cluster resource management and parallel processing.
- Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL (extract, transform, load), Machine Learning, stream processing, and graph computation.
- Also communicates well with some databases and other resources.
- Installation of Spark and its dependencies is explained in the [Installation chapter](../../7_Appendix/Installation.ipynb).

In [None]:
# Set environment variables for PySpark (system and version dependent!) 
# if not already set persistently (e.g., in .bashrc or .bash_profile or Windows environment variables)
import os
# Set the Java home path to the one you are using ((un)comment and edit as needed):
# os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/zulu-18.jdk/Contents/Home" (Liland's old Mac)
# os.environ["JAVA_HOME"] = "C:/Program Files/Java/jdk1.8.0_281" # or similar on Windows (Liland's Windows)
os.environ["JAVA_HOME"] = "/Library/Internet Plug-Ins/JavaAppletPlugin.plugin/Contents/Home" # Liland's Mac

# If you are using environments in Python, you can set the environment variables like the alternative below.
# The default Python environment is used if the variables are set to "python" (edit if needed):
os.environ["PYSPARK_PYTHON"] = "python" # or similar to "/Users/kristian/miniforge3/envs/tf_M1/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python" # or similar to "/Users/kristian/miniforge3/envs/tf_M1/bin/python"

# On Windows you need to specify where the Hadoop drivers are located (uncomment and edit if needed):
# os.environ["HADOOP_HOME"] = "C:/Hadoop/hadoop-3.3.1"

# Set the Hadoop version to the one you are using, e.g., none:
os.environ["PYSPARK_HADOOP_VERSION"] = "without"

## Spark and Cassandra
- Cassandra is one of the databases that work well with Spark.
    - Same type of distributed processing.
    - Same way of replicating for fault tolerance.
- Spark can be deployed on the same nodes as Cassandra for:
    - local (short traveled) data manipulation, and
    - combination of results to a central hub ([MapReduce](https://en.wikipedia.org/wiki/MapReduce)).
- Requires drivers from Datastax
    - Automatically downloaded and applied with the following configuration.
- A [SparkSession](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html) instantiates Spark, applies configurations and connects to a data source.

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkCassandraApp').\
    config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.5.1').\
    config('spark.cassandra.connection.host', 'localhost').\
    config('spark.sql.extensions', 'com.datastax.spark.connector.CassandraSparkExtensions').\
    config('spark.sql.catalog.mycatalog', 'com.datastax.spark.connector.datasource.CassandraCatalog').\
    config('spark.cassandra.connection.port', '9042').getOrCreate()
# Some warnings are to be expected.
# If running this cell does not give any output after ~30 seconds, there is likely an error in the configuration (JAVA_HOME, HADOOP_HOME, etc.).

## Accessing tables
**Note: The following sets of commands assume that the [Cassandra notebook](./3_Cassandra.ipynb) has been run first to set up the relevant keyspace and tables.**

In [None]:
# .load() is used to load data from Cassandra table as a Spark DataFrame.
spark.read.format("org.apache.spark.sql.cassandra").options(table="my_first_table", keyspace="my_first_keyspace").load().show()

### Database views
- Useful for "setting the scene" before a more simplified data extraction.
- The below example simply attaches to the correct keyspace and table.
    - The _view_ could also be a selection into that table to query further.

In [None]:
# Create view for simpler SQL queries
spark.read.format("org.apache.spark.sql.cassandra").options(table="table_with_uuid", keyspace="my_first_keyspace").load().createOrReplaceTempView("my_first_table_view")

### [Spark DataFrame](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)
- Related to a Pandas data frame, but can be distributed over compute nodes.
- Various functions like filters, statistical calculations, groupBy, Pandas functions (mapInPandas), joins, etc.
- Export to Pandas and JSON.
- Reads many formats, including SQL, JSON, Excel, ...

In [None]:
# Read CSV file into Spark DataFrame
planets = spark.read.csv("../../data/planets.csv", header=True, inferSchema=True)
planets.show()

In [None]:
# Select only Tesla company
#             DataFrame                    -->
spark.sql("select * from my_first_table_view").filter("company = 'Tesla'").show()

In [None]:
# Equivalent to the above but in pure SQL
spark.sql("select * from my_first_table_view where company = 'Tesla'").show()

In [None]:
# Select all data from the view and convert it to Pandas DataFrame
spark.sql("select * from my_first_table_view").toPandas()

In [None]:
# View data as a table and select only Tesla company
df = spark.sql("select * from my_first_table_view")
df.filter(df.company == 'Tesla').toPandas() # Equivalent to "company = 'Tesla'"


In [None]:
# Filter also on price > 20000
df.filter((df.company == 'Tesla') & (df.price > 20000)).toPandas()

### Aggregation, grouping and filtering
- These can be combined in many ways.
- Starting from the left.
- Order is important.

In [None]:
# Aggregate prices by company and sort by company name
df.groupBy("company").agg({"price": "avg"}).orderBy('company').toPandas()

## Write data to Cassandra
- One can append or overwrite data in existing database tables.
- PySpark is picky regarding data formats.
    - Reading data from the existing table and extracting formatting is possible.
- PySpark is case sensitive, while Cassandra is not by default.
    - See example of case sensitvity issues in the [Cassandra notebook](./3_Cassandra.ipynb).

In [None]:
# Create two new cars in a Pandas DataFrame
import pandas as pd
newCars = pd.DataFrame([[459, 'Ford', 'Escort'], [460, 'Ford', 'Transit']], columns=['ind', 'company', 'model'])
newCars

In [None]:
# Convert the Pandas DataFrame to Spark DataFrame and save it to Cassandra (append mode)
spark.createDataFrame(newCars).write.format("org.apache.spark.sql.cassandra")\
.options(table="my_first_table", keyspace="my_first_keyspace").mode("append").save()

In [None]:
# Check if the new cars are in the table
spark.read.format("org.apache.spark.sql.cassandra")\
.options(table="my_first_table", keyspace="my_first_keyspace").load()\
.createOrReplaceTempView("my_first_table_view2")

spark.sql("select * from my_first_table_view2").toPandas()

## Exercise
- Create a table matching the structure of the planets data.
- Insert the planets data into the table using Spark.
- Read the data back from Cassandra through Spark and print it.

In [None]:
# Stop Spark session
try:
    spark.stop()
except ConnectionRefusedError:
    print("Spark session already stopped.")

```{seealso} Resources
:class: tip
- [PySpark Tutorial For Beginners (sparkbyexample.com)](https://sparkbyexamples.com/pyspark-tutorial/)
- [PySpark documentation](https://spark.apache.org/docs/latest/api/python/index.html)
    - [PySpark DataFrame](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html)
    - [PySpark SparkSession](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html)
- [YouTube: PySpark Tutorial: Spark SQL & DataFrame Basics](https://youtu.be/3-pnWVWyH-s?si=5AfOao23gqgh19en) (17m:12s)
```