Create Project
Open Vscode >> File >> Open Folder >> Select empty folder
Create build.sbt
file followed code below
Vscode menu >> Terminal >> New Terminal >> Command Prompt >> run sbt
create folder and file src/main/scala/<package>/<filename>.scala
if using window then download winutils to folder <your-path>/winutils/hadoop-3.3.1/bin/winutils.exe
set environment HADOOP_HOME=<your-path>/winutils/hadoop-3.3.1
Copy name := "demo"
version := "1.0"
scalaVersion := "2.13.12"
val sparkVersion = "3.2.0"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
)
/src/main/scala/demo/main.scala
Copy package demo
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.log4j.{Logger, Level}
object demoSpark {
Logger.getLogger( "org.apache.spark" ).setLevel(Level.ERROR) // remove spark log
val conf : SparkConf = new SparkConf().setAppName( "Demo" ).setMaster( "local[*]" )
val sc : SparkContext = new SparkContext(conf)
val as = List(
( 101 , ( "R" , "AG" )),
( 102 , ( "B" , "DT" )),
( 103 , ( "G" , "DTV" )),
( 104 , ( "S" , "DT" ))
)
val ls = List(
( 101 , "Bern" ),
( 101 , "Thun" ),
( 102 , "Lausanne" ),
( 102 , "Geneve" ),
( 102 , "Nyon" ),
( 103 , "Zurich" ),
( 103 , "St-Gallen" ),
( 103 , "Chur" )
)
def main (args: Array[String]) : Unit = {
println( "Hello world" )
val abos = sc.parallelize(as)
val locations = sc.parallelize(ls)
val trackedCustomers = abos.join(locations)
trackedCustomers.collect().foreach(println)
// Result:
// Hello world
// (101,((R,AG),Bern))
// (101,((R,AG),Thun))
// (102,((B,DT),Lausanne))
// (102,((B,DT),Geneve))
// (102,((B,DT),Nyon))
// (103,((G,DTV),Zurich))
// (103,((G,DTV),St-Gallen))
// (103,((G,DTV),Chur))
}
}
Transformations and Actions
Transformations
return new RDDs as results. They are lazy, their results RDD is not immediately computed
Copy val largeList : List[String] = ...
val wordsRdd = sc.parallelize(largeList)
val lengthsRdd = wordsRdd.map(_.length)
val totalChars = lengthsRdd.reduce(_ + _)
Actions
compute a result based on an RDD, and either returned or saved to an external storage system (e.g. HDFS). They are eager , their result is immediately computed
Copy val lastYearsLogs : RDD[String]
val numDecErrorLogs = lastYearsLogs.filter(lg => lg.contains( "2016-12" ) && lg.contain( "error" ))
.count()
val firstLogsWithErros = lastYearsLogs.filter(_.contains( "ERROR" )).take( 10 )
Evaluation in Spark
Interation and Big Data Procerssing
Logistic Regression
Copy val points = sc.textFile(...).map(parsePoint)
var w = Vector.zeros(d)
for (i <- 1 to numIterations) {
val gradient = points.map { p =>
( 1 / ( 1 + exp( - p.y * w.dot(p.x))) - 1 ) * p.y * p.y
}.reduce(_ + _)
w -= alpha * gradient
}
Caching and Persistence
By default, RDDs are recomputed each time you run an action on them. Spark allow you to control what is cached in memory
Copy val points = sc.textFile(...).map(parsePoint).persist()
Cache
Shorthad for default storage level
Persist
Persistence can be customized with this method
Cluster Topology
The driver program runs the Spark application, which creates a SparkContext upon start-up
The SparkContext connects to a cluster manager (e.g. Mesos/YARN) which allocates resources
Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application
Next, driver program sends your application code to the executors
Finally, SparkContext send tasks for the executors to run
Reduction Operations
Pair RDDs
In Spark, distributed key-value pairs are Pair RDDs that allow you to act on each key in parallel or regroup data across the network
Copy val rdd : RDD[WikipediaPage] = ...
val pairRdd = rdd.map(page => (page.title, page.text))
Copy case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val groupedRdd = eventsRdd.groupByKey()
groupedRdd.collect().foreach(println)
Copy case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val budgetsRdd = eventsRdd.reduceByKey(_ + _)
budgetsRdd.collect().foreach(println)
Copy val intermediate = eventsRdd.mapValues(b => (b, 1 )) // (budget, events)
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
val avgBudgets = intermediate.mapValue {
case (budget, numberOfEvents) => budget / numberOfEvents
}
avgBudgets.collect().foreach(println)
Copy case class Visitor(ip: String, timestamp: String, duration: String)
val visits : RDD[Visitor] = sc.textfile(...)
.map(v => (v.ip, v.duration))
val numUniqueVisits = visits.keys.distinct().count()
Partitioning and Suffling
Shuffling
Move data from one node to another to be grouped with its key. This can be enormous hit
Rule of thumb: a shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD.
groupByKey
Copy def groupByKey () : RDD[(K, Interable[V])]
Copy val purshasesRdd = sc.textFile()
val purchasesPerMonth = purchasesRdd.map(p => (p.customerId, p.price)) // Pair RDD
.groupByKey()
.map(p => p._1, (p._2.size, p._2.sum))
.collect()
// Command took 15.48s.
reduceByKey
Copy def reduceByKey (func: (V, V) => V) : RDD[(K, V)]
Copy val purshasesRdd = sc.textFile()
val purchasesPerMonth = purchasesRdd.map(p => (p.customerId, ( 1 , p.price))) // Pair RDD
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)
.collect()
// Command took 4.65s.
Partitioning
Kind of partitioning in Spark
Hash partitioning (groupByKey)
Range Partitioning (sortedByKey)
Hash partitioning
hash partitioning attemps to spread data evenly across partitions based on the key
Copy // p = k.hashCode() % numPartitions
val partitioned = purchasesRdd.map(p => (p.customerId, p.price))
.partitionBy( new HashPartitioner( 100 ))
.persist()
Range partitioning
tuples with keys in the same rage appear on the same machine
Copy // set of ranges: [1, 200], [201, 400], [401, 600], [601, 800]
val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tunedPartitioner = new RangePartitioner( 8 , pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()
Optimization with partitioners
partitioning can bring substantial performance gains, especially in the face of shuffles (ensure that data is not transmitted over the network to other machines)
Optimization using rage partitioning
Copy val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tuned Partitioner = new Range Partitioner( 8 , pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()
val purchasesPerMonth = partitioned.map(p => (p.customerId, ( 1 , p.price)))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)
.collect()
// Command took 1.79s.
Avoiding a Network Shuffle By Partitioning
reduceByKey running on a pre-partitioned RDD will cause the values to be computed locally
join called on two RDDs that are pre-partitioned with the same partitioner and cached on the same machine will cause the join to be computed locally
Wide and Narrow Dependencies
Transformations can have two kinds of dependencies
Narrow Dependencies: Each partition of the parent RDD is used by at most one partition of the child RDD.
map, mapValues, flatMap, filter
mapPartitions, mapPartitionsWithIndex
join (with co-partitioned inputs)
Wide Dependencies: Each partition of the parent RDD may be depended on bu multiple child partitions
join (with inputs not co-partitioned)
Lineage and Fault Tolerance
Lineages grapghs are the key to fault tolerance in Spark (recomputation)
toDebugString
SQL, Dataframes, and Dataset
SparkSession
Copy import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName( "My App" )
// .config("spark.some.config.option", "some-value")
.getOrCreate()
Creating DataFrames from existing RDD
Copy val schemaString = "name age"
val fields = schemaString.split( " " )
.map(fieldName => StructField(fieldName, StringType, nullable = True))
val schema = StructType(fields)
val rowRDD = peopleRDD
.map(_.split( "," ))
.map(attributes => Row(attrubuteds( 0 ), attributes( 1 ).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema))
// val employeeDF = sc.parallelize(...).toDF
DataFrame
Copy import org.apache.spark.sql.types._
drop()
drop( "all" )
drop(Array( "id" , "name" ))
fill( 0 )
fill(Map( "minBalance" -> 0 ))
replace(Array( "id" ), Map( 1234 , 8923 ))
References
Last updated 11 months ago