Pyspark
Setup spark
Download Spark from official website https://spark.apache.org/downloads.html
Setup environment

HADOOP_HOME=C:\Winutils
JAVA_HOME=C:\Program Files\Java\jdk1.8.0_202
SPARK_HOME=C:\Users\<NAME>\Documents\spark-3.5.0-bin-hadoop3
Path=%HADOOP_HOME%\bin
Path=%JAVA_HOME%\bin
Path=%SPARK_HOME%\bin
Connect Spark
spark-shell
Spark Architecture

Concept
Lazy Evaluation
Evaluation strategy that delays the evaluation of an expression until its value is needed
Transformations
Narrow: can be done within a single partition ex. map(), filter(), union()
Wide: need to be combined data from all partitions ex. groupByKey(), reduceByKey(), join()
Catalyst Optimizer

Analysis
Logical optimization
Physical planning
Code generation
Tungsten Binary Format (Unsafe Row)
Tungsten Binary Format is data sharing optimization in shuffle step
Data shuffle creates stage boundary
Spark can execute a process backwards
The cache process works like shuffle files
Application

Job: a sequence of transformations on data. ex count(), first(), collect(), save(), read(), write(), take()
Stage: a job is divided into stages by shuffling
Task: smallest unit of work. Each stage is divided into tasks. Execute on executors parallely. Operation over a single partition of data

Memory

Type of Memory
Driver Memory: the memory assigned to the driver process used for creating execution plans, tracking data, and gathering results.
Execurtor Memory: the amount of memory allocated to each executor process
Reserved Memory: the memory reserved by the system 300MB that cannot be changed. If executor memory less than 1.5 times of reserved memory (450MB), it will be failed
User Memory: stores all the user defined data structures, UDFs
Spark Memory: This is responsible for storing intermediate state while doing task execution, cache/persisted data
Storage Memory: used for storing all of the cached data, broadcast variable
Execution Memory: used by Spark for objects created during execution of task
Calulation
Reserved Memory = 300 MB
User Memory = (java heap - Reserved Memory) * (1- spark.memory.fraction)
Spark Memory = (java heap - Reserved Memory) * spark.memory.fraction
Storage Memory = SparkMemory * spark.memory.storageFraction
Execution Memory = SparkMemory * (1-spark.memory.storageFraction)
Coding
quick start
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F
sc = SparkContext.getOrCreate()
sc.addPyFile(path)
spark = SparkSession.builder.appName("MyApp").getOrCreate()
df = spark.sql(“SELECT * FROM bd.tbl_name”)
df = df.withColumn("month", F.month('data_dt'))
df = df.filter(F.col("month") > 6))
df.write.partition('data_dt').saveAsTable(tbl_name, mode=“overwrite”)
Data Type
from pyspark.sql.types import StringType, DateType
col_name = 'data_dt'
if isinstance(df.schema[col_name].dataType, StringType):
pass
Functions
concat_ws
current_timestamp
lit
col
to_timestamp
date_format
add_months
year
month
quarter
when
substring
split
regexp_replace
lpad
rpad
sha2
to_date
last_day
Transform
df.withColumnRenamed(old_name, new_name)
df.withColumn(new_name, fun_name())
df.filter(col().isNotNull())
col().cast(“timestamp”)
when(col() > 2000, 100).otherwise(col())
df = df1.join(df2, on='id', how='inner')
Write
Insert into
overwrite (static): overwrite table
overwrite (dynamic): overwrite by partition
df.write.insertInto(tbl_name, overwrite=True)
saveAsTable
overwrite table
df.write.partitionBy(key).saveAsTable(tbl_name, mode=“overwrite”)
Create if not exists table
if spark._jsparkSession.catalog().tableExists(tbl_name):
df.write.insertInto(tbl_name, overwrite=True)
else:
df.write.partition(key).saveAsTable(tbl_name, mode=“overwrite”)
WIndow Analytics
from pyspark.sql.window import Window
df = spark.sql("SELECT * FROM db_nm.tbl_nm")
window_spec = Window.partitionBy("period").orderBy(F.asc("data_dt"))
df = df.withColumn("row_num", F.row_number().over(window_spec))
Configuration
Dynamic Partition
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
Memory
spark.conf.set("spark.driver.memory", "2g")
spark.conf.set("spark.driver.memoryOverhead", "384m")
spark.conf.set("spark.executor.memory", "4g")
spark.conf.set("spark.executor.memoryOverhead", "512m")
References
Last updated