Spark分布式计算原理

it2025-07-15  11

RDD依赖以及DAG原理

1.RDD的转换

val lines=sc.textFile("/data/words.txt") //① val count=lines.flatMap(line=>line.split(" ")) //② .map(word=>(word,1)) //③ .reduceByKey(_+_) //④ count.collect //⑤

(1)首先从 HDFS 中读取文件,产生一个 HadoopRDD,然后进行RDD 转换,转换结果为 MapPartitionsRDD。 lines 实际上是一个MapPartitionsRDD,其父 RDD 是 HadoopRDD。 (2)flatMap 操作将 lines 中的所有行,以空格切分 ,然后返回一个单词列表,以每个单词为元素的列表保存到新的 MapPartitionsRDD。 (3)将第二行生成的 MapPartitionsRDD 再次经过 map 操作将每个单词 word 转化为(word,1)的二元组,返回一个新的 MapPartitionsRDD 包含这些元组。 (4)reduceByKey 操作会生成一个 ShuffledRDD。 (5)collect 动作将提交 Job 开始执行,到此 Application 结束

2.RDD的依赖关系 ,根据子 RDD 依赖父 RDD 的分区的不同,将这种关系划分为两种:窄依赖和宽依赖 (1)窄依赖 ①定义: 窄依赖指的是每一个父 RDD 的分区最多被子 RDD 的一个分区使用 ②map和filter 它们只是将各个分区的数据根据转换的规则进行转化 ③union 只是将多个 RDD 合并成一个,父 RDD 的分区不会有任务的变化。 ④join 如果每个分区仅仅和已知的、特定的分区进行 join,那么这个依赖关系是窄依赖。 (2)宽依赖 宽依赖指的是多个子 RDD 的分区会依赖同一个父 RDD 的分区。 ①join 对于需要父 RDD 的所有分区进行 join 的转换,这类 join 的依赖就是宽依赖了。 ②groudByKey 子 RDD 的所有分区会依赖父 RDD 的所有分区,子 RDD 的分区是父 RDD 的所有分区 Shuffle 的结果,因此这两个 RDD 是不能通过一个计算任务来完成的。 (3)窄依赖的实现 ①OneToOneDependency:一对一依赖 map、filter 操作。 ②RangeDependency:范围依赖 如 union 操作。 union 操作返回 UnionRDD,UnionRDD 是把多个 RDD 合并成一个 RDD。

(4)窄依赖的实现: ShuffleDependency

(5)宽窄依赖的对比 ①宽依赖对应shuffle操作,需要在运行时将同一个父RDD的分区传入到不同的子RDD分区中,不同的分区可能位于不同的节点,就可能涉及多个节点间数据传输 ②当RDD分区丢失时,Spark会对数据进行重新计算,对于窄依赖只需重新计算一次子RDD的父RDD分区 (6)Lineage(血统、遗传) ①RDD最重要的特性之一,保存了RDD的依赖关系 ②RDD实现了基于Lineage的容错机制 3.DAG工作原理 (1)DAG生成 根据RDD之间的依赖关系,形成一个DAG(有向无环图) (2)阶段划分 DAGScheduler将DAG划分为多个Stage ①划分依据:是否发生宽依赖(Shuffle) ②划分规则:从后往前,遇到宽依赖切割为新的Stage ③每个Stage由一组并行的Task组成

(3)任务调度 ①DAGScheduler 负责分析用户提交的应用,并根据计算任务的依赖关系建立 DAG,且将 DAG 划分为不同的 Stage,每个 Stage 可并发执行一组 Task。 ②TaskScheduler DAGScheduler 将划分完成的 Task提交到 TaskScheduler,TaskScheduler 通过 Cluster Manager 在集群中的某个Worker 的 Executor 上启动任务。 ③SchedulerBackend 每个 TaskScheduler 对应一个 SchedulerBackend,作用是分配当前可用的资源,具体就是向当前等待分配计算资源的 Task 分配计算资源(Executor),并在分配的 Executor 上启动 Task,完成计算的调度过程。

(4)spark任务 ①ShuffleMapTask 任务所在 Stage 不是最后一个 Stage,即 ShuffleMapState。对于非最后的 Stage,会根据每个 Stage 的分区数量来生成 ShuffleMapTask。ShuffleMapTask 会根据下游 Task 的分区数量和 Shuffle 策略来生成一系列文件。 ②ResultTask 任务所在 Stage 是最后一个 Stage,即 ResultStage。对于最后一个 Stage,会根据生成结果的分区来生成与分区数量相同的 ResultTask,然后ResultTask 将计算结果汇报到 Driver 端。

SparkSuffle过程

在分区之间重新分配数据 1.父RDD中同一分区中的数据按照算子要求重新进入子RDD的不同分区中 2.中间结果写入磁盘 3.由子RDD拉取数据,而不是由父RDD推送 4.默认情况下,Shuffle不会改变分区数量

RDD优化

1.RDD持久化 (1)RDD缓存机制 缓存数据至内存/磁盘,可大幅度提升Spark应用性能。 ①cache=persist(MEMORY) ②persist (2)缓存策略StorageLevel ①MEMORY_ONLY(默认) ②MEMORY_AND_DISK ③DISK_ONLY (3)缓存应用场景 ①从文件加载数据之后,因为重新获取文件成本较高 ②经过较多的算子变换之后,重新计算成本较高 ③单个非常消耗资源的算子之后

(4)注意事项 ①cache()或persist()后不能再有其他算子 ②cache()或persist()遇到Action算子完成后才生效

(5)检查点(类似于快照)

sc.setCheckpointDir("hdfs:/checkpoint0918") val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4))) rdd.checkpoint rdd.collect //生成快照 rdd.isCheckpointed rdd.getCheckpointFile

(6)检查点与缓存的区别 ①检查点会删除RDD lineage,而缓存不会 ②SparkContext被销毁后,检查点数据不会被删除

2.共享变量 (1)广播变量 允许开发者将一个只读变量(Driver端)缓存到每个节点(Executor)上,而不是每个任务传递一个副本

val broadcastVar=sc.broadcast(Array(1,2,3)) //定义广播变量 broadcastVar.value //访问方式

注意: ①Driver端变量在每个Executor每个Task保存一个变量副本 ②Driver端广播变量在每个Executor只保存一个变量副本 (2)累加器 只允许added操作,常用于实现计数

//初始值是0 val accum = sc.accumulator(0,"My Accumulator") sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x) accum.value

3.RDD分区设计 (1)分区大小限制为2G (2)分区太少 ①不利于并发 ②更容易受数据倾斜影响 ③groupBy, reduceByKey, sortByKey等内存压力增大 (3)分区过多 ①Shuffle开销越大 ②创建任务开销越大 (4)优化设置 ①每个分区大约128MB ②如果分区小于但接近2000,则设置为大于2000

4.数据倾斜 (1)概念: 指分区中的数据分配不均匀,数据集中在少数分区中 ①严重影响性能 ②通常发生在groupBy,join等之后

(2)解决方案: 使用新的Hash值(如对key加盐)重新分区

Spark装载数据

1.装载CSV数据源 (1)文件预览 (2)使用SparkContext

val lines = sc.textFile("file:///home/kgc/data/users.csv") val fields = lines.mapPartitionsWithIndex((idx, iter) => if (idx == 0) iter.drop(1) else iter).map(l => l.split(",")) val fields = lines.filter(l=>l.startsWith("user_id")==false).map(l=>l.split(",")) //移除首行,效果与上一行相同

(3)使用SparkSession

val df = spark.read.format("csv").option("header", "true").load("file:///home/kgc/data/users.csv")

2.装载Json数据源 (1)使用SparkContext

val lines = sc.textFile("file:///home/kgc/data/users.json") //scala内置的JSON库 import scala.util.parsing.json.JSON val result=lines.map(l=>JSON.parseFull(l))

(2)使用SparkSession

val df = spark.read.format("json").load("file:///home/kgc/data/users.json")
最新回复(0)