flink 简单的批处理样例

it2025-08-15  17

1. 第一个import是引入flink 批处理执行所需要的环境 2. 第二个import是引入flink 的隐式转换,如果没有这个,就会报错 3. flink 批处理是不需要env.execute(),这个要特别注意,不然会报各种奇怪的问题 4. 注意流式引入与批处理引入的包不一样 以下式流式处理的包 import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

flink 批处理样例

import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ object demo { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile("D:\\idea\\flink_demo_git\\learn_demo\\data\\aaa.txt") val counts = text.map(r=>r.split(" ")).map((r=>(r(0),r(1).toInt))) .groupBy(0) .sum(1) counts.print() // flink 批处理是不需要env.execute ,流式处理需要,切记 // env.execute("first flink demo") } }
最新回复(0)