文章目录
集群模型和角色资源和资源组分布式缓存故障恢复和重启策略故障恢复fullregion
重启策略
Exactly-once过程
窗口和时间窗口时间
反压问题反压指标反压处理数据倾斜GC代码本身
数据倾斜原因两阶段聚合解决 KeyBy 热点解决方案
GroupBy + Aggregation 分组聚合热点问题解决方案
Flink 消费 Kafka 上下游并行度不一致导致的数据倾斜解决方案
Flink 维表关联实时查询维表预加载全量数据LRU 缓存
Flink 去重基于状态后端基于 HyperLogLog基于布隆过滤器()基于 BitMap
实战环境准备配置 Kafka基本配置如果要消费多个 topic 怎么办?消息序列化partition 和 topic 动态发现Flink 消费 Kafka 设置 offset 的方法Flink 消费 kafka 结合水印及乱序
聚合函数ReduceFunctionAggregateFunctionProcessWindowFunction
累加器Flink Redis Sink 实现自定义 Redis Sink使用 Flink Redis Sink
TOP N 热门商品实现Flink 实时统计 PV、UVFlume 概述搭建 Flume 环境启动 Flume Agent发送消息并观察
Flume + Kafka 整合Kafka 日志数据清洗Flink 和 Kafka 整合时间窗口设计Flink 消费 Kafka 数据并反序列化窗口设计触发器设计
Flink 计算 PV、UV 代码实现分组窗口 + 过期数据剔除分组窗口数据剔除使用 BitMap / 布隆过滤器
Flink 和 Redis 整合Flink 整合 MySQLFlink 整合 HBase
集群模型和角色
JobManager:它扮演的是集群管理者的角色,负责调度任务、协调 checkpoints、协调故障恢复、收集 Job 的状态信息,并管理 Flink 集群中的从节点TaskManager。 TaskManager:实际负责执行计算的 Worker,在其上执行 Flink Job 的一组 Task;TaskManager 还是所在节点的管理员,它负责把该节点上的服务器信息比如内存、磁盘、任务运行情况等向 JobManager 汇报。Client:用户在提交编写好的 Flink 工程时,会先创建一个客户端再进行提交,这个客户端就是 Client,Client 会根据用户传入的参数选择使用 yarn per job 模式、stand-alone 模式还是 yarn-session 模式将 Flink 程序提交到集群。
资源和资源组
在 Flink 集群中,一个 TaskManger 就是一个 JVM 进程,并且会用独立的线程来执行 task,为了控制一个 TaskManger 能接受多少个 task,Flink 提出了 Task Slot 的概念。
假如一个 TaskManager 拥有 5 个 slot,那么该 TaskManager 的计算资源会被平均分为 5 份,不同的 task 在不同的 slot 中执行,避免资源竞争。需要注意的是,slot 仅仅用来做内存的隔离,对 CPU 不起作用。那么运行在同一个 JVM 的 task 可以共享 TCP 连接,减少网络传输,在一定程度上提高了程序的运行效率,降低了资源消耗。
分布式缓存
目的是为了在分布式环境中让每一个 TaskManager 节点保存一份相同的数据或者文件,当前计算节点的 task 就像读取本地文件一样拉取这些配置。
步骤: 第一步:首先需要在 env 环境中注册一个文件,该文件可以来源于本地,也可以来源于 HDFS ,并且为该文件取一个名字。 第二步:在使用分布式缓存时,可根据注册的名字直接获取。
注意事项:
缓存文件在运行期间最好是只读状态,保证数据的一致性;缓存的内容不宜过大,否则会影响 task 的执行效率,严重的情况下还会造成 OOM 。
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerCachedFile("/Users/wangzhiwu/WorkSpace/quickstart/distributedcache.txt", "distributedCache");
//1:注册一个文件,可以使用hdfs上的文件 也可以是本地文件进行测试
DataSource<String> data = env.fromElements("Linea", "Lineb", "Linec", "Lined");
DataSet<String> result = data.map(new RichMapFunction<String, String>() {
private ArrayList<String> dataList = new ArrayList<String>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//2:使用该缓存文件
File myFile = getRuntimeContext().getDistributedCache().getFile("distributedCache");
List<String> lines = FileUtils.readLines(myFile);
for (String line : lines) {
this.dataList.add(line);
System.err.println("分布式缓存为:" + line);
}
}
@Override
public String map(String value) throws Exception {
//在这里就可以使用dataList
System.err.println("使用datalist:" + dataList + "-------" +value);
//业务逻辑
return dataList +":" + value;
}
});
result.printToErr();
}
故障恢复和重启策略
故障恢复
有 full 和 region 两种,配置项是 jobmanager.execution.failover-strategy 。
full
当我们配置的故障恢复策略为 full 时,集群中的 Task 发生故障,那么该任务的所有 Task 都会发生重启。
region
集群中某一个或几个 Task 发生了故障,只需要重启有问题的一部分即可。 Flink 在判断需要重启的 Region 时,采用了以下的判断逻辑:
发生错误的 Task 所在的 Region 需要重启;如果当前 Region 的依赖数据出现损坏或者部分丢失,那么生产数据的 Region 也需要重启;为了保证数据一致性,当前 Region 的下游 Region 也需要重启。
重启策略
固定延迟重启策略模式;失败率重启策略模式;无重启策略模式。
Exactly-once
在 Flink 中两阶段提交的实现方法被封装到了 TwoPhaseCommitSinkFunction 这个抽象类中,我们只需要实现其中的beginTransaction、preCommit、commit、abort 四个方法就可以实现“精确一次”的处理语义,实现的方式我们可以在官网中查到:
beginTransaction,在开启事务之前,我们在目标文件系统的临时目录中创建一个临时文件,后面在处理数据时将数据写入此文件;
preCommit,在预提交阶段,刷写(flush)文件,然后关闭文件,之后就不能写入到文件了,我们还将为属于下一个检查点的任何后续写入启动新事务;
commit,在提交阶段,我们将预提交的文件原子性移动到真正的目标目录中,请注意,这会增加输出数据可见性的延迟;
abort,在中止阶段,我们删除临时文件。
过程
整个过程可以总结为下面四个阶段: 一旦 Flink 开始做 checkpoint 操作,那么就会进入 pre-commit 阶段,同时 Flink JobManager 会将检查点 Barrier 注入数据流中 ; 当所有的 barrier 在算子中成功进行一遍传递,并完成快照后,则 pre-commit 阶段完成; 等所有的算子完成“预提交”,就会发起一个“提交”动作,但是任何一个“预提交”失败都会导致 Flink 回滚到最近的 checkpoint; pre-commit 完成,必须要确保 commit 也要成功,上图中的 Sink Operators 和 Kafka Sink 会共同来保证。
窗口和时间
窗口
滚动窗口,窗口数据有固定的大小,窗口中的数据不会叠加;滑动窗口,窗口数据有固定的大小,并且有生成间隔;会话窗口,窗口数据没有固定的大小,根据用户传入的参数进行划分,窗口数据无叠加。
时间
事件时间(Event Time),即事件实际发生的时间;摄入时间(Ingestion Time),事件进入流处理框架的时间;处理时间(Processing Time),事件被处理的时间。
//设置时间属性为 EventTime
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置时间属性为 ProcessingTime
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
反压问题
Flink 的反压设计利用了网络传输和动态限流。Flink 任务的组成由基本的“流”和“算子”构成,那么“流”中的数据在“算子”间进行计算和转换时,会被放入分布式的阻塞队列中。当消费者的阻塞队列满时,则会降低生产者的数据生产速度。
反压指标
outPoolUsage:发送端缓冲池的使用率
inPoolUsage:接收端缓冲池的使用率
floatingBuffersUsage:处理节点缓冲池的使用率
exclusiveBuffersUsage:数据输入方缓冲池的使用率
反压处理
数据倾斜
数据倾斜问题是我们生产环境中出现频率最多的影响任务运行的因素,可以在 Flink 的后台管理页面看到每个 Task 处理数据的大小。当数据倾斜出现时,通常是简单地使用类似 KeyBy 等分组聚合函数导致的,需要用户将热点 Key 进行预处理,降低或者消除热点 Key 的影响。
GC
垃圾回收问题也是造成反压的因素之一。不合理的设置 TaskManager 的垃圾回收参数会导致严重的 GC 问题,我们可以通过 -XX:+PrintGCDetails 参数查看 GC 的日志。
代码本身
开发者错误地使用 Flink 算子,没有深入了解算子的实现机制导致性能问题。我们可以通过查看运行机器节点的 CPU 和内存情况定位问题。
数据倾斜
原因
业务上有严重的数据热点,比如滴滴打车的订单数据中北京、上海等几个城市的订单量远远超过其他地区;技术上大量使用了 KeyBy、GroupBy 等操作,错误的使用了分组 Key,人为产生数据热点。
两阶段聚合解决 KeyBy 热点
解决方案
首先把分组的 key 打散,比如加随机后缀;对打散后的数据进行聚合;把打散的 key 还原为真正的 key;二次 KeyBy 进行结果统计,然后输出。
DataStream sourceStream = ...;
resultStream = sourceStream
.map(record -> {
Record record = JSON.parseObject(record, Record.class);
String type = record.getType();
record.setType(type + "#" + new Random().nextInt(100));
return record;
})
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new CountAggregate())
.map(count -> {
String key = count.getKey.substring(0, count.getKey.indexOf("#"));
return RecordCount(key,count.getCount);
})
//二次聚合
.keyBy(0)
.process(new CountProcessFunction);
resultStream.sink()...
env.execute()...
GroupBy + Aggregation 分组聚合热点问题
解决方案
// 原本的代码
select
date,
type,
sum(count) as pv
from table
group by
date,
type;
// 改进后的代码
// 在上面的 SQL 拆成了内外两层,第一层通过随机打散 100 份的方式减少数据热点
select date,
type,
sum(pv) as pv
from(
select
date,
type,
sum(count) as pv
from table
group by
date,
type,
floor(rand()*100) --随机打散成100份
)
group by
date,
type;
Flink 消费 Kafka 上下游并行度不一致导致的数据倾斜
解决方案
dataStream
.setParallelism(2)
// 采用REBALANCE分区策略重分区
.rebalance() //.rescale()
.print()
.setParallelism(4);
Flink 维表关联
实时查询维表
public class Order {
private Integer cityId;
private String userName;
private String items;
public Integer getCityId() {
return cityId;
}
public void setCityId(Integer cityId) {
this.cityId = cityId;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getItems() {
return items;
}
public void setItems(String items) {
this.items = items;
}
@Override
public String toString() {
return "Order{" +
"cityId=" + cityId +
", userName='" + userName + '\'' +
", items='" + items + '\'' +
'}';
}
}
public class DimSync extends RichMapFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);
private Connection conn = null;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
}
public Order map(String in) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(in);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
//根据city_id 查询 city_name
PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
pst.setInt(1,cityId);
ResultSet resultSet = pst.executeQuery();
String cityName = null;
while (resultSet.next()){
cityName = resultSet.getString(1);
}
pst.close();
return new Order(cityId,userName,items,cityName);
}
public void close() throws Exception {
super.close();
conn.close();
}
}
预加载全量数据
public class WholeLoad extends RichMapFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
ScheduledExecutorService executor = null;
private Map<String,String> cache;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
load();
} catch (Exception e) {
e.printStackTrace();
}
}
},5,5, TimeUnit.MINUTES);
}
@Override
public Order map(String value) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(value);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
String cityName = cache.get(cityId);
return new Order(cityId,userName,items,cityName);
}
public void load() throws Exception {
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
ResultSet rs = statement.executeQuery();
while (rs.next()) {
String cityId = rs.getString("city_id");
String cityName = rs.getString("city_name");
cache.put(cityId, cityName);
}
con.close();
}
}
LRU 缓存
<!-- 添加依赖 -->
<dependency>
<groupId>org.hbase</groupId>
<artifactId>asynchbase</artifactId>
<version>1.8.2</version>
</dependency>
public class LRU extends RichAsyncFunction<String,Order> {
private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
String table = "info";
Cache<String, String> cache = null;
private HBaseClient client = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//创建hbase客户端
client = new HBaseClient("127.0.0.1","7071");
cache = CacheBuilder.newBuilder()
//最多存储10000条
.maximumSize(10000)
//过期时间为1分钟
.expireAfterWrite(60, TimeUnit.SECONDS)
.build();
}
@Override
public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
JSONObject jsonObject = JSONObject.parseObject(input);
Integer cityId = jsonObject.getInteger("city_id");
String userName = jsonObject.getString("user_name");
String items = jsonObject.getString("items");
//读缓存
String cacheCityName = cache.getIfPresent(cityId);
//如果缓存获取失败再从hbase获取维度数据
if(cacheCityName != null){
Order order = new Order();
order.setCityId(cityId);
order.setItems(items);
order.setUserName(userName);
order.setCityName(cacheCityName);
resultFuture.complete(Collections.singleton(order));
}else {
client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
for (KeyValue kv : arg) {
String value = new String(kv.value());
Order order = new Order();
order.setCityId(cityId);
order.setItems(items);
order.setUserName(userName);
order.setCityName(value);
resultFuture.complete(Collections.singleton(order));
cache.put(String.valueOf(cityId), value);
}
return null;
});
}
}
}
Flink 去重
基于状态后端
public class MapStateDistinctFunction extends KeyedProcessFunction<String,Tuple2<String,Integer>,Tuple2<String,Integer>> {
private transient ValueState<Integer> counts;
@Override
public void open(Configuration parameters) throws Exception {
//我们设置 ValueState 的 TTL 的生命周期为24小时,到期自动清除状态
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.minutes(24 * 60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//设置 ValueState 的默认值
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("skuNum", Integer.class);
descriptor.enableTimeToLive(ttlConfig);
counts = getRuntimeContext().getState(descriptor);
super.open(parameters);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String f0 = value.f0;
//如果不存在则新增
if(counts.value() == null){
counts.update(1);
}else{
//如果存在则加1
counts.update(counts.value()+1);
}
out.collect(Tuple2.of(f0, counts.value()));
}
}
基于 HyperLogLog
HyperLogLog 是一种估计统计算法,被用来统计一个集合中不同数据的个数,也就是我们所说的去重统计。HyperLogLog 算法是用于基数统计的算法,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2 的 64 方个不同元素的基数。HyperLogLog 适用于大数据量的统计,因为成本相对来说是更低的,最多也就占用 12KB 内存。
<!-- 引入依赖 -->
<dependency>
<groupId>net.agkn</groupId>
<artifactId>hll</artifactId>
<version>1.6.0</version>
</dependency>
public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String,Long>,HLL,Long> {
@Override
public HLL createAccumulator() {
return new HLL(14, 5);
}
@Override
public HLL add(Tuple2<String, Long> value, HLL accumulator) {
//value 为访问记录 <商品sku, 用户id>
accumulator.addRaw(value.f1);
return accumulator;
}
@Override
public Long getResult(HLL accumulator) {
long cardinality = accumulator.cardinality();
return cardinality;
}
@Override
public HLL merge(HLL a, HLL b) {
a.union(b);
return a;
}
}
基于布隆过滤器()
BloomFilter(布隆过滤器)类似于一个 HashSet,用于快速判断某个元素是否存在于集合中,其典型的应用场景就是能够快速判断一个 key 是否存在于某容器,不存在就直接返回。需要注意的是,和 HyperLogLog 一样,布隆过滤器不能保证 100% 精确。但是它的插入和查询效率都很高。
public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> {
private transient ValueState<BloomFilter> bloomState;
private transient ValueState<Long> countState;
@Override
public void processElement(String value, Context ctx, Collector<Long> out) throws Exception {
BloomFilter bloomFilter = bloomState.value();
Long skuCount = countState.value();
if(bloomFilter == null){
BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000);
}
if(skuCount == null){
skuCount = 0L;
}
if(!bloomFilter.mightContain(value)){
bloomFilter.put(value);
skuCount = skuCount + 1;
}
bloomState.update(bloomFilter);
countState.update(skuCount);
out.collect(countState.value());
}
}
基于 BitMap
Bit-Map 的基本思想是用一个 bit 位来标记某个元素对应的 Value,而 Key 即是该元素。由于采用了 Bit 为单位来存储数据,因此可以大大节省存储空间。BitMap 不仅可以减少存储,而且还可以做到完全准确。
<!-- 引入依赖 -->
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.8.0</version>
</dependency>
public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}
@Override
public Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) {
accumulator.add(value);
return accumulator;
}
@Override
public Long getResult(Roaring64NavigableMap accumulator) {
return accumulator.getLongCardinality();
}
@Override
public Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) {
return null;
}
}
实战
环境准备
-- 启动 Zookeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
-- 启动Kafka Server
nohup bin/kafka-server-start.sh config/server.properties &
-- 创建名为 test 的 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
配置 Kafka
基本配置
<!-- 添加 kafka 相关依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
// 随机生成一些数据
public class MyNoParalleSource implements SourceFunction<String> {
//private long count = 1L;
private boolean isRunning = true;
/**
* 主要的方法
* 启动一个source
* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
while(isRunning){
//图书的排行榜
List<String> books = new ArrayList<>();
books.add("Pyhton从入门到放弃");//10
books.add("Java从入门到放弃");//8
books.add("Php从入门到放弃");//5
books.add("C++从入门到放弃");//3
books.add("Scala从入门到放弃");
int i = new Random().nextInt(5);
ctx.collect(books.get(i));
//每2秒产生一条数据
Thread.sleep(2000);
}
}
//取消一个cancel的时候会调用的方法
@Override
public void cancel() {
isRunning = false;
}
}
// 向 kafka 发送数据
public class KafkaProducer {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(5000);
DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// 2.0 配置 KafkaProducer
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(
"127.0.0.1:9092", //broker 列表
"test", //topic
new SimpleStringSchema()); // 消息序列化
//写入 Kafka 时附加记录的事件时间戳
producer.setWriteTimestampToKafka(true);
text.addSink(producer);
env.execute();
}
}
// 程序成功启动后可以用以下命令查看是否写入成功
/bin/kafka-console-comsumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
// 消费 kafka 中的数据并在控制台输出
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(5000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// 如果你是0.8版本的Kafka,需要配置
//properties.setProperty("zookeeper.connect", "localhost:2181");
//设置消费组
properties.setProperty("group.id", "group_test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
//设置从最早的ffset消费
consumer.setStartFromEarliest();
//还可以手动指定相应的 topic, partition,offset,然后从指定好的位置开始消费
//HashMap<KafkaTopicPartition, Long> map = new HashMap<>();
//map.put(new KafkaTopicPartition("test", 1), 10240L);
//假如partition有多个,可以指定每个partition的消费位置
//map.put(new KafkaTopicPartition("test", 2), 10560L);
//然后各个partition从指定位置消费
//consumer.setStartFromSpecificOffsets(map);
env.addSource(consumer).flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
System.out.println(value);
}
});
env.execute("start consumer...");
}
如果要消费多个 topic 怎么办?
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
// 如果你是0.8版本的Kafka,需要配置
//properties.setProperty("zookeeper.connect", "localhost:2181");
//设置消费组
properties.setProperty("group.id", "group_test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
ArrayList<String> topics = new ArrayList<>();
topics.add("test_A");
topics.add("test_B");
// 传入一个 list,完美解决了这个问题
FlinkKafkaConsumer<Tuple2<String, String>> consumer = new FlinkKafkaConsumer<>(topics, new SimpleStringSchema(), properties);
消息序列化
public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {
//是否表示流的最后一条元素,设置为false,表示数据会源源不断地到来
@Override
public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {
return false;
}
//这里返回一个ConsumerRecord<String,String>类型的数据,除了原数据还包括topic,offset,partition等信息
@Override
public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return new ConsumerRecord<String, String>(
record.topic(),
record.partition(),
record.offset(),
new String(record.key()),
new String(record.value())
);
}
//指定数据的输入类型
@Override
public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>(){});
}
}
partition 和 topic 动态发现
// 每隔 10ms 会动态获取 Topic 的元数据,对于新增的 Partition 会自动从最早的位点开始消费数据。
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
// 使用正则表达式动态识别 topic
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(Pattern.compile("^test_([A-Za-z0-9]*)$"), new SimpleStringSchema(), properties);
Flink 消费 Kafka 设置 offset 的方法
/**
* Flink从指定的topic和parition中指定的offset开始
*/
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition("test", 0), 10000L);
offsets.put(new KafkaTopicPartition("test", 1), 20000L);
offsets.put(new KafkaTopicPartition("test", 2), 30000L);
consumer.setStartFromSpecificOffsets(offsets);
/**
* Flink从topic中最早的offset消费
*/
consumer.setStartFromEarliest();
/**
* Flink从topic中指定的时间点开始消费
*/
consumer.setStartFromTimestamp(1559801580000l);
/**
* Flink从topic中最新的数据开始消费
*/
consumer.setStartFromLatest();
/**
* Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
*/
consumer.setStartFromGroupOffsets();
FlinkKafkaConsumer 首先创建了 KafkaFetcher 对象,然后 KafkaFetcher 创建了 KafkaConsumerThread 和 Handover,KafkaConsumerThread 负责直接从 Kafka 中读取 msg,并交给 Handover,然后 Handover 将 msg 传递给 KafkaFetcher.emitRecord 将消息发出。
24讲还需要再看看结尾的源码部分
Flink 消费 kafka 结合水印及乱序
public class WindowWaterMark {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
//设置为eventtime事件类型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置水印生成时间间隔100ms
env.getConfig().setAutoWatermarkInterval(100);
DataStream<String> dataStream = env
.socketTextStream("127.0.0.1", 9000)
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<String>() {
private Long currentTimeStamp = 0L;
//设置允许乱序时间
private Long maxOutOfOrderness = 5000L;
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentTimeStamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(String s, long l) {
String[] arr = s.split(",");
long timeStamp = Long.parseLong(arr[1]);
currentTimeStamp = Math.max(timeStamp, currentTimeStamp);
System.err.println(s + ",EventTime:" + timeStamp + ",watermark:" + (currentTimeStamp - maxOutOfOrderness));
return timeStamp;
}
});
dataStream.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String s) throws Exception {
String[] split = s.split(",");
return new Tuple2<String, Long>(split[0], Long.parseLong(split[1]));
}
})
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<Tuple2<String,Long>, Object, Object>() {
...
})
.print();
env.execute("WaterMark Test Demo");
}
}
聚合函数
ReduceFunction
public class MyReduceFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> input = env.fromElements(courses);
DataStream<Tuple2<String, Integer>> total = input.keyBy(0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
});
total.printToErr();
env.execute("ReduceFunction");
}
public static final Tuple2[] courses = new Tuple2[]{
Tuple2.of("张三",100),
Tuple2.of("李四",80),
Tuple2.of("张三",80),
Tuple2.of("李四",95),
Tuple2.of("张三",90),
Tuple2.of("李四",100),
};
}
AggregateFunction
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
ProcessWindowFunction
// 实现对窗口的分组统计功能
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());
public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}
// 将 ReduceFunction 和 ProcessWindowFunction 结合使用返回窗口中的最小事件以及窗口的开始时间
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
}
}
区别:ReduceFunction 和 AggregateFunction 属于增量聚合,ProcessWindowFunction 属于全量聚合。
累加器
IntCounterLongCounterDoubleCounter自定义实现 Accumulator 或 SimpleAccumulator 接口
// 计算 9000 端口中输入数据的个数
public class CounterTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.socketTextStream("127.0.0.1", 9000, "\n");
dataStream.map(new RichMapFunction<String, String>() {
//定义累加器
private IntCounter intCounter = new IntCounter();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//注册累加器
getRuntimeContext().addAccumulator("counter", this.intCounter);
}
@Override
public String map(String s) throws Exception {
//累加
this.intCounter.add(1);
return s;
}
});
dataStream.print();
JobExecutionResult result = env.execute("counter");
//第四步:结束后输出总量;如果不需要结束后持久化,可以省去
Object accResult = result.getAccumulatorResult("counter");
System.out.println("累加器计算结果:" + accResult);
}
}
Flink Redis Sink 实现
自定义 Redis Sink
<!-- 添加依赖 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.2</version>
</dependency>
public class SelfRedisSink extends RichSinkFunction {
private transient Jedis jedis;
public void open(Configuration config) {
jedis = new Jedis("localhost", 6379);
}
public void invoke(Tuple2<String, String> value, Context context) throws Exception {
if (!jedis.isConnected()) {
jedis.connect();
}
jedis.set(value.f0, value.f1);
}
@Override
public void close() throws Exception {
jedis.close();
}
}
使用 Flink Redis Sink
<!-- 添加依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, String>> stream = env.fromElements("Flink","Spark","Storm").map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String s) throws Exception {
return new Tuple2<>(s, s);
}
});
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();
stream.addSink(new RedisSink<>(conf, new RedisSink01()));
env.execute("redis sink01");
}
public class RedisSink implements RedisMapper<Tuple2<String, String>>{
/**
* 设置 Redis 数据类型
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}
/**
* 设置Key
*/
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
/**
* 设置value
*/
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
TOP N 热门商品实现
// 订单结构表
public class OrderDetail {
private Long userId; //下单用户id
private Long itemId; //商品id
private String citeName;//用户所在城市
private Double price;//订单金额
private Long timeStamp;//下单时间
}
// 使用 EventTime 作为时间依据,并设置 checkpoint 为 60 秒
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);
// 设置 kafka 消费者信息
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
//从最早开始消费
consumer.setStartFromEarliest();
DataStream<String> stream = env
.addSource(consumer);
// 设置乱序时间并声称水印
DataStream<OrderDetail> orderStream = stream.map(message -> JSON.parseObject(message, OrderDetail.class));
orderStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<OrderDetail>() {
private Long currentTimeStamp = 0L;
//设置允许乱序时间
private Long maxOutOfOrderness = 5000L;
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentTimeStamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(OrderDetail element, long previousElementTimestamp) {
return element.getTimeStamp();
}
});
// 所有用户中下单金额最多的前十位用户
DataStream<OrderDetail> reduce = dataStream
.keyBy((KeySelector<OrderDetail, Object>) value -> value.getUserId())
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(600), Time.seconds(20)))
.reduce(new ReduceFunction<OrderDetail>() {
@Override
public OrderDetail reduce(OrderDetail value1, OrderDetail value2) throws Exception {
return new OrderDetail(
value1.getUserId(), value1.getItemId(), value1.getCiteName(), value1.getPrice() + value2.getPrice(), value1.getTimeStamp()
);
}
});
//每20秒计算一次
SingleOutputStreamOperator<Tuple2<Double, OrderDetail>> process = reduce.windowAll(TumblingEventTimeWindows.of(Time.seconds(20)))
.process(new ProcessAllWindowFunction<OrderDetail, Tuple2<Double, OrderDetail>, TimeWindow>() {
@Override
public void process(Context context, Iterable<OrderDetail> elements, Collector<Tuple2<Double, OrderDetail>> out) throws Exception {
TreeMap<Double, OrderDetail> treeMap = new TreeMap<Double, OrderDetail>(new Comparator<Double>() {
@Override
public int compare(Double x, Double y) {
return (x < y) ? -1 : 1;
}
});
Iterator<OrderDetail> iterator = elements.iterator();
if (iterator.hasNext()) {
treeMap.put(iterator.next().getPrice(), iterator.next());
if (treeMap.size() > 10) {
treeMap.pollLastEntry();
}
}
for (Map.Entry<Double, OrderDetail> entry : treeMap.entrySet()) {
out.collect(Tuple2.of(entry.getKey(), entry.getValue()));
}
}
}
);
// 将结果存入 redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();
process.addSink(new RedisSink<>(conf, new RedisMapper<Tuple2<Double, OrderDetail>>() {
private final String TOPN_PREFIX = "TOPN:";
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,TOPN_PREFIX);
}
@Override
public String getKeyFromData(Tuple2<Double, OrderDetail> data) {
return String.valueOf(data.f0);
}
@Override
public String getValueFromData(Tuple2<Double, OrderDetail> data) {
return String.valueOf(data.f1.toString());
}
}));
Flink 实时统计 PV、UV
Flume 概述
Client:客户端,用来运行 Flume Agent。Event:Flume 中的数据单位,可以是一行日志、一条消息。Agent:代表一个独立的 Flume 进程,包含三个组件:Source、Channel 和 Sink。Source:数据的收集入口,用来获取 Event 并且传递给 Channel。Channel:Event 的一个临时存储,是数据的临时通道,可以认为是一个队列。Sink:从 Channel 中读取 Event,将 Event 中的数据传递给下游。Flow:一个抽象概念,可以认为是一个从 Source 到达 Sink 的数据流向图。
搭建 Flume 环境
// 解压 Flume 安装包
tar zxf apache-flume-1.8.0-bin.tar.gz
// 配置 Flume
cd /usr/local/apache-flume-1.8.0-bin/conf
cp flume-env.sh.template flume-env.sh
// 修改 flume-env.sh
export JAVA_HOME=/usr/local/java/jdk1.8
FLUME_CLASSPATH="/usr/local/apache-flume-1.8.0-bin/"
// 创建 nc_logger.conf
vim nc_logger.conf
// 输入以下配置内容
# 定义这个 agent 中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 描述和配置 source 组件:r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 9000
# 描述和配置 sink 组件:k1
a1.sinks.k1.type = logger
# 描述和配置channel组件,此处使用是内存缓存的方式
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 描述和配置 source channel sink 之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动 Flume Agent
// 启动 Flume Agent
bin/flume-ng agent
-c conf
-f conf/nc_logger.conf
-n a1 -Dflume.root.logger=INFO,console
参数解释:
–conf (-c) 用来指定配置文件夹路径;–conf-file(-f) 用来指定采集方案文件;–name(-n) 用来指定 agent 名字;-Dflume.root.logger=INFO,console 开启 flume 日志输出到终端。
发送消息并观察
// 新开个终端,并输入以下命令
nc localhost 9000
Flume + Kafka 整合
// 配置日志端的 Flume
vim log_kafka.conf
// 输入以下配置
# 定义这个 agent 中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source的配置,监听日志文件中的新增数据
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/logs/access.log
#sink配置,使用avro日志做数据的消费
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = flumeagent03
a1.sinks.k1.port = 9000
#channel配置,使用文件做数据的临时缓存
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/temp/flume/checkpoint
a1.channels.c1.dataDirs = /home/temp/flume/data
#描述和配置 source channel sink 之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c
// 启动日志端的 Flume Agent
$ flume-ng agent
-c conf
-n a1
-f conf/log_kafka.conf >/dev/null 2>&1 &
// 启动 Kafka
nohup bin/kafka-server-start.sh config/server.properties &
// 创建接收 Flume 消息的 topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic log_kafka
// 配置 Kafka 端的 Flume
vim flume_kafka.conf
// 输入以下配置
# 定义这个 agent 中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source配置
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 9000
#sink配置
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = log_kafka
a1.sinks.k1.brokerList = 127.0.0.1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
#channel配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#描述和配置 source channel sink 之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
// 启动 Kafka 端的 Flume Agent
$ flume-ng agent
-c conf
-n a1
-f conf/flume_kafka.conf >/dev/null 2>&1 &
Kafka 日志数据清洗
逻辑:如果消息体中 userId 为空或者事件消息不是“点击”的事件,并且需要满足 JSON 格式数据的基本要求:以“{" 开头,以 "}”结尾。
StreamExecutionEnviro nment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(5000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
//设置消费组
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("log_kafka", new SimpleStringSchema(), properties);
env.addSource(consumer)
.filter(new UserActionFilter())
.flatMap(new MyFlatMapFunction())
.returns(TypeInformation.of(String.class))
.addSink(new FlinkKafkaProducer(
"127.0.0.1:9092",
"log_user_action",
new SimpleStringSchema()
));
// 定义消息格式
public class UserClick {
private String userId;
private Long timestamp;
private String action;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
}
enum UserAction{
//点击
CLICK("CLICK"),
//购买
PURCHASE("PURCHASE"),
//其他
OTHER("OTHER");
private String action;
UserAction(String action) {
this.action = action;
}
}
// 筛选出用户动作为 CLICK 且为合法的 JSON 格式
public class UserActionFilter implements FilterFunction<String> {
@Override
public boolean filter(String input) throws Exception {
return input.contains("CLICK") && input.startsWith("{") && input.endsWith("}");
}
}
// 过滤掉 userId 为空或者 action 类型为空的数据
public class MyFlatMapFunction implements FlatMapFunction<String,String> {
@Override
public void flatMap(String input, Collector out) throws Exception {
JSONObject jsonObject = JSON.parseObject(input);
String user_id = jsonObject.getString("user_id");
String action = jsonObject.getString("action");
Long timestamp = jsonObject.getLong("timestamp");
if(!StringUtils.isEmpty(user_id) || !StringUtils.isEmpty(action)){
UserClick userClick = new UserClick();
userClick.setUserId(user_id);
userClick.setTimestamp(timestamp);
userClick.setAction(action);
out.collect(JSON.toJSONString(userClick));
}
}
}
// 也可以选择整合筛选逻辑到 ProcessFunction
public class UserActionProcessFunction extends ProcessFunction<String, String> {
@Override
public void processElement(String input, Context ctx, Collector<String> out) throws Exception {
if(! input.contains("CLICK") || input.startsWith("{") || input.endsWith("}")){
return;
}
JSONObject jsonObject = JSON.parseObject(input);
String user_id = jsonObject.getString("user_id");
String action = jsonObject.getString("action");
Long timestamp = jsonObject.getLong("timestamp");
if(!StringUtils.isEmpty(user_id) || !StringUtils.isEmpty(action)){
UserClick userClick = new UserClick();
userClick.setUserId(user_id);
userClick.setTimestamp(timestamp);
userClick.setAction(action);
out.collect(JSON.toJSONString(userClick));
}
}
}
Flink 和 Kafka 整合时间窗口设计
数据清洗完毕之后,才能发挥其真正的价值,另一方面也能减少系统的负担。
Flink 消费 Kafka 数据并反序列化
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 检查点配置,如果要用到状态后端,那么必须配置
env.setStateBackend(new MemoryStateBackend(true));
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("log_user_action", new SimpleStringSchema(), properties);
//设置从最早的offset消费
consumer.setStartFromEarliest();
DataStream<UserClick> dataStream = env
.addSource(consumer)
.name("log_user_action")
.map(message -> {
JSONObject record = JSON.parseObject(message);
return new UserClick(
record.getString("user_id"),
record.getLong("timestamp"),
record.getString("action")
);
})
.returns(TypeInformation.of(UserClick.class));
窗口设计
// 指定消息的乱序时间 30 秒
SingleOutputStreamOperator<UserClick> userClickSingleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClick>(Time.seconds(30)) {
@Override
public long extractTimestamp(UserClick element) {
return element.getTimestamp();
}
});
触发器设计
触发器概述:
EventTimeTrigger:通过对比 Watermark 和窗口的 Endtime 确定是否触发窗口计算,如果 Watermark 大于 Window EndTime 则触发,否则不触发,窗口将继续等待。ProcessTimeTrigger:通过对比 ProcessTime 和窗口 EndTime 确定是否触发窗口,如果 ProcessTime 大于 EndTime 则触发计算,否则窗口继续等待。ContinuousEventTimeTrigger:根据间隔时间周期性触发窗口或者 Window 的结束时间小于当前 EndTime 触发窗口计算。ContinuousProcessingTimeTrigger:根据间隔时间周期性触发窗口或者 Window 的结束时间小于当前 ProcessTime 触发窗口计算。CountTrigger:根据接入数据量是否超过设定的阈值判断是否触发窗口计算。DeltaTrigger:根据接入数据计算出来的 Delta 指标是否超过指定的 Threshold 去判断是否触发窗口计算。PurgingTrigger:可以将任意触发器作为参数转换为 Purge 类型的触发器,计算完成后数据将被清理。
最终实现的代码:
dataStream .windowAll(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
Flink 计算 PV、UV 代码实现
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setStateBackend(new MemoryStateBackend(true));
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("log_user_action", new SimpleStringSchema(), properties);
//设置从最早的offset消费
consumer.setStartFromEarliest();
DataStream<UserClick> dataStream = env
.addSource(consumer)
.name("log_user_action")
.map(message -> {
JSONObject record = JSON.parseObject(message);
return new UserClick(
record.getString("user_id"),
record.getLong("timestamp"),
record.getString("action")
);
})
.returns(TypeInformation.of(UserClick.class));
SingleOutputStreamOperator<UserClick> userClickSingleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClick>(Time.seconds(30)) {
@Override
public long extractTimestamp(UserClick element) {
return element.getTimestamp();
}
});
userClickSingleOutputStreamOperator
.windowAll(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
public class MyProcessFunction extends ProcessAllWindowFunction<UserClick,Tuple2<String,Integer>,TimeWindow> {
@Override
public void process(Context context, Iterable<UserClick> elements, Collector<Tuple2<String, Integer>> out) throws Exception {
HashSet<String> uv = Sets.newHashSet();
Integer pv = 0;
Iterator<UserClick> iterator = elements.iterator();
while (iterator.hasNext()){
String userId = iterator.next().getUserId();
uv.add(userId);
pv = pv + 1;
}
out.collect(Tuple2.of("pv",pv));
out.collect(Tuple2.of("uv",uv.size()));
}
}
这种方法代码简单清晰,但是有很严重的内存占用问题。如果我们的数据量很大,那么所定义的 TumblingProcessingTimeWindows 窗口会缓存一整天的数据,内存消耗非常大。
分组窗口 + 过期数据剔除
分组窗口
为了减少窗口内缓存的数据量,我们可以根据用户的访问时间戳所在天进行分组,然后将数据分散在各个窗口内进行计算,接着在 State 中进行汇总。
userClickSingleOutputStreamOperator
.keyBy(new KeySelector<UserClick, String>() {
@Override
public String getKey(UserClick value) throws Exception {
return DateUtil.timeStampToDate(value.getTimestamp());
}
})
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
...
public class DateUtil {
public static String timeStampToDate(Long timestamp){
ThreadLocal<SimpleDateFormat> threadLocal
= ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
String format = threadLocal.get().format(new Date(timestamp));
return format.substring(0,10);
}
}
数据剔除
CountEvictor:数量剔除器。在 Window 中保留指定数量的元素,并从窗口头部开始丢弃其余元素。DeltaEvictor:阈值剔除器。计算 Window 中最后一个元素与其余每个元素之间的增量,丢弃增量大于或等于阈值的元素。TimeEvictor:时间剔除器。保留 Window 中最近一段时间内的元素,并丢弃其余元素。
userClickSingleOutputStreamOperator
.keyBy(new KeySelector<UserClick, String>() {
@Override
public String getKey(UserClick value) throws Exception {
return value.getUserId();
}
})
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new MyProcessWindowFunction());
public class MyProcessWindowFunction extends ProcessWindowFunction<UserClick,Tuple3<String,String, Integer>,String,TimeWindow>{
private transient MapState<String, String> uvState;
private transient ValueState<Integer> pvState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
uvState = this.getRuntimeContext().getMapState(new MapStateDescriptor<>("uv", String.class, String.class));
pvState = this.getRuntimeContext().getState(new ValueStateDescriptor<Integer>("pv", Integer.class));
}
@Override
public void process(String s, Context context, Iterable<UserClick> elements, Collector<Tuple3<String, String, Integer>> out) throws Exception {
Integer pv = 0;
Iterator<UserClick> iterator = elements.iterator();
while (iterator.hasNext()){
pv = pv + 1;
String userId = iterator.next().getUserId();
uvState.put(userId,null);
}
pvState.update(pvState.value() + pv);
Integer uv = 0;
Iterator<String> uvIterator = uvState.keys().iterator();
while (uvIterator.hasNext()){
String next = uvIterator.next();
uv = uv + 1;
}
Integer value = pvState.value();
if(null == value){
pvState.update(pv);
}else {
pvState.update(value + pv);
}
out.collect(Tuple3.of(s,"uv",uv));
out.collect(Tuple3.of(s,"pv",pvState.value()));
}
}
使用 BitMap / 布隆过滤器
public class MyProcessWindowFunctionBitMap extends ProcessWindowFunction<UserClick,Tuple3<String,String, Integer>,String,TimeWindow>{
private transient ValueState<Integer> uvState;
private transient ValueState<Integer> pvState;
private transient ValueState<Roaring64NavigableMap> bitMapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
uvState = this.getRuntimeContext().getState(new ValueStateDescriptor<Integer>("uv", Integer.class));
pvState = this.getRuntimeContext().getState(new ValueStateDescriptor<Integer>("pv", Integer.class));
bitMapState = this.getRuntimeContext().getState(new ValueStateDescriptor("bitMap", TypeInformation.of(new TypeHint<Roaring64NavigableMap>() {
})));
}
@Override
public void process(String s, Context context, Iterable<UserClick> elements, Collector<Tuple3<String, String, Integer>> out) throws Exception {
Integer uv = uvState.value();
Integer pv = pvState.value();
Roaring64NavigableMap bitMap = bitMapState.value();
if(bitMap == null){
bitMap = new Roaring64NavigableMap();
uv = 0;
pv = 0;
}
Iterator<UserClick> iterator = elements.iterator();
while (iterator.hasNext()){
pv = pv + 1;
String userId = iterator.next().getUserId();
//如果userId可以转成long
bitMap.add(Long.valueOf(userId));
}
out.collect(Tuple3.of(s,"uv",bitMap.getIntCardinality()));
out.collect(Tuple3.of(s,"pv",pv));
}
}
Flink 和 Redis 整合
<!-- 添加依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
// 自定义 Redis Sink
public class MyRedisSink implements RedisMapper<Tuple3<String,String, Integer>>{
/**
* 设置redis数据类型
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"flink_pv_uv");
}
//指定key
@Override
public String getKeyFromData(Tuple3<String, String, Integer> data) {
return data.f1;
}
//指定value
@Override
public String getValueFromData(Tuple3<String, String, Integer> data) {
return data.f2.toString();
}
}
// 调用 sink
...
userClickSingleOutputStreamOperator
.keyBy(new KeySelector<UserClick, String>() {
@Override
public String getKey(UserClick value) throws Exception {
return value.getUserId();
}
})
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new MyProcessWindowFunction())
.addSink(new RedisSink<>(conf,new MyRedisSink()));
...
Flink 整合 MySQL
<!-- 添加依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.0</version>
</dependency>
// JDBC Sink
String driverClass = "com.mysql.jdbc.Driver";
String dbUrl = "jdbc:mysql://127.0.0.1:3306/test";
String userNmae = "root";
String passWord = "123456";
userClickSingleOutputStreamOperator
.keyBy(new KeySelector<UserClick, String>() {
@Override
public String getKey(UserClick value) throws Exception {
return value.getUserId();
}
})
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new MyProcessWindowFunction())
.addSink(
JdbcSink.sink(
"replace into pvuv_result (type,value) values (?,?)",
(ps, value) -> {
ps.setString(1, value.f1);
ps.setInt(2,value.f2);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(dbUrl)
.withDriverName(driverClass)
.withUsername(userNmae)
.withPassword(passWord)
.build())
);
// MySQL Sink
public class MyMysqlSink extends RichSinkFunction<Person> {
private PreparedStatement ps = null;
private Connection connection = null;
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://127.0.0.1:3306/test";
String username = "root";
String password = "123456";
// 初始化方法
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 获取连接
connection = getConn();
connection.setAutoCommit(false);
}
private Connection getConn() {
try {
Class.forName(driver);
connection = DriverManager.getConnection(url, username, password);
} catch (Exception e) {
e.printStackTrace();
}
return connection;
}
//每一个元素的插入,都会调用一次
@Override
public void invoke(Tuple3<String,String,Integer> data, Context context) throws Exception {
ps.prepareStatement("replace into pvuv_result (type,value) values (?,?)")
ps.setString(1,data.f1);
ps.setInt(2,data.f2);
ps.execute();
connection.commit();
}
@Override
public void close() throws Exception {
super.close();
if(connection != null){
connection.close();
}
if (ps != null){
ps.close();
}
}
}
Flink 整合 HBase
<!-- 添加依赖 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
// 自定义 HBase Sink
public class MyHbaseSink extends RichSinkFunction<Tuple3<String, String, Integer>> {
private transient Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost:2181");
connection = ConnectionFactory.createConnection(conf);
}
@Override
public void invoke(Tuple3<String, String, Integer> value, Context context) throws Exception {
String tableName = "database:pvuv_result";
String family = "f";
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(value.f0.getBytes());
put.addColumn(Bytes.toBytes(family),Bytes.toBytes(value.f1),Bytes.toBytes(value.f2));
table.put(put);
table.close();
}
@Override
public void close() throws Exception {
super.close();
connection.close();
}
}
// 使用批量写入的方式提高效率
public class MyHbaseSink extends RichSinkFunction<Tuple3<String, String, Integer>> {
private transient Connection connection;
private transient List<Put> puts = new ArrayList<>(100);
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost:2181");
connection = ConnectionFactory.createConnection(conf);
}
@Override
public void invoke(Tuple3<String, String, Integer> value, Context context) throws Exception {
String tableName = "database:pvuv_result";
String family = "f";
Table table = connection.getTable(TableName.valueOf(tableName));
Put put = new Put(value.f0.getBytes());
put.addColumn(Bytes.toBytes(family),Bytes.toBytes(value.f1),Bytes.toBytes(value.f2));
puts.add(put);
if(puts.size() == 100){
table.put(puts);
puts.clear();
}
table.close();
}
@Override
public void close() throws Exception {
super.close();
connection.close();
}
}