# Spark

## Create Project

1. Open Vscode >> File >> Open Folder >> Select empty folder
2. Create `build.sbt`file followed code below
3. Vscode menu >> Terminal >> New Terminal >> Command Prompt >> run `sbt`
4. create folder and file `src/main/scala/<package>/<filename>.scala`&#x20;
5. if using window then download [winutils](https://github.com/kontext-tech/winutils/blob/master/hadoop-3.3.1/bin/winutils.exe) to folder \<your-path>/winutils/hadoop-3.3.1/bin/winutils.exe
6. set environment `HADOOP_HOME=<your-path>/winutils/hadoop-3.3.1`

{% code title="build.sbt" %}

```scala
name := "demo"
version := "1.0"

scalaVersion := "2.13.12"
val sparkVersion = "3.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
)
```

{% endcode %}

{% code title="/src/main/scala/demo/main.scala" %}

```scala
package demo

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.log4j.{Logger, Level}

object demoSpark {
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) // remove spark log

  val conf: SparkConf = new SparkConf().setAppName("Demo").setMaster("local[*]")
  val sc: SparkContext = new SparkContext(conf)
  val as = List(
    (101, ("R", "AG")),
    (102, ("B", "DT")),
    (103, ("G", "DTV")),
    (104, ("S", "DT"))
  )

  val ls = List(
    (101, "Bern"),
    (101, "Thun"),
    (102, "Lausanne"),
    (102, "Geneve"),
    (102, "Nyon"),
    (103, "Zurich"),
    (103, "St-Gallen"),
    (103, "Chur")
  )

  def main(args: Array[String]): Unit = {
    println("Hello world")


    val abos = sc.parallelize(as)
    val locations = sc.parallelize(ls)
    val trackedCustomers = abos.join(locations)
    trackedCustomers.collect().foreach(println)

    // Result:
    // Hello world
    // (101,((R,AG),Bern))
    // (101,((R,AG),Thun))
    // (102,((B,DT),Lausanne))
    // (102,((B,DT),Geneve))
    // (102,((B,DT),Nyon))
    // (103,((G,DTV),Zurich))
    // (103,((G,DTV),St-Gallen))
    // (103,((G,DTV),Chur))
  }
}
```

{% endcode %}

## Transformations and Actions

### Transformations

return new RDDs  as results. They are lazy, their results RDD is not immediately computed

```scala
val largeList: List[String] = ...
val wordsRdd = sc.parallelize(largeList)
val lengthsRdd = wordsRdd.map(_.length)
val totalChars = lengthsRdd.reduce(_ + _)
```

<table><thead><tr><th width="117">Transformation</th><th width="385">Definition</th><th>Description</th></tr></thead><tbody><tr><td>map</td><td>map[B](f: A => B): RDD[B]</td><td>Apply function to each element in the RDD and return an RDD of the result</td></tr><tr><td>flatMap</td><td>flatMap[B](f: A => TraversableOnce[B]: RDD[B]</td><td>Apply a function to each element in the RDD and return an RDD of the contents of the iterators returned</td></tr><tr><td>filter</td><td>filter(pred: A => Boolean): RDD[A]</td><td>Apply predicate function to each element in the RDD and return an RDD of elements that have passed the predicate condition</td></tr><tr><td>distinct</td><td>distinct(): RDD[B]</td><td>Return RDD woth duplicates removed</td></tr><tr><td>union</td><td>union(other: RDD[T]): RDD[T]</td><td></td></tr><tr><td>intersection</td><td>intersection(other: RDD[T]): RDD[T]</td><td>Return an RDD contain elements inly found in both RDDs</td></tr><tr><td>subtract</td><td>subtract(other: RDD[T]): RDD[T]</td><td>Return an RDD with the contents of the other RDD removed</td></tr><tr><td>cartesian</td><td>cartesian[U](other: RDD[U]): RDD[(T, U)]</td><td>Cartesian product with the other RDD</td></tr></tbody></table>

### Actions

compute a result based on an RDD, and either returned or saved to an external storage system (e.g. HDFS). They are **eager**, their result is immediately computed

| Action             | Definition                                                   |                                                                                                  |
| ------------------ | ------------------------------------------------------------ | ------------------------------------------------------------------------------------------------ |
| collect            | collect(): Array\[T]                                         | Return all elements from RDD                                                                     |
| count              | count(): :Long                                               | Return the number of elements in the RDD                                                         |
| take               | take(num: Int): Array\[T]                                    | Return the first num elements of the RDD                                                         |
| reduce             | reduce(op: (A, A) => A): A                                   | Combine the elements in the RDD together using op function and return result                     |
| foreach            | foreach(f: T => Unit): Unit                                  | Apply function to each element in the RDD                                                        |
| takeSample         | takeSample(withRepl: Boolean, num: Int): Array\[T]           | Return an array with a ramdom sample of num elements of the dataset, with or without replacement |
| takeOrdered        | takeOrdered(num: Int)(implicit ord: Ordering\[T]): Array\[T] | Return the first n elements of the RDD using either their natural order or a custom comparator   |
| saveAsTextFile     | saveAsTextFile(path: String): Unit                           | Write the elements of the dataset as a text file in the local filesystem or HDFS                 |
| saveAsSequenceFile | saveAsSequenceFile(path: String): Unit                       | Write the elements of the dataset as a Hadoop SequenceFile in the local filesystem of HDFS       |

```scala
val lastYearsLogs: RDD[String]
val numDecErrorLogs = lastYearsLogs.filter(lg => lg.contains("2016-12") && lg.contain("error"))
                                   .count()

val firstLogsWithErros = lastYearsLogs.filter(_.contains("ERROR")).take(10)
```

## Evaluation in Spark

### Interation and Big Data Procerssing

<figure><img src="/files/5B3fPXjwqGDn8dCwkkd5" alt=""><figcaption></figcaption></figure>

#### Logistic Regression

$$
w \leftarrow w - \alpha \* \sum\_{i=1}^Ng(w; x\_i, y\_i)
$$

```scala
val points = sc.textFile(...).map(parsePoint)
var w = Vector.zeros(d)
for (i <- 1 to numIterations) {
    val gradient = points.map { p =>
        (1 / (1 + exp(-p.y * w.dot(p.x))) - 1) * p.y * p.y
    }.reduce(_ + _)
    w -= alpha * gradient
}
```

### Caching and Persistence

By default, RDDs are recomputed each time you run an action on them. Spark allow you to control what is cached in memory

```scala
val points = sc.textFile(...).map(parsePoint).persist()
```

#### Cache

Shorthad for default storage level

#### Persist

Persistence can be customized with this method

<table><thead><tr><th width="265">Storage Level</th><th>Space Used</th><th>CPU time</th></tr></thead><tbody><tr><td>MEMORY_ONLY (default)</td><td>High</td><td>Low</td></tr><tr><td>MEMORY_ONLY_SER</td><td>Low</td><td>High</td></tr><tr><td>MEMORY_AND_DISK</td><td>High</td><td>Medium</td></tr><tr><td>MEMORY_AND_DISK_SER</td><td>Low</td><td>High</td></tr><tr><td>DISK_ONLY</td><td>Low</td><td>High</td></tr></tbody></table>

## Cluster Topology

<figure><img src="/files/ZmtZu3dWS9XrL8dyVGVn" alt=""><figcaption></figcaption></figure>

1. The driver program runs the Spark application, which creates a **SparkContext** upon start-up
2. The **SparkContext** connects to a cluster manager (e.g. Mesos/YARN) which allocates resources
3. Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application
4. Next, driver program sends your application code to the executors
5. Finally, **SparkContext** send tasks for the executors to run

## Reduction Operations

## Pair RDDs

In Spark, distributed key-value pairs are **Pair RDDs** that allow you to act on each key in parallel or regroup data across the network

{% code title="Creating a Pair RDD" %}

```scala
val rdd: RDD[WikipediaPage] = ...
val pairRdd = rdd.map(page => (page.title, page.text))
```

{% endcode %}

{% code title="groupByKey" %}

```scala
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
                  .map(event => (event.organizer, event.budget))
val groupedRdd = eventsRdd.groupByKey()
groupedRdd.collect().foreach(println)
```

{% endcode %}

{% code title="reduceByKey" %}

```scala
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
                  .map(event => (event.organizer, event.budget))
val budgetsRdd = eventsRdd.reduceByKey(_ + _)
budgetsRdd.collect().foreach(println)
```

{% endcode %}

{% code title="mapValue" %}

```scala
val intermediate = eventsRdd.mapValues(b => (b, 1)) // (budget, events)
                            .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
val avgBudgets = intermediate.mapValue {
    case (budget, numberOfEvents) => budget / numberOfEvents
}
avgBudgets.collect().foreach(println)
```

{% endcode %}

{% code title="keys" %}

```scala
case class Visitor(ip: String, timestamp: String, duration: String)
val visits: RDD[Visitor] = sc.textfile(...)
                             .map(v => (v.ip, v.duration))
val numUniqueVisits = visits.keys.distinct().count()
```

{% endcode %}

## Partitioning and Suffling

### Shuffling

Move data from one node to another to be grouped with its key. This can be enormous hit

**Rule of thumb:** a shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD.

#### groupByKey

```scala
def groupByKey(): RDD[(K, Interable[V])]
```

```scala
val purshasesRdd = sc.textFile()
val purchasesPerMonth = purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
                                .groupByKey()
                                .map(p => p._1, (p._2.size, p._2.sum))
                                .collect()
// Command took 15.48s.
```

<figure><img src="/files/pmAURR3WQpiPGuMxg1A7" alt=""><figcaption><p>groupByKey</p></figcaption></figure>

#### reduceByKey

```scala
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
```

```scala
val purshasesRdd = sc.textFile()
val purchasesPerMonth = purchasesRdd.map(p => (p.customerId, (1, p.price))) // Pair RDD
                                .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)
                                .collect()
// Command took 4.65s.
```

<figure><img src="/files/Nu6fVxVrM27HLa54QciO" alt=""><figcaption><p>reduceByKey</p></figcaption></figure>

### Partitioning

#### Kind of partitioning in Spark

* Hash partitioning (groupByKey)
* Range Partitioning (sortedByKey)

#### Hash partitioning

hash partitioning attemps to spread data evenly across partitions based on the key

<pre class="language-scala"><code class="lang-scala"><strong>// p = k.hashCode() % numPartitions
</strong>val partitioned = purchasesRdd.map(p => (p.customerId, p.price))
                              .partitionBy(new HashPartitioner(100))
                              .persist()
</code></pre>

#### Range partitioning

tuples with keys in the same rage appear on the same machine

```scala
// set of ranges: [1, 200], [201, 400], [401, 600], [601, 800]
val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tunedPartitioner = new RangePartitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()
```

### Optimization with partitioners

partitioning can bring substantial performance gains, especially in the face of shuffles (ensure that data is not transmitted over the network to other machines)

#### Optimization using rage partitioning

```scala
val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tuned Partitioner = new Range Partitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()

val purchasesPerMonth = partitioned.map(p => (p.customerId, (1, p.price)))
                                .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)
                                .collect()
// Command took 1.79s.
```

#### Avoiding a Network Shuffle By Partitioning

1. reduceByKey running on a pre-partitioned RDD will cause the values to be computed locally
2. join called on two RDDs that are pre-partitioned with the same partitioner and cached on the same machine will cause the join to be computed locally

### Wide and Narrow Dependencies

Transformations can have two kinds of dependencies

1. **Narrow Dependencies:** Each partition of the parent RDD is used by at most one partition of the child RDD.
   1. map, mapValues, flatMap, filter
   2. mapPartitions, mapPartitionsWithIndex
   3. union
   4. join (with co-partitioned inputs)
2. **Wide Dependencies:** Each partition of the parent RDD may be depended on bu **multiple** child partitions
   1. cogroup
   2. groupWith
   3. join (with inputs not co-partitioned)
   4. leftOuterJoin
   5. rightOuterJoin
   6. groupByKey
   7. reduceByKey
   8. combineByKey
   9. distinct
   10. intersection
   11. repartition
   12. coalesce

<figure><img src="/files/KuPuYsBAkkeoV2CPblks" alt=""><figcaption><p>Wide &#x26; Narrow Dependencies</p></figcaption></figure>

#### Lineage and Fault Tolerance

Lineages grapghs are the key to fault tolerance in Spark (recomputation)

toDebugString

## SQL, Dataframes, and Dataset

### SparkSession

```scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession
   .builder()
   .appName("My App")
   // .config("spark.some.config.option", "some-value")
   .getOrCreate()
```

{% code title="Creating DataFrames from existing RDD" %}

```scala
val schemaString = "name age"
val fields = schemaString.split(" ")
   .map(fieldName => StructField(fieldName, StringType, nullable=True))
val schema = StructType(fields)

val rowRDD = peopleRDD
   .map(_.split(","))
   .map(attributes => Row(attrubuteds(0), attributes(1).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema))

// val employeeDF = sc.parallelize(...).toDF
```

{% endcode %}

## DataFrame

```scala
import org.apache.spark.sql.types._

drop()
drop("all")
drop(Array("id", "name"))
fill(0)
fill(Map("minBalance" -> 0))
replace(Array("id"), Map(1234, 8923))
```

## References

* Scala Version: <https://www.scala-lang.org/download/all.html>


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.datahungry.dev/scala/spark.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
