文章目录
一.简介二.函数三.时间服务和计时器3.1 简介3.2 示例
四.副输出/侧输出(SideOutput)4.1 简介4.2 示例
一.简介
时间信息和Watermark对很多流式应用至关重要,无法用DataStream API转换来访问它们。
DataStream API提供了一组相对底层的转换——处理函数。
除了基本功能,它们还可以访问记录的时间戳和水位线,并支持注册在将来某个特点时间出发计时器。处理函数的副输出功能还允许将记录发送到多个输出流中。
二.函数
Flink提供了8种不同处理函数:
ProcessFunction:dataStreamKeyedProcessFunction:用于KeyedStream,keyBy之后的流处理CoProcessFunction:用于connect连接的流ProcessJoinFunction:用于join流操作BroadcastProcessFunction:用于广播KeyedBroadcastProcessFunction:keyBy之后的广播ProcessWindowFunction:窗口增量聚合ProcessAllWindowFunction:全窗口聚合
通用方法
processElement(v:IN,ctx:Context,out:Collector[Out])会针对流中的每条记录调用一次。
可以跟以前MapFunction一样,Collector发送出去。
Context可以访问时间戳,当前记录键值以及TimeService,支持将结果发送到副输出。
onTimer(timestamp:Long,ctx:OnTimerContext,out:Collector[OUT]) 是一个回调函数,会在之前注册的计数器触发时调用。
timestamp 参数给出了所触发计时器的时间戳,Collector可用来发出记录。
OnTimerContext能够提供和processElement()方法中Context对象相同的服务,它还会返回触发计时器的时间域(处理时间/事件时间)。
三.时间服务和计时器
3.1 简介
Context和OnTimerContext对象中TimerService
currentProcessingTime():Long 返回当前的处理时间。currentWatermark():Long 返回当前水位线时间戳。registerProcessingTimeTimer(timestamp:Long):Unit针对当前键值注册一个处理时间计时器,当执行机器处理时间达到给定的时间戳,该计时器就会触发。registerEventTimeTimer(timestamp:Long):Unit 针对当前键值注册一个事件时间计时器,当更新后水位线时间戳大于或等于计时器时间戳时,它就会触发。deleteProcessingTimeTimer(timestamp:Long):Unit 针对当前键值删除一个注册过的处理时间计时器。如果该计时器不存在,则方法不会有任何作用。deleteEventTimeTimer(timestamp:Long):Unit 针对当前键值删除一个注册过事件时间计时器,如果该计时器不存在,则方法不会有任何作用。计时器触发时会调用onTimer()回调函数,系统对于processElement()和onTimer()两个方法调用同步,防止并发。
每个键值和时间戳只能注册一个计时器,每个键值可以有多个计时器,但具体到每个时间戳就只能有一个。
3.2 示例
object KeyedProcessFunctionTemperatureTest
{
def main
(args
: Array
[String]): Unit = {
val env
= StreamExecutionEnvironment
.getExecutionEnvironment
env
.setParallelism
(1)
env
.setStreamTimeCharacteristic
(TimeCharacteristic
.EventTime
)
val stream
= env
.socketTextStream
("192.168.200.116",9999)
import org
.apache
.flink
.api
.scala
._
val dataDstream
= stream
.map
(data
=>{
val arr
= data
.split
(",")
Record
(arr
(0),arr
(1).trim
.toLong
,arr
(2).trim
.toDouble
)
}).assignTimestampsAndWatermarks
(new BoundedOutOfOrdernessTimestampExtractor
[Record
](Time
.seconds
(1)){
override def extractTimestamp
(element
: Record
): Long = {
element
.timestamp
* 1000
}
})
val resultDStrem
= dataDstream
.keyBy
(_
.id
).process
(new TempIncreaseAlertFunction
())
dataDstream
.print
("data")
resultDStrem
.print
("result")
env
.execute
("KeyedProcessFunctionTemperatureTest")
}
class TempIncreaseAlertFunction
extends KeyedProcessFunction
[String, Record
, String] {
import org
.apache
.flink
.api
.scala
._
lazy val lastTemp
= getRuntimeContext
.getState
(new ValueStateDescriptor
[Double]("lastTemp",Types
.of
[Double]))
lazy val currentTimer
= getRuntimeContext
.getState
(new ValueStateDescriptor
[Long]("timer",Types
.of
[Long]))
override def processElement
(value
: Record
, ctx
: KeyedProcessFunction
[String, Record
, String]#Context
, out
: Collector
[String]): Unit = {
val prevTemp
= lastTemp
.value
()
lastTemp
.update
(value
.temperature
)
val curTimerTimestamp
= currentTimer
.value
()
if(prevTemp
== 0.0 || value
.temperature
< prevTemp
) {
ctx
.timerService
().deleteProcessingTimeTimer
(curTimerTimestamp
)
currentTimer
.clear
()
}else if(value
.temperature
> prevTemp
&& curTimerTimestamp
== 0){
val timerTs
= ctx
.timerService
().currentProcessingTime
() + 1000
ctx
.timerService
().registerProcessingTimeTimer
(timerTs
)
currentTimer
.update
(timerTs
)
}
}
override def onTimer
(timestamp
: Long, ctx
: KeyedProcessFunction
[String, Record
, String]#OnTimerContext
, out
: Collector
[String]): Unit = {
out
.collect
("设备 id 为: " + ctx
.getCurrentKey
+ "的设备温度值已经连续 1s 上升了。")
currentTimer
.clear
()
}
}
}
case class Record
(id
:String,timestamp
:Long,temperature
:Double) extends Serializable
输入
1,1603279030,10
1,1603279030,11
2,1603279030,10 2,1603279030,11
1,1603279030,12
输出
data> Record(1,1603279030,10.0)
data> Record(1,1603279030,11.0)
result> 设备 id 为: 1的设备温度值已经连续 1s 上升了。
data> Record(2,1603279030,10.0)
data> Record(2,1603279030,11.0)
result> 设备 id 为: 2的设备温度值已经连续 1s 上升了。
data> Record(1,1603279030,12.0)
result> 设备 id 为: 1的设备温度值已经连续 1s 上升了。
四.副输出/侧输出(SideOutput)
4.1 简介
大多数DataStream API 算子都只有一个输出,即只能生成一条某个数据类型的结果流。只有split算子可以将一条流拆分成多条类型相同的流。
处理函数提供的副输出功能允许从同一函数发出多条数据流,它们类型可以不同。
4.2 示例
object SideOutputTest2
{
import org
.apache
.flink
.api
.scala
._
val webTerminal
: OutputTag
[MdMsg
] = new OutputTag
[MdMsg
]("Web端埋点数据")
val mobileTerminal
: OutputTag
[MdMsg
] = new OutputTag
[MdMsg
]("移动端埋点数据")
val csTerminal
: OutputTag
[MdMsg
] = new OutputTag
[MdMsg
]("CS端埋点数据")
def main
(args
: Array
[String]): Unit = {
val env
: StreamExecutionEnvironment
= StreamExecutionEnvironment
.getExecutionEnvironment
env
.setStreamTimeCharacteristic
(TimeCharacteristic
.ProcessingTime
)
env
.setParallelism
(1)
env
.setStateBackend
(new MemoryStateBackend
(100, false))
val socketData
: DataStream
[String] = env
.socketTextStream
("localhost", 9999)
socketData
.print
("input data")
val outputStream
: DataStream
[MdMsg
] = socketData
.map
(line
=> {
val str
: Array
[String] = line
.split
(",")
MdMsg
(str
(0), str
(1), str
(2).toLong
)
})
.process
(new ProcessFunction
[MdMsg
,MdMsg
]{
override def processElement
(value
: MdMsg
, ctx
: ProcessFunction
[MdMsg
, MdMsg
]#Context
, out
: Collector
[MdMsg
]): Unit = {
if (value
.mdType
== "web") {
ctx
.output
(webTerminal
, value
)
} else if (value
.mdType
== "mobile") {
ctx
.output
(mobileTerminal
, value
)
} else if (value
.mdType
== "cs") {
ctx
.output
(csTerminal
, value
)
} else {
out
.collect
(value
)
}
}
})
outputStream
.getSideOutput
(webTerminal
).print
("web")
outputStream
.getSideOutput
(mobileTerminal
).print
("mobile")
outputStream
.getSideOutput
(csTerminal
).print
("cs")
env
.execute
()
}
case class MdMsg
(mdType
:String, url
:String, Time
:Long)
}
输入
web,http:web,10
mobile,http:mobile,10
cs,http:cs,10
输出
input data> web,http:web,10
web> MdMsg(web,http:web,10)
input data> mobile,http:mobile,10
mobile> MdMsg(mobile,http:mobile,10)
input data> cs,http:cs,10
cs> MdMsg(cs,http:cs,10)
公众号
名称:大数据计算 微信号:bigdata_limeng