您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
Storm大数据实时处理
 
  2176  次浏览      16
 2019-2-20
 
编辑推荐:
本文来自于jdon,本文主要介绍了Storm、Storm和Hadoop的区别、Storm的容错性和可靠性等相关内容。

什么是Storm?

Storm是:

快速且可扩展伸缩

容错

确保消息能够被处理

易于设置和操作

开源的分布式实时计算系统

- 最初由Nathan Marz开发

- 使用Java 和 Clojure 编写

Storm和Hadoop主要区别是实时和批处理的区别:

Storm概念 组成:Spout 和Bolt组成Topology。

Tuple是Storm的数据模型,如['jdon',12346]

多个Tuple组成事件流:

Spout是读取需要分析处理的数据源,然后转为Tuples,这些数据源可以是Web日志、 API调用、数据库等等。Spout相当于事件流的生产者。

Bolt 处理Tuples然后再创建新的Tuples流,Bolt相当于事件流的消费者。

Bolt 作为真正业务处理者,主要实现大数据处理的核心功能,比如转换数据,应用相应过滤器,计算和聚合数据(比如统计总和等等) 。

以Twitter的某个Tweet为案例,看看Storm如何处理:

这些tweett贴内容是:“No Small Cell Lung #Cancer(没有小细胞肺癌#癌症)” "An #OnCology Consult...."

这些贴被Spout读取以后,产生Tuple,字段名是tweet,内容是"No Small Cell Lung #Cancer",格式类似:['No Small Cell Lung #Cancer',133221]。

然后进入被流 消费者Bolt进行处理,第一个Bolt是SplitSentence,将tuple内容进行分离,结果成为:一个个单词:"No" "Small" "Cell" "Lung" "#Cancer" ;然后经过第二个Bolt进行过滤HashTagFilter处理,Hash标签是单词中用#标注的,也就是Cancer;再经过HasTagCount计数,可以本地内存缓存这个计数结果,最后通过PrinterBolt打印出标签单词统计结果 。

我们使用Stom所要做的就是编制Spout和Bolt代码:

public class RandomSentenceSpout extends BaseRichSpout {
  SpoutOutputCollector collector;
  Random random;

//读入外部数据
  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    this.collector = collector;
    random = new Random();
  }
  //产生Tuple
   public void nextTuple() {
    String[] sentences = new String[] {
      "No Small Cell Lung #Cancer",
      "An #OnCology Consultant apple a day keeps the doctor away",
      "four score and seven years ago",
      "snow white and the seven dwarfs",
      "i am at two with nature"
    };
    String tweet = sentences[random.nextInt(sentences.length)];
    //定义字段名"tweet" 的值
    collector.emit(new Values(tweet));

 }

  // 定义字段名"tweet"

 public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("tweet"));
  }
  @Override
  public void ack(Object msgId) {}
  @Override
  public void fail(Object msgId) {}
}

下面是Bolt的代码编写:

public class SplitSentenceBolt extends BaseRichBolt {
  OutputCollector collector;

  @Override
  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
  }
  @Override 消费者激活主要方法:分离成单个单词
  public void execute(Tuple input) {
    for (String s : input.getString(0).split("\\s")) {
      collector.emit(new Values(s));
    }
  }
  @Override 定义新的字段名
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
  }

最后是装配运行Spout和Bolt的客户端调用代码:

public class WordCountTopology {
  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("tweet", new RandomSentenceSpout(), 2);
    builder.setBolt("split", new SplitSentenceBolt(), 4)
      .shuffleGrouping("tweet")
      .setNumTasks(8);
    builder.setBolt("count", new WordCountBolt(), 6)
      .fieldsGrouping("split", new Fields("word"));
    ..设置多个Bolt

    Config config = new Config();
    config.setNumWorkers(4);
    
    StormSubmitter.submitTopology("wordcount", config, builder.createTopology());
// Local testing
//LocalCluster cluster = new LocalCluster();
// cluster.submitTopology("wordcount", config, builder.createTopology());
//Thread.sleep(10000);
//cluster.shutdown();
}
}

在这个代码中定义了一些参数比如Works的数目是4,其含义在后面详细分析。

下面我们要将上面这段代码发布部署到Storm中,首先了解Storm物理架构图:

Nimbus是一个主后台处理器,主要负责:

1.发布分发代码

2.分配任务

3.监控失败。

Supervisor是负责当前这个节点的后台工作处理器的监听。

Work类似Java的线程,采取JDK的Executor 。

下面开始将我们的代码部署到这个网络拓扑中:

将代码Jar包上传到Nimbus的inbox,包括所有的依赖包,然后提交。

Nimbus将保存在本地文件系统,然后开始配置网络拓扑,分配开始拓扑。

见下图:

Nimbus服务器将拓扑Jar 配置和结构下载到 Supervisor,负载平衡ZooKeeper分配某个特定的Supervisor服务器,而Supervisor开始基于配置分配Work,Work调用JDK的Executor启动线程,开始任务处理。

下面是我们代码对拓扑分配的参数示意图:

Executor启动的线程数目是12个,组件的实例是16个,那么如何在实际服务器中分配呢?如下图:

图中RsSpout代表我们的代码中RandomSentenceSpout;SplitSentenceBolt简写为SSbolt;

现在开始分析Storm内部架构,首先看看Work之间的消息传递,如下图,

Work之间的通讯是通过ZeroMQ,但是Yahoo后来发现使用异步的Netty能够提升Storm一倍性能,数据使用Kryo进行序列化,本地通讯使用Lmax的Disruptor ,内部无需序列化。

容错性

如下图,executor发送心跳到Zookeeper,Supervisor从本地文件读取所在服务器的worker心跳状态,然后同步分配发送到zooKeeper。Nimbus监控集群状态。这样能确保worker一直活着。

如果某个节点也就是服务器没有心跳,Nimbus将重新分配新的服务器上线工作。

如果某个节点服务器中work没有心跳,那么Supervisor将负责重启线程。

如果某个Supervisor完蛋,整个处理正常,但是分配的同步工作就无法进行了。

如果Nimbus崩溃,整个系统可以运行,但是拓扑分配工作无法进行了。

可靠性:确保消息被处理

public class RandomSentenceSpout extends BaseRichSpout {
  public void nextTuple() {
     UUID msgId = getMsgId();//用消息ID发送消息
    collector.emit(new Values(tweet), msgId);
  }
  public void ack(Object msgId) {
    // Do something with acked message id. 确认消息ID
  }
  public void fail(Object msgId) {
     // Do something with failed message id. 消息ID失败了
  }
}

public class SplitSentenceBolt extends BaseRichBolt {
  public void execute(Tuple input) {
    for (String s : input.getString(0).split("\\s")) {
      collector.emit(input, new Values(s));
    }
    //当整个词语都被切分后,确认输入的事件已经被接受处理。
    collector.ack(input);
  }
}

下面是一个Ack确认流程,注意到Acker Implicit bolt。

对于一个树形结构Tuple流,也就是Tuple里面嵌套Tuple。

如果事件被下一个节点成功接受和处理,这个节点将更新相应初始事件的签名,通过异或操作,将输入事件的ID和所有基于该输入事件产生的所有事件的ID进行异或操作,如下图,事件 01111 产生子事件 01100, 10010, 和 00010, 这样事件 01111的签名是11100 (= 01111 (initial value) xor 01111 xor 01100 xor 10010 xor 00010).

当Ack值变成0,Acker implicit bolt就知道tuple树形数据集合全部被处理完成,一个事务确保可靠结束。例如语句分成一个个单词全部完成。

Storm的集群设置

设置ZooKeeper cluster

(1)安装Storm依赖的库包到服务器上:

- ZeroMQ 2.1.7 and JZMQ

- Java 6 and Python 2.6.6

- unzip

(2)下载解压Storm。

填写强制性配置到storm.yaml

用storm脚本启动守护流程的监督

通过Web界面能够观察管理拓扑网络情况和组件情况。

   
2176 次浏览       16
相关文章

基于EA的数据库建模
数据流建模(EA指南)
“数据湖”:概念、特征、架构与案例
在线商城数据库系统设计 思路+效果
 
相关文档

Greenplum数据库基础培训
MySQL5.1性能优化方案
某电商数据中台架构实践
MySQL高扩展架构设计
相关课程

数据治理、数据架构及数据标准
MongoDB实战课程
并发、大容量、高性能数据库设计与优化
PostgreSQL数据库实战培训