在信息流推荐业务场景中,数据是模型迭代的原材料,是指标增长的重要基石,而「数据流」则贯穿整个推荐业务场景。
一些系统创建了数据,而另一些系统需要使用这些数据。因此高效的数据流托管和自动化传输,是很长时间以来一直困扰我们的问题。为了使不同系统间的数据标准得到统一,企业常用 Flink 或 Storm(以下简称 FS)构建系统间数据流传输的解决方案。但是在推荐架构侧,很多场景下直接搬运数据流即可,不需要进行大量转化,这时使用 FS 的代价就有点大了,而且并不高效。
对于数据流的处理和分发,Apache 家族的另一大成员 NiFi 则更擅长做这种事情。假如老板给你一个新需求,要增加一路数据到 ES。如果使用 FS 的话,代码开发、测试、部署再到线上验证,这个复杂的流程可能会用掉一上午的时间,但对于 NiFi 来说,你可能只用 5 分钟就可以完成。
目前可供查阅的 NiFi 资料并不多,本文将结合 NiFi 在信息流推荐引擎中的使用,简单介绍 NiFi 的特点和使用方式,以期抛转引玉。
NiFi 最初由美国国家安全局(NSA)开发和使用的一个可视化、可定制的数据集成产品。2014 年 NSA 将其贡献给了 Apache 开源社区,2015 年 7 月成为 Apache 顶级项目。
NiFi 为数据流而设计,它可以用来在不同的数据中心之间搭建数据流通的管道。NiFi 通过拖拽界面、配置参数、简单地连接,即可完成对数据流的托管和系统间的自动化传输,使用者可以可视化整个过程并实时进行更改。相比于 FS,它还有很多优秀的特性:
Web 界面拖放组件,并支持图形化配置使用人员无需进行代码开发支持多种数据源自动进行负载均衡和反压方便监控便于扩展且易恢复支持模板复用 下面,我们试着通过 Nifi 的框架来了解它在界面上搭建的工作流,到底是以什么形式在后端进行解析和运转的。NiFi 是基于 Java 的,通过主机上的 JVM 来进行执行,主要由Web Server、Flow Controller、Repository 这三个核心部件组成: Web Server:NiFi 提供了基于 HTTP 协议的 Web 页面,我们可以通过 Web 页面来操作自己的 Task。 Flow Controller:NiFi 的核心部分,可以理解成文件交流的处理器;Processer 则是实际处理单元。
NiFi 将每一个功能集成到一个 Processor 上,Flow Controller 维持着多个处理器的连接并管理各个Processer。NiFi 提供了许多可用的 Processor,如 HDFS、Attributes、Kafka 等,我们在使用时可以直接拖拽 Processor 并更改其配置。当官方的 Processor 不能支持我们的业务需求时,我们还可以利用 Nifi 的可扩展性进行定制开发。Repository:NiFi 提供了三个数据库 FlowFile、Content、Provenance,分别来存储数据流的运行状态、实际数据以及数据源信息。
NiFi 也支持集群模式,运行时每个节点执行相同操作、不同数据。集群依赖 ZooKeeper。ZK 会选出主节点以及集群协调器,负责监督其他节点的心跳: 综上我们可以看到,NiFi 为自动化系统之间的数据流提供了优秀的解决方案。无论数据源头是 MySQL、NoSQL、Kafka、Spark,NiFi 都可以提供相应的支持和丰富的 Processor。基于 WEB 图形界面,通过拖拽、连接、配置就可以完成基于流程的编程,实现数据采集、处理等功能。而且对集群模式的支持,也赋予了 Nifi 良好的横向扩展能力。
目前,Nifi 已经应用在推荐引擎平台的很多在线任务中,包括用户行为实时数据落入 HDFS、曝光事件落盘到 ES、Session 数据的同步任务、兴趣标签落盘到 MySQL 等。 下面是一个用户行为数据实时落盘到 HDFS 的实例: 其中每个 Processor 都可以通过图形化配置相应的属性,无需代码开发即可完成数据的传输。其中 EvaluateJsonPath 的作用是取 FlowFile 中的属性给某个字段赋值以方便后续使用,UpdateAttribute 的作用是对某些字段进行简单处理。从配置这些 Processor 到完成一个 Job 只需要几分钟的时间,极大提升了大家的工作效率。
以下是我们推荐引擎团队使用 NiFi 的一些主要的 Processor Group/Job,主要包括:
实时用户行为数据落盘到 Hive 表,为算法组同学计算小时级模型提供了数据支撑;实时行为数据落入 ES,帮助提供包括首页在内的各个场景的实时监控(CTR 监控、各路召回监控、排序打分情况等);用户每一屏的画像快照落盘到 ES,为推荐架构历史查询系统提供数据支持,对线上修复各种 Bad Case 提供了便利;帮助内容挖掘组的同学解决了新增推荐池笔记图片的打分存储问题。虽然 NiFi 已经集成了很多 Processor 供我们使用,但是随着业务复杂度不断升级,有时候我们需要定制开发适合团队的 Processor。比如我们业务场景中用到了 RocketMQ,NiFi 本身是不支持 RocketMQ 数据流的传输的,这时就需要进行扩展性开发。
通常,通过继承 AbstractProcessor 抽象类,并复写 onTrigger 与 onScheduled 方法来进行开发。每一个 Processor 都有相应的配置,而配置信息可用下边这段代码来进行添加:
private final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() .name("TOPIC") .displayName("TOPIC") .description("TOPIC") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build();下图是我们建好的一组 Processor Group。它包含了一个 Job,主要功能是从 RocketMQ 的某个 topic 不断消费数据,将数据写入 HDFS 中: 当我们右键点某个 Processor 的 Start 按钮,该 Processor 就会启动,而启动后程序会马上去调用对应类中的 onScheduled 方法。当所有 Processor 启动后,程序会按图中流程进行数据流的传递,当 Queue 中一旦有了数据,后置的 Processor 会调用 OnScheduled 方法来处理 Queue 中的数据流。所以,onScheduled 方法一般来编写数据库连接信息、配置属性等相关程序,是处理 Session 的真正逻辑。
下边的程序是我们开发的一个写入 RocketMQ 的简单 Processor,onTrigger 是处理数据流的逻辑,onScheduled 方法是 RocketMQ 的 Producer 的配置信息:
@Override //ProcessSession类专门用来处理queue中传来的dataflow(nifi中为flowFlie)的一些列操作; public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException { try { String topic = context.getProperty(TOPIC).getValue(); FlowFile flowfile = session.get(); session.read(flowfile, in -> { String message = IOUtils.toString(in); byte[] messageBytes = message.getBytes(RemotingHelper.DEFAULT_CHARSET); Message msg = new Message(topic, messageBytes); try { producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { } @Override public void onException(Throwable e) { getLogger().error("producer:" + e); } }); } catch (Exception e) { getLogger().error("producer:", e.getStackTrace()); } }); //清除queue中已经处理过的message; session.getProvenanceReporter().send(flowfile, "rocketMQ"); session.transfer(flowfile, SUCCESS); } catch (Exception e) { getLogger().error("start fail", e.getStackTrace()); } } @OnScheduled public void onScheduled(final ProcessContext context) { try { if (consumer == null) { this.producer = new DefaultMQProducer(); String producerGroup = context.getProperty(PRODUCER_GROUP).getValue(); if (StringUtils.isBlank(producerGroup)) producerGroup = "your group"; this.producer.setNamesrvAddr("your address of rocketMq"); this.producer.setProducerGroup(producerGroup); this.producer.start(); this.producer.setRetryTimesWhenSendFailed(0); } } } catch (Exception e) { getLogger().error("start fail", e.getStackTrace()); } }当某个 Processor 需要停止时,可以通过点击运行中 Processor 的 Stop 按键来进行停止。此时程序会调用 OnStopped 修饰的方法来对相应的服务进行终止:
@OnStopped public void stopConsumer() { getLogger().error("OnStopped:" + String.valueOf(producer)); invalidProducer(); } private synchronized void invalidProducer() { if (producer != null) { producer.shutdown(); } producer = null; }由于 NiFi 集群是依赖 ZK 的,所以在搭建 NiFi 之前,必须先搭建一个可用的 ZK 集群。NiFi 支持使用自己的内嵌 ZK,但由于我们团队拥有自己的 ZK 集群,所以在搭建时使用了外部 ZK 作为 NiFi 的依赖而非内嵌 ZK。NiFi 的核心配置文件是 nifi.properties,该文件是用于控制 NiFi 运行方式的主要配置文件:
nifi.web.http.host= your's host #web Server的Ip,当前节点机器的IP地址 nifi.web.http.port= your's port #Web Server的端口,可以通过该ip和端口访问NIfi的web界面 nifi.remote.input.host=your's host #当前节点机器的IP,站点到站点通信的主机名 nifi.remote.input.socket.port= port #用于站点到站点通信的远程输入套接字端口 nifi.cluster.is.node=true # nifi.cluster.node.address=your's host #节点地址 nifi.cluster.node.protocol.port=port #用于节点之间通信的端口 nifi.zookeeper.connect.string= address1,address2..... #zk的地址 nifi.cluster.load.balance.port=port #指定要侦听传入连接的端口,以便跨群集负载平衡数据。默认值为6342其次是 bootstrap.conf 文件,通过该文件来更改每个节点 JVM 的运行空间参数,以及 log 日志参数。由于 NiFi 的 log 日志默认情况是没有内存使用信息的,所以必须通过该文件配置相应的参数,才能在 log 日志中看到内存的 log 信息:
java.arg.2=-Xms5120m java.arg.3=-Xmx5120m #这两个参数来控制jvm的运行内存参数 #下边这四个参数控制GC日志, nifi默认是没有这些配置参数的 java.arg.20=-XX:+PrintGCDetails java.arg.21=-XX:+PrintGCTimeStamps java.arg.22=-XX:+PrintGCDateStamps java.arg.23=-Xloggc:path of you gc.log最后一个需要配置的地方是 NiFi 的启动文件 nifi.sh,通过该文件可以配置启动 Web 的内存空间大小:
run_nifi_cmd="'${JAVA}' -cp '${BOOTSTRAP_CLASSPATH}' -Xms1024m -Xmx1024m....."NiFi 集群搭建好后,就可以通过配置的 IP 和端口来访问集群的 Web 页面。通过操作栏按钮可以添加Processor Group 和 Processor。通常每个 Processor Group 包含一个或多个 Job,而每个 Job 包含多个Processor。Operate可以控制某个Processor Group的状态转换。 下面我们通过一个从 KafKa 写入 Redis 的实际需求,来分析 NiFi 的使用方法。
如下图所示,可以通过拖拽导航栏中的按钮来进行 Processor Group 的添加。添加过程中,可以键入 Processor Group 的名字,并按 ADD 键来进行 Processor Group 的创建。
创建完 Processor Group 后,可以通过拖拽导航栏中的 Processor 按钮,在该 Processor Group 下添加自己需要的 Processor,点击后我们可以看到很多已经内置的 Processor。当然还有你自己定制开发的 Processor。通过搜索栏来检索相应的 Processor,选中想要的 Processor 后,可以看到该 Processor 的解释信息。
因为实例中是从 Kafka 消费数据到 Redis,所以选择 GetKafka 作为消费数据的 Processor,通过该 Processor 的 Properties 栏对 Kafka 的集群地址、Topic 等基础信息进行配置,当然也可以通过 Setting 栏来配置 Processor 的调度方法。同理,通过该方法添加写入 Redis 的 Processor(PutRedis),并配置 PutRedis 相应的的基础信息即可完成 Processor 的添加。
目前 NiFi 支持的常用 Processor 算子有:
GetKafka/PutKafka:消费 Kafka 的数据/生产数据 GetHDFS/PutHDFS:从 HDFS 中读取数据/写入 HDFSPutElasticsearch:将数据写入 ESUpdateCounter:数据流条数计数器GetRocketMQ/PutRocketMQ(自研):消费 RocketMQ 的数据/生产数据PutSQL:将数据导入 MySQLExecuteSQL:执行 MySQL 语句所有 Processor 配置完毕后,可以右键相应的 Processor 并点击 Start 键来进行启动。
以上是我对 NiFi 在信息流推荐引擎中应用与实践的总结。目前,推荐引擎团队正在调研所有推荐线上相关业务通过 NiFi 界面化来进行配置,无需手动更改数据库,欢迎感兴趣的同学可以一起参与进来。
为了方便大家快速上手,我们团队搭建了一个测试 NiFi 供大家练习。下边附上推荐引擎团队开发的扩展组件 Git 地址,以及 NiFi 的官方 Git 地址和测试 NiFi 地址:
Extension NiFi : https://github.com/chebacca/nifi-external-bundleApache NiFi : https://github.com/apache/nifi Apache NiFi Doc : https://nifi.apache.org/docs.html