±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚ²©¿ÍÔ°,±¾ÎÄͨ¹ý±¾µØ²âÊÔ´úÂ룬¼òµ¥µÄ½éÉÜÁËstormµÄѧϰ£¬Ï£Íû¶ÔÄúµÄѧϰÓаïÖú¡£
|
|
1.HADOOPÓëSTORM±È½Ï
Êý¾ÝÀ´Ô´£ºHADOOP´¦ÀíµÄÊÇHDFSÉÏTB¼¶±ðµÄÊý¾Ý(ÀúÊ·Êý¾Ý)£¬STORMÊÇ´¦ÀíµÄÊÇʵʱÐÂÔöµÄijһ±ÊÊý¾Ý(ʵʱÊý¾Ý)£¬´¦ÀíһЩ¼òµ¥µÄÒµÎñÂß¼£»
´¦Àí¹ý³Ì£ºHADOOPÊÇ·ÖMAP½×¶Îµ½REDUCE½×¶Î£¬STORMÊÇÓÉÓû§¶¨Òå´¦ÀíÁ÷³Ì£¬Á÷³ÌÖпÉÒÔ°üº¬¶à¸ö²½Ö裬ÿ¸ö²½Öè¿ÉÒÔÊÇÊý¾ÝÔ´(SPOUT)»ò´¦ÀíÂß¼(BOLT)£»
ÊÇ·ñ½áÊø£ºHADOOP×îºóÊÇÒª½áÊøµÄ£¬STORMÊÇûÓнáÊø×´Ì¬£¬µ½×îºóÒ»²½Ê±£¬¾ÍÍ£ÔÚÄÇ£¬Ö±µ½ÓÐÐÂÊý¾Ý½øÈëʱÔÙ´ÓÍ·¿ªÊ¼£¬£¨SPOUTһֱѻ·nextTuple()·½·¨£¬BOLTµ±ÓнÓÊܵ½ÏûÏ¢¾Íµ÷ÓÃexecute(Tuple
input£©·½·¨£©£»
´¦ÀíËÙ¶È£ºHADOOPÊÇÒÔ´¦ÀíHDFSÉÏTB¼¶±ðÊý¾ÝΪĿµÄ£¬´¦ÀíËÙ¶ÈÂý£¬STORMÊÇÖ»Òª´¦ÀíÐÂÔöµÄijһ±ÊÊý¾Ý¼´¿É£¬¿ÉÒÔ×öµ½ºÜ¿ì£»
ÊÊÓó¡¾°£ºHADOOPÊÇÔÚÒª´¦ÀíÅúÁ¿Êý¾ÝʱÓõ쬲»½²¾¿Ê±Ð§ÐÔ£¬STORMÊÇÒª´¦ÀíijһÐÂÔöÊý¾ÝʱÓõģ¬Òª½²Ê±Ð§ÐÔ£»
2.StormµÄÉè¼ÆË¼Ïë
StormÊǶÔÁ÷StreamµÄ³éÏó£¬Á÷ÊÇÒ»¸ö²»¼ä¶ÏµÄÎÞ½çµÄÁ¬Ðøtuple£¬×¢ÒâStormÔÚ½¨Ä£Ê¼þÁ÷ʱ£¬°ÑÁ÷ÖеÄʼþ³éÏóΪtuple¼´Ôª×é¡£
Storm½«Á÷ÖÐÔªËØ³éÏóΪTuple£¬Ò»¸ötuple¾ÍÊÇÒ»¸öÖµÁбívalue list£¬listÖеÄÿ¸övalue¶¼ÓÐÒ»¸öname£¬²¢ÇÒ¸Ãvalue¿ÉÒÔÊÇ»ù±¾ÀàÐÍ£¬×Ö·ûÀàÐÍ£¬×Ö½ÚÊý×éµÈ£¬µ±È»Ò²¿ÉÒÔÊÇÆäËû¿ÉÐòÁл¯µÄÀàÐÍ¡£
StormÈÏΪÿ¸östream¶¼ÓÐÒ»¸östreamÔ´£¬Ò²¾ÍÊÇÔʼԪ×éµÄÔ´Í·£¬ËùÒÔËü½«Õâ¸öÔ´Í·³ÆÎªSpout¡£
ÓÐÁËÔ´Í·¼´spoutÒ²¾ÍÊÇÓÐÁËstream£¬ÄÇô¸ÃÈçºÎ´¦ÀístreamÄÚµÄtupleÄØ¡£½«Á÷µÄ״̬ת»»³ÆÎªBolt£¬bolt¿ÉÒÔÏû·ÑÈÎÒâÊýÁ¿µÄÊäÈëÁ÷£¬Ö»Òª½«Á÷·½Ïòµ¼Ïò¸Ãbolt£¬Í¬Ê±ËüÒ²¿ÉÒÔ·¢ËÍеÄÁ÷¸øÆäËûboltʹÓã¬ÕâÑùÒ»À´£¬Ö»Òª´ò¿ªÌض¨µÄspout£¨¹Ü¿Ú£©ÔÙ½«spoutÖÐÁ÷³öµÄtupleµ¼ÏòÌØ¶¨µÄbolt£¬ÓÖbolt¶Ôµ¼ÈëµÄÁ÷×ö´¦ÀíºóÔÙµ¼ÏòÆäËûbolt»òÕßÄ¿µÄµØ¡£
ÒÔÉÏ´¦Àí¹ý³Ìͳ³ÆÎªTopology¼´ÍØÆË¡£ÍØÆËÊÇstormÖÐ×î¸ß²ã´ÎµÄÒ»¸ö³éÏó¸ÅÄËü¿ÉÒÔ±»Ìá½»µ½storm¼¯ÈºÖ´ÐУ¬Ò»¸öÍØÆË¾ÍÊÇÒ»¸öÁ÷ת»»Í¼£¬Í¼ÖÐÿ¸ö½ÚµãÊÇÒ»¸öspout»òÕßbolt£¬Í¼Öеı߱íʾbolt¶©ÔÄÁËÄÄЩÁ÷£¬µ±spout»òÕßbolt·¢ËÍÔª×éµ½Á÷ʱ£¬Ëü¾Í·¢ËÍÔª×鵽ÿ¸ö¶©ÔÄÁ˸ÃÁ÷µÄbolt£¨Õâ¾ÍÒâζ×Ų»ÐèÒªÎÒÃÇÊÖ¹¤À¹ÜµÀ£¬Ö»ÒªÔ¤Ïȶ©ÔÄ£¬spout¾Í»á½«Á÷·¢µ½Êʵ±boltÉÏ£©¡£
ÍØÆËµÄÿ¸ö½Úµã¶¼ÒªËµÃ÷ËüËù·¢Éä³öµÄÔª×éµÄ×ֶεÄname£¬ÆäËû½ÚµãÖ»ÐèÒª¶©ÔĸÃname¾Í¿ÉÒÔ½ÓÊÕ´¦Àí¡£
3.Á÷´¦Àí¹ý³Ì
4.StormµÄ»ù´¡¸ÅÄî
Topology : Ï൱ÓÚÒ»¸öÒµÎñÁ÷³ÌÏîÄ¿£¬Ï൱ÓÚhadoopÖÐMapperReduceÖеÄjob
Stream:ÏûÏ¢Á÷£¬ÊÇÒ»¸öûÓб߽çµÄtupleÐòÁУ¬ÕâЩtuples»á±»ÒÔÒ»ÖÖ·Ö²¼Ê½µÄ·½Ê½²¢Ðеش´½¨ºÍ´¦Àí
tuple:¾ÍÊÇÊý¾ÝµÄµ¥Î»£¬ÐèҪÿһ¸öÐèÒª´¦ÀíµÄÊý¾ÝµÄ·â×°ÔÚtupleÖÐ
Spouts ÏûÏ¢Ô´£¬ÊÇÏûÏ¢Éú²úÕߣ¬Ëû»á´ÓÒ»¸öÍⲿԴ¶ÁÈ¡Êý¾Ý²¢ÏòtopologyÀï̾̾·¢³öÏûÏ¢£ºtuple
Bolts ÏûÏ¢´¦ÀíÕߣ¬ËùÓеÄÏûÏ¢´¦ÀíÂß¼±»·â×°ÔÚboltsÀïÃæ£¬´¦ÀíÊäÈëµÄÊý¾ÝÁ÷²¢²úÉúеÄÊä³öÊý¾ÝÁ÷,¿ÉÖ´ÐйýÂË£¬¾ÛºÏ£¬²éѯÊý¾Ý¿âµÈ²Ù×÷
Task ÿһ¸öSpoutºÍBolt»á±»µ±×÷ºÜ¶àtaskÔÚÕû¸ö¼¯ÈºÀïÃæÖ´ÐÐ,ÿһ¸ötask¶ÔÓ¦µ½Ò»¸öÏß³Ì.
Stream groupings ÏûÏ¢·Ö·¢²ßÂÔ,¶¨ÒåÒ»¸öTopologyµÄÆäÖÐÒ»²½ÊǶ¨Òåÿ¸ötuple½ÓÊÜʲôÑùµÄÁ÷×÷ΪÊäÈë,stream
grouping¾ÍÊÇÓÃÀ´¶¨ÒåÒ»¸östreamÓ¦¸ÃÈçºÎ·ÖÅ䏸BoltsÃÇ.
5.±¾µØ²âÊÔ´úÂë
package
stormNew;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class LocalTopology {
public static void main(String[] args) {
//×é×°Topology
TopologyBuilder build = new TopologyBuilder();
//¶¨ÒåspoutµÄid
build.setSpout("spout", new Spout());
//¶¨ÒåboltµÄid,ʹÓÃnew Fields("field")×ֶνøÐзÖ×é
build.setBolt("bolt", new Bolt()).fieldsGrouping("spout",
new Fields("field"));
try {
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalTopology",
conf, build.createTopology());
// Config stormConf = new Config();
// stormConf.setNumWorkers(2);
// StormSubmitter.submitTopology("luluTology",
stormConf,build.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
public static class Spout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
private int i;
private HashMap<Integer,Integer> map =
new HashMap<Integer,Integer>();
//spoutÖÐÑ»·Õâ¸ö·½·¨£¬½øÐÐÏûÏ¢»ñÈ¡Óë·¢ËÍ
public void nextTuple() {
// System.err.println("Spout:"+i);
//×÷ΪÿһÌõÏûÏ¢µÄΨһ±êʶ£¬ÓÃÓÚackµÄÏûϢȷÈÏ»úÖÆ
int mgsid = i;
//·¢ËÍtupleÏûÏ¢µ½boltÖд¦Àí
this.collector.emit(new Values(i++,i%3),mgsid);
//spout×ÔÉíά»¤×ÅÏûÏ¢Óë±êʶ֮¼äµÄ¹ØÏµ
map.put(mgsid, i);
try {
//ÐÝÃßһϣ¬ÇåÎú¿´³öЧ¹û
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
//¶¨Òå·¢ËÍ×ֶεÄÃû³Æ
outputFieldsDeclarer.declare(new Fields("num","field"));
}
@Override
//µ±boltµ÷ÓõÄack·½ÃæÊ±£¬»Øµ÷spoutÖеÄack·½·¨
public void ack(Object msgId) {
System.out.println("È·ÈÏÐÅÏ¢-----------");
}
@Override
//µ±boltµ÷ÓõÄfail·½ÃæÊ±£¬»Øµ÷spoutÖеÄfail·½·¨
public void fail(Object msgId) {
System.out.println("ÏûϢʧ°Ü-----------"+map.get(msgId));
}
}
public static class Bolt extends BaseRichBolt{
private Map conf;
private TopologyContext context;
private OutputCollector collector;
private int sum = 0;
//bolt½ÓÊܶ¯spout·¢Ë͹ýÀ´µÄÏûÏ¢¾Íµ÷execute
public void execute(Tuple input) {
// TODO Auto-generated method stub
//ͨ¹ý×Ö¶ÎÃû³ÆÀ´»ñÈ¡Êý¾Ý
int num = input.getIntegerByField("num");
System.err.println("--------------------num:"+(num));
/**
* ΪÁ˸üºÃµÄ¿´ÊéackÏûϢȷÈÏ»úÖÆµÄЧ¹û£¬ËùÒÔÖ±½Óµ÷ÓÃackÓëfail·½·¨
*
* ĬÈϸ÷½·¨ÊÇÕâÑù½øÐÐackµÄµ÷ÓÃ
* try{
* ÒµÎñÂß¼
* this.collector.ack(input);
*}catch(){
* this.collector.fail(input);
*}
*/
if(num >=10 && num <=20){
//System.err.println("sum:"+(sum+=num));
this.collector.ack(input);
}else{
this.collector.fail(input);
}
}
public void prepare(Map conf, TopologyContext
context, OutputCollector collector) {
// TODO Auto-generated method stub
this.conf = conf;
this.context = context;
this.collector = collector;
}
public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
// TODO Auto-generated method stub
}
}
} |
6.Storm¼¯Èº½á¹¹
Ö÷½Úµã£¨Nimbus£©:Nimbus¸ºÔðÔÚ¼¯Èº·¶Î§ÄÚ·Ö·¢´úÂ롢Ϊworker·ÖÅäÈÎÎñºÍ¹ÊÕϼà²â
´Ó½Úµã£¨Supervisor£©:Supervisor¼àÌý·ÖÅ䏸ËüËùÔÚ»úÆ÷µÄ¹¤×÷£¬»ùÓÚNimbus·ÖÅ䏸ËüµÄÊÂÇéÀ´¾ö¶¨Æô¶¯»òÍ£Ö¹¹¤×÷Õß½ø³Ì¡£Ã¿¸ö¹¤×÷Õß½ø³ÌÖ´ÐÐÒ»¸ötopologyµÄ×Ó¼¯£¨Ò²¾ÍÊÇÒ»¸ö×ÓÍØÆË½á¹¹£©£»Ò»¸öÔËÐÐÖеÄtopologyÓÉÐí¶à¿ç¶à¸ö»úÆ÷µÄ¹¤×÷Õß½ø³Ì×é³É¡£
Æô¶¯¼¯Èº
ÔÚnimbus½ÚµãÖ´ÐÐ"nohup bin/storm nimbus >/dev/null
2>&1 &"Æô¶¯Nimbusºǫ́³ÌÐò£¬²¢·Åµ½ºǫִ́ÐÐ
ÔÚsupervisor½ÚµãÖ´ÐÐ"nohup bin/storm supervisor >/dev/null
2>&1 &"Æô¶¯Supervisorºǫ́³ÌÐò£¬²¢·Åµ½ºǫִ́ÐУ»
ÔÚnimbus½ÚµãÖ´ÐÐ"nohup bin/storm ui >/dev/null
2>&1 &"Æô¶¯UIºǫ́³ÌÐò£¬²¢·Åµ½ºǫִ́ÐУ¬Æô¶¯ºó¿ÉÒÔͨ¹ýhttp://{nimbus
host}:8080¹Û²ì¼¯ÈºµÄworker×ÊԴʹÓÃÇé¿ö¡¢TopologiesµÄÔËÐÐ״̬µÈÐÅÏ¢¡£
ÔÚËùÓнڵãÖ´ÐÐ"nohup bin/storm logviewer >/dev/null
2>&1 &"Æô¶¯logºǫ́³ÌÐò£¬²¢·Åµ½ºǫִ́ÐУ¬Æô¶¯ºó¿ÉÒÔͨ¹ýhttp://{host}:8000¹Û²ìÈÕÖ¾ÐÅÏ¢¡£(nimbus½Úµã¿ÉÒÔ²»ÓÃÆô¶¯logviewer½ø³Ì£¬ÒòΪlogviewer½ø³ÌÖ÷ÒªÊÇΪÁË·½±ã²é¿´ÈÎÎñµÄÖ´ÐÐÈÕÖ¾£¬ÕâЩִÐÐÈÕÖ¾¶¼ÔÚsupervisor½ÚµãÉÏ¡£)
Í£Ö¹×÷Òµ
ÏȲéѯ×÷ÒµÁбístorm list
ÃüÁîÐÐÏÂÖ´ÐÐstorm kill TopologyName
ÔÚstorm uiÉϵã»÷kill°´Å¥
7.²¢ÐжÈ
Ò»¸ö½ÚµãÉÏ×î¶àÄܹ»ÔËÐÐËĸöworker£¬ÊÇËĸöslots

1¸öworker½ø³ÌÖ´ÐеÄÊÇ1¸ötopologyµÄ×Ó¼¯£¨×¢£º²»»á³öÏÖ1¸öworkerΪ¶à¸ötopology·þÎñ£©¡£1¸öworker½ø³Ì»áÆô¶¯1¸ö»ò¶à¸öexecutorÏß³ÌÀ´Ö´ÐÐ1¸ötopologyµÄcomponent(spout»òbolt)¡£Òò´Ë£¬1¸öÔËÐÐÖеÄtopology¾ÍÊÇÓɼ¯ÈºÖжą̀ÎïÀí»úÉϵĶà¸öworker½ø³Ì×é³ÉµÄ¡£
executorÊÇ1¸ö±»worker½ø³ÌÆô¶¯µÄµ¥¶ÀÏ̡߳£Ã¿¸öexecutorÖ»»áÔËÐÐ1¸ötopologyµÄ1¸öcomponent(spout»òbolt)µÄtask£¨×¢£ºtask¿ÉÒÔÊÇ1¸ö»ò¶à¸ö£¬stormĬÈÏÊÇ1¸öcomponentÖ»Éú³É1¸ötask£¬executorÏ̻߳áÔÚÿ´ÎÑ»·Àï˳Ðòµ÷ÓÃËùÓÐtaskʵÀý£©¡£
taskÊÇ×îÖÕÔËÐÐspout»òboltÖдúÂëµÄÖ´Ðе¥Ôª£¨×¢£º1¸ötask¼´Îªspout»òboltµÄ1¸öʵÀý£¬executorÏß³ÌÔÚÖ´ÐÐÆÚ¼ä»áµ÷ÓøÃtaskµÄnextTuple»òexecute·½·¨£©¡£topologyÆô¶¯ºó£¬1¸öcomponent(spout»òbolt)µÄtaskÊýÄ¿Êǹ̶¨²»±äµÄ£¬µ«¸ÃcomponentʹÓõÄexecutorÏß³ÌÊý¿ÉÒÔ¶¯Ì¬µ÷Õû£¨ÀýÈ磺1¸öexecutorÏ߳̿ÉÒÔÖ´ÐиÃcomponentµÄ1¸ö»ò¶à¸ötaskʵÀý£©¡£ÕâÒâζ×Å£¬¶ÔÓÚ1¸öcomponent´æÔÚÕâÑùµÄÌõ¼þ£º#threads<=#tasks£¨¼´£ºÏß³ÌÊýСÓÚµÈÓÚtaskÊýÄ¿£©¡£Ä¬ÈÏÇé¿öÏÂtaskµÄÊýÄ¿µÈÓÚexecutorÏß³ÌÊýÄ¿£¬¼´1¸öexecutorÏß³ÌÖ»ÔËÐÐ1¸ötask¡£
ĬÈÏÇé¿öÏ£¬Ò»¸ösupervisor½Úµã»áÆô¶¯4¸öworker½ø³Ì¡£Ã¿¸öworker½ø³Ì»áÆô¶¯1¸öexecutor£¬Ã¿¸öexecutorÆô¶¯1¸ötask
Ìá¸ß²¢ÐжÈ
worker(slots)
ĬÈÏÒ»¸ö´Ó½ÚµãÉÏÃæ¿ÉÒÔÆô¶¯4¸öworker½ø³Ì£¬²ÎÊýÊÇsupervisor.slots.port¡£ÔÚstormÅäÖÃÎļþÖÐÒѾÅäÖùýÁË£¬Ä¬ÈÏÊÇÔÚstrom-core.jar°üÖеÄdefaults.yamlÖÐÅäÖõÄÓС£
ĬÈÏÒ»¸östromÏîĿֻʹÓÃÒ»¸öworker½ø³Ì£¬¿ÉÒÔͨ¹ý´úÂëÀ´ÉèÖÃʹÓöà¸öworker½ø³Ì¡£
ͨ¹ýconfig.setNumWorkers(workers)ÉèÖÃ
ͨ¹ýconf.setNumAckers(0);¿ÉÒÔÈ¡ÏûackerÈÎÎñ
×îºÃһ̨»úÆ÷ÉϵÄÒ»¸ötopologyֻʹÓÃÒ»¸öworker,Ö÷ÒªÔÒòÊǼõÉÙÁËworkerÖ®¼äµÄÊý¾Ý´«Êä
Èç¹ûworkerʹÓÃÍêµÄ»°ÔÙÌá½»topology¾Í²»»áÖ´ÐУ¬»á´¦Óڵȴý״̬
executor
ĬÈÏÇé¿öÏÂÒ»¸öexecutorÔËÐÐÒ»¸ötask£¬¿ÉÒÔͨ¹ýÔÚ´úÂëÖÐÉèÖÃ
builder.setSpout(id, spout, parallelism_hint);
builder.setBolt(id, bolt, parallelism_hint);
task
ͨ¹ýboltDeclarer.setNumTasks(num);À´ÉèÖÃʵÀýµÄ¸öÊý
executorµÄÊýÁ¿»áСÓÚµÈÓÚtaskµÄÊýÁ¿(ΪÁËrebalance)
µ¯ÐÔ¼ÆËã
ͨ¹ý´úÂëµ÷Õû
ͨ¹ý´úÂëµ÷Õû
topologyBuilder.setBolt("green-bolt",
new GreenBolt(),2)
.setNumTasks(4).shuffleGrouping("blue-spout);
ͨ¹ýshellµ÷Õû
# 10ÃëÖ®ºó¿ªÊ¼µ÷Õû
# Reconfigure the topology "mytopology"
to use 5 worker processes,
# the spout "blue-spout" to use 3
executors and
# the bolt "yellow-bolt" to use 10
executors.
storm rebalance mytopology -w 10 -n 5 -e blue-spout=3
-e yellow-bolt=10 |
-w ´ú±í¼¸Ãëºó¿ªÊ¼Ö´ÐÐ
-n ´ú±í¼¸¸öworker
-e ´ú±í¼¸¸öexcutor
stream grouping·ÖÀà
Shuffle Grouping: Ëæ»ú·Ö×飬 Ëæ»úÅÉ·¢streamÀïÃæµÄtuple£¬ ±£Ö¤boltÖеÄÿ¸öÈÎÎñ½ÓÊÕµ½µÄtupleÊýÄ¿Ïàͬ.(ËüÄÜʵÏֽϺõĸºÔؾùºâ)
Fields Grouping£º°´×ֶηÖ×飬 ±ÈÈç°´useridÀ´·Ö×飬 ¾ßÓÐͬÑùuseridµÄtuple»á±»·Öµ½Í¬Ò»ÈÎÎñ£¬
¶ø²»Í¬µÄuseridÔò»á±»·ÖÅäµ½²»Í¬µÄÈÎÎñ
All Grouping£º ¹ã²¥·¢ËÍ,¶ÔÓÚÿһ¸ötuple,BoltsÖеÄËùÓÐÈÎÎñ¶¼»áÊÕµ½. |