Apache Flink是一个面向分布式数据流处理和批量数据 处理的开源计算平台,可以对有限数据流和无限数据流进行有状态计算,即提供支持流处理和批处理两种类型的功能
批流统一
支持高吞吐、低延迟‘高性能的流处理
支持有状态计算的Exactly-Once语义
支持高度灵活的窗口操作,如基于事件时间,基于会话时间,基于处理时间等。
支持Backpressure功能的持续流模型
支持基于轻量级Snapshot实现的容错
支持迭代计算
支持程序自动优化:即避免特定情况下的shuffle、排序等操作,中间结果有必要缓存
在Spark生态体系中,其主要是为离线计算设计的,底层引擎都是Spark Core,而Spark Streaming只不过是一种特殊的批处理而已。
下载Flink二进制包
wget http://www.us.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-hadoop27-scala_2.11.tgz解压缩
tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz mv flink-1.7.2-bin-hadoop27-scala_2.11 /opt/flink chown -R hdfs:hdfs /opt/flink环境变量配置
vi /etc/profile export FLINK_HOME=/opt/flink export PATH=$FLINK_HOME/bin:$PATH :wq source /etc/profile下载安装Ambari-Flink软件包
git clone https://github.com/abajwa-hw/ambari-flink-service.git mv ambari-flink-server /var/lib/ambari-server/resources/stacks/HDP/$VERSION/services/FLINK重启ambari-server
ambari-server restart安装flink
img
img
修改yarn参数
<property> <name>yarn.client.failover-proxy-provider</name> <value> org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider </value> </property>重启yarn
测试
flink run --jobmanager yarn-cluster \ -yn 1 \ -ytm 768 \ -yjm 768 \ /opt/flink/examples/batch/WordCount.jar \ --input hdfs://hdp1:8020/user/hdfs/demo/input/word \ --output hdfs://hdp3:8020/user/hdfs/demo/output/wc/flink sql client
sql-client.sh embedded错误解决
java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties 解决方案:在flink的lib包里面添加如下的两个jar包Flink程序的基础构建模块是流(Streams)与转换(transformations)
每个数据流起始于一个或多个source,并终止于一个或多个sink
Source主要负责数据的读取
Transformation主要负责对数据的转换操作
Sink负责最终计算好的结果数据输出
Flink提供了不同的抽象级别以开发流式或批处理应用
最底层提供了有状态流。它将通过过程函数(Process Function)嵌入到DataStream API中。它允许用户可以自由处理来自一个或多个流数据的事件,并使用一致、容错的状态。除此之外,用户可以注册事件时间和处理事件回调,从而程序可以实现复杂的计算。
DataStream/DataSet API 是Flink提供的核心API。DataSet处理有界的数据集,DataStream处理有界或无界的数据流。用户可以通过各种方法(map/flatmap/window/keyBy/sum/max/min/avg/join)等将数据进行转换/计算
Table API:是以表为中心的声明式SQL,其中表可能会动态变化(在表达流数据时)。Table API提供了例如select、project、join、group-by、aggregate等操作,使用起来更加简洁。
Flink提供了最高层级的抽象是SQL.这一层抽象在语法与表达能力上与Table API类似,但是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行
从一个Socket端口中实时的读取数据,然后实时统计相同单词出现的次数,该程序会一直运行,启动程序前先使用nc -l 8888启动一个socket用来发送数据
public class StreamingWordCount { public static void main(String[] args) throws Exception { //创建流式计算的ExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //调用Source,指定Socket地址和端口 DataStream<String> lines = env.socketTextStream("localhost", 10088); DataStream<Tuple2<String, Integer>> words = lines. flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = line.split(" "); for (String word : words) { collector.collect(Tuple2.of(word, 1)); } } }); //按照key分组并聚合 DataStream<Tuple2<String, Integer>> result = words.keyBy(0).sum(1); //将结果打印到控制台 result.print(); //执行 env.execute("StreamingWordCount"); } }参数说明:
-m:指定主机名后面的端口为JobManager的REST的端口,而不是RPC的端口,RPC通信端口是6123
-p:指定是并行度
-c:指定main方法的全类名
