SparkStreaming优雅关闭剖析

it2023-02-09  46

简介

在前面的文章中,总结了SparkStreaming入门级的文章,了解到SparkStreaming是一种微批处理的"实时"流技术,在实际场景中,当我们使用SparkStreaming开发好功能并通过测试之后部署到生产环境,那么之后就会7*24不间断执行的,除非出现异常退出。当然SparkStreaming提供了checkpoint和WAL机制能够保证我们的程序再次启动时候不会出现数据丢失的情况。但是需求并不是一成不变的,相信读者们都经历过需求不断迭代的情况,当我们需要迭代逻辑的时候,那么我们如何停止线上正在运行的程序呢?本文将为读者们详细介绍一些关于SparkStreaming优雅关闭的手段。接下来我们将针对以下几个问题进行展开讲解:

为什么需要优雅关闭?什么时候触发关闭?采用什么策略关闭?

1.为什么需要优雅关闭

基于前面提到的,当我们的场景需要保证数据准确,不允许数据丢失,那么这个时候我们就得考虑优雅关闭了。说到关闭,那么非优雅关闭就是通过kill -9 processId的方式或者yarn -kill applicationId的方式进行暴力关闭,为什么说这种方式是属于暴力关闭呢?由于Spark Streaming是基于micro-batch机制工作的,按照间隔时间生成RDD,如果在间隔期间执行了暴力关闭,那么就会导致这段时间的数据丢失,虽然提供了checkpoin机制,可以使程序启动的时候进行恢复,但是当出现程序发生变更的场景,必须要删除掉checkpoint,因此这里就会有丢失的风险。

因此我们需要优雅关闭,将剩余未处理的数据或者正在处理的数据能够全部执行完成后,这样才不会出现数据丢失的情况。

2.什么时候触发关闭

既然我们知道了需要优雅关闭,那么就需要知道什么会触发关闭,这样才能有针对性的策略实现优雅关闭。

首先我们先来了解一下整体流程:

首先StreamContext在做初始化的时候,会增加Shutdown hook方法 ,放入到一个钩子队列中,并设置优先级为51当程序jvm退出时,会启动一个线程从钩子队列中按照优先级取出执行,然后就会执行Shutdown钩子方法当执行Shutdown钩子方法时,首先会将receiver进行关闭,即不再接收数据然后停止生成BatchRDD等待task全部完成,停止Executor最后释放所有资源,即整个关闭流程结束

接下来看源码的具体实现

StreamingContext.scala:调用start方法会调用ShutdownHookManager注册stopOnShutdown函数

def start(): Unit = synchronized { state match { case INITIALIZED => startSite.set(DStream.getCreationSite()) ...... /** * StreamContext启动时会增加Shutdown钩子函数,优先级为51 */ shutdownHookRef = ShutdownHookManager.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown()) .... case ACTIVE => logWarning("StreamingContext has already been started") case STOPPED => throw new IllegalStateException("StreamingContext has already been stopped") } }

ShutdownHookManager.scala:在增加钩子函数的时候底层调用了SparkShutdownHookManager内部类

def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = { shutdownHooks.add(priority, hook) } private lazy val shutdownHooks = { val manager = new SparkShutdownHookManager() manager.install() manager } private [util] class SparkShutdownHookManager { def install(): Unit = { val hookTask = new Runnable() { override def run(): Unit = runAll() } org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30) } /** * jvm退出的时候会开启一个线程按照优先级逐个调用钩子函数 */ def runAll(): Unit = { shuttingDown = true var nextHook: SparkShutdownHook = null while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) { Try(Utils.logUncaughtExceptions(nextHook.run())) } } def add(priority: Int, hook: () => Unit): AnyRef = { hooks.synchronized { if (shuttingDown) { throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.") } val hookRef = new SparkShutdownHook(priority, hook) hooks.add(hookRef) hookRef } } } private class SparkShutdownHook(private val priority: Int, hook: () => Unit) extends Comparable[SparkShutdownHook] { //这里真正调用注册的函数 def run(): Unit = hook() }

那么接下来看下真正执行关闭的逻辑,即StreamingContext#stopOnShutdown方法

private def stopOnShutdown(): Unit = { val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false) stop(stopSparkContext = false, stopGracefully = stopGracefully) } def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { synchronized { state match { case ACTIVE => //调度相关的关闭 Utils.tryLogNonFatalError { scheduler.stop(stopGracefully) } //监控 Utils.tryLogNonFatalError { env.metricsSystem.removeSource(streamingSource) } //ui Utils.tryLogNonFatalError { uiTab.foreach(_.detach()) } Utils.tryLogNonFatalError { unregisterProgressListener() } StreamingContext.setActiveContext(null) //设置状态为停止 state = STOPPED } } if (shutdownHookRefToRemove != null) { ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove) } // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). if (stopSparkContext) sc.stop() }

可以看到这里有一个spark.streaming.stopGracefullyOnShutdown参数来传给底层的stop方法,即调用Jobscheduler#stop方法

JobScheduler#stop

def stop(processAllReceivedData: Boolean): Unit = synchronized { //1.首先停止接收数据 if (receiverTracker != null) { receiverTracker.stop(processAllReceivedData) } if (executorAllocationManager != null) { executorAllocationManager.foreach(_.stop()) } //2.停止生成BatchRdd,处理剩余的数据 jobGenerator.stop(processAllReceivedData) //3.停止Exectuor jobExecutor.shutdown() val terminated = if (processAllReceivedData) { jobExecutor.awaitTermination(1, TimeUnit.HOURS) // just a very large period of time } else { jobExecutor.awaitTermination(2, TimeUnit.SECONDS) } if (!terminated) { jobExecutor.shutdownNow() } // Stop everything else listenerBus.stop() eventLoop.stop() eventLoop = null logInfo("Stopped JobScheduler") }

3.采用什么策略关闭?

3.1 配置策略

根据刚才梳理的触发关闭流程中,其实可以通过配置spark.streaming.stopGracefullyOnShutdown=true来实现优雅关闭,但是需要发送 SIGTERM 信号给driver端,这里有两种方案

方案一,具体步骤如下:

通过Spark UI找到driver所在节点。

登录driver节点,执行 ps -ef |grep java |grep ApplicationMaster命令找到对应的pid

执行**kill -SIGTERM ** 发送SIGTERM信号

当spark driver收到该信号时,在日志中会有以下信息

ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook INFO streaming.StreamingContext: StreamingContext stopped successfully INFO spark.SparkContext: Invoking stop() from shutdown hook INFO spark.SparkContext: Successfully stopped SparkContext INFO util.ShutdownHookManager: Shutdown hook called

注意:

这里有一个坑,默认情况下在yarn模式下,spark.yarn.maxAppAttempts参数值和yarn.resourcemanager.am.max-attempts是同一个值,即为2。当通过Kill命令杀掉AM时,Yarn会自动重新启动一个AM,因此需要再发送一次Kill命令。当然也可以通过spark-submit命令提交的时候指定spark.yarn.maxAppAttempts=1这个配置参数;但这里也会有容灾风险,比如出现网络问题的时候,这里就无法自动重启了,程序就会以失败而告终。

方案二:通过**yarn application -kill < applicationid >**命令来kill掉job(不建议使用)

该命令会发送SIGTERM信号给container,同时也会立即发送 SIGKILL 命令。虽然可以通过yarn.nodemanager.sleep-delay-before-sigkill.ms参数来调整SIGTERM和SIGKILL之间的间隔,但是好像没什么作用。具体日志信息如下:

ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM INFO streaming.StreamingContext: Invoking stop(stopGracefully=true) from shutdown hook

3.2 标记策略

该种策略通过借助于三方系统来标记状态, 一种方法是将标记HDFS文件,如果标记文件存在,则调用scc.stop(true,true);或者是借助于redis的key是否存在等方式

val checkIntervalMillis = 60000 var isStopped = false while (! isStopped) { isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis) checkShutdownMarker if (!isStopped && stopFlag) { ssc.stop(true, true) } } def checkShutdownMarker = { if (!stopFlag) { val fs = FileSystem.get(new Configuration()) stopFlag = fs.exists(new Path(shutdownMarker)) }

3.3 服务策略

即提供一个restful服务,暴露出一个接口提供关闭功能。

def httpServer(port:Int,ssc:StreamingContext)={ val server = new Server(port) val context = new ContextHandler() context.setContextPath("/shutdown") context.setHandler( new CloseStreamHandler(ssc) ) server.setHandler(context) server.start() } class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler { override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={ ssc.stop(true,true) response.setContentType("text/html; charset=utf-8"); response.setStatus(HttpServletResponse.SC_OK); val out = response.getWriter(); baseRequest.setHandled(true); } }
最新回复(0)