大数据之Flink(9) | 状态管理

it2025-09-02  2

对于定义state有两种方法

ValueState

第一种方法

class MyProcessor extends KeyedProcessFunction[String,SensorReading,Int]{ lazy val myState:ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("my-state",classOf[Int])) override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, Int] #Context, out: Collector[Int]): Unit = { //取出状态 更新状态 myState.value() myState.update(1) } }

第二种方法

class MyProcessor extends KeyedProcessFunction[String,SensorReading,Int]{ var myState:ValueState[Int] = _ override def open(parameters: Configuration): Unit = { myState = getRuntimeContext.getState(new ValueStateDescriptor[Int]("my-state",classOf[Int])) } override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, Int] #Context, out: Collector[Int]): Unit = { //取出状态 更新状态 myState.value() myState.update(1) } }

ListState

class MyProcessor extends KeyedProcessFunction[String,SensorReading,Int]{ lazy val myListState:ListState[String] = getRuntimeContext.getListState(new ListStateDescriptor[String] ("my-list",classOf[String])) override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, Int] #Context, out: Collector[Int]): Unit ={ myListState.add("hello") myListState.update(new java.util.ArrayList[String]()) } }

MapState

class MyProcessor extends KeyedProcessFunction[String,SensorReading,Int]{ lazy val myMapState:MapState[String,Double] = getRuntimeContext.getMapState(new MapStateDescriptor[String,Double] ("my-map",classOf[String],classOf[Double])) override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, Int] #Context, out: Collector[Int]): Unit ={ myMapState.put("sensor_1",10.0) myMapState.get("sensor_1") } }

ReducingState

class MyProcessor extends KeyedProcessFunction[String,SensorReading,Int]{ lazy val myReducingState:ReducingState[SensorReading] = getRuntimeContext.getReducingState(new ReducingStateDescriptor[SensorReading]("my-reduce", new ReduceFunction[SensorReading] { override def reduce(value1: SensorReading, value2: SensorReading): SensorReading = SensorReading(value1.id,value1.timestamp.max(value2.timestamp),value1.temperature.min(value2.temperature)) }, classOf[SensorReading])) override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, Int] #Context, out: Collector[Int]): Unit ={ myReducingState.add(value) myReducingState.clear() } }
最新回复(0)