大数据之Flink(8) | 侧输出流(SideOutput)

it2025-02-27  23

大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。

需求:温度大于30输出一条流和温度小于30输出一条流 代码实现 import learning.SensorReading import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object SideOutputTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val inputStream = env.socketTextStream("hadoop12",9999) val dataStream = inputStream .map(data => { val dataArray = data.split(",") SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble) }) //用ProcessFunction的测输出流实现分流操作 val highTempStream = dataStream .process(new SplitTempProcessor(30)) val lowTempStream = highTempStream.getSideOutput(new OutputTag[(String,Double,Long)]("low-temp")) //打印输出 highTempStream.print("high") lowTempStream.print("low") env.execute("side output job") } } //自定义ProcessFunction,用于区分高低温度的数据 class SplitTempProcessor(threshold: Int) extends ProcessFunction[SensorReading,SensorReading]{ override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading] #Context, out: Collector[SensorReading]): Unit = { //判断当前数据的温度值,如果大于阈值,输出到主流,如果小于阈值,输出到测输出流 if (value.temperature > threshold){ out.collect(value) }else{ ctx.output(new OutputTag[(String,Double,Long)]("low-temp"),(value.id,value.temperature,value.timestamp)) } } }

结果

最新回复(0)