1.±à³ÌÄ£ÐÍ
ÎÒÃÇÖªµÀhadoopÓÐmapreduce±à³ÌÄ£ÐÍ£¬ÄÇôÓëÖ®¶ÔÓ¦µÄstormµÄ±à³ÌÄ£ÐÍÊÇʲôÄÇ£¬ÏÂÃæÎÒÃÇÏêϸ½éÉÜһϡ£
ÔÚstormµÄ±à³ÌÄ£ÐÍÖУ¬Ö÷ÒªÓÐÈý¸ö×é¼þ£¬Ò»¸öÊÇTopology£¬Õâ¸öTopologyÊÇÒ»¸öÓɶà¸ö¼ÆËã½Úµã¹¹³ÉµÄÍØÆËͼ¡£½áËã½Úµã·ÖΪÁ½ÖÖ£¬Ò»ÖÖÊÇSpout£¬Ò»ÖÖÊÇBlot¡£ÕâЩSpoutºÍBolt¹¹³ÉÏÂͼËùʾµÄÒ»¸ö¶à½ÚµãµÄÓÐÏòͼ¡££¨Êý¾Ý´Ó×óÏòÓÒÁ÷¶¯£©¡£Õû¸öͼÊÇÒ»¸öTopology£¬ÆäÖÐÓÐ2¸öSpoutºÍ4¸öBolot×é³É¡£

SpoutÊÇStormµÄTopoloyµÄÈë¿Ú£¬Êý¾ÝÁ÷¶¼ÊÇ´ÓSpout½øÈëtopologyÀ´½øÐд¦Àí¡£Ëû¸ºÔð´ÓÍⲿ½ÓÊÕÊý¾Ý£¬´¦Àí£¬È»ºó·¢Éä³öÈ¥¡£Ã¿¸öTopologyÖÁÉÙÒªÓÐÒ»¸öSpout
BoltÊÇÊý¾Ý´¦Àíµ¥Ôª£¬Ëû¿ÉÒÔ´ÓÒ»¸ö»òÕß¶à¸öÆäËûµÄ¼ÆËã½Úµã½ÓÊÕÊý¾Ý£¬´¦Àí²¢·¢Éä³öÈ¥¡£Ëû¿ÉÒÔ´ÓSpout½ÓÊÕÊý¾Ý£¬Ò²¿ÉÒÔ´ÓÆäËûµÄBolt½ÓÊÕÊý¾Ý¡£BoltÊýÁ¿¿ÉÒÔΪ0¡£
SpoutµÄÊý¾Ý·¢Éä¸øË£¬Bolt´Ó˽ÓÊÕÊý¾ÝÕâЩ½«²»»áÔÙSpoutºÍBoltÀàÖÐÉùÃ÷£¬ÔÚ×é×°TopologyµÄʱºò˵Ã÷¡£
2.±à³Ì²½Öè
ÎÒÃÇÒª±àдһ¸öÍêÕûµÄstormµÄ³ÌÐò£¬ÐèÒª¾¹ýËIJ½
±àдһ¸ö»òÕß¶à¸öSpout
±àдһ¸ö»òÕß¶à¸öBolt
±àдһ¸ö´øMainº¯ÊýµÄÈ»ºó½«Æä×é×°³ÉΪһ¸öTopology
×îºóÌá½»¸østorm¼¯ÈºÔËÐÐ
3.ÏêϸʵÏÖ
ÂÔ
4.ÐèÇó
¼ÙÉèÎÒÃǻ᲻¶ÏµÄ½«Ò»Ð©Òƶ¯Óû§ÉÏÍøÈÕÖ¾¼Ç¼Îļþ´æ´¢µ½HDFSµÄij¸öĿ¼Ï£¬ÒªÇóÎÒÃÇÄܹ»¿ìËٵĽ«Ðµ½´ïµÄÎļþ½âÎö³ÉÎÒÃÇÐèÒªµÄ½á¹¹£¬´æ´¢µ½HBaseÊý¾Ý¿âµÄÉÏÍøÈÕÖ¾±íÖУ¬²¢ÇÒ¶Ôÿ¸öÓû§ÉÏÍø¼Ç¼µÄ´ÎÊý×öͳ¼Æ£¬³¬³öÃÅÏÞÖµºó´æ´¢µ½HbaseµÄ³¬ÃÅÏÞÐÅÏ¢±íÖС£
4.1¾ßÌåʵÏÖ
ͨ¹ý·ÖÎö£¬ÎÒÃDzÉÓÃstormÀ´ÊµÏÖ¸ÃÐèÇó£¬Ê×ÏÈÎÒÃÇÉè¼Æ³öÎÒÃǵĴ¦Àí¼Ü¹¹Í¼ÈçÏÂËùʾ

ÓÉÒÔÉÏÉè¼Æ£¬ÎÒÃÇÐèҪʵÏÖÒ»¸öSpoutÀà(com.ygc.mobilenet.MobileNetLogAnalyseSpout)¸ºÔð´ÓHDFS¶ÁÈ¡Êý¾Ý£¬³õ²½´¦ÀíÈ»ºó·¢É䣨emit£©³öÈ¥¡£ÎÒÃÇҪʵÏÖÈý¸öBoltÀ࣬һ¸öBoltÀà(com.ygc.mobilenet.MobileNetLogSaveBolt)¸ºÔð»ñȡÿÌõ¼Ç¼£¬²¢´æ´¢µ½HBaseÊý¾Ý¿âÖС£Ò»¸öBoltÀà(com.inspur.mobilenet.MobileNetLogStatisticsBolt)¸ºÔð°´ÊÖ»úºÅÔÚʱ¼ä´°¿ÚÄÚͳ¼ÆÉÏÍø¼Ç¼Êý¡£Ò»¸öBoltÀà(com.inspur.mobilenet.MobileNetLogThresholdBolt)¸ºÔð¼ì²éÿ¸öÊÖ»úºÅʱ¼ä´°¿ÚÄÚÉÏÍø¼Ç¼ÊýÊÇ·ñ³¬ÃÅÏÞ£¬Èç¹û³¬ÃÅÏÞÔò±£´æµ½HbaseÊý¾Ý¿âÖС£
ÔÚʵÏÖ֮ǰ£¬ÎÒÃÇÒªµ¼ÈëÒÀÀµ°ü¡£ÎÒÊÇÓÃmavenÀ´¹ÜÀíÒÀÀµ°üµÄ¡£ËùÒÔÎÒµÄmavenÒÀÀµÅäÖÃÈçÏÂËùʾ
<dependencies> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.8.1</version> </dependency> <dependency> <groupId>com.ygc.hadoop</groupId> <artifactId>hadoop2client</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> |
ÎÒ¶ÔHBaseºÍHDFSµÄ·ÃÎʶ¼·â×°µ½hadoop2clientÕâ¸öjar°üÖÐÁË£¬¶øÕâ¸öjar°üÒÀÀµµÄhadoopºÍhbaseµÄ°üÔÚËû×Ô¼ºµÄpom.xmlÎļþÖÐÉùÃ÷£¬²»ÐèÒªÎÒÃÇ×Ô¼ºÏÔʽµÄÉùÃ÷¡£ÕâÊÇmavenµÄºÃ´¦£¬Ö»ÓÃÉùÃ÷Ö±½ÓÒÀÀµ¼´¿É¡£
µ±È»£¬ÎÒÃÇÒ»¶¨Òªµ¼Èëstorm±¾ÉíµÄ°ü²ÅÐÐ
ÒÔÏÂÕ½ڷֱðÃèÊöÿ¸ö²¿·Ö¾ßÌåµÄʵÏÖ¡£
4.2.1SpoutʵÏÖ
4.2.1.1com.ygc.mobilenet.MobileNetLogVal
Àà
Õâ¸öÊÇÒ»¸ö¸¨ÖúÀ࣬ÓÃÀ´ÃèÊö¶ÁÈ¡µÄÊý¾ÝÎļþ½á¹¹µÄ£¬²»ÐèÒª½âÊÍ
package com.ygc.mobilenet;
import backtype.storm.tuple.Fields;
/**
* ÓÃÀ´ÃèÊöÒª½âÎöµÄÈÕÖ¾ÄÚÈݵĸ¨ÖúÀà
*/
public class MobileNetLogVal {
//ÉÏÍø¿ªÊ¼Ê±¼ä
public static String START_TIME_FIELD = "START_TIME";
//ÉÏÍøÏìӦʱ¼ä
public static String RESPONSE_TIME_FIELD = "RESPONSE_TIME";
//ÏìÓ¦½áÊøÊ±¼ä
public static String END_TIME_FIELD = "END_TIME";
//Ô´É豸IP
public static String SOURCE_DEV_IP_FIELD = "SOURCE_DEV_IP";
//Ô´Óû§IP
public static String SOURCE_USER_IP_FIELD = "SOURCE_USER_IP";
//Ô´¶Ë¿Ú
public static String SOURCE_PORT_FIELD = "SOURCE_PORT";
//Ä¿±êÉ豸IP
public static String DEST_DEV_IP_FIELD = "DEST_DEV_IP";
//Ä¿±êÓû§IP
public static String DEST_USER_IP_FIELD = "DEST_USER_IP";
//Ä¿±ê¶Ë¿Ú
public static String DEST_PORT_FIELD = "DEST_PORT";
//IMSI±êʶÉ豸
public static String IMSI_FIELD = "IMSI";
//MSISDN±êʶÊÖ»úºÅÂë
public static String MSISDN_FIELD = "MSISDN";
//ÒÆ¶¯ÉÏÍø½ÓÈëµã
public static String APN_FIELD = "APN";
//Óû§²Ù×÷ϵͳ¡£Ò»°ãÊÇä¯ÀÀÆ÷ÐͺŻòÕß³ÌÐòÃû
public static String USER_AGENT_FIELD = "USER_AGENT";
//Ä¿±êURL
public static String URL_FIELD = "URL";
//Ä¿±êÖ÷»ú£¬°üº¬ÔÚURLÖÐ
public static String HOST_FIELD = "HOST";
/**
* ´´½¨Ò»¸öSpoutÒª·¢ÉäµÄÊý¾Ý½á¹¹¡£
*
* @return ·µ»ØSpoutÒª·¢ÉäµÄÊý¾Ý½á¹¹¡£
*/
public Fields createFields() {
return new Fields(
START_TIME_FIELD,
RESPONSE_TIME_FIELD,
END_TIME_FIELD,
SOURCE_DEV_IP_FIELD,
SOURCE_USER_IP_FIELD,
SOURCE_PORT_FIELD,
DEST_DEV_IP_FIELD,
DEST_USER_IP_FIELD,
DEST_PORT_FIELD,
IMSI_FIELD,
MSISDN_FIELD,
APN_FIELD,
USER_AGENT_FIELD,
URL_FIELD,
HOST_FIELD);
}
}
|
4.2.1.2com.ygc.mobilenet.MobileNetLogAnalyseSpoutÀà
package com.ygc.mobilenet;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.ygc.hadoop.hdfs.HDFSClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.BufferedReader;
import java.io.File;
import java.util.Map;
/**
* @author inspur research
* @since 2014-01-09
* ½ÓÊÕÒÆ¶¯Óû§ÉÏÍø¼Ç¼²¢·¢Éä
*/
public class MobileNetLogAnalyseSpout implements
IRichSpout {
private SpoutOutputCollector outputCollector;
private HDFSClient hdfsClient;
private Log log = LogFactory.getLog(MobileNetLogAnalyseSpout.class);
//Òª¼à¿ØµÄHDFSÎļþĿ¼
private String MONITOR_PATH = "/storm/mobilelog";
@Override
public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
//ÿ¸öSpout»òÕßBlotÒªemitÊý¾Ý£¬±ØÐëÖ¸¶¨Êý¾ÝµÄ½á¹¹¡£
outputFieldsDeclarer.declare(new MobileNetLogVal().createFields());
}
/**
* ½âÎöÒ»ÐÐÊý¾Ý£¬×ª»»ÎªºóÐøÒª´¦ÀíµÄÊý¾Ý½á¹¹£¬ÕâÀï´ÓÊý¾ÝÖжÁÈ¡ÁË15ÁÐÊý¾Ý
* @param line ´ÓÊý¾ÝÔ´½ÓÊÕµÄÒ»ÐÐÊý¾Ý
* @return ¸ñʽ»¯ºÃµÄValues¶ÔÏó¡£
*/
public Values createValues(String line) {
String[] cols = line.split("[|]");
return new Values(
cols[0],
cols[1],
cols[2],
cols[4],
cols[6],
cols[12],
cols[5],
cols[7],
cols[13],
cols[28],
cols[29],
cols[32],
cols[33],
cols[30],
cols[31]
);
}
@Override
public Map<String, Object> getComponentConfiguration()
{
return null;
}
@Override
public void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {
//ÐèҪͨ¹ýSpoutOutputCollector¶ÔÏóÀ´emitÊý¾ÝÁ÷ºÍ
this.outputCollector = spoutOutputCollector;
try {
//¶ÁÈ¡HDFSÎļþµÄ¿Í»§¶Ë£¬×Ô¼ºÊµÏÖ
hdfsClient = new HDFSClient();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void nextTuple() {
Utils.sleep(10000);
try {
//¸Ã·½·¨´ÓÖ¸¶¨µÄĿ¼ÖÐÖжÁÈ¡·ûºÏÌõ¼þµÄÎļþÁÐ±í£¬²¢Ëæ»ú´ÓÖÐÑ¡ÔñÒ»¸ö½«Æä¶ÀÕ¼²¢·µ»ØÎļþÃû
String lockFileName = hdfsClient.lockFileName(MONITOR_PATH,
".TXT");
if (lockFileName != null) {
//´ÓHDFSÖдò¿ªÒѾ±»¶ÀÕ¼µÄÎı¾Îļþ
BufferedReader read = hdfsClient.getBufferedReader(lockFileName);
String line;
int count = 0;
while ((line = read.readLine()) != null) {
if (line.trim().length() > 0) {
count++;
this.outputCollector.emit(createValues(line));
}
}
log.info("deal file " + lockFileName
+ " " + count + " rows!");
}
} catch (Exception e) {
log.error(e);
}
}
@Override
public void ack(Object o) {
}
@Override
public void fail(Object o) {
}
}
|
4.2.1.3ÖØµã½âÊÍ
ҪʵÏÖÒ»¸öSpoutÀ࣬һ°ãµÄ×ö·¨ÊÇʵÏÖbacktype.storm.topology.IRichSpout½Ó¿Ú£¬ÖصãµÄÊÇҪʵÏÖÏÂÃæµÄ¼¸¸ö½Ó¿Ú4.2.1.3.1public
void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
½Ó¿ÚÔÚÕâ¸ö½Ó¿ÚÀÎÒÃÇÒªÉùÃ÷±¾SpoutÒª·¢É䣨emit£©µÄÊý¾ÝµÄ½á¹¹£¬¼°Ò»¸öbacktype.storm.tuple.Fields¶ÔÏó¡£Õâ¸ö¶ÔÏóºÍpublic
void nextTuple()½Ó¿ÚÖÐemitµÄbacktype.storm.tuple.Values¹²Í¬×é³ÉÁËÒ»¸öÔª×é¶ÔÏó£¨backtype.storm.tuple.Tuple£©¹©ºóÃæ½ÓÊÕ¸ÃÊý¾ÝµÄBlotʹÓÃ
4.2.1.3.2public void open(Map map, TopologyContext
topologyContext, SpoutOutputCollector spoutOutputCollector)½Ó¿Ú
¸Ã½Ó¿ÚÊdzõʼ»¯½Ó¿Ú¡£ÔÚÕâÀïÒª½«SpoutOutputCollector spoutOutputCollector¶ÔÏó±£´æÏÂÀ´£¬¹©ºóÃæµÄpublic
void nextTuple()½Ó¿ÚʹÓ㬻¹¿ÉÒÔÖ´ÐÐһЩÆäËûµÄ²Ù×÷¡£ÀýÈçÕâÀォHDFSClient¶ÔÏó³õʼ»¯ÁË¡£
4.2.1.3.3public void nextTuple() ½Ó¿Ú¸Ã½Ó¿ÚʵÏÖ¾ßÌåµÄ¶ÁÈ¡Êý¾ÝÔ´µÄ·½·¨¡£±¾ÀýÖÐÊÇ´ÓHDFSÖжÁȡһ¸öÊý¾ÝÎļþ£¬²¢ÖðÐнâÎö£¬½«»ñÈ¡µÄÊý¾Ýemit³öÈ¥¡£emitµÄ¶ÔÏóÊÇͨ¹ýpublic
Values createValues(String line)·½·¨Éú³ÉµÄbacktype.storm.tuple.Values¶ÔÏ󡣸÷½·¨´ÓÊý¾ÝÔ´µÄÒ»ÐÐÊý¾ÝÖУ¬Ñ¡È¡µÄ15¸öÄ¿±êÖµ×é³ÉÒ»¸öbacktype.storm.tuple.Values¶ÔÏó¡£Õâ¸ö¶ÔÏóÖпÉÒÔ´æ´¢²»Í¬ÀàÐ͵ĶÔÏó£¬ÀýÈçÄã¿ÉÒÔͬʱ½«String¶ÔÏó£¬Long¶ÔÏó´æÈ¡ÔÚÒ»¸öbacktype.storm.tuple.ValuesÖÐemit³öÈ¥¡£Êµ¼ÊÉÏֻҪʵÏÖÁËStormÒªÇóµÄÐòÁл¯½Ó¿ÚµÄ¶ÔÏ󶼿ÉÒÔ´æ´¢ÔÚÀïÃæ¡£emit¸ÃÖµµÃʱºòÐèҪעÒ⣬ËûµÄÄÚÈÝÒªºÍdeclareOutputFieldsÖÐÉùÃ÷µÄbacktype.storm.tuple.ields¶ÔÏóÏàÆ¥Å䣬±ØÐëÒ»Ò»¶ÔÓ¦¡£ËûÃDZ»¹²Í¬×é³ÉÒ»¸öbacktype.storm.tuple.TupleÔª×é¶ÔÏ󣬱»ºóÃæ½ÓÊÕ¸ÃÊý¾ÝÁ÷µÄ¶ÔÏóʹÓá£
4.2.2BoltʵÏÖ
4.2.2.1com.ygc.mobilenet.MobileNetLogSaveBoltÀà
package com.ygc.mobilenet;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import com.inspur.hadoop.hbase.HTableClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
/**
* ¸ºÔð½«ËùÓÐÉÏÍø¼Ç¼±£´æµ½HBaseÊý¾Ý¿âÖÐ
*/
public class MobileNetLogSaveBolt implements IRichBolt
{
private OutputCollector outputCollector;
private HTableClient tableClient;
private Log log = LogFactory.getLog(MobileNetLogSaveBolt.class);
@Override
public void prepare(Map map, TopologyContext topologyContext,
OutputCollector outputCollector) {
this.outputCollector = outputCollector;
try {
//³õʼ»¯HBaseÊý¾Ý¿â
tableClient = new HTableClient("192.168.1.230");
} catch (Exception e) {
log.error(e);
}
}
@Override
public void execute(Tuple tuple) {
try {
log.info("access tuple " + tuple.getValues()+"files
info = " +tuple.getFields().toString());
saveTupleToHBase(tuple);
this.outputCollector.emit(tuple, tuple.getValues());
} catch (Exception e) {
log.error(e);
} finally {
outputCollector.ack(tuple);
}
}
/**
* ½«½ÓÊÕµÄÔª×é±£´æµ½Êý¾Ý¿â
* @param tuple ´ÓÆäËû¼ÆËã½Úµã½ÓÊÕµÄÊý¾ÝÁ÷£¬ÕâÀïÊÇ´Ócom.ygc.mobilenet.MobileNetLogAnalyseSpoutÖнÓÊÕµÄÊý¾ÝÁ÷¡£×¢ÒâºÍ¸ÃÀàÖÐÉùÃ÷µÄbacktype.storm.tuple.Fields¶ÔÏóºÍemitµÄbacktype.storm.tuple.Values¶ÔÏó¶ÔÓ¦
* @return
*/
private boolean saveTupleToHBase(Tuple tuple)
{
try {
String rowKey = createRowKeyByeTuple(tuple);
long ts = System.currentTimeMillis();
tableClient.insertRow("t_mobilenet_log",
rowKey, "TIME",
MobileNetLogVal.START_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.START_TIME_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "TIME",
MobileNetLogVal.RESPONSE_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.RESPONSE_TIME_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "TIME",
MobileNetLogVal.END_TIME_FIELD, ts, tuple.getStringByField(MobileNetLogVal.END_TIME_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "SOURCE_INFO",
MobileNetLogVal.SOURCE_DEV_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_DEV_IP_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "SOURCE_INFO",
MobileNetLogVal.SOURCE_USER_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_USER_IP_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "SOURCE_INFO",
MobileNetLogVal.SOURCE_PORT_FIELD, ts, tuple.getStringByField(MobileNetLogVal.SOURCE_PORT_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "DEST_INFO",
MobileNetLogVal.DEST_DEV_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_DEV_IP_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "DEST_INFO",
MobileNetLogVal.DEST_USER_IP_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_USER_IP_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "DEST_INFO",
MobileNetLogVal.DEST_PORT_FIELD, ts, tuple.getStringByField(MobileNetLogVal.DEST_PORT_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "BASIC_INFO",
MobileNetLogVal.APN_FIELD, ts, tuple.getStringByField(MobileNetLogVal.APN_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "BASIC_INFO",
MobileNetLogVal.IMSI_FIELD, ts, tuple.getStringByField(MobileNetLogVal.IMSI_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "BASIC_INFO",
MobileNetLogVal.MSISDN_FIELD, ts, tuple.getStringByField(MobileNetLogVal.MSISDN_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "BASIC_INFO",
MobileNetLogVal.URL_FIELD, ts, tuple.getStringByField(MobileNetLogVal.URL_FIELD));
tableClient.insertRow("t_mobilenet_log",
rowKey, "BASIC_INFO",
MobileNetLogVal.HOST_FIELD, ts, tuple.getStringByField(MobileNetLogVal.HOST_FIELD));
} catch (Exception e) {
log.error(e);
return false;
}
return true;
}
private String createRowKeyByeTuple(Tuple tuple)
{
return tuple.getStringByField(MobileNetLogVal.MSISDN_FIELD)
+ tuple.getStringByField(MobileNetLogVal.START_TIME_FIELD);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new MobileNetLogVal().createFields());
}
@Override
public Map<String, Object> getComponentConfiguration()
{
return null;
}
} |
ÐèҪעÒ⼸¸ö·½Ã棺
ÒòΪÕâ¸öBolt´¦ÀíÍêÁ˺󣬽«Êý¾ÝÔÑù·¢Éä³öÈ¥¡£ËùÒÔÕâÀïµÄpublic
void declareOutputFields·½·¨ºÍcom.ygc.mobilenet.MobileNetLogAnalyseSpoutÖеķ½·¨Ò»Ñù¡£
¶øÔÚprivate boolean saveTupleToHBase(Tuple
tuple)·½·¨Öд¦Àíbacktype.storm.tuple.Tuple¶ÔÏóµÄʱºòÖ÷Òª¸Ã¶ÔÏóµÄÄÚÈݺÍÇ°Ãæ¼ÆËãµ¥Ôª(com.ygc.mobilenet.MobileNetLogAnalyseSpout)ÉùÃ÷µÄbacktype.storm.tuple.Fields¶ÔÏóºÍemitµÄbacktype.storm.tuple.Values¶ÔÏó¶ÔÓ¦¡£
4.2.2.2com.ygc.mobilenet.MobileNetLogStatisticsBolt
package com.ygc.mobilenet;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
/**
* ͳ¼ÆÊ±¼ä´°¿ÚÄÚÉÏÍø¼Ç¼Êý¡£ÕâÀïûÓÐʵÏÖʱ¼ä´°¿Ú¡£
*/
public class MobileNetLogStatisticsBolt implements
IRichBolt {
private OutputCollector outputCollector;
private Map<String, Integer> requestStatistic
= new HashMap<String, Integer>();
@Override
public void prepare(Map map, TopologyContext topologyContext,
OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String msisdn = tuple.getStringByField(MobileNetLogVal.MSISDN_FIELD);
Integer count = 1;
if (requestStatistic.containsKey(msisdn)) {
count = requestStatistic.get(msisdn) + 1;
}
requestStatistic.put(msisdn, count);
this.outputCollector.emit(tuple, new Values(msisdn,
count));
this.outputCollector.ack(tuple);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields(MobileNetLogVal.MSISDN_FIELD,"count"));
}
@Override
public Map<String, Object> getComponentConfiguration()
{
return null;
}
} |
ÖØµã×¢Ò⣺
ÕâÀïÉùÃ÷µÄbacktype.storm.tuple.Fields°üº¬Á½¸ö×Ö¶ÎMobileNetLogVal.MSISDN_FIELD£¬"count"£»
ÕâÀïemitµÄbacktype.storm.tuple.ValuesͬÑù¶ÔÓ¦Á½¸ö×ֶΣ¬¶øÇÒÒ»¸öÊÇStringÀàÐÍÒ»¸öÊÇIntegerÀàÐÍ
ÕâÀïemitʱ£¬µÚÒ»¸ö×Ö¶ÎΪÔʼµÄacktype.storm.tuple.Tuple£¬µÚ¶þ¸ö×ֶβÅÊÇÒªemitµÄbacktype.storm.tuple.Values¡£ÕâÀïÊÇΪÁ˽¨Á¢Ô´ÏûÏ¢ºÍÅÉÉúÏûÏ¢Ö±½ÓµÄÊ÷Ðνṹ£¬ÈÃstorm¿ò¼ÜÄܹ»×Ô¶¯¸ú×Ù¸ÃÏûÏ¢ÊÇ·ñ±»ÍêÕûµÄ´¦Àí¡£ÕâÊDZ£Ö¤stormÖеÄÊý¾Ý¿É¿¿ÐÔµÄÒ»¸ö»úÖÆ¡£ÕâÀïÏȲ»ÌÖÂÛ¡£ÄãÒ²¿ÉÒÔ²»ÒªµÚÒ»¸ö²ÎÊý¡£ÄÇôbacktype.storm.tuple.Values±»·¢Éä³öÈ¥ºó£¬ÊÇ·ñ±»³É¹¦´¦Àístorm¾Í²»¹ÜÁË¡£
4.2.2.3com.ygc.mobilenet.MobileNetLogThresholdBolt
package com.ygc.mobilenet;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import com.inspur.hadoop.hbase.HTableClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Map;
/**
* ¸ºÔð¼ì²éÓû§ÉÏÍø´ÎÊýÊÇ·ñÃÅÏÞ£¬Èç¹û³¬ÃÅÏÞÔò±£´æµ½HBaseÊý¾Ý¿âÖÐ
*/
public class MobileNetLogThresholdBolt implements
IRichBolt {
private OutputCollector outputCollector;
//²Ù×÷HBaseÊý¾Ý¿âµÄ¿Í»§¶Ë
private HTableClient tableClient;
private Log log = LogFactory.getLog(MobileNetLogThresholdBolt.class);
@Override
public void prepare(Map map, TopologyContext topologyContext,
OutputCollector outputCollector) {
this.outputCollector = outputCollector;
try {
tableClient = new HTableClient("192.168.1.230");
} catch (Exception e) {
log.error(e);
}
}
@Override
public void execute(Tuple tuple) {
log.info("deal data " + tuple.getString(0)
+ "=" + tuple.getInteger(1));
if (tuple.getInteger(1) > 2) {
try {
tableClient.insertRow("t_mobilenet_threshold",
tuple.getString(0), "STAT_INFO", "COUNT",
String.valueOf(tuple.getInteger(1)));
} catch (Exception e) {
log.error(e);
}
}
this.outputCollector.emit(tuple, tuple.getValues());
this.outputCollector.ack(tuple);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
}
@Override
public Map<String, Object> getComponentConfiguration()
{
return null;
}
} |
ÖØµã½âÊÍ£º
ֻҪעÒâÔÚ´ÓÔª×éÖÐÈ¡ÊýµÄʱºò£¬tuple.getString(0)ºÍtuple.getInteger(1)·Ö±ð¶ÔÓ¦ÁËcom.inspur.mobilenet.MobileNetLogStatisticsBolt¶ÔÏóÖз¢ÉäµÄMobileNetLogVal.MSISDN_FIELD£¬"count"×ֶζÔÓ¦¼´¿É¡£ÔÚcom.inspur.mobilenet.MobileNetLogStatisticsBoltÖÐÎÒÃÇȡԪ×éÊý¾ÝµÄʱºòÊÇͨ¹ýFieldµÄÃû×ÖÀ´È¡µÄ£¬ÕâÀïÊÇͨ¹ýÐòºÅÀ´È¡µÄ¡£
4.2.3×éװʵÏÖ
ÏÖÔÚ£¬ÎÒÃÇ¿ª·¢µÄ×é¼þÐèÒª×é×°³ÉTopologyÁËÕâ¸öÊÇÔÚÒ»¸ö´øMainº¯ÊýµÄÀàÀïÃæÊµÏֵġ£´ÓÉÏÃæµÄÉè¼ÆÍ¼¿´£¬½á¹¹Ó¦¸ÃÊÇcom.ygc.mobilenet.MobileNetLogAnalyseSpout×÷Ϊ¸ÃTopology¶ÁÈ¡Êý¾ÝµÄÔ´£¬Ëû·¢ÉäµÄÊý¾Ý·Ö±ð±»com.ygc.mobilenet.MobileNetLogSaveBolt¡¢com.ygc.mobilenet.MobileNetLogStatisticsBolt¶ÔÏó½ÓÊÕ¡£¶øcom.ygc.mobilenet.MobileNetLogStatisticsBolt¶ÔÏó·¢ÉäµÄÊý¾ÝÁ÷±»com.ygc.mobilenet.MobileNetLogThresholdBolt¶ÔÏó½ÓÊÕ¡£ËùÒÔ×é×°TopologyµÄ´úÂëÈçÏÂËùʾ
package com.ygc.mobilenet;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
/**
* ×é×°stormµÄTopologyµÄ´úÂë
*/
public class MobileNetLogTopology {
public static void main(String[] args) {
if (args.length > 0) {
int worknum = Integer.parseInt(args[0]);
int execunum = Integer.parseInt(args[1]);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("analyseMobileNetlog",
new MobileNetLogAnalyseSpout(), execunum);
builder.setBolt("saveMobileNetlog",
new MobileNetLogSaveBolt(), execunum).shuffleGrouping("analyseMobileNetlog");
builder.setBolt("countMobileNetlog",
new MobileNetLogStatisticsBolt(), execunum).fieldsGrouping("analyseMobileNetlog",
new Fields(MobileNetLogVal.MSISDN_FIELD));
builder.setBolt("thresholdMobileNetlog",
new MobileNetLogThresholdBolt(), execunum).shuffleGrouping("countMobileNetlog");
Config conf = new Config();
conf.setNumWorkers(worknum);
conf.setMaxSpoutPending(5000);
try {
StormSubmitter.submitTopology("mobilenetlogtopology",
conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
|
builder.setSpout("analyseMobileNetlog",
new MobileNetLogAnalyseSpout(), execunum);ÉèÖÃTopologyµÄSpoutÊÇË¡£¿ÉÒÔÉèÖöà¸ö£¬ÆäÖеÚÒ»¸ö²ÎÊýÊǸüÆËã½ÚµãµÄÃû³Æ£¬ÔÚͬһ¸öTopologyÖбØÐë±£³ÖΨһÐÔ¡£µÚÈý¸ö²ÎÊýÊÇÉèÖøüÆËãµ¥ÔªÖ´ÐÐÕßÊýÁ¿µÄ²ÎÊý£¬¼´ÓжàÉÙ¸öÏß³ÌÀ´Ö´ÐиüÆËãÂß¼¡£
builder.setBolt("saveMobileNetlog",
new MobileNetLogSaveBolt(), execunum).shuffleGrouping("analyseMobileNetlog");ÉèÖÃÒ»¸öBolt¡£µÚÒ»¸ö²ÎÊý»¹ÊǸüÆËã½ÚµãµÄÃû³Æ¡£±ØÐë±£³ÖΨһÐÔ¡£µÚÈý¸ö½ÚµãͬÑùÊÇÉèÖøüÆËãµ¥ÔªÖ´ÐÐÊýÁ¿µÄ²ÎÊý
shuffleGrouping("analyseMobileNetlog")·½·¨£¬±íʾ¸ÃBolt¼ÆËãµ¥ÔªÒªËæ»ú´ÓIDΪ"analyseMobileNetlog"µÄ¼ÆËã½Úµã£¬¼´Ç°ÃæÉèÖõÄSpout¶ÔÏóÖнÓÊÕÊý¾Ý¡£
builder.setBolt("countMobileNetlog",
new MobileNetLogStatisticsBolt(), execunum).fieldsGrouping("analyseMobileNetlog",
new Fields(MobileNetLogVal.MSISDN_FIELD));ÉèÖõڶþ¸öBolt¡£ÒòΪÊÇÔÚÄÚ´æÖжÔÊÖ»úºÅÂë×öÉÏÍø¼Ç¼µÄͳ¼Æ£¬ËùÒÔÏàͬÊÖ»úºÅµÄÊý¾ÝÐèÒª·¢Ë͵½Í¬Ò»¸öMobileNetLogStatisticsBoltµÄÏß³ÌÀ´´¦Àí£¬ËùÒÔÕâÀïʹÓÃÁËÒ»¸öfieldsGrouping("analyseMobileNetlog",
new Fields(MobileNetLogVal.MSISDN_FIELD))µÄ·½Ê½À´·Ö·¢Êý¾Ý¡£
builder.setBolt("thresholdMobileNetlog",
new MobileNetLogThresholdBolt(),execunum).shuffleGrouping("countMobileNetlog");Õâ¸öûÓÐÐÂÏÊÄÚÈÝ¡£
ÔÚ×é×°TopologyµÄ¹ý³ÌÖÐÉæ¼°µ½Ò»¸öStream GroupingµÄ¸ÅÄÕâ¸ö¸ÅÄî¾ÍÊÇTopologyµÄ¸÷¸ö¼ÆËãµ¥ÔªÖ®¼äÊý¾ÝÁ÷·Ö·¢µÄÒ»¸ö²ßÂÔ¡£²»Í¬µÄ²ßÂÔÓ¦ÓÃÔÚ²»Í¬µÄ³¡¾°Ï¡£ÀýÈçÉÏÃæµÚÈý½Ú£¬ÒªÇóij¸ö×ֶΣ¨ÊÖ»úºÅ£©ÏàͬµÄÊý¾Ý±ØÐë·¢Ë͵½ºóÐø¼ÆËã½Úµãͬһ¸öÈÎÎñÖС£Stream
GroupingĬÈÏÌṩÈçϵÄÀà±ð
shuffleGrouping£ºÊÇǰһ¸ö¼ÆËãµ¥ÔªËæ»úµÄ½«Êý¾ÝÁ÷·¢Ë͸øºóÒ»¸ö¼ÆËãµ¥ÔªµÄTask
fieldsGrouping£ºÊÇͨ¹ýij¸ö×Ö¶ÎÀ´Ê¶±ð£¬½«¸Ã×Ö¶ÎÖµÒ»ÑùµÄÊý¾ÝÁ÷·¢Ë͸øºóÒ»¸ö¼ÆËãµ¥ÔªµÄͬһ¸öTask¡£¼´Èç¹ûÒÔÊÖ»úºÅΪfieldsGroupingµÄÒÀ¾Ý£¬Ç°Ò»´Î½«ÊÖ»úºÅΪ138111111111µÄÊý¾Ý·¢ËÍTask1£¬ÔòÏÂÒ»´ÎµÄ138111111111µÄÊý¾Ý²»»á·¢¸øÆäËûµÄTaskÀ´´¦Àí¡£
All grouping£º½«Êý¾ÝÁ÷·¢Ë͸øËùÓкóÒ»¸ö¼ÆËã½ÚµãµÄTask£¬¼´Èç¹ûºóÒ»¸ö¼ÆËã½ÚµãÓÐ5¸öTask£¬Õâ5¸öTask¶¼½«½ÓÊÕµ½ËùÓеÄÊý¾Ý¡£ÎÒÏÖÔÚÏë²»µ½Õâ¸öÓÃÔÚʲô³¡¾°Ï¡£
Global grouping£ºÕâ¸öÒâ˼ÊÇ˵Êý¾ÝÁ÷½«±»Ïû·ÑËûµÄBoltµÄÈÎÎñºÅ×îСµÄÈÎÎñ½ÓÊÕ¡£
None grouping£ºÕâ¸öÒâ˼ÊÇÄã²»¹ØÏµÊý¾ÝÈçºÎ±»·Ö·¢£¬Êµ¼ÊÉÏ£¬ËûµÈͬÓÚshuffleGrouping¡£
Direct grouping£ºÕâ¸öÊÇ˵ÈÃÉú³ÉÔª×éµÄ¶ÔÏó×Ô¼º¾ö¶¨Ïû·ÑÕâ¸öÔª×éµÄ¼ÆËãµ¥ÔªµÄÄĸöÈÎÎñÀ´½ÓÊÕÕâ¸öÔª×é¡£ËûÔÚ·¢ÉäµÄʱºò±ØÐëÓÃemitDirect·½·¨À´·¢É䣬²¢Ö¸Ã÷taskId¡£
4.2.4Ìá½»Ö´ÐÐ
Ìá½»Õâ¸östormµÄTopologyÈÎÎñ֮ǰ£¬Òª½«ÒÔÉÏ´úÂëÐèÒªµÄÏà¹ØjarÖкÍstorm0.8.1ÎÞ¹ØµÄÆäËûjar°üÈ«²¿´òµ½Ò»¸öjarÖУ¬ÀýÈçÉÏÊöÏà¹ØµÄ¾ÍÓÐ×Ô¼ºÊµÏÖµÄHDFSClient¡¢HTableClientÒÔ¼°ËûÃÇÏà¹ØµÄHadoop¡¢HbaseÒÀÀµ°ü¶¼´ò°üµ½Í¬Ò»¸östormdemo-1.0-SNAPSHOT-jar-with-dependencies.jarÎļþÖС£ÕâÀï¿ÉÒÔʹÓÃmavenµÄ×°Åä²å¼þÀ´Íê³ÉÕâ¸ö´ò°üµÄ¹¤×÷
<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> |
Õâ¸ö×é×°²å¼þ¿ÉÒÔ½«ÄãµÄÏîÄ¿ÒÀÀµµÄËùÓÐjar°ü½âѹºÍÄãµÄclassÒ»Æð´ò°üµ½ÄãµÄjarÎļþÖС£µ±È»£¬ÀïÃæÒ²»áÓÐһЩ¿Ó¡£ÀýÈç¶à¸öjar°üÖÐÓÐͬһ¸öÅäÖÃÎļþ»¥Ïา¸Çµ¼ÖµÄÎÊÌâÎÒ¾ÍÓöµ½¹ý¡£
´ò°üºÃÁ˺󣬽«¸ÃjarÎļþÉÏ´«µ½stormµÄnimbus½ø³ÌËùÔڵķþÎñÆ÷ÉÏ£¬Ö´ÐÐÈçϵÄÃüÁî
storm jar stormdemo-1.0-SNAPSHOT-jar-with-dependencies.jar
com.ygc.mobilenet.MobileNetLogTopology 16 4
ÆäÖÐ16¡¢4ÊÇcom.ygc.mobilenet.MobileNetLogTopology¶ÔÏóÒª¶ÁÈ¡µÄµÄÃüÁîÐвÎÊý¡£·Ö±ðÓû§ÓÃÀ´ÉèÖÃÖ´ÐиÃTopologyµÄworkerµÄÊýÁ¿ºÍexecutorsµÄÊýÁ¿¡£ÕâÁ½¸öÊýÁ¿·Ö±ðÉèÖÃÁËÕâ¸öTopoployÒªÖ´ÐеĽø³ÌÊýºÍÿ¸ö¼ÆËãµ¥ÔªÖ´ÐеÄÏß³ÌÊý¡£
Èç¹û³É¹¦£¬»áÏÔʾÈçϵÄÐÅÏ¢
0 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar... 10 [main] INFO backtype.storm.StormSubmitter -
Uploading topology jar stormdemo-1.0-SNAPSHOT-jar-with-dependencies.
jar to assigned location: /home/storm/stormlocale/nimbus/inbox/stormjar-a3bdefb1-6ec5-40ab-8153-6baab889b043.jar 593 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology
jar to assigned location: /home/storm/stormlocale/nimbus/inbox/stormjar-a3bdefb1-6ec5-40ab-8153-6baab889b043.jar 593 [main] INFO backtype.storm.StormSubmitter - Submitting topology
mobilenetlogtopology in distributed mode with conf {"topology.workers":16,"topology.max.spout.pending":5000} 719 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: mobilenetlogtopology |
ÔÚStorm UI½çÃæÉϽ«¿´µ½ÈçϵĽçÃæ

4.2.5Ö´ÐÐЧ¹û4.2.5.1²é¿´Storm UIЧ¹û
ÏÖÔÚÎÒÃǽ«¼¸¸öÒª´¦ÀíµÄÉÏÍøÈÕÖ¾ÎļþÉÏ´«µ½HDFSµÄ/storm/mobilelistÎļþ¼ÐÖС£µÈÒ»»áË¢ÐÂstorm
UI½çÃæ£¬µã»÷ÉÏͼÖеÄmobilenetlogtopologyÁ´½Ó£¬¿ÉÒÔ¿´µ½ÈçÏÂµÄ»Ãæ¡£ÀïÃæÊǸÃTopologyÔËÐеÄÏêÇé

4.2.5.2²é¿´HBaseÊý¾ÝÔÚHBaseµÄHMasterÉÏÖ´ÐÐ
hbase shell
½øÈëHBase½»»¥½çÃæ£¬Ö´ÐÐÈçϵÄÃüÁî
scan 't_mobilenet_log'

ÁíÒ»¸ö±íµÄÊý¾Ý¾Í²»¿´ÁË¡£µ½ÕâÀïÒ»¸öÍêÕûµÄStormµÄÁ÷¼ÆËãÓ¦ÓþÍËãÊÇÍê³ÉÁË£¬Ï£Íû¶Ô´ó¼ÒÓÐÓá£Èç¹ûÓв»¶ÔµÄµØ·½Ò²ÇëÖ¸Õý¡£ÎÒÒ²ÊǸոսӴ¥¡£Àí½â²»µ½Î»µÄµØ·½»¹Óв»ÉÙ¡£Ï£ÍûÒ»ÆðÌÖÂÛ¡£
|