idea使用maven快速构建flink骨架项目

it2025-10-04  3

Scala版Flink程序编写

本项目使用的Flink版本为最新版本,也就是1.11.2。现在提供maven项目的配置文件。

使用Intellij IDEA创建一个Maven新项目 勾选Create from archetype,然后点击Add Archetype按钮 GroupId中输入org.apache.flink,ArtifactId中输入flink-quickstart-scala,Version中输入1.11.2,然后点击OK 点击向右箭头,出现下拉列表,选中flink-quickstart-scala:1.11.2(同理构建java项目就是将scala改成java),点击Next Name中输入FlinkTest,GroupId中输入com.zhengkw,ArtifactId中输入FlinkTest,点击Next 最好使用IDEA默认的Maven工具:Bundled(Maven 3),点击Finish,等待一会儿,项目就创建好了 根据情况修改pom文件,因为我这边用的是scala2.12.12,所以我将pom做了一些修改

<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.11.2</flink.version> <scala.binary.version>2.12</scala.binary.version> <scala.version>2.12.12</scala.version> <log4j.version>2.12.1</log4j.version> </properties>

编写WordCount.scala程序

package com.imprexion import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} /** * @ClassName:WorldCount * @author: zhengkw * @description: * @date: 20/10/22上午 11:13 * @version:1.0 * @since: jdk 1.8 scala 2.11.8 */ object WorldCount { def main(args: Array[String]): Unit = { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val text: DataStream[String] = env.readTextFile("E:\\IdeaWorkspace\\Flink-DataMoveForHive2Kafka\\src\\main\\resources\\helloworld.txt") val value = text.flatMap { r => r.split("\\s") } .map(w => WordWithCount(w, 1)) // 使用word字段进行分组操作,也就是shuffle .keyBy(0) // 做聚合操作,类似与reduce .sum(1).print() env.execute() } case class WordWithCount(word: String, count: Int) }

过程中遇到的问题

【Flink scala】No implicits found for parameter evidence$12 官网说明

1:A frequent reason if that the code that generates the TypeInformation has not been imported. Make sure to import the entire flink.api.scala package.

2:Another common cause are generic methods, which can be fixed as described in the following section.

原因:当前环境之下找到不到scala的包

添加:

import org.apache.flink.api.scala._ 即可

最新回复(0)