# Pyspark

## Setup spark

1. Download Spark from official website <https://spark.apache.org/downloads.html>
2. Setup environment

<figure><img src="/files/EdrLizcZBJUddpH8QMcd" alt=""><figcaption><p>Download Spark</p></figcaption></figure>

{% code title="Setup Environment" %}

```
HADOOP_HOME=C:\Winutils
JAVA_HOME=C:\Program Files\Java\jdk1.8.0_202
SPARK_HOME=C:\Users\<NAME>\Documents\spark-3.5.0-bin-hadoop3
Path=%HADOOP_HOME%\bin
Path=%JAVA_HOME%\bin
Path=%SPARK_HOME%\bin
```

{% endcode %}

***

## Connect Spark

```
spark-shell
```

## Spark Architecture

<figure><img src="/files/iaY58D5PXibDCr1FnWZZ" alt=""><figcaption><p>Spark Architectire</p></figcaption></figure>

## Concept

### Lazy Evaluation

Evaluation strategy that delays the evaluation of an expression until its value is needed

### Transformations

* **Narrow:** can be done within a single partition ex. map(), filter(), union()
* **Wide:** need to be combined data from all partitions ex. groupByKey(), reduceByKey(), join()

### Catalyst Optimizer

<figure><img src="/files/viRx1EX2FK9Gt1eBMWKb" alt=""><figcaption></figcaption></figure>

* Analysis
* Logical optimization
* Physical planning
* Code generation

### Tungsten Binary Format (Unsafe Row)

Tungsten Binary Format is data sharing optimization in shuffle step

* Data shuffle creates **stage** boundary
* Spark can execute a process **backwards**
* The **cache** process works like shuffle files

### Application

<figure><img src="/files/BRPlrfl982aPaxi2fbpE" alt=""><figcaption><p>Job, Stage, and Task</p></figcaption></figure>

* **Job:** a sequence of transformations on data. ex count(), first(), collect(), save(), read(), write(), take()
* **Stage:** a job is divided into stages by shuffling
* **Task:** smallest unit of work. Each stage is divided into tasks. Execute on executors parallely. Operation over a single partition of data

<figure><img src="/files/eJkgZxZOm1qd8m2sMHM9" alt=""><figcaption><p>Stage and Shuffle</p></figcaption></figure>

### Memory

<figure><img src="/files/J1dzmUFGqzEW8sDwK7j1" alt=""><figcaption></figcaption></figure>

#### Type of Memory

* **Driver Memory:** the memory assigned to the driver process used for creating execution plans, tracking data, and gathering results.
* **Execurtor Memory**: the amount of memory allocated to each executor process
  * **Reserved Memory:** the memory reserved by the system 300MB that cannot be changed. If executor memory less than 1.5 times of reserved memory (450MB), it will be failed
  * **User Memory:** stores all the user defined data structures, UDFs
  * **Spark Memory:** This is responsible for storing intermediate state while doing task execution, cache/persisted data
    * **Storage Memory**: used for storing all of the cached data, broadcast variable
    * **Execution Memory**: used by Spark for objects created during execution of task

#### Calulation

<mark style="color:green;">Reserved Memory</mark> = 300 MB

User Memory = (java heap - <mark style="color:green;">Reserved Memory</mark>) \* (1- <mark style="color:blue;">spark.memory.fraction</mark>)

<mark style="color:orange;">Spark Memory</mark> = (java heap - <mark style="color:green;">Reserved Memory</mark>) \* <mark style="color:blue;">spark.memory.fraction</mark>

Storage Memory = <mark style="color:orange;">SparkMemory</mark> \* <mark style="color:purple;">spark.memory.storageFraction</mark>

Execution Memory = <mark style="color:orange;">SparkMemory</mark> \* (1-<mark style="color:purple;">spark.memory.storageFraction</mark>)

## Coding

### quick start

```python
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F

sc = SparkContext.getOrCreate()
sc.addPyFile(path)
spark = SparkSession.builder.appName("MyApp").getOrCreate()

df = spark.sql(“SELECT * FROM bd.tbl_name”)

df = df.withColumn("month", F.month('data_dt'))
df = df.filter(F.col("month") > 6))

df.write.partition('data_dt').saveAsTable(tbl_name, mode=“overwrite”)
```

### Data Type

```python
from pyspark.sql.types import StringType, DateType

col_name = 'data_dt'
if isinstance(df.schema[col_name].dataType, StringType):
    pass
```

### Functions&#x20;

```
concat_ws
current_timestamp
lit
col
to_timestamp
date_format
add_months
year
month
quarter
when
substring
split
regexp_replace
lpad
rpad
sha2
to_date
last_day
```

###

### Transform

```python

df.withColumnRenamed(old_name, new_name)
df.withColumn(new_name, fun_name())

df.filter(col().isNotNull())
col().cast(“timestamp”)
when(col() > 2000, 100).otherwise(col())

df = df1.join(df2, on='id', how='inner')
```

### Write

#### Insert into

* overwrite (static): overwrite table
* overwrite (dynamic): overwrite by partition

```python
df.write.insertInto(tbl_name, overwrite=True)
```

#### saveAsTable

overwrite table

```python
df.write.partitionBy(key).saveAsTable(tbl_name, mode=“overwrite”)
```

#### Create if not exists table

```python
if spark._jsparkSession.catalog().tableExists(tbl_name):
   df.write.insertInto(tbl_name, overwrite=True)
else:
   df.write.partition(key).saveAsTable(tbl_name, mode=“overwrite”)
```

## WIndow Analytics

```python
from pyspark.sql.window import Window

df = spark.sql("SELECT * FROM db_nm.tbl_nm")
window_spec = Window.partitionBy("period").orderBy(F.asc("data_dt"))
df = df.withColumn("row_num", F.row_number().over(window_spec))
```

## Configuration

### Dynamic Partition

```python
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
```

### Memory

```python
spark.conf.set("spark.driver.memory", "2g")
spark.conf.set("spark.driver.memoryOverhead", "384m")
spark.conf.set("spark.executor.memory", "4g")
spark.conf.set("spark.executor.memoryOverhead", "512m")
```

## References

* <https://spark.apache.org/docs/latest/configuration.html>
* <https://towardsdatascience.com/3-reasons-why-sparks-lazy-evaluation-is-useful-ed06e27360c4>
* <https://stackoverflow.com/questions/42263270/what-is-the-concept-of-application-job-stage-and-task-in-spark>
* <https://medium.com/@diehardankush/what-are-job-stage-and-task-in-apache-spark-2fc0d326c15f>
* <https://medium.com/analytics-vidhya/apache-spark-memory-management-49682ded3d42>
* <https://www.linkedin.com/pulse/roles-driver-memory-executor-spark-impact-big-data-processing-dip/>
* <https://www.databricks.com/glossary/catalyst-optimizer>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.datahungry.dev/library/pyspark.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
