Open Vscode >> File >> Open Folder >> Select empty folder
Create build.sbtfile followed code below
Vscode menu >> Terminal >> New Terminal >> Command Prompt >> run sbt
create folder and file src/main/scala/<package>/<filename>.scala
if using window then download to folder <your-path>/winutils/hadoop-3.3.1/bin/winutils.exe
set environment HADOOP_HOME=<your-path>/winutils/hadoop-3.3.1
build.sbt
name := "demo"
version := "1.0"
scalaVersion := "2.13.12"
val sparkVersion = "3.2.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
)
/src/main/scala/demo/main.scala
package demo
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.log4j.{Logger, Level}
object demoSpark {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) // remove spark log
val conf: SparkConf = new SparkConf().setAppName("Demo").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val as = List(
(101, ("R", "AG")),
(102, ("B", "DT")),
(103, ("G", "DTV")),
(104, ("S", "DT"))
)
val ls = List(
(101, "Bern"),
(101, "Thun"),
(102, "Lausanne"),
(102, "Geneve"),
(102, "Nyon"),
(103, "Zurich"),
(103, "St-Gallen"),
(103, "Chur")
)
def main(args: Array[String]): Unit = {
println("Hello world")
val abos = sc.parallelize(as)
val locations = sc.parallelize(ls)
val trackedCustomers = abos.join(locations)
trackedCustomers.collect().foreach(println)
// Result:
// Hello world
// (101,((R,AG),Bern))
// (101,((R,AG),Thun))
// (102,((B,DT),Lausanne))
// (102,((B,DT),Geneve))
// (102,((B,DT),Nyon))
// (103,((G,DTV),Zurich))
// (103,((G,DTV),St-Gallen))
// (103,((G,DTV),Chur))
}
}
Transformations and Actions
Transformations
return new RDDs as results. They are lazy, their results RDD is not immediately computed
val largeList: List[String] = ...
val wordsRdd = sc.parallelize(largeList)
val lengthsRdd = wordsRdd.map(_.length)
val totalChars = lengthsRdd.reduce(_ + _)
Transformation
Definition
Description
map
map[B](f: A => B): RDD[B]
Apply function to each element in the RDD and return an RDD of the result
flatMap
flatMap[B](f: A => TraversableOnce[B]: RDD[B]
Apply a function to each element in the RDD and return an RDD of the contents of the iterators returned
filter
filter(pred: A => Boolean): RDD[A]
Apply predicate function to each element in the RDD and return an RDD of elements that have passed the predicate condition
distinct
distinct(): RDD[B]
Return RDD woth duplicates removed
union
union(other: RDD[T]): RDD[T]
intersection
intersection(other: RDD[T]): RDD[T]
Return an RDD contain elements inly found in both RDDs
subtract
subtract(other: RDD[T]): RDD[T]
Return an RDD with the contents of the other RDD removed
cartesian
cartesian[U](other: RDD[U]): RDD[(T, U)]
Cartesian product with the other RDD
Actions
compute a result based on an RDD, and either returned or saved to an external storage system (e.g. HDFS). They are eager, their result is immediately computed
Action
Definition
collect
collect(): Array[T]
Return all elements from RDD
count
count(): :Long
Return the number of elements in the RDD
take
take(num: Int): Array[T]
Return the first num elements of the RDD
reduce
reduce(op: (A, A) => A): A
Combine the elements in the RDD together using op function and return result
foreach
foreach(f: T => Unit): Unit
Apply function to each element in the RDD
takeSample
takeSample(withRepl: Boolean, num: Int): Array[T]
Return an array with a ramdom sample of num elements of the dataset, with or without replacement
Return the first n elements of the RDD using either their natural order or a custom comparator
saveAsTextFile
saveAsTextFile(path: String): Unit
Write the elements of the dataset as a text file in the local filesystem or HDFS
saveAsSequenceFile
saveAsSequenceFile(path: String): Unit
Write the elements of the dataset as a Hadoop SequenceFile in the local filesystem of HDFS
val lastYearsLogs: RDD[String]
val numDecErrorLogs = lastYearsLogs.filter(lg => lg.contains("2016-12") && lg.contain("error"))
.count()
val firstLogsWithErros = lastYearsLogs.filter(_.contains("ERROR")).take(10)
Evaluation in Spark
Interation and Big Data Procerssing
Logistic Regression
w←w−α∗i=1∑Ng(w;xi,yi)
val points = sc.textFile(...).map(parsePoint)
var w = Vector.zeros(d)
for (i <- 1 to numIterations) {
val gradient = points.map { p =>
(1 / (1 + exp(-p.y * w.dot(p.x))) - 1) * p.y * p.y
}.reduce(_ + _)
w -= alpha * gradient
}
Caching and Persistence
By default, RDDs are recomputed each time you run an action on them. Spark allow you to control what is cached in memory
val points = sc.textFile(...).map(parsePoint).persist()
Cache
Shorthad for default storage level
Persist
Persistence can be customized with this method
Storage Level
Space Used
CPU time
MEMORY_ONLY (default)
High
Low
MEMORY_ONLY_SER
Low
High
MEMORY_AND_DISK
High
Medium
MEMORY_AND_DISK_SER
Low
High
DISK_ONLY
Low
High
Cluster Topology
The driver program runs the Spark application, which creates a SparkContext upon start-up
The SparkContext connects to a cluster manager (e.g. Mesos/YARN) which allocates resources
Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application
Next, driver program sends your application code to the executors
Finally, SparkContext send tasks for the executors to run
Reduction Operations
Pair RDDs
In Spark, distributed key-value pairs are Pair RDDs that allow you to act on each key in parallel or regroup data across the network
Creating a Pair RDD
val rdd: RDD[WikipediaPage] = ...
val pairRdd = rdd.map(page => (page.title, page.text))
groupByKey
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val groupedRdd = eventsRdd.groupByKey()
groupedRdd.collect().foreach(println)
reduceByKey
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val budgetsRdd = eventsRdd.reduceByKey(_ + _)
budgetsRdd.collect().foreach(println)
case class Visitor(ip: String, timestamp: String, duration: String)
val visits: RDD[Visitor] = sc.textfile(...)
.map(v => (v.ip, v.duration))
val numUniqueVisits = visits.keys.distinct().count()
Partitioning and Suffling
Shuffling
Move data from one node to another to be grouped with its key. This can be enormous hit
Rule of thumb: a shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD.
groupByKey
def groupByKey(): RDD[(K, Interable[V])]
val purshasesRdd = sc.textFile()
val purchasesPerMonth = purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey()
.map(p => p._1, (p._2.size, p._2.sum))
.collect()
// Command took 15.48s.
reduceByKey
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
val purshasesRdd = sc.textFile()
val purchasesPerMonth = purchasesRdd.map(p => (p.customerId, (1, p.price))) // Pair RDD
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)
.collect()
// Command took 4.65s.
Partitioning
Kind of partitioning in Spark
Hash partitioning (groupByKey)
Range Partitioning (sortedByKey)
Hash partitioning
hash partitioning attemps to spread data evenly across partitions based on the key
// p = k.hashCode() % numPartitions
val partitioned = purchasesRdd.map(p => (p.customerId, p.price))
.partitionBy(new HashPartitioner(100))
.persist()
Range partitioning
tuples with keys in the same rage appear on the same machine
// set of ranges: [1, 200], [201, 400], [401, 600], [601, 800]
val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tunedPartitioner = new RangePartitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()
Optimization with partitioners
partitioning can bring substantial performance gains, especially in the face of shuffles (ensure that data is not transmitted over the network to other machines)
Optimization using rage partitioning
val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tuned Partitioner = new Range Partitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()
val purchasesPerMonth = partitioned.map(p => (p.customerId, (1, p.price)))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)
.collect()
// Command took 1.79s.
Avoiding a Network Shuffle By Partitioning
reduceByKey running on a pre-partitioned RDD will cause the values to be computed locally
join called on two RDDs that are pre-partitioned with the same partitioner and cached on the same machine will cause the join to be computed locally
Wide and Narrow Dependencies
Transformations can have two kinds of dependencies
Narrow Dependencies: Each partition of the parent RDD is used by at most one partition of the child RDD.
map, mapValues, flatMap, filter
mapPartitions, mapPartitionsWithIndex
union
join (with co-partitioned inputs)
Wide Dependencies: Each partition of the parent RDD may be depended on bu multiple child partitions
cogroup
groupWith
join (with inputs not co-partitioned)
leftOuterJoin
rightOuterJoin
groupByKey
reduceByKey
combineByKey
distinct
intersection
repartition
coalesce
Lineage and Fault Tolerance
Lineages grapghs are the key to fault tolerance in Spark (recomputation)