Spark

Create Project

  1. Open Vscode >> File >> Open Folder >> Select empty folder

  2. Create build.sbtfile followed code below

  3. Vscode menu >> Terminal >> New Terminal >> Command Prompt >> run sbt

  4. create folder and file src/main/scala/<package>/<filename>.scala

  5. if using window then download winutils to folder <your-path>/winutils/hadoop-3.3.1/bin/winutils.exe

  6. set environment HADOOP_HOME=<your-path>/winutils/hadoop-3.3.1

build.sbt
name := "demo"
version := "1.0"

scalaVersion := "2.13.12"
val sparkVersion = "3.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
)
/src/main/scala/demo/main.scala
package demo

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.log4j.{Logger, Level}

object demoSpark {
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) // remove spark log

  val conf: SparkConf = new SparkConf().setAppName("Demo").setMaster("local[*]")
  val sc: SparkContext = new SparkContext(conf)
  val as = List(
    (101, ("R", "AG")),
    (102, ("B", "DT")),
    (103, ("G", "DTV")),
    (104, ("S", "DT"))
  )

  val ls = List(
    (101, "Bern"),
    (101, "Thun"),
    (102, "Lausanne"),
    (102, "Geneve"),
    (102, "Nyon"),
    (103, "Zurich"),
    (103, "St-Gallen"),
    (103, "Chur")
  )

  def main(args: Array[String]): Unit = {
    println("Hello world")


    val abos = sc.parallelize(as)
    val locations = sc.parallelize(ls)
    val trackedCustomers = abos.join(locations)
    trackedCustomers.collect().foreach(println)

    // Result:
    // Hello world
    // (101,((R,AG),Bern))
    // (101,((R,AG),Thun))
    // (102,((B,DT),Lausanne))
    // (102,((B,DT),Geneve))
    // (102,((B,DT),Nyon))
    // (103,((G,DTV),Zurich))
    // (103,((G,DTV),St-Gallen))
    // (103,((G,DTV),Chur))
  }
}

Transformations and Actions

Transformations

return new RDDs as results. They are lazy, their results RDD is not immediately computed

val largeList: List[String] = ...
val wordsRdd = sc.parallelize(largeList)
val lengthsRdd = wordsRdd.map(_.length)
val totalChars = lengthsRdd.reduce(_ + _)
TransformationDefinitionDescription

map

map[B](f: A => B): RDD[B]

Apply function to each element in the RDD and return an RDD of the result

flatMap

flatMap[B](f: A => TraversableOnce[B]: RDD[B]

Apply a function to each element in the RDD and return an RDD of the contents of the iterators returned

filter

filter(pred: A => Boolean): RDD[A]

Apply predicate function to each element in the RDD and return an RDD of elements that have passed the predicate condition

distinct

distinct(): RDD[B]

Return RDD woth duplicates removed

union

union(other: RDD[T]): RDD[T]

intersection

intersection(other: RDD[T]): RDD[T]

Return an RDD contain elements inly found in both RDDs

subtract

subtract(other: RDD[T]): RDD[T]

Return an RDD with the contents of the other RDD removed

cartesian

cartesian[U](other: RDD[U]): RDD[(T, U)]

Cartesian product with the other RDD

Actions

compute a result based on an RDD, and either returned or saved to an external storage system (e.g. HDFS). They are eager, their result is immediately computed

ActionDefinition

collect

collect(): Array[T]

Return all elements from RDD

count

count(): :Long

Return the number of elements in the RDD

take

take(num: Int): Array[T]

Return the first num elements of the RDD

reduce

reduce(op: (A, A) => A): A

Combine the elements in the RDD together using op function and return result

foreach

foreach(f: T => Unit): Unit

Apply function to each element in the RDD

takeSample

takeSample(withRepl: Boolean, num: Int): Array[T]

Return an array with a ramdom sample of num elements of the dataset, with or without replacement

takeOrdered

takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

Return the first n elements of the RDD using either their natural order or a custom comparator

saveAsTextFile

saveAsTextFile(path: String): Unit

Write the elements of the dataset as a text file in the local filesystem or HDFS

saveAsSequenceFile

saveAsSequenceFile(path: String): Unit

Write the elements of the dataset as a Hadoop SequenceFile in the local filesystem of HDFS

val lastYearsLogs: RDD[String]
val numDecErrorLogs = lastYearsLogs.filter(lg => lg.contains("2016-12") && lg.contain("error"))
                                   .count()

val firstLogsWithErros = lastYearsLogs.filter(_.contains("ERROR")).take(10)

Evaluation in Spark

Interation and Big Data Procerssing

Logistic Regression

wwαi=1Ng(w;xi,yi)w \leftarrow w - \alpha * \sum_{i=1}^Ng(w; x_i, y_i)
val points = sc.textFile(...).map(parsePoint)
var w = Vector.zeros(d)
for (i <- 1 to numIterations) {
    val gradient = points.map { p =>
        (1 / (1 + exp(-p.y * w.dot(p.x))) - 1) * p.y * p.y
    }.reduce(_ + _)
    w -= alpha * gradient
}

Caching and Persistence

By default, RDDs are recomputed each time you run an action on them. Spark allow you to control what is cached in memory

val points = sc.textFile(...).map(parsePoint).persist()

Cache

Shorthad for default storage level

Persist

Persistence can be customized with this method

Storage LevelSpace UsedCPU time

MEMORY_ONLY (default)

High

Low

MEMORY_ONLY_SER

Low

High

MEMORY_AND_DISK

High

Medium

MEMORY_AND_DISK_SER

Low

High

DISK_ONLY

Low

High

Cluster Topology

  1. The driver program runs the Spark application, which creates a SparkContext upon start-up

  2. The SparkContext connects to a cluster manager (e.g. Mesos/YARN) which allocates resources

  3. Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application

  4. Next, driver program sends your application code to the executors

  5. Finally, SparkContext send tasks for the executors to run

Reduction Operations

Pair RDDs

In Spark, distributed key-value pairs are Pair RDDs that allow you to act on each key in parallel or regroup data across the network

Creating a Pair RDD
val rdd: RDD[WikipediaPage] = ...
val pairRdd = rdd.map(page => (page.title, page.text))
groupByKey
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
                  .map(event => (event.organizer, event.budget))
val groupedRdd = eventsRdd.groupByKey()
groupedRdd.collect().foreach(println)
reduceByKey
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
                  .map(event => (event.organizer, event.budget))
val budgetsRdd = eventsRdd.reduceByKey(_ + _)
budgetsRdd.collect().foreach(println)
mapValue
val intermediate = eventsRdd.mapValues(b => (b, 1)) // (budget, events)
                            .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
val avgBudgets = intermediate.mapValue {
    case (budget, numberOfEvents) => budget / numberOfEvents
}
avgBudgets.collect().foreach(println)
keys
case class Visitor(ip: String, timestamp: String, duration: String)
val visits: RDD[Visitor] = sc.textfile(...)
                             .map(v => (v.ip, v.duration))
val numUniqueVisits = visits.keys.distinct().count()

Partitioning and Suffling

Shuffling

Move data from one node to another to be grouped with its key. This can be enormous hit

Rule of thumb: a shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD.

groupByKey

def groupByKey(): RDD[(K, Interable[V])]
val purshasesRdd = sc.textFile()
val purchasesPerMonth = purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
                                .groupByKey()
                                .map(p => p._1, (p._2.size, p._2.sum))
                                .collect()
// Command took 15.48s.

reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]
val purshasesRdd = sc.textFile()
val purchasesPerMonth = purchasesRdd.map(p => (p.customerId, (1, p.price))) // Pair RDD
                                .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)
                                .collect()
// Command took 4.65s.

Partitioning

Kind of partitioning in Spark

  • Hash partitioning (groupByKey)

  • Range Partitioning (sortedByKey)

Hash partitioning

hash partitioning attemps to spread data evenly across partitions based on the key

// p = k.hashCode() % numPartitions
val partitioned = purchasesRdd.map(p => (p.customerId, p.price))
                              .partitionBy(new HashPartitioner(100))
                              .persist()

Range partitioning

tuples with keys in the same rage appear on the same machine

// set of ranges: [1, 200], [201, 400], [401, 600], [601, 800]
val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tunedPartitioner = new RangePartitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()

Optimization with partitioners

partitioning can bring substantial performance gains, especially in the face of shuffles (ensure that data is not transmitted over the network to other machines)

Optimization using rage partitioning

val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tuned Partitioner = new Range Partitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()

val purchasesPerMonth = partitioned.map(p => (p.customerId, (1, p.price)))
                                .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)
                                .collect()
// Command took 1.79s.

Avoiding a Network Shuffle By Partitioning

  1. reduceByKey running on a pre-partitioned RDD will cause the values to be computed locally

  2. join called on two RDDs that are pre-partitioned with the same partitioner and cached on the same machine will cause the join to be computed locally

Wide and Narrow Dependencies

Transformations can have two kinds of dependencies

  1. Narrow Dependencies: Each partition of the parent RDD is used by at most one partition of the child RDD.

    1. map, mapValues, flatMap, filter

    2. mapPartitions, mapPartitionsWithIndex

    3. union

    4. join (with co-partitioned inputs)

  2. Wide Dependencies: Each partition of the parent RDD may be depended on bu multiple child partitions

    1. cogroup

    2. groupWith

    3. join (with inputs not co-partitioned)

    4. leftOuterJoin

    5. rightOuterJoin

    6. groupByKey

    7. reduceByKey

    8. combineByKey

    9. distinct

    10. intersection

    11. repartition

    12. coalesce

Lineage and Fault Tolerance

Lineages grapghs are the key to fault tolerance in Spark (recomputation)

toDebugString

SQL, Dataframes, and Dataset

SparkSession

import org.apache.spark.sql.SparkSession
val spark = SparkSession
   .builder()
   .appName("My App")
   // .config("spark.some.config.option", "some-value")
   .getOrCreate()
Creating DataFrames from existing RDD
val schemaString = "name age"
val fields = schemaString.split(" ")
   .map(fieldName => StructField(fieldName, StringType, nullable=True))
val schema = StructType(fields)

val rowRDD = peopleRDD
   .map(_.split(","))
   .map(attributes => Row(attrubuteds(0), attributes(1).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema))

// val employeeDF = sc.parallelize(...).toDF

DataFrame

import org.apache.spark.sql.types._

drop()
drop("all")
drop(Array("id", "name"))
fill(0)
fill(Map("minBalance" -> 0))
replace(Array("id"), Map(1234, 8923))

References

Last updated