[Storm]分布式单词计数(一)一个简单的storm demo

it2023-04-10  80

目录

前言:

基本概念:

1.1 Spout

1.1 业务 SentenceSpout

1.2 SentenceSpout引用的部分类源码

BaseRichSpout源码

Values源码

ISpout的源码

2. Bolt

2.1 业务分隔语句 SplitSentenceBolt

2.2 业务单词计数 bolt

2.3 业务实时上报 bolt

2.4 相关引用类源码

BaseRichBolt 源码

OutputCollector

3. 拓扑Topology


前言:

阅读《Storm分布式实时计算模式》书中的例子;

demo可以直接执行输出;

代码写在:wordcount chapter1 v1 

代码链接

基本概念:

storm 包括 拓扑 数据流 和spout(数据流生成者) bolt(运算)组成。

拓扑很像hadoop中的job,但是会一直运行下去。

Stream

stream的核心数据结构是tuple,tuple是一个或者多个键值对的列表,

Stream 是由无限制的tuple组成的序列。

Spout

代表着拓扑的入口,充当采集器的角色,连接到数据源,将数据转化为一个个tuple

并将tuple作为数据流进行发射。

主要工作是:编写代码从数据源或者API消费数据。

比如采集:应用的日志时间

spout通常不会实现业务逻辑,所以在多个topology可以复用。

bolt

运算或者函数,将一个或者多个数据流作为输入,对于数据实施运算后,选择性的输出一个或者多个数据流。

bolt可以订阅多个由spout或者其他bolt发射的数据流,这样可以建立复杂的数据流转换网络。

bolt 可以进行:过滤 链接 计算 数据库读写

1.1 Spout

1.1 业务 SentenceSpout

import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import java.util.Map; import storm.blueprints.utils.Utils; /** * BaseRichSpout相当于一个比较简单的实现 * * */ public class SentenceSpout extends BaseRichSpout { private SpoutOutputCollector collector; private String[] sentences = { "my dog has fleas", "i like cold beverages", "the dog ate my homework", "don't have a cow man", "i don't think i like fleas" }; private int index = 0; /** * 所有的storm的组件都应该实现这个接口 * 通过这个方法会告诉storm 这个组件这个类将会发射那些数据流 * 就是发射sentence这个数据流 * 这边声明的数据流,在下面算子中,也就是SplitSentenceBolt 会去获取这个值; * */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } /** * 所有的spout的组件在初始化的时候调用这个方法 * map是包含了Storm的配置信息的map,第二个TopologyContext对象提供了拓扑中组件的信息,SpoutOutputCollector提供了发射tuple的方法。 * 但是这个地方因为open只是简单地将SpoutOutputCollector对象的引用保存在变量中 * 保存在这个类中 * */ public void open(Map config, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 核心实现; * storm通过这个方法向输出的collector发射tuple * 这个意思就是我们发射当前索引对应的语句,然后递增索引指向下一个语句 * */ public void nextTuple() { //这个sentences[index]相当于tuple,其实就是一个list //emit相当于发出发射的意思 this.collector.emit(new Values(sentences[index])); index++; //这个应该是一个循环 if (index >= sentences.length) { index = 0; } Utils.waitForMillis(1); } public static void main(String[] args) { String[] sentences = { "my dog has fleas", "i like cold beverages", "the dog ate my homework", "don't have a cow man", "i don't think i like fleas" }; for (String str : sentences) { System.out.println(str); } } }

1.2 SentenceSpout引用的部分类源码

BaseRichSpout源码

package backtype.storm.topology.base; import backtype.storm.topology.IRichSpout; public abstract class BaseRichSpout extends BaseComponent implements IRichSpout { public BaseRichSpout() { } public void close() { } public void activate() { } public void deactivate() { } public void ack(Object msgId) { } public void fail(Object msgId) { } }

 

Values源码

package backtype.storm.tuple; import java.util.ArrayList; public class Values extends ArrayList<Object> { public Values() { } public Values(Object... vals) { super(vals.length); Object[] arr$ = vals; int len$ = vals.length; for(int i$ = 0; i$ < len$; ++i$) { Object o = arr$[i$]; this.add(o); } } }

其实就是一个ArrayList,只不过是入参不定数量的。

ISpout的源码

package backtype.storm.spout; import backtype.storm.task.TopologyContext; import java.io.Serializable; import java.util.Map; public interface ISpout extends Serializable { void open(Map var1, TopologyContext var2, SpoutOutputCollector var3); void close(); void activate(); void deactivate(); void nextTuple(); void ack(Object var1); void fail(Object var1); }

对于open方法所有的Spout在初始化的时候都需要调用这个方法。

map是包含了Storm的配置信息的map,第二个TopologyContext对象提供了拓扑中组件的信息,SpoutOutputCollector提供了发射tuple的方法。

 

2. Bolt

2.1 业务分隔语句 SplitSentenceBolt

import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.Map; /** * 业务代码 * 实现语句分割bolt * BaseRichBolt 是一个简单的实现,继承这个类,就可以不用去实现本例子不需要关心的方法 * * */ public class SplitSentenceBolt extends BaseRichBolt{ private OutputCollector collector; /** * * 类同于Spout 中的open方法; * 用于初始化; * 初始化可以做一些例如初始化数据库连接等操作; * 不过这个例子没有额外的操作; * */ public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; } /** * 核心功能执行; * 从哪来的数据?从订阅中的数据流中,接收到一个tuple; * 然后这个方法读取sentence * 然后去根据空格分隔成为单词,然后在发射出去; * */ public void execute(Tuple tuple) { String sentence = tuple.getStringByField("sentence"); String[] words = sentence.split(" "); for(String word : words){ this.collector.emit(new Values(word)); } } /** * 声明输出流; * 这里面声明了输出流中包含了word这个字段; * * */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }

2.2 业务单词计数 bolt

package storm.demo.chapter1.v1; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.HashMap; import java.util.Map; /** * 业务代码 * 实现单词计数bolt * */ public class WordCountBolt extends BaseRichBolt{ private OutputCollector collector; private HashMap<String, Long> counts = null; /** * 初始化; * 初始化counts 这是一个map,用来存储单词和对应的计数; * 在我们storm应用中其实很多也是各种维度分隔计数; * 重点: * 通常最好是在构造函数中对基本数据类型和可序列话的 对象进行复制和实例化; * prepare进行不可序列化的对象进行实例化; * HashMap 是可以序列化的; * */ public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counts = new HashMap<String, Long>(); } /** * 获得单词 ; * 然后获取单词的数量; * 进行单词数量++; * 重新刷新发射 * */ public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if(count == null){ count = 0L; } count++; this.counts.put(word, count); this.collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }

2.3 业务实时上报 bolt

package storm.demo.chapter1.v1; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 实现上报的BOLT * 对所有的单词计数生成一份报告; * 在这边是简单的将接收到的计数BOLT发射出的计数tuple 进行存储; * */ public class ReportBolt extends BaseRichBolt { private HashMap<String, Long> counts = null; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.counts = new HashMap<String, Long>(); } public void execute(Tuple tuple) { String word = tuple.getStringByField("word"); Long count = tuple.getLongByField("count"); this.counts.put(word, count); } /** * 数据流末端的bolt,不会发射tuple; * */ public void declareOutputFields(OutputFieldsDeclarer declarer) { // this bolt does not emit anything } /** * stoorm在种植一个bolt之前会调用这个方法; * 通常用来释放bolt占用的资源,比如释放打开的句柄 或者数据库连接; * 重点: * 但是对于拓扑在storm集群上面运行的时候,这个cleanup方法是不可靠的; * 不能保证会执行,涉及到了strom的容错机制; * 如果是在开发环境中,是可以保证这个被调用的; * */ @Override public void cleanup() { System.out.println("--- FINAL COUNTS ---"); List<String> keys = new ArrayList<String>(); keys.addAll(this.counts.keySet()); Collections.sort(keys); for (String key : keys) { System.out.println(key + " : " + this.counts.get(key)); } System.out.println("--------------"); } }

2.4 相关引用类源码

BaseRichBolt 源码

package backtype.storm.topology.base; import backtype.storm.topology.IRichBolt; public abstract class BaseRichBolt extends BaseComponent implements IRichBolt { public BaseRichBolt() { } public void cleanup() { } }

     * 但是对于拓扑在storm集群上面运行的时候,这个cleanup方法是不可靠的;      * 不能保证会执行,涉及到了strom的容错机制;      * 如果是在开发环境中,是可以保证这个被调用的;

OutputCollector

// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package backtype.storm.task; import backtype.storm.tuple.Tuple; import java.util.Arrays; import java.util.Collection; import java.util.List; public class OutputCollector implements IOutputCollector { private IOutputCollector _delegate; public OutputCollector(IOutputCollector delegate) { this._delegate = delegate; } public List<Integer> emit(String streamId, Tuple anchor, List<Object> tuple) { return this.emit(streamId, (Collection)Arrays.asList(anchor), tuple); } public List<Integer> emit(String streamId, List<Object> tuple) { return this.emit(streamId, (Collection)((List)null), tuple); } public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) { return this.emit("default", anchors, tuple); } public List<Integer> emit(Tuple anchor, List<Object> tuple) { return this.emit("default", anchor, tuple); } public List<Integer> emit(List<Object> tuple) { return this.emit("default", tuple); } public void emitDirect(int taskId, String streamId, Tuple anchor, List<Object> tuple) { this.emitDirect(taskId, streamId, (Collection)Arrays.asList(anchor), tuple); } public void emitDirect(int taskId, String streamId, List<Object> tuple) { this.emitDirect(taskId, streamId, (Collection)((List)null), tuple); } public void emitDirect(int taskId, Collection<Tuple> anchors, List<Object> tuple) { this.emitDirect(taskId, "default", anchors, tuple); } public void emitDirect(int taskId, Tuple anchor, List<Object> tuple) { this.emitDirect(taskId, "default", anchor, tuple); } public void emitDirect(int taskId, List<Object> tuple) { this.emitDirect(taskId, "default", tuple); } public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) { return this._delegate.emit(streamId, anchors, tuple); } public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) { this._delegate.emitDirect(taskId, streamId, anchors, tuple); } public void ack(Tuple input) { this._delegate.ack(input); } public void fail(Tuple input) { this._delegate.fail(input); } public void reportError(Throwable error) { this._delegate.reportError(error); } }

对于这个发射方法,其实就是发射List类型的tuple;

我们在学习ack机制的时候,我们知道Storm的Bolt有BaseBasicBolt和BaseRichBolt。 在BaseBasicBolt中,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack。 在使用BaseRichBolt需要在emit数据的时候,显示指定该数据的源tuple要加上第二个参数anchor tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple);并且需要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple);

引用自:https://blog.csdn.net/zengqiang1/article/details/71124004

3. 拓扑Topology

package storm.demo.chapter1.v1; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import static storm.demo.utils.Utils.*; /** * 业务代码 * 实现拓扑; * 定义好了输入 和计算单元bolt进行整合成为一个可以运行的拓扑; * */ public class WordCountTopology { private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String REPORT_BOLT_ID = "report-bolt"; private static final String TOPOLOGY_NAME = "word-count-topology"; /** * 输出结果: * --- FINAL COUNTS --- * a : 1344 * ate : 1344 * beverages : 1344 * cold : 1344 * cow : 1344 * dog : 2688 * don't : 2687 * fleas : 2687 * has : 1344 * have : 1344 * homework : 1344 * i : 4030 * like : 2687 * man : 1344 * my : 2688 * the : 1344 * think : 1343 * -------------- * * */ public static void main(String[] args) throws Exception { //输入 SentenceSpout spout = new SentenceSpout(); //bolt SplitSentenceBolt splitBolt = new SplitSentenceBolt(); WordCountBolt countBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); //官方类,这个类用来提供流式的接口风格的API来定义拓扑组件之间的数据流 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, spout); /** * 对于这个builder,我们首先注册一个bolt订阅 * 使用.shuffleGrouping(SENTENCE_SPOUT_ID); 来订阅语句的数据源; * shuffleGrouping这个方法用来告诉storm,要将spout发射的tuple随机均匀的分发给SPLIT_BOLT * 对于setBolt这个方法会注册bolt,并且返回BoltDeclarer 这个实例; * 这边涉及到了"数据流分组"的内容; * */ // SentenceSpout --> SplitSentenceBolt builder.setBolt(SPLIT_BOLT_ID, splitBolt) .shuffleGrouping(SENTENCE_SPOUT_ID); /** * 将特定的tuple路由到特殊的bolt实例中; * 重点: * 可以使用fieldsGrouping方法来保证所有的word字段值相同的tuple 会路由到同一个wordCountBolt中 * */ // SplitSentenceBolt --> WordCountBolt builder.setBolt(COUNT_BOLT_ID, countBolt) .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); /** * globalGrouping 统一路由到唯一的ReportBolt的任务重 * * */ // WordCountBolt --> ReportBolt builder.setBolt(REPORT_BOLT_ID, reportBolt) .globalGrouping(COUNT_BOLT_ID); //上述数据流都已经定义好,运行单词计数计算的最后一步是编译并提交到集群上 Config config = new Config(); //这边使用的是Storm本地模式 LocalCluster在本地的开发环境中来模拟一个完整的storm集群 LocalCluster cluster = new LocalCluster(); //当一个拓扑提交的时候,storm会将默认配置和config实例中的配置合并然后作为参数传递给submitTopology //合并之后的配置将会分发给各个spout的bolt的open() prepare()方法 cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); //控制执行的时长,修改这个会输出结果不一样 waitForSeconds(10); cluster.killTopology(TOPOLOGY_NAME); cluster.shutdown(); } }

 

 

最新回复(0)