Spark

Create Project

  1. Open Vscode >> File >> Open Folder >> Select empty folder

  2. Create build.sbtfile followed code below

  3. Vscode menu >> Terminal >> New Terminal >> Command Prompt >> run sbt

  4. create folder and file src/main/scala/<package>/<filename>.scala

  5. if using window then download winutils to folder <your-path>/winutils/hadoop-3.3.1/bin/winutils.exe

  6. set environment HADOOP_HOME=<your-path>/winutils/hadoop-3.3.1

build.sbt
name := "demo"
version := "1.0"

scalaVersion := "2.13.12"
val sparkVersion = "3.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
)
/src/main/scala/demo/main.scala
package demo

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.log4j.{Logger, Level}

object demoSpark {
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) // remove spark log

  val conf: SparkConf = new SparkConf().setAppName("Demo").setMaster("local[*]")
  val sc: SparkContext = new SparkContext(conf)
  val as = List(
    (101, ("R", "AG")),
    (102, ("B", "DT")),
    (103, ("G", "DTV")),
    (104, ("S", "DT"))
  )

  val ls = List(
    (101, "Bern"),
    (101, "Thun"),
    (102, "Lausanne"),
    (102, "Geneve"),
    (102, "Nyon"),
    (103, "Zurich"),
    (103, "St-Gallen"),
    (103, "Chur")
  )

  def main(args: Array[String]): Unit = {
    println("Hello world")


    val abos = sc.parallelize(as)
    val locations = sc.parallelize(ls)
    val trackedCustomers = abos.join(locations)
    trackedCustomers.collect().foreach(println)

    // Result:
    // Hello world
    // (101,((R,AG),Bern))
    // (101,((R,AG),Thun))
    // (102,((B,DT),Lausanne))
    // (102,((B,DT),Geneve))
    // (102,((B,DT),Nyon))
    // (103,((G,DTV),Zurich))
    // (103,((G,DTV),St-Gallen))
    // (103,((G,DTV),Chur))
  }
}

Transformations and Actions

Transformations

return new RDDs as results. They are lazy, their results RDD is not immediately computed

val largeList: List[String] = ...
val wordsRdd = sc.parallelize(largeList)
val lengthsRdd = wordsRdd.map(_.length)
val totalChars = lengthsRdd.reduce(_ + _)

Actions

compute a result based on an RDD, and either returned or saved to an external storage system (e.g. HDFS). They are eager, their result is immediately computed

val lastYearsLogs: RDD[String]
val numDecErrorLogs = lastYearsLogs.filter(lg => lg.contains("2016-12") && lg.contain("error"))
                                   .count()

val firstLogsWithErros = lastYearsLogs.filter(_.contains("ERROR")).take(10)

Evaluation in Spark

Interation and Big Data Procerssing

Logistic Regression

val points = sc.textFile(...).map(parsePoint)
var w = Vector.zeros(d)
for (i <- 1 to numIterations) {
    val gradient = points.map { p =>
        (1 / (1 + exp(-p.y * w.dot(p.x))) - 1) * p.y * p.y
    }.reduce(_ + _)
    w -= alpha * gradient
}

Caching and Persistence

By default, RDDs are recomputed each time you run an action on them. Spark allow you to control what is cached in memory

val points = sc.textFile(...).map(parsePoint).persist()

Cache

Shorthad for default storage level

Persist

Persistence can be customized with this method

Cluster Topology

  1. The driver program runs the Spark application, which creates a SparkContext upon start-up

  2. The SparkContext connects to a cluster manager (e.g. Mesos/YARN) which allocates resources

  3. Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application

  4. Next, driver program sends your application code to the executors

  5. Finally, SparkContext send tasks for the executors to run

Reduction Operations

Pair RDDs

In Spark, distributed key-value pairs are Pair RDDs that allow you to act on each key in parallel or regroup data across the network

Creating a Pair RDD
val rdd: RDD[WikipediaPage] = ...
val pairRdd = rdd.map(page => (page.title, page.text))
groupByKey
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
                  .map(event => (event.organizer, event.budget))
val groupedRdd = eventsRdd.groupByKey()
groupedRdd.collect().foreach(println)
reduceByKey
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
                  .map(event => (event.organizer, event.budget))
val budgetsRdd = eventsRdd.reduceByKey(_ + _)
budgetsRdd.collect().foreach(println)
mapValue
val intermediate = eventsRdd.mapValues(b => (b, 1)) // (budget, events)
                            .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
val avgBudgets = intermediate.mapValue {
    case (budget, numberOfEvents) => budget / numberOfEvents
}
avgBudgets.collect().foreach(println)
keys
case class Visitor(ip: String, timestamp: String, duration: String)
val visits: RDD[Visitor] = sc.textfile(...)
                             .map(v => (v.ip, v.duration))
val numUniqueVisits = visits.keys.distinct().count()

Partitioning and Suffling

Shuffling

Move data from one node to another to be grouped with its key. This can be enormous hit

Rule of thumb: a shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD.

groupByKey

def groupByKey(): RDD[(K, Interable[V])]
val purshasesRdd = sc.textFile()
val purchasesPerMonth = purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
                                .groupByKey()
                                .map(p => p._1, (p._2.size, p._2.sum))
                                .collect()
// Command took 15.48s.

reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
val purshasesRdd = sc.textFile()
val purchasesPerMonth = purchasesRdd.map(p => (p.customerId, (1, p.price))) // Pair RDD
                                .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)
                                .collect()
// Command took 4.65s.

Partitioning

Kind of partitioning in Spark

  • Hash partitioning (groupByKey)

  • Range Partitioning (sortedByKey)

Hash partitioning

hash partitioning attemps to spread data evenly across partitions based on the key

// p = k.hashCode() % numPartitions
val partitioned = purchasesRdd.map(p => (p.customerId, p.price))
                              .partitionBy(new HashPartitioner(100))
                              .persist()

Range partitioning

tuples with keys in the same rage appear on the same machine

// set of ranges: [1, 200], [201, 400], [401, 600], [601, 800]
val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tunedPartitioner = new RangePartitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()

Optimization with partitioners

partitioning can bring substantial performance gains, especially in the face of shuffles (ensure that data is not transmitted over the network to other machines)

Optimization using rage partitioning

val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tuned Partitioner = new Range Partitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()

val purchasesPerMonth = partitioned.map(p => (p.customerId, (1, p.price)))
                                .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)
                                .collect()
// Command took 1.79s.

Avoiding a Network Shuffle By Partitioning

  1. reduceByKey running on a pre-partitioned RDD will cause the values to be computed locally

  2. join called on two RDDs that are pre-partitioned with the same partitioner and cached on the same machine will cause the join to be computed locally

Wide and Narrow Dependencies

Transformations can have two kinds of dependencies

  1. Narrow Dependencies: Each partition of the parent RDD is used by at most one partition of the child RDD.

    1. map, mapValues, flatMap, filter

    2. mapPartitions, mapPartitionsWithIndex

    3. union

    4. join (with co-partitioned inputs)

  2. Wide Dependencies: Each partition of the parent RDD may be depended on bu multiple child partitions

    1. cogroup

    2. groupWith

    3. join (with inputs not co-partitioned)

    4. leftOuterJoin

    5. rightOuterJoin

    6. groupByKey

    7. reduceByKey

    8. combineByKey

    9. distinct

    10. intersection

    11. repartition

    12. coalesce

Lineage and Fault Tolerance

Lineages grapghs are the key to fault tolerance in Spark (recomputation)

toDebugString

SQL, Dataframes, and Dataset

SparkSession

import org.apache.spark.sql.SparkSession
val spark = SparkSession
   .builder()
   .appName("My App")
   // .config("spark.some.config.option", "some-value")
   .getOrCreate()
Creating DataFrames from existing RDD
val schemaString = "name age"
val fields = schemaString.split(" ")
   .map(fieldName => StructField(fieldName, StringType, nullable=True))
val schema = StructType(fields)

val rowRDD = peopleRDD
   .map(_.split(","))
   .map(attributes => Row(attrubuteds(0), attributes(1).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema))

// val employeeDF = sc.parallelize(...).toDF

DataFrame

import org.apache.spark.sql.types._

drop()
drop("all")
drop(Array("id", "name"))
fill(0)
fill(Map("minBalance" -> 0))
replace(Array("id"), Map(1234, 8923))

References

Last updated