Spark的内存模型及Executor的Execution内存解析

it2023-07-30  74

1. Spark的内存模型

 Spark的Executor的内存管理是基于JVM的内存管理之上,Spark对JVM堆内(On-Heap)空间进行了 更为详细的分配,以便充分利用内存,同时Spark引入堆外内存(OffHeap)内存,可以直接在Worker 节点的系统内存中开辟空间,进一步优化内存使用。  Spark的堆内(On-Heap)空间是由–executor-memory或spark.executor.memory参数配置, Executor内运行的并发任务共享JVM堆内内存。而且该堆内内存是一种逻辑上的管理,因为对象的 释放都是由JVM完成。  Spark引入堆外内存(OffHeap)内存主要是为了提高Shuffle排序的效率,存储优化过的二进制数据。 从2.0之后Spark可以直接操作系统的堆外内存,减少不必要的开销。改参数默认不开启,通过 spark.memory.offHeap.ennable参数启用,并由spark.memory.offHeap.size参数设定堆外空间大 小。  默认情况下,Spark 仅仅使用了堆内内存。Executor 端的堆内内存区域大致可以分为以下四大块:

Execution 内存:主要用于存放 Shuffle、Join、Sort、Aggregation 等计算过程中的临时数据Storage 内存:主要用于存储 spark 的 cache 数据,例如RDD的缓存、unroll数据;用户内存(User Memory):主要用于存储 RDD 转换操作所需要的数据,例如 RDD 依赖等信息。预留内存(Reserved Memory):系统预留内存,会用来存储Spark内部对象。

1.1 Execution 内存和 Storage 内存动态调整

 上面两张图中的 Execution 内存和 Storage 内存之间存在一条虚线,这是为什么呢?  在 Spark 1.5 之前,Execution 内存和 Storage 内存分配是静态的,换句话说就是如果 Execution 内存不足,即使 Storage 内存有很大空闲程序也是无法利用到的;反之亦然。这就导 致我们很难进行内存的调优工作,我们必须非常清楚地了解 Execution 和 Storage 两块区域的内存分布。而目前 Execution 内存和 Storage 内存可以互相共享的。也就是说,如果Execution 内存不足,而 Storage 内存有空闲,那么 Execution 可以从 Storage 中申请空间;反之亦然。所 以上图中的虚线代表 Execution 内存和 Storage 内存是可以随着运作动态调整的,这样可以有效 地利用内存资源。Execution 内存和 Storage 内存之间的动态调整可以概括如下: 具体的实现逻辑如下:

程序提交的时候我们都会设定基本的 Execution 内存和 Storage 内存区域(通过spark.memory.storageFraction 参数设置);在程序运行时,如果双方的空间都不足时,则存储到硬盘;将内存中的块存储到磁盘的策 略是按照 LRU规则进行的。若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是 指不足以放下一个完整的 Block);Execution 内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后"归还"借 用的空间Storage 内存的空间被对方占用后,目前的实现是无法让对方"归还",因为需要考虑 Shuffle过程中的很多因素,实现起来较为复杂;而且 Shuffle 过程产生的文件在后面一定会被使用 到,而 Cache在内存的数据不一定在后面使用。

注意:上面说的借用对方的内存需要借用方和被借用方的内存类型都一样,都是堆内内存或者都是堆外内存,不存在堆内内存不够去借用堆外内存的空间。

1.2 Task 之间内存分布

 为了更好地使用使用内存,Executor 内运行的 Task 之间共享着 Execution 内存。具体的, Spark 内部维护了一个 HashMap 用于记录每个 Task 占用的内存。当 Task 需要在 Execution 内存区域申请 numBytes 内存,其先判断 HashMap 里面是否维护着这个 Task 的内存使用情况, 如果没有,则将这个 Task 内存使用置为0,并且以 TaskId 为 key,内存使用为 value 加入到 HashMap 里面。之后为这个 Task 申请 numBytes 内存,如果 Execution 内存区域正好有大于 numBytes 的空闲内存,则在 HashMap 里面将当前 Task 使用的内存加上 numBytes,然后返回; 如果当前 Execution 内存区域无法申请到每个Task 最小可申请的内存,则当前 Task 被阻塞, 直到有其他任务释放了足够的执行内存,该任务才可以被唤醒。每个 Task 可以使用 Execution 内 存大小范围为 1/2N ~ 1/N,其中 N 为当前 Executor 内正在运行的 Task 个数。一个 Task 能够 运行必须申请到最小内存为 (1/2N * Execution 内存);当 N = 1 的时候,Task 可以使用全部的 Execution 内存。

 比如如果 Execution 内存大小为 10GB,当前 Executor 内正在运行的 Task 个数为5,则该 Task 可以申请的内存范围为 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的范围。

参考:Spark的内存模型

 经常使用Spark,也经常在提交spark任务,有没有想过,提交这些任务参数是什么意思,内存中又怎么解析的呢? 举个栗子🌰:

bin/spark-submit --class com.erainm.cluster \ --master yarn-cluster \ --driver-cores 2 \ --driver-memory 30G \ --conf spark.shuffle.service.ennabled=true \ --conf spark.memory.storageFraction=0.30 \ --conf spark.memory.fraction=0.7 \ --conf spark.default.parallelism=2800 \ --conf spark.sql.shuffle.partitions1=1400 \ --conf spark.yarn.executor.memeoryOverhead=4096 \ --executor-memory 30g \ --executor-cores 8 \ --num-executors 20 \ /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/examples/jars/spark-examples_2.11-2.2.0.jar \ 100

有下面两个问题,大家思考一下。

问题1: 这个任务一共需要多少的Cores和Memory? 答案: 162cores,630G

问题2: 程序运行时每个executor上的storage内存和execution内存多少? 答案: 6.3g,14.7g

–executor-memory MEMMemory per executor (e.g. 1000M, 2G) (Default: 1G).–num-executors NUMNumber of executors to launch (Default: 2).–executor-cores NUMNumber of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode)

2. Execution内存解析:

 Spark在一个Executor中的内存分为三块,一块是execution内存,一块是storage内存,一 块是other内存。  上述spark-submit提交命令是spark2.x的。

 execution和storage是Spark Executor中内存的大户,other占用内存相对少很多,这里就不 说了。在spark-1.6.0以前的版本,execution和storage的内存分配是固定的,(称作静态内存模型), 使用的参数配置分别是  spark.shuffle.memoryFraction(execution内存占Executor总内存大小,default 0.2)和spark.storage.memoryFraction (storage内存占Executor内存大小,default 0.6),因为是1.6.0 以前这两块内存是互相隔离的,这就导致了Executor的内存利用率不高,而且需要根据Application 的具体情况,使用者自己来调节这两个参数才能优化Spark的内存使用。

 在spark2.x及以后的版本,execution内存和storage内存可以相互借用,(称作动态内存模型),提高了内存的Spark中内存的使用率,同时也减少了OOM的情况。  spark.memory.storageFraction (default 0.5)  这个参数设置内存表示 Executor内存中 storage/(storage+execution),虽然spark-1.6.0+的版 本内存storage和execution的内存已经是可以互相借用的了,但是借用和赎回也是需要消耗性能 的,所以如果明知道程序中storage是多是少就可以调节一下这个参数。 spark.default.parallelism=2800

解析:

[概念解析]  spark中有partition的概念(和slice是同一个概念,在spark1.2中官网已经做出了说明),一般 每个partition对应一个task。在我的测试过程中,如果没有设置spark.default.parallelism参数,spark 计算出来的partition非常巨大,与我的cores非常不搭。我在两台机器上(8cores *2 +6g * 2)上, spark计算出来的partition达到2.8万个,也就是2.9万个tasks,每个task完成时间都是几毫秒或者零 点几毫秒,执行起来非常缓慢。在我尝试设置了 spark.default.parallelism 后,任务数减少到10, 执行一次计算过程从minute降到20second。 官网-configuration [调优]from: tuning

Level of Parallelism Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. Spark automatically sets the number of “map” tasks to run on each file according to its size (though you can control it through optional parameters to SparkContext.textFile, etc), and for distributed “reduce” operations, such as groupByKey and reduceByKey, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument (see the spark.PairRDDFunctions documentation), or set the config property spark.default.parallelism to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster.

参数3设置: spark.default.parallelism只有在处理RDD时才会起作用,对Spark SQL的无效。

#Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.

spark.sql.shuffle.partitions则是对sparks SQL专用的设置

#Configures the number of partitions to use when shuffling data for joins or aggregations

参数4设置:  spark.yarn.executor.memoryOverhead,executor执行的时候,用的内存可能会超过 executor-memoy,所以会为executor额外预留一部分内存。  spark.yarn.executor.memoryOverhead代表了这部分内存。这个参数如果没有设置,会有一个自动 计算公式(位于ClientArguments.scala中),如果超过spark.yarn.executor.memoryOverhead内存, yarn会将应用kill掉

Spark中executor-memory参数详解 参考 :spark-调节executor堆外内存

 什么时候需要调节Executor的堆外内存大小? 当出现一下异常时: shuffle file cannot find,executor lost、task lost,out of memory 出现这种问题的现象大致有这么两种情况:

Executor挂掉了,对应的Executor上面的block manager也挂掉了,找不到对应的shuffle map output文件,Reducer端不能够拉取数据Executor并没有挂掉,而是在拉取数据的过程出现了问题。
最新回复(0)