Flink 处理函数

it2025-02-10  8

文章目录

一.简介二.函数三.时间服务和计时器3.1 简介3.2 示例 四.副输出/侧输出(SideOutput)4.1 简介4.2 示例

一.简介

时间信息和Watermark对很多流式应用至关重要,无法用DataStream API转换来访问它们。

DataStream API提供了一组相对底层的转换——处理函数。

除了基本功能,它们还可以访问记录的时间戳和水位线,并支持注册在将来某个特点时间出发计时器。处理函数的副输出功能还允许将记录发送到多个输出流中。

二.函数

Flink提供了8种不同处理函数:

ProcessFunction:dataStreamKeyedProcessFunction:用于KeyedStream,keyBy之后的流处理CoProcessFunction:用于connect连接的流ProcessJoinFunction:用于join流操作BroadcastProcessFunction:用于广播KeyedBroadcastProcessFunction:keyBy之后的广播ProcessWindowFunction:窗口增量聚合ProcessAllWindowFunction:全窗口聚合

通用方法

processElement(v:IN,ctx:Context,out:Collector[Out])会针对流中的每条记录调用一次。

可以跟以前MapFunction一样,Collector发送出去。

Context可以访问时间戳,当前记录键值以及TimeService,支持将结果发送到副输出。

onTimer(timestamp:Long,ctx:OnTimerContext,out:Collector[OUT]) 是一个回调函数,会在之前注册的计数器触发时调用。

timestamp 参数给出了所触发计时器的时间戳,Collector可用来发出记录。

OnTimerContext能够提供和processElement()方法中Context对象相同的服务,它还会返回触发计时器的时间域(处理时间/事件时间)。

三.时间服务和计时器

3.1 简介

Context和OnTimerContext对象中TimerService

currentProcessingTime():Long 返回当前的处理时间。currentWatermark():Long 返回当前水位线时间戳。registerProcessingTimeTimer(timestamp:Long):Unit针对当前键值注册一个处理时间计时器,当执行机器处理时间达到给定的时间戳,该计时器就会触发。registerEventTimeTimer(timestamp:Long):Unit 针对当前键值注册一个事件时间计时器,当更新后水位线时间戳大于或等于计时器时间戳时,它就会触发。deleteProcessingTimeTimer(timestamp:Long):Unit 针对当前键值删除一个注册过的处理时间计时器。如果该计时器不存在,则方法不会有任何作用。deleteEventTimeTimer(timestamp:Long):Unit 针对当前键值删除一个注册过事件时间计时器,如果该计时器不存在,则方法不会有任何作用。计时器触发时会调用onTimer()回调函数,系统对于processElement()和onTimer()两个方法调用同步,防止并发。

每个键值和时间戳只能注册一个计时器,每个键值可以有多个计时器,但具体到每个时间戳就只能有一个。

3.2 示例

//某个传感器的温度在1秒的处理时间内持续上升警告 object KeyedProcessFunctionTemperatureTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = env.socketTextStream("192.168.200.116",9999) import org.apache.flink.api.scala._ val dataDstream = stream.map(data=>{ val arr = data.split(",") Record(arr(0),arr(1).trim.toLong,arr(2).trim.toDouble) }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Record](Time.seconds(1)){ override def extractTimestamp(element: Record): Long = { element.timestamp * 1000 } }) val resultDStrem = dataDstream.keyBy(_.id).process(new TempIncreaseAlertFunction()) dataDstream.print("data") resultDStrem.print("result") env.execute("KeyedProcessFunctionTemperatureTest") } /** * 如果某传感器的温度在1秒(处理时间)持续增加 * 则发出警告 */ class TempIncreaseAlertFunction extends KeyedProcessFunction[String, Record, String] { import org.apache.flink.api.scala._ //定义一个值状态,保存上一个设备温度值 lazy val lastTemp = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp",Types.of[Double])) //保存注册的定时器的时间戳 lazy val currentTimer = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer",Types.of[Long])) override def processElement(value: Record, ctx: KeyedProcessFunction[String, Record, String]#Context, out: Collector[String]): Unit = { //获取前一个温度 val prevTemp = lastTemp.value() //更新最近一次温度 lastTemp.update(value.temperature) //获取上一个定时器的时间戳 val curTimerTimestamp = currentTimer.value() if(prevTemp == 0.0 || value.temperature < prevTemp) { //温度下降,删除当前计时器 ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp) currentTimer.clear() }else if(value.temperature > prevTemp && curTimerTimestamp == 0){ //温度上升,没有设置计时器 //以当前时间 +1秒设置处理时间计时器 val timerTs = ctx.timerService().currentProcessingTime() + 1000 ctx.timerService().registerProcessingTimeTimer(timerTs) //更新当前计时器 currentTimer.update(timerTs) } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, Record, String]#OnTimerContext, out: Collector[String]): Unit = { out.collect("设备 id 为: " + ctx.getCurrentKey + "的设备温度值已经连续 1s 上升了。") currentTimer.clear() } } } case class Record(id:String,timestamp:Long,temperature:Double) extends Serializable

输入

1,1603279030,10 1,1603279030,11 2,1603279030,10 2,1603279030,11 1,1603279030,12

输出

data> Record(1,1603279030,10.0) data> Record(1,1603279030,11.0) result> 设备 id 为: 1的设备温度值已经连续 1s 上升了。 data> Record(2,1603279030,10.0) data> Record(2,1603279030,11.0) result> 设备 id 为: 2的设备温度值已经连续 1s 上升了。 data> Record(1,1603279030,12.0) result> 设备 id 为: 1的设备温度值已经连续 1s 上升了。

四.副输出/侧输出(SideOutput)

4.1 简介

大多数DataStream API 算子都只有一个输出,即只能生成一条某个数据类型的结果流。只有split算子可以将一条流拆分成多条类型相同的流。

处理函数提供的副输出功能允许从同一函数发出多条数据流,它们类型可以不同。

4.2 示例

object SideOutputTest2 { import org.apache.flink.api.scala._ val webTerminal: OutputTag[MdMsg] = new OutputTag[MdMsg]("Web端埋点数据") val mobileTerminal: OutputTag[MdMsg] = new OutputTag[MdMsg]("移动端埋点数据") val csTerminal: OutputTag[MdMsg] = new OutputTag[MdMsg]("CS端埋点数据") def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) env.setParallelism(1) env.setStateBackend(new MemoryStateBackend(100, false)) val socketData: DataStream[String] = env.socketTextStream("localhost", 9999) socketData.print("input data") val outputStream: DataStream[MdMsg] = socketData.map(line => { val str: Array[String] = line.split(",") MdMsg(str(0), str(1), str(2).toLong) }) .process(new ProcessFunction[MdMsg,MdMsg]{ override def processElement(value: MdMsg, ctx: ProcessFunction[MdMsg, MdMsg]#Context, out: Collector[MdMsg]): Unit = { // web if (value.mdType == "web") { ctx.output(webTerminal, value) // mobile } else if (value.mdType == "mobile") { ctx.output(mobileTerminal, value) // cs } else if (value.mdType == "cs") { ctx.output(csTerminal, value) // others } else { out.collect(value) } } }) // Web端埋点数据流处理逻辑 outputStream.getSideOutput(webTerminal).print("web") // Mobile端埋点数据流处理逻辑 outputStream.getSideOutput(mobileTerminal).print("mobile") // CS端埋点数据流处理逻辑 outputStream.getSideOutput(csTerminal).print("cs") env.execute() } case class MdMsg(mdType:String, url:String, Time:Long) }

输入

web,http:web,10 mobile,http:mobile,10 cs,http:cs,10

输出

input data> web,http:web,10 web> MdMsg(web,http:web,10) input data> mobile,http:mobile,10 mobile> MdMsg(mobile,http:mobile,10) input data> cs,http:cs,10 cs> MdMsg(cs,http:cs,10)

公众号

名称:大数据计算 微信号:bigdata_limeng

最新回复(0)