目录
前言:
基本概念:
1.1 Spout
1.1 业务 SentenceSpout
1.2 SentenceSpout引用的部分类源码
BaseRichSpout源码
Values源码
ISpout的源码
2. Bolt
2.1 业务分隔语句 SplitSentenceBolt
2.2 业务单词计数 bolt
2.3 业务实时上报 bolt
2.4 相关引用类源码
BaseRichBolt 源码
OutputCollector
3. 拓扑Topology
前言:
阅读《Storm分布式实时计算模式》书中的例子;
demo可以直接执行输出;
代码写在:wordcount chapter1 v1
代码链接
基本概念:
storm 包括 拓扑 数据流 和spout(数据流生成者) bolt(运算)组成。
拓扑很像hadoop中的job,但是会一直运行下去。
Stream
stream的核心数据结构是tuple,tuple是一个或者多个键值对的列表,
Stream 是由无限制的tuple组成的序列。
Spout
代表着拓扑的入口,充当采集器的角色,连接到数据源,将数据转化为一个个tuple
并将tuple作为数据流进行发射。
主要工作是:编写代码从数据源或者API消费数据。
比如采集:应用的日志时间
spout通常不会实现业务逻辑,所以在多个topology可以复用。
bolt
运算或者函数,将一个或者多个数据流作为输入,对于数据实施运算后,选择性的输出一个或者多个数据流。
bolt可以订阅多个由spout或者其他bolt发射的数据流,这样可以建立复杂的数据流转换网络。
bolt 可以进行:过滤 链接 计算 数据库读写
1.1 Spout
1.1 业务 SentenceSpout
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Map;
import storm.blueprints.utils.Utils;
/**
* BaseRichSpout相当于一个比较简单的实现
*
* */
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {
"my dog has fleas",
"i like cold beverages",
"the dog ate my homework",
"don't have a cow man",
"i don't think i like fleas"
};
private int index = 0;
/**
* 所有的storm的组件都应该实现这个接口
* 通过这个方法会告诉storm 这个组件这个类将会发射那些数据流
* 就是发射sentence这个数据流
* 这边声明的数据流,在下面算子中,也就是SplitSentenceBolt 会去获取这个值;
* */
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
/**
* 所有的spout的组件在初始化的时候调用这个方法
* map是包含了Storm的配置信息的map,第二个TopologyContext对象提供了拓扑中组件的信息,SpoutOutputCollector提供了发射tuple的方法。
* 但是这个地方因为open只是简单地将SpoutOutputCollector对象的引用保存在变量中
* 保存在这个类中
* */
public void open(Map config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* 核心实现;
* storm通过这个方法向输出的collector发射tuple
* 这个意思就是我们发射当前索引对应的语句,然后递增索引指向下一个语句
* */
public void nextTuple() {
//这个sentences[index]相当于tuple,其实就是一个list
//emit相当于发出发射的意思
this.collector.emit(new Values(sentences[index]));
index++;
//这个应该是一个循环
if (index >= sentences.length) {
index = 0;
}
Utils.waitForMillis(1);
}
public static void main(String[] args) {
String[] sentences = {
"my dog has fleas",
"i like cold beverages",
"the dog ate my homework",
"don't have a cow man",
"i don't think i like fleas"
};
for (String str : sentences) {
System.out.println(str);
}
}
}
1.2 SentenceSpout引用的部分类源码
BaseRichSpout源码
package backtype.storm.topology.base;
import backtype.storm.topology.IRichSpout;
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
public BaseRichSpout() {
}
public void close() {
}
public void activate() {
}
public void deactivate() {
}
public void ack(Object msgId) {
}
public void fail(Object msgId) {
}
}
Values源码
package backtype.storm.tuple;
import java.util.ArrayList;
public class Values extends ArrayList<Object> {
public Values() {
}
public Values(Object... vals) {
super(vals.length);
Object[] arr$ = vals;
int len$ = vals.length;
for(int i$ = 0; i$ < len$; ++i$) {
Object o = arr$[i$];
this.add(o);
}
}
}
其实就是一个ArrayList,只不过是入参不定数量的。
ISpout的源码
package backtype.storm.spout;
import backtype.storm.task.TopologyContext;
import java.io.Serializable;
import java.util.Map;
public interface ISpout extends Serializable {
void open(Map var1, TopologyContext var2, SpoutOutputCollector var3);
void close();
void activate();
void deactivate();
void nextTuple();
void ack(Object var1);
void fail(Object var1);
}
对于open方法所有的Spout在初始化的时候都需要调用这个方法。
map是包含了Storm的配置信息的map,第二个TopologyContext对象提供了拓扑中组件的信息,SpoutOutputCollector提供了发射tuple的方法。
2. Bolt
2.1 业务分隔语句 SplitSentenceBolt
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
/**
* 业务代码
* 实现语句分割bolt
* BaseRichBolt 是一个简单的实现,继承这个类,就可以不用去实现本例子不需要关心的方法
*
* */
public class SplitSentenceBolt extends BaseRichBolt{
private OutputCollector collector;
/**
*
* 类同于Spout 中的open方法;
* 用于初始化;
* 初始化可以做一些例如初始化数据库连接等操作;
* 不过这个例子没有额外的操作;
* */
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 核心功能执行;
* 从哪来的数据?从订阅中的数据流中,接收到一个tuple;
* 然后这个方法读取sentence
* 然后去根据空格分隔成为单词,然后在发射出去;
* */
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words){
this.collector.emit(new Values(word));
}
}
/**
* 声明输出流;
* 这里面声明了输出流中包含了word这个字段;
*
* */
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
2.2 业务单词计数 bolt
package storm.demo.chapter1.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
/**
* 业务代码
* 实现单词计数bolt
* */
public class WordCountBolt extends BaseRichBolt{
private OutputCollector collector;
private HashMap<String, Long> counts = null;
/**
* 初始化;
* 初始化counts 这是一个map,用来存储单词和对应的计数;
* 在我们storm应用中其实很多也是各种维度分隔计数;
* 重点:
* 通常最好是在构造函数中对基本数据类型和可序列话的 对象进行复制和实例化;
* prepare进行不可序列化的对象进行实例化;
* HashMap 是可以序列化的;
* */
public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counts = new HashMap<String, Long>();
}
/**
* 获得单词 ;
* 然后获取单词的数量;
* 进行单词数量++;
* 重新刷新发射
* */
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if(count == null){
count = 0L;
}
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
2.3 业务实时上报 bolt
package storm.demo.chapter1.v1;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 实现上报的BOLT
* 对所有的单词计数生成一份报告;
* 在这边是简单的将接收到的计数BOLT发射出的计数tuple 进行存储;
* */
public class ReportBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.counts = new HashMap<String, Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = tuple.getLongByField("count");
this.counts.put(word, count);
}
/**
* 数据流末端的bolt,不会发射tuple;
* */
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
/**
* stoorm在种植一个bolt之前会调用这个方法;
* 通常用来释放bolt占用的资源,比如释放打开的句柄 或者数据库连接;
* 重点:
* 但是对于拓扑在storm集群上面运行的时候,这个cleanup方法是不可靠的;
* 不能保证会执行,涉及到了strom的容错机制;
* 如果是在开发环境中,是可以保证这个被调用的;
* */
@Override
public void cleanup() {
System.out.println("--- FINAL COUNTS ---");
List<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for (String key : keys) {
System.out.println(key + " : " + this.counts.get(key));
}
System.out.println("--------------");
}
}
2.4 相关引用类源码
BaseRichBolt 源码
package backtype.storm.topology.base;
import backtype.storm.topology.IRichBolt;
public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
public BaseRichBolt() {
}
public void cleanup() {
}
}
* 但是对于拓扑在storm集群上面运行的时候,这个cleanup方法是不可靠的; * 不能保证会执行,涉及到了strom的容错机制; * 如果是在开发环境中,是可以保证这个被调用的;
OutputCollector
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package backtype.storm.task;
import backtype.storm.tuple.Tuple;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
public class OutputCollector implements IOutputCollector {
private IOutputCollector _delegate;
public OutputCollector(IOutputCollector delegate) {
this._delegate = delegate;
}
public List<Integer> emit(String streamId, Tuple anchor, List<Object> tuple) {
return this.emit(streamId, (Collection)Arrays.asList(anchor), tuple);
}
public List<Integer> emit(String streamId, List<Object> tuple) {
return this.emit(streamId, (Collection)((List)null), tuple);
}
public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) {
return this.emit("default", anchors, tuple);
}
public List<Integer> emit(Tuple anchor, List<Object> tuple) {
return this.emit("default", anchor, tuple);
}
public List<Integer> emit(List<Object> tuple) {
return this.emit("default", tuple);
}
public void emitDirect(int taskId, String streamId, Tuple anchor, List<Object> tuple) {
this.emitDirect(taskId, streamId, (Collection)Arrays.asList(anchor), tuple);
}
public void emitDirect(int taskId, String streamId, List<Object> tuple) {
this.emitDirect(taskId, streamId, (Collection)((List)null), tuple);
}
public void emitDirect(int taskId, Collection<Tuple> anchors, List<Object> tuple) {
this.emitDirect(taskId, "default", anchors, tuple);
}
public void emitDirect(int taskId, Tuple anchor, List<Object> tuple) {
this.emitDirect(taskId, "default", anchor, tuple);
}
public void emitDirect(int taskId, List<Object> tuple) {
this.emitDirect(taskId, "default", tuple);
}
public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
return this._delegate.emit(streamId, anchors, tuple);
}
public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
this._delegate.emitDirect(taskId, streamId, anchors, tuple);
}
public void ack(Tuple input) {
this._delegate.ack(input);
}
public void fail(Tuple input) {
this._delegate.fail(input);
}
public void reportError(Throwable error) {
this._delegate.reportError(error);
}
}
对于这个发射方法,其实就是发射List类型的tuple;
我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。 在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。 在使用BaseRichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);
引用自:https://blog.csdn.net/zengqiang1/article/details/71124004
3. 拓扑Topology
package storm.demo.chapter1.v1;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import static storm.demo.utils.Utils.*;
/**
* 业务代码
* 实现拓扑;
* 定义好了输入 和计算单元bolt进行整合成为一个可以运行的拓扑;
* */
public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
/**
* 输出结果:
* --- FINAL COUNTS ---
* a : 1344
* ate : 1344
* beverages : 1344
* cold : 1344
* cow : 1344
* dog : 2688
* don't : 2687
* fleas : 2687
* has : 1344
* have : 1344
* homework : 1344
* i : 4030
* like : 2687
* man : 1344
* my : 2688
* the : 1344
* think : 1343
* --------------
*
* */
public static void main(String[] args) throws Exception {
//输入
SentenceSpout spout = new SentenceSpout();
//bolt
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
//官方类,这个类用来提供流式的接口风格的API来定义拓扑组件之间的数据流
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout);
/**
* 对于这个builder,我们首先注册一个bolt订阅
* 使用.shuffleGrouping(SENTENCE_SPOUT_ID); 来订阅语句的数据源;
* shuffleGrouping这个方法用来告诉storm,要将spout发射的tuple随机均匀的分发给SPLIT_BOLT
* 对于setBolt这个方法会注册bolt,并且返回BoltDeclarer 这个实例;
* 这边涉及到了"数据流分组"的内容;
* */
// SentenceSpout --> SplitSentenceBolt
builder.setBolt(SPLIT_BOLT_ID, splitBolt)
.shuffleGrouping(SENTENCE_SPOUT_ID);
/**
* 将特定的tuple路由到特殊的bolt实例中;
* 重点:
* 可以使用fieldsGrouping方法来保证所有的word字段值相同的tuple 会路由到同一个wordCountBolt中
* */
// SplitSentenceBolt --> WordCountBolt
builder.setBolt(COUNT_BOLT_ID, countBolt)
.fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
/**
* globalGrouping 统一路由到唯一的ReportBolt的任务重
*
* */
// WordCountBolt --> ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt)
.globalGrouping(COUNT_BOLT_ID);
//上述数据流都已经定义好,运行单词计数计算的最后一步是编译并提交到集群上
Config config = new Config();
//这边使用的是Storm本地模式 LocalCluster在本地的开发环境中来模拟一个完整的storm集群
LocalCluster cluster = new LocalCluster();
//当一个拓扑提交的时候,storm会将默认配置和config实例中的配置合并然后作为参数传递给submitTopology
//合并之后的配置将会分发给各个spout的bolt的open() prepare()方法
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
//控制执行的时长,修改这个会输出结果不一样
waitForSeconds(10);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}