MapOutputTracker是spark环境的主要组件之一,其功能有三方面,首先DAGScheduler使用MapOutputTrackerMaster来管理各个ShuffleMapTask的输出数据[MapStatus],另外根据各ShuffleMapTask结果的统计信息来进行尽可能的本地化Scheduler;其次ShuffleMapStage使用MapOutputTrackerMaster来判断是否要进行ShuffleMapTask的计算;最后ShuffleReduce任务用来获取从哪些executor获取需要的map输出数据,读取数据进行处理。
MapOutputTracker是Spark环境的主要组件之一,其功能是管理各个Shuffle输出的中间数据,主要有三个方面功能:
DAGScheduler使用MapOutputTrackerMaster来管理各个ShuffleMapTask的输出数据[MapStatus],另外根据各ShuffleMapTask结果的统计信息来进行尽可能的本地化Scheduler。 创建ShuffleMapTask时候注册当前Shuffle,需要Shuffle的唯一标识ShuffleId和Map的个数,通过函数createShuffleMapStage调用registerShuffle来完成注册;ShuffleMapTask计算完成时候,计算成功则更新MapStatus信息,计算失败则删除Stage管理信息,通过handleTaskCompletion调用registerMapOutput来完成计算结果的注册;在Executor/Worker删除时候,清理该Executor/Worker上面的所有的ShuffleId对应的Map结果信息,通过handleTaskCompletion调用removeOutputsOnHost或者removeOutputsOnExecutor进行; ShuffleMapStage使用MapOutputTrackerMaster来判断是否要进行ShuffleMapTask的计算。 ShuffleMapStage通过findMissingPartitions来查找还没计算好的map,进行计算; ShuffleReduce任务使用MapOutputTrackerWorker用来获取从哪些executor获取需要的map输出数据,读取数据进行处理。 ShuffleRDD在compute中,从MapOutputTrackerWorker中的getMapSizesByExecutorId函数获取当前Reduce需要的数据的位置信息,计算reduce计算。所以,可以看出来要想完成上述功能,我们需要Driver端有一个MapOutputTrackerMaster,来管理所有的ShuffleMapTask的结果,Executor有一个MapOutputTrackerWorker,用来向Driver申请获取相应的ShuffleId对应的Map结果信息,所以还需要Driver端有一个常驻的RPCEndPoint MapOutputTrackerMasterEndpoint,用来接收Executor的获取ShuffleId对应Map结果信息,然后转交给MapOutputTrackerMaster进行数据获取然后进行结果数据获取,回传给Executor。整体架构图如下:
可以看出来,MapOutputTracker在executor和driver端都存在:
MapOutputTrackerMaster和MapOutputTrackerMasterEndpoint存在于driver端。MapOutputTrackerMasterEndpoint是MapOutputTrackerMaster的RPC Endpoint,处理请求;MapOutputTrackerMaster负责管理所有shuffleMapTask的输出数据,每个shuffleMapTask执行完后会把执行结果[MapStatus]注册到MapOutputTrackerMaster;另外MapOutputTrackerMaster还会处理executor发送的GetMapOutputStatuses请求,并返回serializedMapStatus给executor端。MapOutputTrackerWorker存在于executor端,MapOutputTrackerWorker负责为Reduce任务提供需要的shuffleMapTask的输出数据信息[MapStatus],如果MapOutputTrackerWorker在本地没有找到请求的Shuffle的mapStatuses,则会向MapOutputTrackerMasterEndpoint发送GetMapOutputStatuses请求获取对应的mapStatuses。本文通过源码分析了各个组件的主要功能。
在Spark中,MapStatus用于表示ShuffleMapTask的运行结果,ShuffleStatus用于管理一个Shuffle对应的所有ShuffleMapTask的运行结果。
MapStatus用于表示ShuffleMapTask的运行结果, 包括map任务输出数据的location和size信息:
// org.apache.spark.scheduler.MapStatus private[spark] sealed trait MapStatus { /** 用于返回ShuffleMapTask运行的位置,即所在节点的BlockManager的身份标识BlockManagerId **/ def location: BlockManagerId /** 用于返回reduce任务需要拉取的Block的大小(单位为字节) */ def getSizeForBlock(reduceId: Int): Long }根据ShuffleTask各个Partition的长度是否大于2000,分别创建HighlyCompressedMapStatus和CompressedMapStatus:
对于较大的数据量使用高度压缩的HighlyCompressedMapStatus。一般的数据量则使用CompressedMapStatus。这块我们只做简单的了解,后续文章我们会详细介绍。
ShuffleStatus用于管理一个Shuffle对应的所有ShuffleMapTask的运行结果,ShuffleStatus对象只存在于MapOutputTrackerMaster中。ShuffleStatus中使用一个数组mapStatuses来保存所有的MapStatus,该数组的元素类型为MapStatus,数组下标是ShuffleMapTask的map id。由于executor获取时候,需要通过网络传输,使用的是序列化数据,为了加速访问,会对当前mapStatus的结果进行序列化缓存;另外有可能结果数据比较大<有参数spark.shuffle.mapOutput.minSizeForBroadcast控制,默认是512k>,就需要进行broadcast,使得executor可以从别的地方进行下载。
// org.apache.spark.ShuffleStatus val mapStatuses = new Array[MapStatus](numPartitions) private[this] var cachedSerializedMapStatus: Array[Byte] = _ private[this] var cachedSerializedBroadcast: Broadcast[Array[Byte]] = _为了保证线程安全性,ShuffleStatus使用了读写锁来进行读取和写入的同步和互斥控制。另外ShuffleStatus还提供了用于添加,移除,序列化,缓存和广播mapStatus的方法。
addMapOutput是某个ShuffleMapTask结束后,会将状态回传给Driver存储在Driver端,更新相应mapId对应的MapStatus信息;updateMapOutput用来更新某个ShuffleMapTask的结果信息;removeMapOutput用来移除某个ShuffleMapTask的结果信息;invalidateSerializedMapOutputStatusCache用于清空mapStatuses的缓存,包括移除对应的广播变量;serializedMapStatus`方法用于序列化并广播mapStatuses数组。都比较简单,通过加锁操作更新mapStatuses, cachedSerializedMapStatus或者cachedSerializedBroadcast信息。
MapOutputTracker的初始化是在SparkEnv中进行的,根据是否是Driver创建相应的Master和Worker,然后初始化mapOutputTracker里面的endpoint,如果是Driver需要新建,如果是Executor则只需要绑定Driver的引用即可,源码如下所示:
// 记录ShuffleMapTask的中间数据结果位置供shuffleRead时候使用,Master->Worker架构 // Master使用MapOutputTrackerMaster,worker使用MapOutputTrackerWorker val mapOutputTracker = if (isDriver) { new MapOutputTrackerMaster(conf, broadcastManager, isLocal) } else { new MapOutputTrackerWorker(conf) } // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint // requires the MapOutputTracker itself // mapOutputTracker也有endPoint来进行通信,初始化endPoint mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint( MapOutputTracker.ENDPOINT_NAME, new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))MapOutputTrackerMasterEndpoint是存在于Driver端的RPC Endpoint,负责处理获取GetMapOutputStatuses和StopMapOutputTracker两种请求,源码如下。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case GetMapOutputStatuses(shuffleId: Int) => // 获取Map中间状态的请求 val hostPort = context.senderAddress.hostPort logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort) // 会将消息投递到MapOutputTrackerMaster的mapOutputRequests队列,等待MessageLoop线程进行处理 val mapOutputStatuses = tracker.post(new GetMapOutputMessage(shuffleId, context)) case StopMapOutputTracker => // 停止跟踪Map中间状态的请求 logInfo("MapOutputTrackerMasterEndpoint stopped!") // 直接返回true context.reply(true) // 停止了当前的RpcEndpoint stop() }可以看出,GetMapOutputStatuses请求负责将需要获取的shuffleId和context放入MapOutPutTrackerMaster的请求队列中,等待Master的处理;StopMapOutputTracker只用简单的关闭EndPoint。
MapOutputTracker是抽象类,定义了endpointRef,在Driver端是MapOutputTrackerMasterEndpoint,在executor端是MapOutputTrackerMasterEndpoint的引用,用来进行消息通信,然后定义了消息传递的接口,我们来详细看下具体的实现类MapOutputTrackerMaster和MapOutputTrackerWorker。
MapOutputTrackerMaster是driver端用于管理所有shuffle的map任务输出数据的组件。
MapOutputTrackerMaster维护了一个最新最全的所有ShuffleMapTask结果:shuffleId -> shuffleStatuses的映射,以及一个使用阻塞队列来缓存GetMapOutputMessage的请求队列,MapOutputTrackerMasterEndpoint接收到ReduceTask请求后,会将请求放到MapOutputTrackerMaster的阻塞队列之中。
val shuffleStatuses = new ConcurrentHashMap[Int, ShuffleStatus]().asScala private val mapOutputRequests = new LinkedBlockingQueue[GetMapOutputMessage]Master会有一个线程池,线程池中Job负责处理由Executor Reduce任务发出来的请求某个Shuffle任务的MapStatus消息,线程池大小由参数spark.shuffle.mapOutput.dispatcher.numThreads控制,默认为8。
private val threadpool: ThreadPoolExecutor = { // 获取线程数量 val numThreads = conf.getInt("spark.shuffle.mapOutput.dispatcher.numThreads", 8) // 创建固定线程数量的线程池 val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "map-output-dispatcher") // 向每个线程池中提交一个MessageLoop任务 for (i <- 0 until numThreads) { pool.execute(new MessageLoop) } pool }MessageLoop是处理请求ShuffleMapTask结果队列中的请求,它会循环地从mapOutputRequests队列中获取GetMapOutputMessage请求进行处理,处理完后会调用RpcCallContext的reply方法将序列化后的shuffleStatus返回给客户端。messageLoop会一直循环处理请求,直到获取到PoisonPill消息,而这个消息是MapOutputTrackerMaster的stop方法发出的。
/** Message loop used for dispatching messages. */ private class MessageLoop extends Runnable { override def run(): Unit = { try { while (true) { try { // 从mapOutputRequests中获取GetMapOutputMessage val data = mapOutputRequests.take() if (data == PoisonPill) { // 如果是毒药,就将其放回并结束当前MessageLoop任务. mapOutputRequests.offer(PoisonPill) return } // 获取RpcCallContext val context = data.context // 获取shuffleId val shuffleId = data.shuffleId val hostPort = context.senderAddress.hostPort logDebug("Handling request to send map output locations for shuffle " + shuffleId + " to " + hostPort) // 获取对应shuffleId所对应的序列化Map任务状态信息 val mapOutputStatuses = getSerializedMapOutputStatuses(shuffleId) // 将序列化的Map任务状态信息返回客户端 context.reply(mapOutputStatuses) } catch { case NonFatal(e) => logError(e.getMessage, e) } } } catch { case ie: InterruptedException => // exit } } }Master提供了一系列添加更新移除Shuffle和mapOutput的方法,调用时机分别如下:
registerShuffle, DAGScheduler在创建一个ShuffleMapStage时会顺便把这个stage对应的shuffle注册进来。registerMapOutput, 在一个shuffleMapTask任务完成后,会把map输出的信息注册进来。updateMapOutput,在更新Block信息时候会更新map的statusremoveOutputsOnHost,将某个host上的相关map输出信息全部移除,一般在主机丢失时调用此操作removeOutputsOnExecutor,同样地,将某个executor上的相关map输出信息全部移除,一般在executor丢失时调用此操作都比较简单,可以参考源码
MapOutputTrackerWorker主要用来让ShuffleReader获取单个Reducer的依赖数据信息,调用路径如下:
org.apache.spark.rdd.ShuffledRDD.compute -> SparkEnv.get.shuffleManager.getReader -> SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId -> MapOutputTrackerWorker.getMapSizesByExecutorId所以最主要是getMapSizesByExecutorId方法,该方法先获取ShuffleId对应的所有状态信息,然后通过MapOutputTracker.convertMapStatuses方法来进行选取需要的mapId以及reduceId需要的数据段信息,包装成ShuffleReader需要的数据迭代器返回。
override def getMapSizesByExecutorId( shuffleId: Int, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId") // 本地是否缓存,没有缓存从Driver端获取,目前已经计算好的所有MapTask结果的MapStatus val statuses = getStatuses(shuffleId, conf) try { val actualEndMapIndex = if (endMapIndex == Int.MaxValue) statuses.length else endMapIndex logDebug(s"Convert map statuses for shuffle $shuffleId, " + s"mappers $startMapIndex-$actualEndMapIndex, partitions $startPartition-$endPartition") // 只选取需要的,并且组织成ReduceTask读取需要的格式进行返回 MapOutputTracker.convertMapStatuses( shuffleId, startPartition, endPartition, statuses, startMapIndex, actualEndMapIndex) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: mapStatuses.clear() throw e } }我们继续看下getStatus方法,MapOutputTrackerWorker中会维护一个已经获取到的shuffleId的状态信息的缓存,如果缓存中没有,就需要通过endPoint<是driver端endPoint的引用>进行发送获取状态请求,MapOutputTrackerMasterEndpoint得到请求后放入MapOutputTrackerMaster队列,然后获取结果进行返回,得到shuffleId对应的状态信息,更新缓存,返回状态信息。
private def getStatuses(shuffleId: Int, conf: SparkConf): Array[MapStatus] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") val startTimeNs = System.nanoTime() fetchingLock.withLock(shuffleId) { var fetchedStatuses = mapStatuses.get(shuffleId).orNull if (fetchedStatuses == null) { logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint) val fetchedBytes = askTracker[Array[Byte]](GetMapOutputStatuses(shuffleId)) // 发送获取MapStatus消息 fetchedStatuses = MapOutputTracker.deserializeMapStatuses(fetchedBytes, conf) // 反序列化 logInfo("Got the output locations") mapStatuses.put(shuffleId, fetchedStatuses) } logDebug(s"Fetching map output statuses for shuffle $shuffleId took " + s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms") fetchedStatuses } } else { statuses } }