您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
 
   
 
 
     
   
 订阅
  捐助
storm-编程入门
 
火龙果软件    发布于 2014-09-01
  54  次浏览      13
 

一 编程接口

Spout 接口

Spout组件的实现可以通过继承BaseRichSpout类或者其他*Spout类来完成,也可以通过实现IRichSpout接口来实现。需要根据情况实方法有:

open方法

当一个Task被初始化的时候会调用此open方法。一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。示例如下:

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {   
_collector = collector;
}

declareOutputFields方法

此方法用于声明当前Spout的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的,其中参数为域名。示例如下:

public void declareOutputFields(OutputFieldsDeclarer declarer) {  
declarer.declare(new Fields("word"));
}

getComponentConfiguration方法

此方法定义在BaseComponent类内,用于声明针对当前组件的特殊的Configuration配置。示例如下:

public Map<String, Object> getComponentConfiguration() {  
if(!_isDistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 3);
return ret;
} else {
return null;
}
}

这里便是设置了Topology中当前Component的线程数量上限。

nextTuple方法

这是Spout类中最重要的一个方法。发射一个Tuple到Topology都是通过这个方法来实现的。示例如下:

public void nextTuple() {  
Utils.sleep(100);
final String[] words = new String[] {"twitter","facebook","google"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}

这里便是从一个数组中随机选取一个单词作为Tuple,然后通过_collector发送到Topology。

另外,除了上述几个方法之外,还有ack、fail和close方法等。Storm在监测到一个Tuple被成功处理之后会调用ack方法,处理失败会调用fail方法,这两个方法在BaseRichSpout类中已经被隐式的实现了。

Bolts 接口

Bolt类接收由Spout或者其他上游Bolt类发来的Tuple,对其进行处理。Bolt组件的实现可以通过继承BasicRichBolt类或者IRichBolt接口来完成。Bolt类需要实现的主要方法有:

prepare方法

此方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。示例如下:

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {         
_collector = collector;
}

注:Bolt中Tuple的发送可以在prepare方法中、execute方法中、cleanup等方法中进行,一般都是些在execute中。

declareOutputFields 方法

用于声明当前Bolt发送的Tuple中包含的字段,和Spout中类似。示例如下:

public void declareOutputFields(OutputFieldsDeclarer declarer) {  
declarer.declare(new Fields("obj", "count", "length"));
}

此例说明当前Bolt类发送的Tuple包含了三个字段:"obj", "count", "length"。

getComponentConfiguration方法

和Spout类一样,在Bolt中也可以有getComponentConfiguration方法。示例如下:

public Map<String, Object> getComponentConfiguration() {  
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
return conf;

此例定义了从系统组件“_system”的“_tick”流中发送Tuple到当前Bolt的频率,当系统需要每隔一段时间执行特定的处理时,就可以利用这个系统的组件的特性来完成。

execute方法

这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的。此时,有两种情况,一种是emit方法中有两个参数,另一个种是有一个参数。

(1) emit有一个参数:此唯一的参数是发送到下游Bolt的Tuple,此时,由上游发来的旧的Tuple在此隔断,新的Tuple和旧的Tuple不再属于同一棵Tuple树。新的Tuple另起一个新的Tuple树。

(2) emit有两个参数:第一个参数是旧的Tuple的输入流,第二个参数是发往下游Bolt的新的Tuple流。此时,新的Tuple和旧的Tuple是仍然属于同一棵Tuple树,即如果下游的Bolt处理Tuple失败,则会向上传递到当前Bolt,当前Bolt根据旧的Tuple流继续往上游传递,申请重发失败的Tuple。保证Tuple处理的可靠性。

这两种情况要根据自己的场景来确定。示例如下:

public void execute(Tuple tuple) {  
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}

注:输入Tuple一般在最后一行被ack

public void execute(Tuple tuple) {  
_collector.emit(new Values(tuple.getString(0) + "!!!"));
}

此外还有ack方法、fail方法、cleanup方法等。其中cleanup方法和Spout中的close方法类似,都是在当前Component关闭时调用,但是针对实时计算来说,除非一些特殊的场景要求以外,这两个方法一般都很少用到。

注:cleanup方法在bolt被关闭的时候调用, 它应该清理所有被打开的资源。但是集群不保证这个方法一定会被执行。比如执行task的机器down掉了,那么根本就没有办法来调用那个方法。cleanup设计的时候是被用来在local mode的时候才被调用(也就是说在一个进程里面模拟整个storm集群), 并且你想在关闭一些topology的时候避免资源泄漏。

有几点需要说明的地方:

1.每个组件(Spout或者Bolt)的构造方法和declareOutputFields方法都只被调用一次。

2.open方法、prepare方法的调用是多次的。入口函数中设定的setSpout或者setBolt里的并行度参数指的是executor的数目,即负责运行组件中的task的线程的数目,此数目是多少,上述的两个方法就会被调用多少次,在每个executor运行的时候调用一次。相当于一个线程的构造方法。

3.nextTuple方法、execute方法是一直被运行的,nextTuple方法不断的发射Tuple,Bolt的execute不断的接收Tuple进行处理。只有这样不断地运行,才会产生无界的Tuple流,体现实时性。

4.在提交了一个topology之后,Storm就会创建spout/bolt实例并进行序列化。之后,将序列化的component发送给所有的任务所在的机器(即Supervisor节点),在每一个任务上反序列化component。

5. Spout和Bolt之间、Bolt和Bolt之间的通信,是通过zeroMQ的消息队列实现的。

二 作业的提交

下面的代码展示了以本地运行方式提交一个Topology作业

//Topology definition  
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader",new WordReader());
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCount(),1)
.fieldsGrouping("word-normalizer", new Fields("word"));

//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(true);

//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();

此例中的builder是TopologyBuilder对象,通过它的createTopology方法可以创建一个Topology对象,同时此builder还要定义当前Topology中用到的Spout和Bolt对象,分别通过setSpout方法和setBolt方法来完成。

setSpout方法和setBolt方法中的第一个参数是当前的Component组件的Stream流ID号;第二个参数是具体的Component实现类的构造;第三个参数是当前Component的并行执行的线程数目,Storm会根据这个数字的累加和来确定Topology的Task数目。

通过一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是一样的。通过调用submitTopology方法来提交topology, 它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的topology本身。

下面对worker、executor以及task做一下说明:

worker:每个worker都属于一个特定的Topology,每个Supervisor节点的worker可以有多个,每个worker使用一个单独的端口,它对Topology中的每个component运行一个或者多个executor线程来提供task的运行服务。其数目可以通过设置yaml中的topology.workers属性以及在代码中通过Config的setNumWorkers方法设定。

executor:产生于worker进程内部的线程,会执行同一个component的一个或者多个task。 其数目可以在Topology的入口类中setBolt、setSpout方法的最后一个参数指定,不指定的话,默认为1;

task:实际的数据处理由task完成,在Topology的生命周期中,每个组件的task数目是不会发生变化的,而executor的数目却不一定。executor数目小于等于task的数目,默认情况下,二者是相等的。在代码中通过TopologyBuilder的setNumTasks方法设定具体某个组件的task数目;

有几点需要说明的地方:

1.Storm提交后,会把代码首先存放到Nimbus节点的inbox目录下,之后,会把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的stormdist目录中,在此目录中同时还有序列化之后的Topology代码文件;

2.在设定Topology所关联的Spouts和Bolts时,可以同时设置当前Spout和Bolt的executor数目和task数目,默认情况下,一个Topology的task的总和是和executor的总和一致的。之后,系统根据worker的数目,尽量平均的分配这些task的执行。worker在哪个supervisor节点上运行是由storm本身决定的;

3. 任务分配好之后,Nimbes节点会将任务的信息提交到zookeeper集群,同时在zookeeper集群中会有workerbeats节点,这里存储了当前Topology的所有worker进程的心跳信息;

4. Supervisor节点会不断的轮询zookeeper集群,在zookeeper的assignments节点中保存了所有Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor通过轮询此节点的内容,来领取自己的任务,启动worker进程运行;

5.一个Topology运行之后,就会不断的通过Spouts来发送Stream流,通过Bolts来不断的处理接收到的Stream流,Stream流是无界的。最后一步会不间断的执行,除非手动结束Topology。

6.通过在Nimbus节点利用如下命令来终止一个Topology的运行:storm kill topologyName kill之后,可以通过UI界面查看topology状态,会首先变成KILLED状态,在清理完本地目录和zookeeper集群中的和当前Topology相关的信息之后,此Topology就会彻底消失了。

三 分组策略

1.shuffleGrouping 随机分组

builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");

它只有一个参数(数据源组件),并且数据源会向随机选择的bolt发送元组,保证每个消费者收到近似数量的元组。

2.fieldsGrouping 域数据流组

builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGrouping("word-normalizer", new Fields("word"));

域数据流组允许你基于元组的一个或多个域控制如何把元组发送给bolts。它保证拥有相同域组合的值集发送给同一个bolt。

注: 在域数据流组中的所有域集合必须存在于数据源的域声明中

3.allGrouping 全部数据流组

builder.setBolt("word-counter", new WordCounter(),2)
.fieldsGroupint("word-normalizer",new Fields("word"))
.allGrouping("signals-spout","signals");

全部数据流组,为每个接收数据的实例复制一份元组副本。这种分组方式用于向所有bolts发送信号。比如,你要刷新缓存,你可以向所有的bolts发送一个刷新缓存信号。

4.customGrouping 自定义数据流组

builder.setBolt("word-normalizer", new WordNormalizer())
.customGrouping("word-reader", new ModuleGrouping());

5.directGrouping 直接数据流组

builder.setBolt("word-counter", new WordCounter(),2)
.directGrouping("word-normalizer");

这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。

6.全局数据流组

全局数据流组把所有数据源创建的元组发送给单一目标实例(即拥有最低ID的任务)。

四 配置选项

在运行Topology之前,可以通过一些参数的配置来调节运行时的状态,参数的配置是通过Storm框架部署目录下的conf/storm.yaml文件来完成的。在此文件中可以配置运行时的Storm本地目录路径、运行时Worker的数目等。

在代码中,也可以设置Config的一些参数,但是优先级是不同的,不同位置配置Config参数的优先级顺序为:

default.yaml < storm.yaml <Topology内部的configuration <内部组件的special configuration < 外部组件的special configuration

在storm.yaml中常用的几个选项为:

   
54 次浏览       13
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训
最新课程计划
信息架构建模(基于UML+EA)3-21[北京]
软件架构设计师 3-21[北京]
图数据库与知识图谱 3-25[北京]
业务架构设计 4-11[北京]
SysML和EA系统设计与建模 4-22[北京]
DoDAF规范、模型与实例 5-23[北京]

MySQL索引背后的数据结构
MySQL性能调优与架构设计
SQL Server数据库备份与恢复
让数据库飞起来 10大DB2优化
oracle的临时表空间写满磁盘
数据库的跨平台设计
更多...   


并发、大容量、高性能数据库
高级数据库架构设计师
Hadoop原理与实践
Oracle 数据仓库
数据仓库和数据挖掘
Oracle数据库开发与管理


GE 区块链技术与实现培训
航天科工某子公司 Nodejs高级应用开发
中盛益华 卓越管理者必须具备的五项能力
某信息技术公司 Python培训
某博彩IT系统厂商 易用性测试与评估
中国邮储银行 测试成熟度模型集成(TMMI)
中物院 产品经理与产品管理
更多...