±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚcsdn£¬±¾ÎļòҪͨ¹ýstormºÍHadoop¸÷½ÇÉ«¶Ô±È£¬½éÉÜÁËstormµÄÔËÐÐÁ÷³Ì£¬Ï£Íû¶ÔÄúµÄѧϰÓаïÖú¡£
|
|
Ò»¡¢stormÊÇÒ»¸öÓÃÓÚʵʱÁ÷ʽ¼ÆËãµÄ·Ö²¼Ê½¼ÆËãÒýÇæ£¬ÃÖ²¹ÁËHadoopÔÚʵʱ¼ÆËã·½ÃæµÄ²»×㣨HadoopÔÚ±¾ÖÊÉÏÊÇÒ»¸öÅú´¦Àíϵͳ£©¡£
¶þ¡¢stormÔÚʵ¼ÊÓ¦Óó¡¾°ÖеÄλÖÃÒ»°ãÈçÏ£º

ÆäÖеıàºÅ1~5˵Ã÷ÈçÏ£º
1¡¢FlumeÓÃÓÚÊÕ¼¯ÈÕÖ¾ÐÅÏ¢£»
2¡¢½áºÏÊý¾Ý´«Ê书ÄÜ¿ÉÒÔ°ÑÊÕ¼¯µ½µÄÈÕÖ¾ÐÅϢʵʱ´«Êäµ½kafka¼¯Èº£¬»ò±£´æµ½Hadoop hdfsÖб£´æ¡£
ÕâÀïÖ®ËùÒÔÑ¡Ôñkafka¼¯ÈºÊÇÒòΪkafka¼¯Èº¾ß±¸»º³å¹¦ÄÜ£¬¿ÉÒÔ·ÀÖ¹Êý¾Ý²É¼¯ËٶȺÍÊý¾Ý´¦ÀíËٶȲ»Æ¥Åäµ¼ÖÂÊý¾Ý¶ªÊ§£¬ÕâÑù×ö¿ÉÒÔÌá¸ß¿É¿¿ÐÔ¡£
3¡¢Ê¹ÓÃstormʵʱ´¦ÀíÊý¾Ý£»
4¡¢±£´æstorm´¦ÀíµÄ½á¹ûÊý¾Ý£¬µ±Êý¾ÝÁ¿²»ÊÇÌØ±ð¾Þ´óʱ£¬¿ÉÒÔʹÓÃMySQL´æ´¢£»µ±Êý¾ÝÁ¿Ìرð¾Þ´óʱ£¬¿ÉÒÔÑ¡Ôñhdfs´æ´¢¡£
5¡¢ÓÃÓÚʵʱչʾ´¦Àí½á¹û¡£
Èý¡¢stormµÄ³éÏóÔËÐз½Ê½£º

ÆäÖУº
spoutΪÊý¾ÝÁ÷µÄÔ´Í·£»
tupleΪÁ÷¶¯ÖеÄÊý¾Ý³ÐÔØµ¥Ôª£»
BoltΪÊý¾ÝÁ÷´¦ÀíµÄÖмä״̬¡£
ËÄ¡¢spoutºÍBoltÈçºÎÐγɳÌÐòÔËÐУ¿
stormÖÐÔËÐеijÌÐò³ÆÎªTopology£¬Topology½«spoutºÍbolt×é×°ÔÚÒ»Æð£¬Íê³Éʵʱ¼ÆËãµÄÈÎÎñ¡£¾ßÌå²Ù×÷ÊÇͨ¹ýTopologyBuilderµÄsetSpout·½·¨ºÍsetBolt·½·¨£¬Àý×ÓÈçÏ£º
TopologyBuilder
builder = new TopologyBuilder();
builder.setSpout("spout-name", your-spout-program);
builder.setBolt("bolt-name-one", your-bolt-program-one,
thread-number)
.fieldsGrouping("spout-name", new
Fields("field-key-name-one"));
builder.setBolt("bolt-name-two", your-bolt-program-two).fieldsGrouping("bolt-name-one",
new Fields("field-key-name-two"));
Config conf = new Config();
StormSubmitter.submitTopology("your-Topology-name",
conf,builder.createTopology()); |
Îå¡¢ÈçºÎ¾ö¶¨Êý¾ÝÁ÷µÄÁ÷Ïò£º
£¨1£©½èÖúÔÚTopologyBuilderµÄsetSpout·½·¨ºÍsetBolt·½·¨µÄµÚÒ»¸ö²ÎÊýÖÐΪSpout³ÌÐòºÍBolt³ÌÐòÈ¡µÄÃû×Ö£¬ÀýÈçÉÏÃæÊ¾Àý´úÂëÖеġ°spout-name¡±ÒÔ¼°¡°bolt-name-one¡±£¬¡°bolt-name-two¡±¡£
²¹³ä£ºsetBolt·½·¨ÔÐÍ£º
setBolt(String
id, IBasicBolt bolt, Number parallelism_hint)
Define a new bolt in this topology. |
setSpout·½·¨ÔÐÍ£º
setSpout(String
id, IRichSpout spout, Number parallelism_hint)
Define a new spout in this topology with the
specified parallelism. |
£¨2£©setBolt·½·¨·µ»ØµÄBoltDeclarer¶ÔÏóÀûÓÃfieldGrouping·½·¨²¢½áºÏ£¨1£©ÖеÄspoutºÍboltÃû×ÖÖ¸¶¨Êý¾ÝÁ÷µÄÁ÷Ïò¡£
²¹³ä£ºfieldGrouping·½·¨ÔÐÍ£º
T
fieldsGrouping(String componentId,
Fields fields)
The stream is partitioned by the fields specified
in the grouping.
Parameters:
componentId -
fields -
Returns: |
Áù¡¢Êý¾ÝÁ÷ÖеÄÊý¾Ý³ÐÔØµ¥Ôªtuple½á¹¹ÊÇʲô
¹ÙÍøÎĵµÈçÏ£º
The
tuple is the main data structure in Storm. A
tuple is a named list of values, where each
value can be any type. Tuples are dynamically
typed -- the types of the fields do not need
to be declared. Tuples have helper methods like
getInteger and getString to get field values
without having to cast the result. Storm needs
to know how to serialize all the values in a
tuple. By default, Storm knows how to serialize
the primitive types, strings, and byte arrays.
If you want to use another type, you'll need
to implement and register a serializer for that
type. See http://github.com/nathanmarz/storm/wiki/Serializationfor
more info. |
ͨË׵Ľ²£¬tuple¾ÍÊÇÒ»¸öÖµÁÐ±í£¬ÆäÖеÄÖµÀàÐÍ¿ÉÒÔÊÇÈκÎÀàÐÍ£¬Ä¬ÈÏÀàÐÍÓÐbyte£¬integer£¬short£¬long£¬float£¬double£¬string£¬byte[]¡£
tupleÊý¾Ý½á¹¹ÈçÏ£º

ÆäÖУ¬fieldNameÊǶ¨ÒåÔÚdeclareOutputFields·½·¨ÖеÄFields¶ÔÏó£¬fieldValueÖµÊÇÔÚemit·½·¨Öз¢Ë͵ÄValues¶ÔÏó¡£
tuple¶¼ÊÇͨ¹ýspoutºÍbolt·¢É䣨´«ËÍ£©µÄ¡£
ÀýÈ磺
spout³ÌÐòÈçÏ£º
public
class ParallelFileSpout extends BaseRichSpout{
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
}
/**
* called in SpoutTracker. called once, send
a single tuple.
*/
public void nextTuple() {
//²»¶Ï»ñÈ¡Êý¾Ý²¢·¢Éä
collector.emit(new Values("your-sent-fieldValue"));
}
/**
* define field. used for grouping by field.
*/
public void declareOutputFields(OutputFieldsDeclarer
declarer) {
declarer.declare(new Fields("your-sent-fieldName"));
}
} |
bolt³ÌÐòÈçÏ£º
public
class DetectionBolt extends BaseBasicBolt {
public void prepare(Map stormConf, TopologyContext
context) {
}
public void execute(Tuple input, BasicOutputCollector
collector) {
//²»¶ÏµÄ´¦ÀíÊý¾Ýºó·¢Éä
collector.emit(new Values(¡°your-sent-fieldValue¡±));
}
public void declareOutputFields(OutputFieldsDeclarer
declarer) {
declarer.declare(new Fields("your-sent-fieldName"));
}
} |
Æß¡¢spoutÈçºÎ·¢ÉäÎÞ½çµÄÊý¾ÝÁ÷£¬boltÈçºÎ´¦Àí½ÓÊÕµ½µÄÊý¾Ýtuple
£¨1£©ÈçÔÚÉÏÒ»²¿·ÖspoutµÄʾÀý´úÂ룬ÆäÖбغ¬ÓÐnextTuple·½·¨£¬ÔÚspout³ÌÐòÉúÃüÖÜÆÚÖУ¬nextTuple·½·¨Ò»Ö±ÔËÐУ¬ËùÒÔ¿ÉÒÔÒ»Ö±»ñÈ¡Êý¾ÝÁ÷ÖеÄÊý¾Ý²¢³ÖÐøÏñbolt´¦Àí³ÌÐò·¢Éä¡£
£¨2£©ÈçÔÚÉÏÒ»²¿·ÖboltµÄʾÀý´úÂ룬ÆäÖбغ¬ÓÐexecute·½·¨£¬ÔÚbolt³ÌÐòÉúÃüÖÜÆÚÖУ¬Ö»ÒªÆäÊÕµ½tupleÊý¾Ý¾Í»á´¦Àí£¬¸ù¾ÝÐèÒª»á°Ñ´¦ÀíºóµÄÊý¾Ý¼ÌÐø·¢Éä³öÈ¥¡£
°Ë¡¢ÈçºÎ±£Ö¤ËùÓз¢ÉäµÄÊý¾Ýtuple¶¼±»ÕýÈ·´¦Àí
ͬһ¸ötuple²»¹ÜÊÇ´¦Àí³É¹¦»¹ÊÇʧ°Ü£¬¶¼ÓÉ´´½¨ËüµÄSpout·¢É䲢ά»¤¡£
¾Å¡¢stormºÍHadoopÖи÷½ÇÉ«¶Ô±È

Ê®¡¢storm±ÈHadoopʵʱÊÇÒòΪHadoopÔÚ°ÑÒ»ÅúÊý¾Ý¶¼´¦ÀíÍê±Ïºó²ÅÊä³ö´¦Àí½á¹û£¬¶østormÊÇ´¦ÀíÒ»µãÊý¾Ý¾ÍʵʱÊä³öÕâЩÊý¾ÝµÄ´¦Àí½á¹û¡£
|