DataHungry Documents
  • Welcom to DataHungry Documents
  • Library
    • Apache Airflow
    • Apache Iceberge
    • Bardapi
    • Binance
    • Databricks
    • Datetime
    • dotenv
    • FastAPI
    • Logging
    • Langchain
    • Minio (S3)
    • MLflow
    • OpenCV
    • Optuna
    • os
    • Pyiceberg
    • Pyspark
    • Pytest
    • Schedule
    • Sklearn & SHAP
    • SQLAlchemy
    • transformers (huggingface)
    • Firebase Firestore
  • Course
    • Web Scraping
    • Streamlit
    • NLP
  • Utility
    • Docker
    • Google Sheet
  • SQL
    • Basic SQL Statements
    • PL/SQL
    • Stored Procedure & Function
  • Scala
    • Setup
    • Spark
  • Cloud
    • AWS
    • Google Cloud
Powered by GitBook
On this page
  • Setup spark
  • Connect Spark
  • Spark Architecture
  • Concept
  • Lazy Evaluation
  • Transformations
  • Catalyst Optimizer
  • Tungsten Binary Format (Unsafe Row)
  • Application
  • Memory
  • Coding
  • quick start
  • Data Type
  • Functions
  • Transform
  • Write
  • WIndow Analytics
  • Configuration
  • Dynamic Partition
  • Memory
  • References
  1. Library

Pyspark

PreviousPyicebergNextPytest

Last updated 1 year ago

Setup spark

  1. Download Spark from official website

  2. Setup environment

Setup Environment
HADOOP_HOME=C:\Winutils
JAVA_HOME=C:\Program Files\Java\jdk1.8.0_202
SPARK_HOME=C:\Users\<NAME>\Documents\spark-3.5.0-bin-hadoop3
Path=%HADOOP_HOME%\bin
Path=%JAVA_HOME%\bin
Path=%SPARK_HOME%\bin

Connect Spark

spark-shell

Spark Architecture

Concept

Lazy Evaluation

Evaluation strategy that delays the evaluation of an expression until its value is needed

Transformations

  • Narrow: can be done within a single partition ex. map(), filter(), union()

  • Wide: need to be combined data from all partitions ex. groupByKey(), reduceByKey(), join()

Catalyst Optimizer

  • Analysis

  • Logical optimization

  • Physical planning

  • Code generation

Tungsten Binary Format (Unsafe Row)

Tungsten Binary Format is data sharing optimization in shuffle step

  • Data shuffle creates stage boundary

  • Spark can execute a process backwards

  • The cache process works like shuffle files

Application

  • Job: a sequence of transformations on data. ex count(), first(), collect(), save(), read(), write(), take()

  • Stage: a job is divided into stages by shuffling

  • Task: smallest unit of work. Each stage is divided into tasks. Execute on executors parallely. Operation over a single partition of data

Memory

Type of Memory

  • Driver Memory: the memory assigned to the driver process used for creating execution plans, tracking data, and gathering results.

  • Execurtor Memory: the amount of memory allocated to each executor process

    • Reserved Memory: the memory reserved by the system 300MB that cannot be changed. If executor memory less than 1.5 times of reserved memory (450MB), it will be failed

    • User Memory: stores all the user defined data structures, UDFs

    • Spark Memory: This is responsible for storing intermediate state while doing task execution, cache/persisted data

      • Storage Memory: used for storing all of the cached data, broadcast variable

      • Execution Memory: used by Spark for objects created during execution of task

Calulation

Reserved Memory = 300 MB

User Memory = (java heap - Reserved Memory) * (1- spark.memory.fraction)

Spark Memory = (java heap - Reserved Memory) * spark.memory.fraction

Storage Memory = SparkMemory * spark.memory.storageFraction

Execution Memory = SparkMemory * (1-spark.memory.storageFraction)

Coding

quick start

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F

sc = SparkContext.getOrCreate()
sc.addPyFile(path)
spark = SparkSession.builder.appName("MyApp").getOrCreate()

df = spark.sql(“SELECT * FROM bd.tbl_name”)

df = df.withColumn("month", F.month('data_dt'))
df = df.filter(F.col("month") > 6))

df.write.partition('data_dt').saveAsTable(tbl_name, mode=“overwrite”)

Data Type

from pyspark.sql.types import StringType, DateType

col_name = 'data_dt'
if isinstance(df.schema[col_name].dataType, StringType):
    pass

Functions

concat_ws
current_timestamp
lit
col
to_timestamp
date_format
add_months
year
month
quarter
when
substring
split
regexp_replace
lpad
rpad
sha2
to_date
last_day

Transform


df.withColumnRenamed(old_name, new_name)
df.withColumn(new_name, fun_name())

df.filter(col().isNotNull())
col().cast(“timestamp”)
when(col() > 2000, 100).otherwise(col())

df = df1.join(df2, on='id', how='inner')

Write

Insert into

  • overwrite (static): overwrite table

  • overwrite (dynamic): overwrite by partition

df.write.insertInto(tbl_name, overwrite=True)

saveAsTable

overwrite table

df.write.partitionBy(key).saveAsTable(tbl_name, mode=“overwrite”)

Create if not exists table

if spark._jsparkSession.catalog().tableExists(tbl_name):
   df.write.insertInto(tbl_name, overwrite=True)
else:
   df.write.partition(key).saveAsTable(tbl_name, mode=“overwrite”)

WIndow Analytics

from pyspark.sql.window import Window

df = spark.sql("SELECT * FROM db_nm.tbl_nm")
window_spec = Window.partitionBy("period").orderBy(F.asc("data_dt"))
df = df.withColumn("row_num", F.row_number().over(window_spec))

Configuration

Dynamic Partition

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

Memory

spark.conf.set("spark.driver.memory", "2g")
spark.conf.set("spark.driver.memoryOverhead", "384m")
spark.conf.set("spark.executor.memory", "4g")
spark.conf.set("spark.executor.memoryOverhead", "512m")

References

https://spark.apache.org/docs/latest/configuration.html
https://towardsdatascience.com/3-reasons-why-sparks-lazy-evaluation-is-useful-ed06e27360c4
https://stackoverflow.com/questions/42263270/what-is-the-concept-of-application-job-stage-and-task-in-spark
https://medium.com/@diehardankush/what-are-job-stage-and-task-in-apache-spark-2fc0d326c15f
https://medium.com/analytics-vidhya/apache-spark-memory-management-49682ded3d42
https://www.linkedin.com/pulse/roles-driver-memory-executor-spark-impact-big-data-processing-dip/
https://www.databricks.com/glossary/catalyst-optimizer
https://spark.apache.org/downloads.html
Download Spark
Spark Architectire
Job, Stage, and Task
Stage and Shuffle