flink中AggregateFunction 执行步骤以及含义全网详细解释

it2023-09-27  75

package operator; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.*; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import org.omg.PortableInterceptor.INACTIVE; import org.apache.flink.api.java.*; import java.text.SimpleDateFormat; import java.util.*; import java.sql.Timestamp; import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent; import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.util.Collector; // Tuple4<Long,Long,Long,Timestamp> public class AggregateFunction_2 { public static void main(String[] args)throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream<Tuple3<String, Integer, Timestamp>> dataStream = env.addSource(new SourceFunction<Tuple3<String, Integer, Timestamp>>() { boolean runing = true; @Override public void run(SourceContext<Tuple3<String, Integer, Timestamp>> ctx) throws Exception { //ctx.collect(new Tuple3("user" , 2, new Timestamp(new Date().getTime()))); int i = 1; while (runing) { Tuple3<String, Integer, Timestamp> t3; Thread.sleep(1000); if (i % 2 == 1) { //判断 t3 = new Tuple3("user" + 1, 1, new Timestamp(new Date().getTime())); } else { t3 = new Tuple3("user" + i, i, new Timestamp(new Date().getTime())); } //System.out.println("======="); //System.out.println(t3); i = i + 1; ctx.collect(t3); /* 返回 user1 1 user2 2 user1 1 user4 4 user1 1 user6 6 */ } } @Override public void cancel() { runing = false; } }); /* DataStream dataStream = env.fromElements( Tuple3.of("1",333,new Timestamp(new Date().getTime())), Tuple3.of("2", 111,new Timestamp(new Date().getTime())), Tuple3.of("1",222,new Timestamp(new Date().getTime())), Tuple3.of("2",444,new Timestamp(new Date().getTime())), Tuple3.of("9",444,new Timestamp(new Date().getTime())), Tuple3.of("6", 555,new Timestamp(new Date().getTime())), Tuple3.of("1", 555,new Timestamp(new Date().getTime())) ) ; */ //dataStream.print(); // 输入类型IN 累加器类型ACC 输出 out DataStream data_aggregate =dataStream // .timeWindowAll(Time.seconds(2))\ .keyBy(0) //分组 .countWindow(2) //2个 // .sum(1); .aggregate(new AggregateFunction<Tuple3<String, Integer, Timestamp>, Tuple3<String, Integer, Timestamp>, Tuple3<String, Integer, Timestamp>>() { @Override // 初始化列累加器 .创建一个新的累加器,启动一个新的聚合,负责迭代状态的初始化 //来一条数据.相应组内只有一条数据时候执行一次 //如果原先有一条,那么新进来一条时候,就不执行了。直接执行add getresult //累加器有点像是中间传递的东西 //user1+user1 通过累加器就是 acc_1=acc(初始化)+第一个user, acc=acc_1+第一个user1 //相加的结果都保留在累加器中。相当于一个寄存的地方 public Tuple3<String, Integer, Timestamp> createAccumulator() { System.out.println("------createAccumulator--------"+new Timestamp(new Date().getTime())); return new Tuple3<>("",0,new Timestamp(new Date().getTime())); } //累加器的累加方法 来一条数据执行一次 对于数据的每条数据,和迭代数据的聚合的具体实现 @Override public Tuple3<String, Integer, Timestamp> add(Tuple3<String, Integer, Timestamp> value, Tuple3<String, Integer, Timestamp> accumulator) { System.out.println("------add--------"+value); accumulator.f0=value.f0; //类加器的第一个值等于第一个数的fo accumulator.f1+=value.f1; //第二个值累加 return accumulator; } // 返回值 在窗口内满足2个,计算结束的时候执行一次 从累加器获取聚合的结果 @Override public Tuple3<String, Integer, Timestamp> getResult(Tuple3<String, Integer, Timestamp> accumulator) { System.out.println("------getResult--------"+accumulator); return accumulator; } //合并两个累加器,返回一个具有合并状态的累加器 一般不触发这个 @Override public Tuple3<String, Integer, Timestamp> merge(Tuple3<String, Integer, Timestamp> a, Tuple3<String, Integer, Timestamp> b) { System.out.println("------merge--------"+a); return null; } } ); data_aggregate.print(); env.execute("execute"); } } 输出 ------createAccumulator--------2020-10-20 20:52:43.177 ------add--------(user1,1,2020-10-20 20:52:43.095) --进来user1 分组后。组内只有一条user1数据 执行createAccumulator-->add (add是加的初始化的累加器) ------createAccumulator--------2020-10-20 20:52:44.179 ------add--------(user2,2,2020-10-20 20:52:44.103) --进来user2 分组后。组内只有一条user2数据 执行createAccumulator-->add(add是加的初始化的累加器) ------add--------(user1,1,2020-10-20 20:52:45.103) --又进来user1 分组后 组内有两个user1 满足数量要求 执行add-->getresult (add:第二个user1+(第一个user1+初始的累加器) )同时由于AggregateFunction是增量计算的。所以清空组内的数据, ------getResult--------(user1,2,2020-10-20 20:52:43.178) (user1,2,2020-10-20 20:52:43.178) ------createAccumulator--------2020-10-20 20:52:46.189 --进来user4 组内只有一条user4数据 执行createAccumulator-->add(add是加的初始化的累加器) ------add--------(user4,4,2020-10-20 20:52:46.103) ------createAccumulator--------2020-10-20 20:52:47.195 --!!!注意由于上面已经进来了两个user1,输出了。由于AggregateFunction是增量计算的。所以前面两个输出后。该组内被清空了 此时是组第一个 ------add--------(user1,1,2020-10-20 20:52:47.104) ------createAccumulator--------2020-10-20 20:52:48.2 --进来一个user6 组内只有一个user6 执行createAccumulator-->add(add是加的初始化的累加器) ------add--------(user6,6,2020-10-20 20:52:48.104) ------add--------(user1,1,2020-10-20 20:52:49.104) --进来一个user1 此时组内有两个了 ,满足数量要求,就 add-->getresult输出。同时由于AggregateFunction是增量计算的。所以清空组内的数据, ------getResult--------(user1,2,2020-10-20 20:52:47.195) (user1,2,2020-10-20 20:52:47.195) ------createAccumulator--------2020-10-20 20:52:50.109 ------add--------(user8,8,2020-10-20 20:52:50.104) ------createAccumulator--------2020-10-20 20:52:51.114 ------add--------(user1,1,2020-10-20 20:52:51.105) ------createAccumulator--------2020-10-20 20:52:52.119 ------add--------(user10,10,2020-10-20 20:52:52.105) ------add--------(user1,1,2020-10-20 20:52:53.106) ------getResult--------(user1,2,2020-10-20 20:52:51.114) (user1,2,2020-10-20 20:52:51.114)

 

最新回复(0)