NB: the described functionalities are tested on the Databricks environment. To run the following code on Google Colab check the last section Pyspark on Google Colab

Pandas to spark

We can create a Spark dataframe starting from a Pandas dataframe using the function spark.createDataFrame:

import pyspark
import pandas as pd
import numpy as np

DATA_LEN = 5

df_pandas_foo_first = pd.DataFrame(data=dict(
  A = np.random.choice(['hey', 'oh'], size=DATA_LEN),
  B = np.random.random(size=DATA_LEN)
))

df_pandas_foo_second = pd.DataFrame(data=dict(
  A = np.random.choice(['hey', 'oh'], size=DATA_LEN),
  C = np.random.uniform(5, 10, size=DATA_LEN),
  D = np.random.choice(['one', 'two', 'five'], size=DATA_LEN)
))

df_foo_first = spark.createDataFrame(df_pandas_foo_first)
df_foo_second = spark.createDataFrame(df_pandas_foo_second)

We can join the two dataframes via the .join function and display the resulting dataframe using the .display method:

df_foo = df_foo_first.join(df_foo_second, how='left', on=['A'])
df_foo.display()

IO operations

csv

By default a .csv write create multiple .csv files to facilitate parallel operations. One can force the creation of asingle output file using the coalesce(1) method.

df_foo.coalesce(1).write.csv(DATA_PATH, header=True)  ## coalesce(1) to save single csv
df_foo = spark.read.csv(DATA_PATH, header=True, inferSchema=True)

parquet

df_foo.write.format("parquet").save(DATA_PATH)
df_foo = spark.read.format("parquet").load(DATA_PATH)

delta table

Delta tables are more complex data structures which allow functionality like high query performance, flexible data architecture and history rollback capabilities. We can easily create a delta table with pyspark:

df_foo.write.format("delta").mode("append").partitionBy("A","D").save(DATA_PATH)

On the Databricks platform we can handle delta tables files as they were qsl tables with additional functionalities:

query_merge =   f"""
  MERGE INTO delta.`{DATA_PATH}` AS target
  ...
  """
spark.sql(query_merge)
query_delete =   f"""
  DELETE  FROM delta.`{DATA_PATH}`
  WHERE A = "oh"
  """
spark.sql(query_delete)

We can check the history and rollback the data to a previous version via the instructions:

%sql
DESCRIBE HISTORY delta.`{DATA_PATH}`
%sql
RESTORE TABLE delta.`{DATA_PATH}` TO version as of 0

Temporary view and sql operation

In order to perform sql operation within a spark table (not saved yet) one can create a temporary view using the function createOrReplaceTempView:

df_foo.createOrReplaceTempView('foo_view')
df_foo_first.createOrReplaceTempView('foo_first_view')
df_foo_second.createOrReplaceTempView('foo_second_view')

This allows us to use the view as a real sql table:

%sql
describe table foo_view

On Google Colab we can perform sql queries using the pyspark library:

spark.sql("""
    DESCRIBE TABLE foo_view
    """).show()

SELECT FROM WHERE syntax

%sql
SELECT *  
FROM foo_view
WHERE C >= 8

Groupby operations

%sql
SELECT A, mean(C)
FROM foo_view
GROUP BY A

Ordering the output:

%sql
SELECT A, mean(C) as C_mean,  count(*) as num, max(B) as B_max
FROM foo_view
GROUP BY A
ORDER BY C_mean desc

Join operations

%sql
SELECT foo_first.A, foo_first.B_max, foo_second.C_mean, num

FROM (
  SELECT A, max(B) as B_max
  FROM foo_first_view
  GROUP BY A
) foo_first

INNER JOIN (
  SELECT A, count(*) as num, mean(C) as C_mean
  FROM foo_second_view
  GROUP BY A
) foo_second


ON (foo_first.A = foo_second.A)

Pyspark on Google Colab

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz #change version
!tar xf spark-3.0.0-bin-hadoop3.2.tgz #match version
!pip -q install findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2" #match version
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.7.0 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark
import pyspark.sql.functions as F
spark = SparkSession.builder.appName('foo_session').getOrCreate()