对于定义state有两种方法
ValueState
第一种方法
class MyProcessor extends KeyedProcessFunction
[String
,SensorReading
,Int
]{
lazy val myState
:ValueState
[Int
] = getRuntimeContext
.getState(new ValueStateDescriptor
[Int
]("my-state",classOf
[Int
]))
override def
processElement(value
: SensorReading
, ctx
: KeyedProcessFunction
[String
, SensorReading
, Int
]
#Context, out: Collector[Int]): Unit = {
myState
.value()
myState
.update(1)
}
}
第二种方法
class MyProcessor extends KeyedProcessFunction
[String
,SensorReading
,Int
]{
var myState
:ValueState
[Int
] = _
override def
open(parameters
: Configuration
): Unit
= {
myState
= getRuntimeContext
.getState(new ValueStateDescriptor
[Int
]("my-state",classOf
[Int
]))
}
override def
processElement(value
: SensorReading
, ctx
: KeyedProcessFunction
[String
, SensorReading
, Int
]
#Context, out: Collector[Int]): Unit = {
myState
.value()
myState
.update(1)
}
}
ListState
class MyProcessor extends KeyedProcessFunction
[String
,SensorReading
,Int
]{
lazy val myListState
:ListState
[String
] = getRuntimeContext
.getListState(new ListStateDescriptor
[String
]
("my-list",classOf
[String
]))
override def
processElement(value
: SensorReading
, ctx
: KeyedProcessFunction
[String
, SensorReading
, Int
]
#Context, out: Collector[Int]): Unit ={
myListState
.add("hello")
myListState
.update(new java
.util
.ArrayList
[String
]())
}
}
MapState
class MyProcessor extends KeyedProcessFunction
[String
,SensorReading
,Int
]{
lazy val myMapState
:MapState
[String
,Double
] = getRuntimeContext
.getMapState(new MapStateDescriptor
[String
,Double
]
("my-map",classOf
[String
],classOf
[Double
]))
override def
processElement(value
: SensorReading
, ctx
: KeyedProcessFunction
[String
, SensorReading
, Int
]
#Context, out: Collector[Int]): Unit ={
myMapState
.put("sensor_1",10.0)
myMapState
.get("sensor_1")
}
}
ReducingState
class MyProcessor extends KeyedProcessFunction
[String
,SensorReading
,Int
]{
lazy val myReducingState
:ReducingState
[SensorReading
] = getRuntimeContext
.getReducingState(new
ReducingStateDescriptor
[SensorReading
]("my-reduce",
new ReduceFunction
[SensorReading
] {
override def
reduce(value1
: SensorReading
, value2
: SensorReading
): SensorReading
=
SensorReading(value1
.id
,value1
.timestamp
.max(value2
.timestamp
),value1
.temperature
.min(value2
.temperature
))
},
classOf
[SensorReading
]))
override def
processElement(value
: SensorReading
, ctx
: KeyedProcessFunction
[String
, SensorReading
, Int
]
#Context, out: Collector[Int]): Unit ={
myReducingState
.add(value
)
myReducingState
.clear()
}
}