spark
auggie

apache spark 源码阅读

spark 入门教程

apache spark

环境安装

hadoop

ssh免密码登陆

  1. 在mac的系统偏好设置–>共享中打开远程登录
  2. cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
  3. ssh localhost

hadoop

hadoop mac 配置

1
brew install hadoop

Hadoop 伪分布式配置:

  1. /opt/homebrew/Cellar/hadoop/3.3.4/libexec/etc/hadoop/core-site.xml 配置如下信息:
1
2
3
4
5
6
7
8
9
10
11
12
<configuration>

<property>
<name>hadoop.tmp.dir</name>
<value>file:/opt/homebrew/Cellar/hadoop/3.3.4/libexec/tmp</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:8020</value>
</property>

</configuration>
  1. /opt/homebrew/Cellar/hadoop/3.3.4/libexec/etc/hadoop/hdfs-site.xml 配置信息如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<configuration>

<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/opt/homebrew/Cellar/hadoop/3.3.4/libexec/tmp/dfs/nam e</value>
</property>
<property>
<name>dfs.namenode.data.dir</name>
<value>file:/opt/homebrew/Cellar/hadoop/3.3.4/libexec/tmp/dfs/dat a</value>
</property>

</configuration>
  1. Env 配置
1
2
3
4
export CLASSPAHT=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
149 export HADOOP_HOME=/opt/homebrew/Cellar/hadoop/3.3.4/libexec
150 export HADOOP_COMMON_HOME=$HADOOP_HOME
151 export PATH=$JAVA_HOME/bin:$PATH:$HADOOP_HOME/bin:/opt//homebrew/Cellar/scala/bin

运行 hadoop 程序

  1. 初始化:hdfs namenode -format

  2. start-dfs.sh

    找不到 JRE

    1
    2
    3
    4
    5
    6
    7
    (base) ➜  ~ start-dfs.sh
    Starting namenodes on [localhost]
    localhost: WARNING: /opt/homebrew/Cellar/hadoop/3.3.4/libexec/logs does not exist. Creating.
    Starting datanodes
    Starting secondary namenodes [Y3Y54Q72DR]
    Y3Y54Q72DR: ssh: Could not resolve hostname y3y54q72dr: nodename nor servname provided, or not known
    2022-11-09 16:17:23,985 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  3. 查看是否启动 jps

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    (base) ➜  ~ jps
    86881
    56226 Jps
    22195
    55683 NameNode
    39907 NailgunRunner
    55786 DataNode
    43003 Launcher
    23211 Application
    30365
  4. 查看 namenode, http://localhost:9870/dfshealth.html#tab-overview

  5. 修改 yarn 配置文件。/opt/homebrew/Cellar/hadoop/3.3.4/libexec/etc/hadoop/mapred-site.xml 添加内容:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    <configuration>

    <property>
    <name>mapreduce.framework.name
    </name>
    <value>yarn</value>
    </property>

    </configuration>
  6. 修改 /opt/homebrew/Cellar/hadoop/3.3.4/libexec/etc/hadoop/yarn-site.xml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    <configuration>

    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
    </property>
    <property>
    <name>yarn.nodemanager.env-whitelist</name>
    <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
    </property>

    </configuration>
  7. 启动 yarnstart-yarn.sh

    1
    2
    3
    (base) ➜  hadoop git:(stable) start-yarn.sh
    Starting resourcemanager
    Starting nodemanagers

    浏览器中打开:http://localhost:8088/cluster

  8. 运行 Hadoop 自带的 wordcount 程序

    • hadoop fs -mkdir /input 创建文件夹
    • hadoop fs -ls / 查看文件夹
    1
    2
    3
    4
    5
    6
    (base) ➜  hadoop git:(stable) hadoop fs -mkdir /input
    2022-11-09 16:27:09,642 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    (base) ➜ hadoop git:(stable) hadoop fs -ls /
    2022-11-09 16:27:30,749 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Found 1 items
    drwxr-xr-x - bytedance supergroup 0 2022-11-09 16:27 /input
  9. 运行程序 hadoop jar /opt/homebrew/Cellar/hadoop/3.3.4/libexec/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.4.jar wordcount /input /output

Hive

太麻烦了(

Hive install

Hive install

1
2
3
4
export HIVE_HOME=/usr/local/hive
export PATH=$PATH:$HIVE_HOME/bin
export CLASSPATH=$CLASSPATH:/usr/local/Hadoop/lib/*:.
export CLASSPATH=$CLASSPATH:/usr/local/hive/lib/*:.
1
2
3
4
5
6
7
HADOOP_HOME=/opt/homebrew/Cellar/hadoop/3.3.4/libexec

# Hive Configuration Directory can be controlled by:
export HIVE_CONF_DIR=/usr/local/hive/conf

# Folder containing extra libraries required for hive compilation/ execution can be controlled by:
export HIVE_AUX_JARS_PATH=/usr/local/hive/lib

hive-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<configuration>
<property>
<name>hive.exec.scratchdir</name>
<value>hdfs://master:8020/data/hive/temp</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>hdfs://master:8020/data/hive/warehouse</value>
</property>
<property>
<name>hive.querylog.location</name>
<value>hdfs://master:8020/data/hive/log</value>
</property>

<!—该配置是关闭hive元数据版本认证,否则会在启动spark程序时报错-->
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>

spark

csdn spark

xmu spark

  1. 安装 scala,必须安装 spark 指定的版本,不然 spark 会报错

  2. 下载 sparkspark官网

  3. 安装 配置 spark

    1
    2
    3
    sudo tar -zxf ~/下载/spark-2.1.0-bin-without-hadoop.tgz -C /usr/local/cd /usr/local
    sudo mv ./spark-2.1.0-bin-without-hadoop/ ./spark
    sudo chown -R hadoop:hadoop ./spark # 此处的 hadoop 为你的用户名

    安装后,还需要修改Spark的配置文件spark-env.sh

    1
    2
    cd $SPARK_HOME
    cp ./conf/spark-env.sh.template ./conf/spark-env.sh

    编辑spark-env.sh文件(vim ./conf/spark-env.sh),在第一行添加以下配置信息:

    1
    export SPARK_DIST_CLASSPATH=$(/opt/homebrew/bin/hadoop classpath)

    有了上面的配置信息以后,Spark就可以把数据存储到Hadoop分布式文件系统HDFS中,也可以从HDFS中读取数据。如果没有配置上面信息,Spark就只能读写本地数据,无法读写HDFS数据。

  4. 验证安装

    1
    2
    3
    4
    bin/run-example SparkPi 2>&1 | grep "Pi is"

    (base) ➜ spark-3.3.1-bin-hadoop3 git:(stable) bin/run-example SparkPi 2>&1 | grep "Pi is"
    Pi is roughly 3.1459757298786495
  5. 启动 spark

    localhost: namenode running as process 2896. Stop it first.

    不要启动到 hadoopstart_all.sh 了😇

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    (base) ➜  ~ cd $SPARK_HOME/sbin
    (base) ➜ sbin git:(stable) ./start-all.sh
    starting org.apache.spark.deploy.master.Master, logging to /opt/homebrew/Cellar/spark-3.3.1-bin-hadoop3/logs/spark-bytedance-org.apache.spark.deploy.master.Master-1-Y3Y54Q72DR.out
    localhost: starting org.apache.spark.deploy.worker.Worker, logging to /opt/homebrew/Cellar/spark-3.3.1-bin-hadoop3/logs/spark-bytedance-org.apache.spark.deploy.worker.Worker-1-Y3Y54Q72DR.out
    (base) ➜ sbin git:(stable) jps
    86881
    67523 Master
    22195
    67634 Worker
    67669 Jps
    66823 NameNode
    43003 Launcher
    23211 Application
    67197 ResourceManager
    30365
    66927 DataNode

    spark 本地地址

  6. 启动 spark shell

    1
    2
    3
    4
    5
    6
    7
    8
    9
    spark-shell --master <master-url>

    * local 使用一个Worker线程本地化运行SPARK(完全不并行)
    * local[*] 使用逻辑CPU个数数量的线程来本地化运行Spark
    * local[K] 使用K个Worker线程本地化运行Spark(理想情况下,K应该根据运行机器的CPU核数设定)
    * spark://HOST:PORT 连接到指定的Spark standalone master。默认端口是7077.
    * yarn-client 以客户端模式连接YARN集群。集群的位置可以在HADOOP_CONF_DIR 环境变量中找到。
    * yarn-cluster 以集群模式连接YARN集群。集群的位置可以在HADOOP_CONF_DIR 环境变量中找到。
    * mesos://HOST:PORT 连接到指定的Mesos集群。默认接口是5050。
    1
    2
    cd $SPARK_HOME/bin
    ./spark-shell --master local
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    Welcome to
    ____ __
    / __/__ ___ _____/ /__
    _\ \/ _ \/ _ `/ __/ '_/
    /___/ .__/\_,_/_/ /_/\_\ version 3.3.1
    /_/

    Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)
    Type in expressions to have them evaluated.
    Type :help for more information.

    scala>

Spark 独立应用程序编程

遇到的坑

  • 路径问题:

    spark 应该会从 hdfs 上面查找文件。如果需要查找本地文件的话,需要使用 file: 开头,例如:

    1
    val logFile = "file:/Users/bytedance/resource/fyy/log.log"
  • 找不到 class 的问题:

    • 首先包含 main 方法的类名要和 --class 输入的类名保持一致。

    • 使用 idea 会报错,不知道为什么。之后还是用 vscode 吧😇

      如下:

      1
      2
      3
      4
      SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
      Error: Failed to load class SimpleApp.
      22/11/10 11:22:42 INFO ShutdownHookManager: Shutdown hook called
      22/11/10 11:22:42 INFO ShutdownHookManager: Deleting directory /private/var/folders/b1/0fd1b6hs7lz0fm_mh346lybm0000gn/T/spark-533682ae-17f9-4229-a3d0-a621fe441511
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    import org.apache.spark.sql.SparkSession

    object SimpleApp {
    def main(args: Array[String]) {
    val logFile = "file:/Users/bytedance/resource/fyy/log.log" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
    }
    }
    1
    2
    3
    4
    5
    6
    7
    name := "Simple App"

    version := "1.0"

    scalaVersion := "2.13.10"

    libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.1"

两种 sparkcontext 姿势

sparkcontext 配置 hdfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("SimpleApp")
val sc = new SparkContext(conf)
val filename = "hdfs://localhost:8020/user/spark/word.txt"
val txt = sc.textFile(filename)
val res = txt.flatMap(line => line.split(" ")).filter(_.size > 0).map(word => (word -> 1))
.reduceByKey(_ + _).collect()
res.foreach(println)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.spark.sql.SparkSession

object SimpleApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().config("spark.master", "local").getOrCreate()
val sc = spark.sparkContext
val filename = "hdfs://localhost:8020/user/spark/word.txt"
val txt = sc.textFile(filename)
val res = txt.flatMap(line => line.split(" ")).filter(_.size > 0).map(word => (word -> 1))
.reduceByKey(_ + _).collect()
res.foreach(println)
}
}

输出:

1
2
3
4
5
(base) ➜  sparkapp $SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local \
./target/scala-2.13/simple-app_2.13-1.0.jar 2&>1 | grep "Line"
Lines with a: 16, Lines with b: 15

wordCount

一些常用的函数

文件读写:

  • 读写本地文件
1
2
3
4
5
6
7
8
9
10
// 读取文件	如果是本地文件 必须用 file:// 开头,用于区分 hdfs
val file = sc.textFile(filepath)

// spark 懒加载,加载文件的第一行
file.first()

// textFile 写会
file.saveasTextFile(filepath)


  • 读写 hdfs 文件

hdfs 常用命令:

1
2
3
4
5
6
7
8
9
10
11
hdfs dfs -mkdir hdfspath

hdfs dfs -ls hdfspath

hdfs dfs -put localpath hfdspath

hdfs dfs -cat hdfsfile

hdfs dfs -get

hdfs getconf -confKey fs.default.name # 获取本地 hdfs 端口号

spark 读取 hdfs 路径:

1
2
3
4
5
6
val textFile = sc.textFile("hdfs://localhost:8020/user/spark/word.txt")

// 实际上可以省略不写
val textFile = sc.textFile("/user/spark/word.txt")

// 读写操作和本地文件类似

词频统计:

1
textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b).foreach(println)

配置 vscode scala spark 环境

vscode scala

vscode 配置 scalafmt (寄了,建议用 idea)

vscode scalafmt

project 中添加 plugins.sbt 文件

1
2
3
4
5
// In project/plugins.sbt. Note, does not support sbt 0.13, only sbt 1.x.
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % SBT_PLUGIN_VERSION)

// idea
addSbtPlugin("org.jetbrains.scala" % "sbt-ide-settings" % "1.1.1")

添加 .scalafmt.config 文件

1
version = 2.7.5

Spark 运行流程

spark 运行流程

spark-process

  1. Spark 应用程序被提交,任务控制节点(Driver)创建一个 SparkContext。SparkContext会向资源管理器(Cluster Manager)注册并申请运行Executor的资源。
  2. Cluster Manager 为 Executor 分配资源,启动 executor 进程,executor运行情况将随着“心跳”发送到资源管理器上。
  3. RDD -> DAG -> task set(stage) -> executor
  4. 任务在Executor上运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。

一个任务创建在一个 worker 上面创建一个 exector

RDD

一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。

  • 容错:通过 RDD 的 DAG 实现容错

  • 中间结果持久化到内存

  • 存放数据可以是 Java obj

  • Narrow:一对一,多对一

  • Wild:一对多

narrow & wild

阶段划分

在DAG中进行反向解析,遇到宽依赖就断开遇到窄依赖就把当前的RDD加入到当前的阶段中;将窄依赖尽量划分在同一个阶段中,可以实现流水线计算。

划分

运行过程

  • 创建 RDD,sparkContext 解析依赖关系,生成 DAG
  • DAGScheduler 将 DAG 拆成多个 stage。
  • 每个阶段中包含了多个任务,每个任务会被任务调度器分发给各个工作节点(Worker Node)上的Executor去执行。

process

RDD创建

  • 读取外部数据集,HDFS,LOCAL,Kafka

    如果使用了本地文件系统的路径,那么,必须要保证在所有的worker节点上,也都能够采用相同的路径访问到该文件,比如,可以把该文件拷贝到每个worker节点上,或者也可以使用网络挂载共享文件系统。

    1
    2
    3
    val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")

    val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
  • 调用 SparkContext 的 parallelize 方法,在已有的集合上面创建一个 RDD

    1
    2
    3
    4
    val list = (1 to 10).toList
    val array = (1 to 10).toArray
    val rdd = sc.parallelize(list)
    val rdd = sc.parallelize(array)

RDD操作

RDD 创建之后,在后续过程中一般只会发生两种操作:

  • transformation:RDD => newRDD
  • Action: 在数据集上运算,返回结果

transformation

  • filter(func)
  • map(func)
  • flatMap(func)
  • groupByKey()
  • reduceByKey(func)

action

  • count()
  • collect() 以数组的形式返回数据集中的所有元素
  • first()
  • take(n)
  • reduce(func)
  • foreach(func)

RDD持久化

RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算

在一些情形下,我们需要多次调用不同的行动操作,这就意味着,每次调用行动操作,都会触发一次从头开始的计算。

通过持久化机制避免这种重复计算的开销。可以使用persist()方法对一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。

1
2
3
4
5
6
7
8
9
10
11
scala> val mp = textFile.map(line => line.split(" ").size)
mp: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[48] at map at <console>:23

scala> mp.cache() // 不会缓存 mp,因为 mp 还没有计算生成
res32: mp.type = MapPartitionsRDD[48] at map at <console>:23

scala> mp.count() // 计算 mp,并且缓存
res33: Long = 2

scala> mp.collect() // 重复使用 cache 的 mp
res34: Array[Int] = Array(7, 2)

如果使用集群模式打印 RDD 的话,不能使用 rdd.foreach(println) 或者rdd.map(println),而是 rdd.collect().foreach(println) 或者rdd.collect().toke(100).foreach(println)

键值对 RDD

通过 map 函数创建 kvRDD

1
val pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1))

transformation:

  • reduceByKey

  • groupByKey

  • sortByKey

  • mapValues(func): 作用于 (k, v) 的 value 的函数

  • join:  (k, v1), (k, v2) => (k, (v1, v2))

  • keys

  • values

demo:统计 kv 的平均值

1
2
3
val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))

rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(x => x._1 / x._2).collect()
  • API

    • Transformations => another RDD

      • map, filter, union
      • groupByKey, reduceByKey, repartition
    • Actions => Lineage 中断

      • count, collect, saveAsTextFile
      • foreach
    • parallelize

  • Source

    • from storage

      • HDFS words = sc.textFile("hdfs://...")

      • local:

      • Kafka

    • from another RDD

      1
      res = words.map(word => (word, 1)).reduceByKey((a, b) => a + b)

提交作业:

  • spark-shell

  • Spark-submit

  • narrow(pipeline): e.g. map

  • wide(shuffle): e.g. reduceByKey

共享变量

当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本

问题:有时候,需要在多个任务之间共享变量。

解决:引入 广播变量,累加器

广播变量

序列化是 Java对象 => 字节序列 的过程。

反序列化是 字节序列 => Java对象 的过程。

主要用于两个 java 进程进行通信,传输 java 对象传送。

好处:

  • 数据持久化
  • 序列化实现远程通信

序列化 & 反序列化

在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。

显式创建广播变量的场景:当跨越多个阶段的那些任务需要相同的数据。

1
2
3
val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar.value

SparkContext.broadcast(v) 之后,集群的函数都需要使用 广播变量 的值,而不是原值。

累加器

一个数值型的累加器,可以通过调用SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()来创建。

1
2
3
4
5
val accum = sc.longAccumulator("My Accumulator")

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

accum.value

Spark SQL

DataFrame 和 RDD 的区别:

  • RDD 是分布式 Java 对象集合,对象内部结构对于 RDD 不可知。
  • DataFrame 是以 RDD 为基础的分布式数据集(分布式 Row 对象集合),提供详细的结构信息

同时:

DataFrame 也采用惰性机制,和 RDD 的处理逻辑一样。

DF 创建 & 保存

创建

  • 读取文件
1
val peopleDF = spark.read.format("json").load("file:///people.json")
  • 使用 rdd 创建 df
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
val filename = "hdfs://localhost:8020/user/spark/people.txt"

val peopleRDD = spark.sparkContext.textFile(filename)

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
  1. 读取 rdd => rowRDD
  2. 创建 schema
  3. 创建 df

保存

1
peopleDF.select("name", "age").write.format("csv").save("file:///newpeople.csv")

write.format()支持输出 json,parquet, jdbc, orc, libsvm, csv, text等格式文件,如果要输出文本文件,可以采用write.format(“text”),但是,需要注意,只有select()中只存在一个列时,才允许保存成文本文件,如果存在两个列,比如select(“name”, “age”),就不能保存成文本文件。

获取 mysql 数据

shell & 程序方法使用 JDBC 读取 mysql 数据

  • 安装 JDBC

  • 启动 shell 的时候指定 jdbc

    1
    2
    3
    spark-shell \
    --jars $SPARK_HOME/jars/mysql-connector-j-8.0.31/mysql-connector-j-8.0.31.jar \
    --driver-class-path $SPARK_HOME/jars/mysql-connector-j-8.0.31/mysql-connector-j-8.0.31.jar
  • 获取 mysql 数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("driver","com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "123").load()
    jdbcDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

    scala> jdbcDF.show()
    +---+--------+------+---+
    | id| name|gender|age|
    +---+--------+------+---+
    | 1| Xueqian| F| 23|
    | 2|Weiliang| M| 24|
    +---+--------+------+---+

Spark Streaming

将输入流按照时间片进行拆分(秒级),然后进过 spark 引擎批处理。

DStream(Discretized Stream,离散化数据流),表示连续不断的数据流。

数据分区器 rdd如何存储

Spark 源码阅读

RDD

  • partitions 列表
  • dependencies 列表
  • 一些函数,e.g. map

RDD 分区

分区接口定义,实现 Serializable 可以实现序列化操作。

1
2
3
4
5
6
7
8
9
10
11
trait Partition extends Serializable {
/**
* Get the partition's index within its parent RDD
*/
def index: Int

// A better default implementation of HashCode
override def hashCode(): Int = index

override def equals(other: Any): Boolean = super.equals(other)
}

RDD:

  • 包含一个 partitions 的 list,外界只能通过 partitions 方法来访问,子类需要重写 getPartitions 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@transient private var partitions_ : Array[Partition] = null

/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getPartitions: Array[Partition]

/**
* Get the array of partitions of this RDD, taking into account whether the
* RDD is checkpointed or not.
*/
final def partitions: Array[Partition] = {
checkpointRDD.map(_.partitions).getOrElse {
if (partitions_ == null) {
partitions_ = getPartitions
}
partitions_
}
}
RDD 分区个数分配原则

尽可能使得分区的个数,等于集群核心数目。

转换操作得到的 RDD 的分区个数:

  • narrow:子 RDD 由父 RDD 分区个数决定
  • Shuffle:依赖由子 RDD 分区器决定

parallelize 方法,通过 defaultParallelism 参数来决定分区大小。

1
2
3
4
5
6
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

defaultParallelism 参数由 spark 不同的模式来确定,例如 SingleCoreMockBackend 只有一个 core

1
2
3
4
5
6
7
8
private[spark] class SingleCoreMockBackend(
conf: SparkConf,
taskScheduler: TaskSchedulerImpl) extends MockBackend(conf, taskScheduler) {

val cores = 1

override def defaultParallelism(): Int = conf.getInt("spark.default.parallelism", cores)
}

textFile 方法,最小是 2

1
2
3
4
5
6
7
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
1
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
RDD 分区内记录个数

尽可能使同一 RDD 不同分区内的记录的数量一致。

  • narrow:依赖于父 RDD 中相同编号分区是如何进行数据分配的
  • shuffle:依赖于选择的分区器,哈希分区器无法保证数据被平均分配到各个分区,而范围分区器则能做到这一点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private object ParallelCollectionRDD {
/**
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
* it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
* is an inclusive Range, we use inclusive range for the last slice.
*/
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
seq match {
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
} else {
new Range.Inclusive(r.start + start * r.step, r.start + (end - 1) * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[T] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices.toSeq
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map { case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
}

就是构造等长的分区。

RDD 依赖

在外部通常把记录的信息成为血缘关系。在内部记录则是 RDD 之间的依赖 Dependancy。

依赖只保存在父 RDD 中。

1
2
3
4
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}

依赖的分类

依赖关系指两个 RDD 之间的依赖关系。如果一次转换中包含多个父依赖,则可能同时存在 narrow 和 wild

narrow

Narrow 实现在 NarrowDependency 中。

1
2
3
4
5
6
7
8
9
10
11
12
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/**
* Get the parent partitions for a child partition.
* 用于获取分区来源于父 RDD 中的哪一个分区,只会返回一个元素
* @param partitionId a partition of the child RDD
* @return the partitions of the parent RDD that the child partition depends upon
*/
def getParents(partitionId: Int): Seq[Int]

override def rdd: RDD[T] = _rdd
}

narrow 可以进一步分为 一对一依赖范围依赖

一对一依赖

一对一依赖表示子 RDD 分区的编号与父 RDD 分区的编号完全一致的情况。

1
2
3
4
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int) = List(partitionId)
}

范围依赖

范围依赖是子 RDD 有父 RDD,但是分区信息还是一对一的。如图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
* @param rdd the parent RDD
* @param inStart the start of the range in the parent RDD
* @param outStart the start of the range in the child RDD
* @param length the length of the range
*/
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {

override def getParents(partitionId: Int) = {
// 如果在子分区中,返回对应父分区
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
// 否则是 Nil
Nil
}
}
}
shuffle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* :: DeveloperApi ::
* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,
* the RDD is transient since we don't need it on the executor side.
*
* @param _rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
@transient _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
val keyOrdering: Option[Ordering[K]] = None,
val aggregator: Option[Aggregator[K, V, C]] = None,
val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {

override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

val shuffleId: Int = _rdd.context.newShuffleId()

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

RDD 持久化

Spark 速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存个数据集

缓存是 Spark 构建 迭代式算法快速交互式查询 的关键。

如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。

两种持久化操作:

  • persist(StorageLevel)
  • cache, cache 就相当于 MEMORY_ONLY 的 persist
RDD 缓存方式

[[内存管理]] 可以查看 on-heap 和 off-heap 内存的优缺点。

默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。

这两个方法并不是被调用时立即缓存,而是触发后面的 action 时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

在存储级别的末尾加上 _2 来把持久化数据存为两份。例如 DISK_ONLY_2

storage level

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object StorageLevel {  
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}
persist, cache, unpersist

persist and cache

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// cache() 等同于 persist() 等同于 persist(StorageLevel.MEMORY_ONLY) ,也就是仅缓存于存储内存中。
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

// 缓存级别,由5个参数组成
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))

def persist(newLevel: StorageLevel): this.type = {
// isLocallyCheckpointed 方法 判断该RDD是否已经标记为 checkpoint,注意不是cache
if (isLocallyCheckpointed) {
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}

private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw SparkCoreErrors.cannotChangeStorageLevelError()
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
// 修改 rdd 的存储级别
storageLevel = newLevel
this
}

/**
* Register an RDD to be persisted in memory and/or disk storage
*/
private[spark] def persistRDD(rdd: RDD[_]): Unit = {
// 相当于打一个标记,真正触发 rdd storage 的地方是在 iterator 的时候
persistentRdds(rdd.id) = rdd
}

unpersist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def unpersist(blocking: Boolean = false): this.type = {
logInfo(s"Removing RDD $id from persistence list")
sc.unpersistRDD(id, blocking)
storageLevel = StorageLevel.NONE
this
}

private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) {
// 通知blockManager删掉属于该RDD的全部block
env.blockManager.master.removeRdd(rddId, blocking)
// 从map中移掉它
persistentRdds.remove(rddId)
listenerBus.post(SparkListenerUnpersistRDD(rddId))
}

获取 persistent RDD 的方法:

1
2
3
4
5
scala> val rdds = sc.getPersistentRDDs
rdds: scala.collection.Map[Int,org.apache.spark.rdd.RDD[_]] = Map(5 -> ShuffledRDD[5] at partitionBy at <console>:28)

scala> rdds.foreach(println)
(5,ShuffledRDD[5] at partitionBy at <console>:28)
iterator

scala curring function

RDD缓存的实现逻辑分析

iterator 是真正触发存储的地方

  1. iterator

    1
    2
    3
    4
    5
    6
    7
    8
    9
    final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
    // 如果存储等级存在的话,调用
    getOrCompute(split, context)
    } else {
    // compute or 读取 checkpoint
    computeOrReadCheckpoint(split, context)
    }
    }
  2. getOrComput

    • 通过 cache 或者 checkpoint 读取数据
    • 否则 comput,然后存储
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
    val blockId = RDDBlockId(id, partition.index)
    var readCachedBlock = true
    SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {
    // 定义了如果 rdd 没有存储,计算的过程
    readCachedBlock = false
    computeOrReadCheckpoint(partition, context)
    }) match {
    // Block hit. 通过 comput 或者 存储 获取到数据
    case Left(blockResult) =>
    ...
    // Need to compute the block. 只有不够存储的时候才会出发这个 comput
    case Right(iter) =>
    new InterruptibleIterator(context, iter)
    }
    }

    private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
    {
    if (isCheckpointedAndMaterialized) {
    // 如果有 checkpoint,TODO
    firstParent[T].iterator(split, context)
    } else {
    // 否则计算
    compute(split, context)
    }
    }

    /***
    * :: DeveloperApi ::
    * Implemented by subclasses to compute a given partition.
    */
    @DeveloperApi
    def compute(split: Partition, context: TaskContext): Iterator[T]
  3. getOrElseUpdate

    • 命中,读取 storage
    • 不命中,comput and storage
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    def getOrElseUpdate[T](
    blockId: BlockId,
    level: StorageLevel,
    classTag: ClassTag[T],
    makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
    // hit local or remote storage, retrun ans
    get[T](blockId)(classTag) match {
    case Some(block) =>
    return Left(block)
    case _ =>
    // Need to compute the block.
    }
    // Initially we hold no locks on this block.
    doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
    case None =>
    // SUCCESS the block already existed or was successfully stored
    Left(blockResult)
    case Some(iter) =>
    // FAILED
    // The put failed, likely because the data was too large to fit in memory and could not be
    // dropped to disk. Therefore, we need to pass the input iterator back to the caller so
    // that they can decide what to do with the values (e.g. process them without caching).
    Right(iter)
    }
    }
  4. doPutIterator

    1. 优先使用 memory
      1. 可以序列化 memoryStore.putIteratorAsBytes
      2. 不能序列化 memoryStore.putIteratorAsValues
      3. 使用 disk diskStore.put
    2. 使用 disk diskStore.put
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    private def doPutIterator[T](
    blockId: BlockId,
    iterator: () => Iterator[T],
    level: StorageLevel,
    classTag: ClassTag[T],
    tellMaster: Boolean = true,
    keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
    doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
    val startTimeNs = System.nanoTime()
    var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
    // Size of the block in bytes
    var size = 0L
    if (level.useMemory) {
    // 优先使用内存
    // Put it in memory first, even if it also has useDisk set to true;
    // We will drop it to disk later if the memory store can't hold it.
    if (level.deserialized) {
    // 不能序列化,iterator() 需要启动 executor 先进行计算,得到 res,存储
    memoryStore.putIteratorAsValues(blockId, iterator(), level.memoryMode, classTag) match {
    case Right(s) =>
    size = s
    case Left(iter) =>
    // Not enough space to unroll this block; drop to disk if applicable
    if (level.useDisk) {
    // 使用 disk 存储
    diskStore.put(blockId) { channel =>
    val out = Channels.newOutputStream(channel)
    serializerManager.dataSerializeStream(blockId, out, iter)(classTag)
    }
    size = diskStore.getSize(blockId)
    } else {
    iteratorFromFailedMemoryStorePut = Some(iter)
    }
    }
    } else { // !level.deserialized
    // 可以序列化,iterator() 需要启动 executor 先进行计算,得到 res,存储
    memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
    case Right(s) =>
    size = s
    case Left(partiallySerializedValues) =>
    // Not enough space to unroll this block; drop to disk if applicable
    if (level.useDisk) {
    // 使用 disk
    logWarning(s"Persisting block $blockId to disk instead.")
    diskStore.put(blockId) { channel =>
    val out = Channels.newOutputStream(channel)
    partiallySerializedValues.finishWritingToStream(out)
    }
    size = diskStore.getSize(blockId)
    } else {
    iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
    }
    }
    }

    } else if (level.useDisk) {
    // 使用 disk
    diskStore.put(blockId) { channel =>
    val out = Channels.newOutputStream(channel)
    serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)
    }
    size = diskStore.getSize(blockId)
    }
    }
    }


  5. memory 底层存储的本质是调用 putIterator

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    private def putIterator[T](
    blockId: BlockId,
    values: Iterator[T],
    classTag: ClassTag[T],
    memoryMode: MemoryMode,
    valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")

    // Number of elements unrolled so far | 当前需要存储的 elem 数量
    var elementsUnrolled = 0
    // Whether there is still enough memory for us to continue unrolling this block | 是否需要继续 rolling
    var keepUnrolling = true
    // Initial per-task memory to request for unrolling blocks (bytes). | 初始化申请 memo 的大小 default 1M
    val initialMemoryThreshold = unrollMemoryThreshold
    // How often to check whether we need to request more memory | unrolling 多少个 elem 进行一次 memo 大小检查 default 16
    val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
    // Memory currently reserved by this task for this particular unrolling operation | 当前占用 memo 的大小
    var memoryThreshold = initialMemoryThreshold
    // Memory to request as a multiple of current vector size | vector 扩容的 frac default 1.5
    val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
    // Keep track of unroll memory used by this particular block / putIterator() operation
    var unrollMemoryUsedByThisBlock = 0L

    // Request enough memory to begin unrolling | 申请初始化 memo
    keepUnrolling =
    reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)

    if (!keepUnrolling) {
    logWarning(s"Failed to reserve initial memory threshold of " +
    s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
    } else {
    unrollMemoryUsedByThisBlock += initialMemoryThreshold
    }

    // Unroll this block safely, checking whether we have exceeded our threshold periodically
    while (values.hasNext && keepUnrolling) {
    // 将当前元素加入 vector
    valuesHolder.storeValue(values.next())
    // 如果到了一个 period,则进行一个 memo 容量检查
    if (elementsUnrolled % memoryCheckPeriod == 0) {
    // 估算当前 vector 的存储容量
    val currentSize = valuesHolder.estimatedSize()
    // If our vector's size has exceeded the threshold, request more memory | 超过容量需要 memo 扩容
    if (currentSize >= memoryThreshold) {
    val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
    keepUnrolling =
    reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
    if (keepUnrolling) {
    unrollMemoryUsedByThisBlock += amountToRequest
    }
    // New threshold is currentSize * memoryGrowthFactor
    memoryThreshold += amountToRequest
    }
    }
    elementsUnrolled += 1
    }

    // Make sure that we have enough memory to store the block. By this point, it is possible that
    // the block's actual memory usage has exceeded the unroll memory by a small amount, so we
    // perform one final call to attempt to allocate additional memory if necessary.
    if (keepUnrolling) {
    val entry = entryBuilder.build()
    // Synchronize so that transfer is atomic | 进行一个 memo 的存储
    entries.synchronized {
    entries.put(blockId, entry)
    }

    logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
    Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
    Right(entry.size)
    } else {
    // We ran out of space while unrolling the values for this block
    logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
    Left(unrollMemoryUsedByThisBlock)
    }
    }
checkpoint

cache 和 persist 将数据持久化到 memory 或者 disk,而且保存了血缘关系。如果出现 node crash 的情况还是可以重新计算的。

chechpoint 是直接将数据持久化到 hdfs 中,因为 hdfs 的高可靠性,所以所以阶段之前的血缘关系。

checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This is allows long lineages to be truncated and the data to be saved reliably in HDFS

1
2
// 设置 checkpoint 的保存路径
sc.setCheckpointDir("ckp")

环境搭建

分析 spark-shubmit 的执行流程:

1
2
3
4
$SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local \ # 5.3 构建 app 对象 (STANDALONE_CLUSTER_SUBMIT_CLASS)
./target/scala-2.13/simple-app_2.13-1.0.jar
  1. 调用 spark-submit 脚本会执行 java org.apache.deploy.SparkSubmit 对象。

  2. 创建 SparkSubmit 对象调用 doSubmit 方法

  3. 解析命令行参数,默认 actionSUBMIT, action = Option(action).getOrElse(SUBMIT)

  4. 调用 submit 方法,=> 调用 runMain 方法。

  5. runMain 方法

  6. 解析命令行参数 => master == ‘local’ => LOCAL

  7. 获取 childMainClass

    1
    2
    3
    4
    5
    6
    private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.yarn.YarnClusterApplication"
    private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
    private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
    private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
  8. 通过 mainClass 反射创建 app,(反射:通过 classname 动态加载和构造对象),并调用 start 方法,开始运行用户的代码

    1
    2
    3
    4
    5
    6
    7
    mainClass = Utils.classForName(childMainClass)
    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
    mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
    new JavaMainApplication(mainClass)
    }
    app.start(childArgs.toArray, sparkConf)
  9. Driver 首先创建 sparkContext 对象的创建

  10. 当 sparkContext 初始化之后,会等待 system 资源创建完成,调用 Hook 函数等待。

    1
    2
    3
    4
    5
    6
    7
    // Post init
    _taskScheduler.postStartHook()

    // Invoked after system has successfully initialized (typically in spark context).
    // Yarn uses this to bootstrap allocation of resources based on preferred locations,
    // wait for executor registrations, etc.
    def postStartHook(): Unit = { }
  11. 创建完成之后,继续执行用户的代码,读写文件或者操作 rdd。

spark-process

standalone process

sparkcontext

前言

在Spark框架中,应用程序的提交离不开Spark Driver,而Spark Driver的初始化始终围绕SparkContext的初始化,可以说SparkContextSpark程序的发动机引擎,有了它程序才能跑起来,在spark-core中,SparkContext重中之重,它提供了很多能力,比如生成RDD,比如生成广播变量等,所以学习SparkContext的组件和启动流程有助于剖析整个Spark内核的架构。

SparkContext组件概览

在SparkContext中包含了整个框架中很重要的几部分:

  • SparkEnv:Spark的运行环境,Executor会依赖它去执行分配的task,不光Executor中有,同时为了保证本地模式任务也能跑起来,Driver中也有
  • SparkUI:Spark作业的监控页面,底层并没有采用前端技术,纯后端实现,用以对当前SparkJob的监控和调优,可以从页面观察到目前的Executor的jvm信息,每个job的stage划分和task划分,同时还可以观察到每个task处理的数据,用以发现数据是否倾斜
  • DAGScheduler:DAG调度器,是SparkJob调度系统的重要组件之一,负责创建job,根据RDD依赖情况划分stage,提交stage,将作业划分成一个有向无环图
  • TaskScheduler:任务调度器,是SparkJob调度系统的重要组件之一,负责按照调度算法将DAGScheduler创建的task分发至Executor,DAGScheduler是它的前置调度
  • SparkStatusTracker:提供对作业、Stage的监控
  • ConsoleProcessBar:利用SparkStatusTracker提供监控信息,将任务进度以日志的形式打印到终端中
  • HearbeatReceiver:心跳接收器,所有Executor都会定期向它发送心跳信息,用以统计存活的Executor,此信息会一直同步给TaskScheduler,用以保证TaskScheduler去分发task的时候会挑选合适的Executor
  • ContextCleaner:上下文清理器,用异步的方式去清理那些超出应用作用域范围的RDD、ShuffleDependency和Broadcast
  • LiveListenerBus:SparkContext中的事件总线,可以接收各个组件的事件,并且通过异步的方式对事件进行匹配并调用不同的回调方法
  • ShutdownHookManager:关闭时的钩子管理器,用以做一些清理工作,比如资源释放等
  • AppStatusStore:存储Application状态数据,在2.3.0之后的版本引入
  • EventLoggingListener(可选):将事件持久化到存储的监听器,通过spark.eventLog.enabled 进行控制
  • ExecutorAllocationManager(可选):Executor动态分配管理器,根据工作负载状态动态调整Executor的数量,通过属性spark.dynamicAllocation.enabledspark.dynamicAllocation.testing 进行控制

任务调度

process

Action 触发 Job submit 到 DAGScheduler eventProcessLoop

  1. 使用 RDD 的 collect 方法可以出发 action,导致作业提交。

    runJob with param rdd func partitions

1
2
3
4
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
  1. 通过 dagScheduler 进行作业提交

    1
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

    然后通过 submitJob 将作业提交。并等待作业返回。

  2. submitJob 将 Job 加入到 dagScheduler 的事件循环中,等待 Job 被调度。

    1
    2
    3
    4
    5
    private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

    eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    Utils.cloneProperties(properties)))

DAGScheduler 对 Job 进行 Stage 划分,并且提交 Task 到 TaskScheduler

  1. dagScheduler eventProcessLoop 通过 onReceive 方法处理事件。然后调用 dagScheduler.handleJobSubmitted 来处理之前 Job 的提交。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /**
    * The main event loop of the DAG scheduler.
    */
    override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
    doOnReceive(event)
    } finally {
    timerContext.stop()
    }
    }

    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    }
  2. handleJobSubmitted 中,会进行 stage 的划分,并且将所有的 stage 包括 parent stage and ancient stage 全部提交,通过 DFS 的方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 1. 调用创建 ResultStage 的方法
    finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)

    // 1.1 首先会获取 trigger action rdd 的所有 shuffleDeps,然后使用 shuffleDeps 创建当前 rdd 的所有 parent(不包括 parent 的 parent),最后创建 resultStage

    // 获取 shuffleDeps 的方法就是 BFS
    val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
    val parents = getOrCreateParentStages(shuffleDeps, jobId)
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
    callSite, resourceProfile.id)


    // 2. 提交 stage,这里的 finalStage 就是 resultStage
    submitStage(finalStage)

    getShuffleDependenciesAndResourceProfiles 方法(BFS):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    private[scheduler] def getShuffleDependenciesAndResourceProfiles(
    rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = {
    // rdd 全部 parent 列表
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    // 记忆化
    val visited = new HashSet[RDD[_]]
    // queue
    val waitingForVisit = new ListBuffer[RDD[_]]
    waitingForVisit += rdd
    while (waitingForVisit.nonEmpty) {
    val toVisit = waitingForVisit.remove(0)
    if (!visited(toVisit)) {
    visited += toVisit
    Option(toVisit.getResourceProfile).foreach(resourceProfiles += _)
    toVisit.dependencies.foreach {
    // 如果是 shuffleDeps 将它加入 parent 中,stage 会从这个地方断开
    case shuffleDep: ShuffleDependency[_, _, _] =>
    parents += shuffleDep
    // 如果是 narrowDeps,则将它加入 queue 进行 BFS
    case dependency =>
    waitingForVisit.prepend(dependency.rdd)
    }
    }
    }
    (parents, resourceProfiles)
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    /** Submits stage, but first recursively submits any missing parents. */
    // 提交 resultStage 和 shuffleMapStage recursively 通过 submitStage
    private def submitStage(stage: Stage): Unit = {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
    logDebug(s"submitStage($stage (name=${stage.name};" +
    s"jobs=${stage.jobIds.toSeq.sorted.mkString(",")}))")
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
    val missing = getMissingParentStages(stage).sortBy(_.id)
    logDebug("missing: " + missing)
    if (missing.isEmpty) {
    logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
    // 如果没有 missing 的 parent stage 就会提交当前阶段
    submitMissingTasks(stage, jobId.get)
    } else {
    for (parent <- missing) {
    // 不然先提交 parent stage
    submitStage(parent)
    }
    // 最后将当前 stage 加入到 waitingStage 中
    waitingStages += stage
    }
    }
    } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
    }
    }


    // submitMissingTasks 中会根据当前 stage 类型,然后通过 partition,rdd 创建对应的 Tasks。rdd 中有多少个分区就会创建多少个 Tasks
    val tasks: Seq[Task[_]] = try {
    val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
    stage match {
    case stage: ShuffleMapStage =>
    stage.pendingPartitions.clear()
    partitionsToCompute.map { id =>
    val locs = taskIdToLocations(id)
    val part = partitions(id)
    stage.pendingPartitions += id
    new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
    taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
    Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
    }

    case stage: ResultStage =>
    partitionsToCompute.map { id =>
    val p: Int = stage.partitions(id)
    val part = partitions(p)
    val locs = taskIdToLocations(id)
    new ResultTask(stage.id, stage.latestInfo.attemptNumber,
    taskBinary, part, locs, id, properties, serializedTaskMetrics,
    Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
    stage.rdd.isBarrier())
    }
    }
    } catch {
    case NonFatal(e) =>
    abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
    runningStages -= stage
    return
    }

    // 最后在 taskScheduler 中提交 TaskSet
    taskScheduler.submitTasks(new TaskSet(
    tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties,
    stage.resourceProfileId))

TaskScheduler 将 TaskSet 通过 backend 的调度分发到不同的 executor 上面

TaskScheduler 将 TaskSet 进一步包装成 TaskSetManager。然后提交到 schedulableBuilder 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 1. 通过 TaskSet 创建 TaskSetManager 对象
val manager = createTaskSetManager(taskSet, maxTaskFailures)
// 2. 将 TaskSetManager 提交到任务调度器 SchedulerBuilder
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
// 3. 调用 scheduler backend 处理 Task
backend.reviveOffers()

// 3.1 LocalSchedulerBackend 会向 localEndPoint 发送一个 ReviveOffers 信号
private var localEndpoint: RpcEndpointRef = null
override def reviveOffers(): Unit = {
localEndpoint.send(ReviveOffers)
}

// 3.2 localEndPoint 收到消息后会通过 recevie 来判断消息,并且调用对应的函数
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
reviveOffers()

case KillTask(taskId, interruptThread, reason) =>
executor.killTask(taskId, interruptThread, reason)
}

def reviveOffers(): Unit = {
// local mode doesn't support extra resources like GPUs right now
val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,
Some(rpcEnv.address.hostPort)))
// 首先通过 scheduler 进行一个 Task 的调度。将任务放到最合适的位置
for (task <- scheduler.resourceOffers(offers, true).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
// 然后向 executor 发送一个 launchTask 的消息
executor.launchTask(executorBackend, task)
}
}

// 3.2.1 scheduler 任务调度,通过 scheduler 的调度算法进行控制
val sortedTaskSets = rootPool.getSortedTaskSetQueue

val sortedSchedulableQueue = schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)

// taskSetSchedulingAlgorithm 可以是 FIFO 或者 Fair

SchedulerBuilder 是 Spark 的任务调度器,提供两种调度方式:

  • FIFOSchedulableBuilder
  • FairSchedulableBuilder
1
2
3
4
5
6
7
8
9
10
11
12
13
private[spark] trait SchedulableBuilder {
def rootPool: Pool

def buildPools(): Unit

def addTaskSetManager(manager: Schedulable, properties: Properties): Unit
}

private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
extends SchedulableBuilder with Logging

private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext)
extends SchedulableBuilder with Logging

之后 exector 使用一个 Thread 执行对应的 Task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val taskId = taskDescription.taskId
val tr = createTaskRunner(context, taskDescription)
runningTasks.put(taskId, tr)
val killMark = killMarks.get(taskId)
if (killMark != null) {
tr.kill(killMark._1, killMark._2)
killMarks.remove(taskId)
}
// 从线程池中取出一个线程执行 Task
threadPool.execute(tr)
if (decommissioned) {
log.error(s"Launching a task while in decommissioned state.")
}
}

shuffle

shuffle 一定会落盘

  • 减少落盘数据量

shuffle 原理和演进

预聚合

map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其它节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。

  • reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。
  • groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

源码部分

  1. DAGScheduler 中生成 task 的时候,会根据 stage 来生成不同的 task

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    stage match {
    case stage: ShuffleMapStage =>
    stage.pendingPartitions.clear()
    partitionsToCompute.map { id =>
    // ShuffleMapStage => ShuffleMapTask
    new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
    taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
    Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
    }

    case stage: ResultStage =>
    partitionsToCompute.map { id =>
    // ResultStage => ResultTask
    new ResultTask(stage.id, stage.latestInfo.attemptNumber,
    taskBinary, part, locs, id, properties, serializedTaskMetrics,
    Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
    stage.rdd.isBarrier())
    }
    }
  2. 「map」ShuffleTask 在 runTask 的末尾会进行数据的落盘操作,不同的 shuffleWriterProcessor 会产生不同的文件。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    dep.shuffleWriterProcessor.write(rdd, dep, mapId, context, partition)

    // 然后通过 shuffleHandle 通知写文件
    writer = manager.getWriter[Any, Any](
    dep.shuffleHandle,
    mapId,
    context,
    createMetricsReporter(context))
    writer.write(
    rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
  3. 「reduce」ResultTask 在 runTask 中读取 map 产生的文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    // resultMapStage 的 runTask 方法
    override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTimeNs = System.nanoTime()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    func(context, rdd.iterator(partition, context))
    }

    // shuffleRDD iterator 的 comput 方法就会读取 map 产生的文件
    override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    val metrics = context.taskMetrics().createTempShuffleReadMetrics()
    SparkEnv.get.shuffleManager.getReader(
    dep.shuffleHandle, split.index, split.index + 1, context, metrics)
    .read()
    .asInstanceOf[Iterator[(K, C)]]
    }