Pyspark

Setup spark

  1. Download Spark from official website https://spark.apache.org/downloads.html

  2. Setup environment

Download Spark
Setup Environment
HADOOP_HOME=C:\Winutils
JAVA_HOME=C:\Program Files\Java\jdk1.8.0_202
SPARK_HOME=C:\Users\<NAME>\Documents\spark-3.5.0-bin-hadoop3
Path=%HADOOP_HOME%\bin
Path=%JAVA_HOME%\bin
Path=%SPARK_HOME%\bin

Connect Spark

spark-shell

Spark Architecture

Spark Architectire

Concept

Lazy Evaluation

Evaluation strategy that delays the evaluation of an expression until its value is needed

Transformations

  • Narrow: can be done within a single partition ex. map(), filter(), union()

  • Wide: need to be combined data from all partitions ex. groupByKey(), reduceByKey(), join()

Catalyst Optimizer

  • Analysis

  • Logical optimization

  • Physical planning

  • Code generation

Tungsten Binary Format (Unsafe Row)

Tungsten Binary Format is data sharing optimization in shuffle step

  • Data shuffle creates stage boundary

  • Spark can execute a process backwards

  • The cache process works like shuffle files

Application

Job, Stage, and Task
  • Job: a sequence of transformations on data. ex count(), first(), collect(), save(), read(), write(), take()

  • Stage: a job is divided into stages by shuffling

  • Task: smallest unit of work. Each stage is divided into tasks. Execute on executors parallely. Operation over a single partition of data

Stage and Shuffle

Memory

Type of Memory

  • Driver Memory: the memory assigned to the driver process used for creating execution plans, tracking data, and gathering results.

  • Execurtor Memory: the amount of memory allocated to each executor process

    • Reserved Memory: the memory reserved by the system 300MB that cannot be changed. If executor memory less than 1.5 times of reserved memory (450MB), it will be failed

    • User Memory: stores all the user defined data structures, UDFs

    • Spark Memory: This is responsible for storing intermediate state while doing task execution, cache/persisted data

      • Storage Memory: used for storing all of the cached data, broadcast variable

      • Execution Memory: used by Spark for objects created during execution of task

Calulation

Reserved Memory = 300 MB

User Memory = (java heap - Reserved Memory) * (1- spark.memory.fraction)

Spark Memory = (java heap - Reserved Memory) * spark.memory.fraction

Storage Memory = SparkMemory * spark.memory.storageFraction

Execution Memory = SparkMemory * (1-spark.memory.storageFraction)

Coding

quick start

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F

sc = SparkContext.getOrCreate()
sc.addPyFile(path)
spark = SparkSession.builder.appName("MyApp").getOrCreate()

df = spark.sql(“SELECT * FROM bd.tbl_name”)

df = df.withColumn("month", F.month('data_dt'))
df = df.filter(F.col("month") > 6))

df.write.partition('data_dt').saveAsTable(tbl_name, mode=“overwrite”)

Data Type

from pyspark.sql.types import StringType, DateType

col_name = 'data_dt'
if isinstance(df.schema[col_name].dataType, StringType):
    pass

Functions

concat_ws
current_timestamp
lit
col
to_timestamp
date_format
add_months
year
month
quarter
when
substring
split
regexp_replace
lpad
rpad
sha2
to_date
last_day

Transform


df.withColumnRenamed(old_name, new_name)
df.withColumn(new_name, fun_name())

df.filter(col().isNotNull())
col().cast(“timestamp”)
when(col() > 2000, 100).otherwise(col())

df = df1.join(df2, on='id', how='inner')

Write

Insert into

  • overwrite (static): overwrite table

  • overwrite (dynamic): overwrite by partition

df.write.insertInto(tbl_name, overwrite=True)

saveAsTable

overwrite table

df.write.partitionBy(key).saveAsTable(tbl_name, mode=“overwrite”)

Create if not exists table

if spark._jsparkSession.catalog().tableExists(tbl_name):
   df.write.insertInto(tbl_name, overwrite=True)
else:
   df.write.partition(key).saveAsTable(tbl_name, mode=“overwrite”)

WIndow Analytics

from pyspark.sql.window import Window

df = spark.sql("SELECT * FROM db_nm.tbl_nm")
window_spec = Window.partitionBy("period").orderBy(F.asc("data_dt"))
df = df.withColumn("row_num", F.row_number().over(window_spec))

Configuration

Dynamic Partition

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

Memory

spark.conf.set("spark.driver.memory", "2g")
spark.conf.set("spark.driver.memoryOverhead", "384m")
spark.conf.set("spark.executor.memory", "4g")
spark.conf.set("spark.executor.memoryOverhead", "512m")

References

Last updated