1
Á÷Êý¾Ý¼à¿ØÉè¼Æ¸ÅÊö
1.1 ¸ÅÊöǰÑÔ
1.֮ǰ¸ú´ó¼Ò˵Ҫ¸ø´ó¼ÒдһЩstormʵʱ´¦ÀíµÄ´úÂ룬±¾À´´òËãÖÜĩдµÄ£¬µ«ÖÜĩȥÅÀÏãɽÁË£¬ËùÒÔ¡³ÙÁ˼¸Ìì(ÕâЩËãÊÇ·Ï»°)¡£
2.ÍøÉÏÓÐÈËÌù³öµÄ¹ØÓÚGPSʵʱ´¦ÀíµÄ´úÂ룬¸öÈ˸оõÆäʵʱ´¦ÀíÖ»ÊÇÔÚËÙ¶ÈÕâÒ»ÊôÐÔÉϽøÐÐÉÏÏÞ´¦ÀíÓÐЩ¼òµ¥ÁË£¬ËùÒÔÏë×Ô¼ºÉè¼Æ¸öÏîÄ¿£¬ËùÒÔÁË¡°Á÷Êý¾Ý¼à¿Ø¡±Õâ¸öÄ£ÄâÏîÄ¿¡£
3.ĿǰÕâ¸öÄ£ÄâÏîÄ¿±È½Ï¼òµ¥£¨¸ßÊÖÑÛÖУ©£¬µ«×ÜÌå¿ò¼ÜÓÐÁË£¬ÎÒ»áÒ»²½Ò»²½ÍêÉÆ£¬ÂýÂý»á²¹³äÍêÕû¡£
1.2 Éè¼Æ´óÌå¸ÅÊö
1.2.1 Êý¾ÝÁ÷²úÉú:Spout
Êý¾ÝÁ÷µÄ²úÉúĿǰʹÓõıȽ϶àµÄÊÇ£ºlogÎļþ¶ÁÈ¡¡¢´Ómysql£¨»òÕßÊÇÏà¹Ødb£©ÖлñÈ¡¡¢´ÓÏûÏ¢Öмä¼þ£¨Èçmetaq£©ÖлñÈ¡¼°Ê¹ÓÃsocket´ÓÍøÂçÖлñÈ¡¡£
²¹³ä£º
ÔÚ¸ÃÏîÄ¿ÖУ¬ÓÉÓÚÎÒµÄmetaq»¹Ã»´îºÃ£¬ËùÒÔ¾ÍÖ±½Ó²ÉÓöÁÈ¡logµÄ·½Ê½×÷ΪԴÊý¾Ý£¬Íùºó»á¸ø³ömetaq×÷ΪÊý¾ÝÔ´µÄ½Ó¿Ú¼°mysql×÷ΪÊý¾ÝÔ´µÄ½Ó¿ÚµÈ¡£
1.2.2 ´¦ÀíÊý¾Ý£ºHandleBolt
ÕâÀïµÄHandleBoltÊÇ¿í·ºµÄ¸ÅÄָ¶ÔÊý¾Ý½øÐд¦ÀíµÄÏà¹ØBolt£¬Ä¿Ç°±È½Ï³£¼ûµÄ´¦Àí·½Ê½ÊÇÊý¾Ý¹ýÂË¡¢Êý¾ÝÌí¼Ó¡¢²¿·ÖÊý¾Ýͳ¼Æ¡¢Êý¾Ý¼à¿ØµÈµÈ¡£ÕâЩ¶¼ÊDZȽϳ£¼ûµÄÊý¾Ýʵʱ´¦Àí·½Ê½¡£
²¹³ä£º
¸ÃÏîÄ¿ÖÐÊý¾Ý´¦Àí²¿·ÖʹÓÃÊý¾Ý¼à¿Ø´¦Àí£¬¼°¶ÔÊý¾ÝÁ÷½øÐÐÌõ¼þ¹ýÂË£¬½«²¿·Ö·ûºÏÌõ¼þµÄÊý¾Ýɸѡ³öÀ´×ö½øÒ»²½´¦Àí£¬´ïµ½Ìõ¼þÊý¾Ý¼à¿ØµÄÄ¿µÄ¡£Ä¿Ç°¸Ã²¿·ÖÖ§³Ö¶àÖÖÌõ¼þÅжϷ½Ê½×éºÏ£¬¶à¸ö×Ö¶Î×éºÏÅжϼ°¶àÖÖÂß¼ÅжϷ½Ê½¡£Íùºó»á½øÒ»²½ÍíÉÏ¡£
1.2.3 Êý¾Ý³Ö¾Ã»¯£ºLastingBolt
LastingBolt·ºÖ¸Êý¾ÝÔÚ´¦ÀíÖ®ºó½øÐг־û¯²Ù×÷µÄ½Ó¿Ú£¬³£¼ûµÄ³Ö¾Ã»¯²Ù×÷½Ó¿ÚÊÇ£ºÖ±½Ó´òÓ¡£¨Õâ¸öÃ²ËÆ²»Ë㣩¡¢Ð´ÈëfileÖС¢Ð´Èëmysql£¨¼°ÆäËûdb£©ÖС¢Ð´ÈëÏûÏ¢Öмä¼þ£¨metaq£©¹©ÆäËûÒµÎñµ÷Óá¢Ê¹ÓÃSocketдÈëÍøÂç¶Ë¿ÚÖеȵȡ£
²¹³ä£º
¸ÃÏîÄ¿ÖÐÔÝʱÉè¼ÆÁ½¸öÊý¾Ý³Ö¾Ã»¯Bolt£¬Ò»¸öÊÇÖ±½Ó´òÓ¡³öÀ´£¨±È½ÏÖ±¹Û£©£¬¶þÊÇ´æÈëmysqlÖС£ÆäËû·½Ê½½Ó¿Ú»áÂýÂýµÄ¸ø³ö¡£
2 Êý¾Ý¼à¿ØÉè¼Æ¿ò¼Ü
2.1 Êý¾Ý¼à¿ØÉè¼ÆÍØÆË

ͼ2.1 Êý¾Ý¼à¿ØÉè¼ÆÍØÆË
Êý¾Ý¼à¿ØÉè¼ÆÍØÆË˵Ã÷£º
ReadLogSpout:
¸Ã²¿·Öspout´Ódomain.log£¨ÉÔºó¸ø³ölog˵Ã÷£©¶ÁÈ¡Êý¾Ý£¬Ã¿´Î¶ÁȡһÐмǼ£¬¸ÃÊý¾ÝΪÓòÃû³öÊÛlog£¬¶ÁÈ¡Êý¾Ýºó½»¸øMonitorBolt½øÐд¦Àí¡£
MonitorBolt£º
¸Ã²¿·ÖBolt¶ÔÊý¾Ý½øÐнâÎö£¬¶ÁÈ¡ÅäÖÃÎļþMonitorBolt.xmlÖеÄÂß¼Åжϼ°Ïà¹Ø¹ýÂ˹æÔòµÈ£¬½øÐÐÊý¾Ý¹ýÂË£¬½«·ûºÏÌõ¼þµÄÊý¾Ý·¢Éäµ½ÏÂÒ»¼¶£¨ÉÔºóÓÐBolt·ÖÎö£©¡£
MysqlBolt£º
ÔÚMonitorBolt´¦ÀíÍêÊý¾ÝÖ®ºó½«Êý¾Ý´æÈëmysqlµÄ¿â±íÖУ¬Êý¾Ý¿âÏà¹ØÅäÖôÓÅäÖÃÎļþMysqlBolt.xmlÖжÁÈ¡¡£
PrintBolt£º
½«½á¹ûÖ±½Ó´òÓ¡³öÀ´¡£
2.2 Êý¾ÝÁ÷¼à¿Ø»·¾³ÍØÆË

ͼ2.2 ÍøÂç»·¾³ÍØÆË
ÍØÆË˵Ã÷£º
¸ÃÏîĿֻ´î½¨Èý¸östorm½ÚµãÒ»¸öΪNimbus½Úµã£¬Á½¸ösupervisor½Úµã£¬ÆäÖÐÔÚNimbus½ÚµãÖÐÓÖ°²×°ÓÐmysql¡£
3 Êý¾Ý¼à¿ØÏêϸÉè¼Æ
3.1 Ô´Êý¾Ý˵Ã÷
ĿǰԴÊý¾Ý´Ódomain.logÖжÁÈ¡£¬¸ÃlogΪÈ˹¤¹¹Ô죬ģÄâÓòÃûÅÄÂôµÄlog£¬ÆäÖÐÓÐÎå¸ö×ֶΣ¬·Ö±ðΪdomain£¨ÓòÃû£©¡¢value£¨ÊÛ¼Û£©¡¢time£¨ÉêÇëÄê·Ý£©¡¢validity£¨ÓÐЧÆÚ£©¡¢seller£¨Âô¼Ò£©¡£ÏêϸÈçÏ£º

ͼ3.1 Ô´Êý¾Ý˵Ã÷
Ò»ÐÐÊý¾ÝΪһÌõ¼Ç¼£¬Ã¿Ìõ¼Ç¼ÓÐ5¸öÊôÐÔ¡£
3.2 Êý¾Ý¼à¿ØÉè¼Æ
´ÓMonitorµÄÅäÖÃÎļþÖÐ˵Ã÷Êý¾Ý¼à¿ØµÄÉè¼Æ£º

ͼ3.2 MonitorBolt.xml½ØÍ¼
²ÎÊý˵Ã÷£º
MatchLogic£ºÌõ¼þ¼äµÄÂß¼¹ØÏµ£¬ÓÃÓÚÈçϼ¸¸öÌõ¼þ¼äµÄÂß¼¹ØÏµÖ¸Ã÷£¬ÆäÓС°AND¡±¼°¡°OR¡±Á½ÖÖÂß¼¹ØÏµÉèÖá£
MatchType£ºÅжÏÀàÐÍÁÐ±í£¬¸ÃÁбíÖ¸Ã÷ÁËij¸ö×Ö¶ÎÓúÎÖÖÆ¥ÅäËã·¨½øÐÐÅжϣ¬regularΪÕýÔòÆ¥Åä¡¢rangeΪ·¶Î§Æ¥Åä¡¢routine0Ϊ³£¹æÄ£ºýÆ¥Åä¡¢routine1Ϊ³£¹æÍêȫƥÅä¡£
MatchField£ºÆ¥Åä×Ö¶ÎÁÐ±í£¬Ö¸Ã÷¶ÔÄö×ֶνøÐÐÅжϡ£
FieldValue£º¶ÔÓ¦µÄ×Ö¶ÎÖµ¡£
ÈçÉÏÅäÖÃ˵Ã÷£º¶Ô×Ö¶Î1/2/5·Ö±ð½øÐÐÕýÔò¡¢·¶Î§¼°³£¹æÄ£ºýÆ¥Å䣬×Ö¶Î1Âú×ãÕýÔòÆ¥Åä.*google.*£¬×Ö¶Î2Âú×ã´Ó200µ½2001£¬×Ö¶Î5Âú×ãÄ£ºýÆ¥Åäina£¬Ö»ÓÐÈý¸öÌõ¼þͬʱÂú×ã¡°AND¡±£¬¸ÃÊý¾Ý²Å»á·¢Éäµ½ÏÂÒ»¼¶¡£
3.3 Êý¾ÝMysql´¦Àí
´ÓMysqlBolt.xmlÖнøÐÐ˵Ã÷£º

ͼ3.3 MyslqBolt.xml½ØÍ¼
MyslqÊý¾Ý´æ´¢´¦ÀíÖ¸Ã÷myslqµÄhost£¬Ö¸Ã÷database¼°from£¬Ê¹ÓÃusername¼°password½«Êý¾Ý´æ´¢ÒѾ´´½¨ºÃµÄmysql±íÖС£
3.4 Ô´Âë¼ò½é

ͼ3.4 Ô´ÂëÊ÷
Ô´Âë¼òµ¥ËµÃ÷£ºStorm°üÖÐΪ×ÜÌåÔËÐеÄTopology£¬Storm.baseĿǰֻÓÐmyslqÔ¤´¦ÀíµÄÒ»¸öÀ࣬storm.boltΪbolt´¦ÀíÀ࣬°üÀ¨monitorbolt¼°printbolt£¬storm.spout°üÖÐΪspoutÔ´Êý¾Ý½Ó¿Ú£¬storm.sourceΪ¹¹ÔìÔ´Êý¾ÝµÄÒ»¸öÀࣨÕâ¸ö¿ÉÒÔºöÂÔ£©£¬storm.xmlΪÅäÖÃÎļþ¶ÁÈ¡À࣬domain.logΪԴÊý¾Ý£¬MonitorBolt.xml¼°MyslqBolt.xml·Ö±ðΪÅäÖÃÎļþ¡£
StormÏîÄ¿£ºÁ÷Êý¾Ý¼à¿Ø <2>Á÷Êý¾Ý¼à¿Ø´úÂëÏê½â
1 ÏîÄ¿¸ÅÊö
1.1 Êý¾ÝÁ÷Ïò
Á÷Êý¾Ý¼à¿ØÎªstormÄ£ÄâÏîÄ¿£¬Ä£ÄâÊý¾ÝÔ´´ÓlogÎļþÖжÁÈ¡Êý¾Ý£¬²¢ÖðÌõ·¢Éäµ½¼à¿ØBoltÖУ¬MonitorsBolt¶ÁÈ¡ÅäÖÃÎļþMonitorBolt.xmlÖÐµÄÆ¥Å乿Ôò£¬°üÀ¨ÕýÔòÆ¥Åä¡¢·¶Î§Æ¥Åä¡¢³£¹æÄ£ºýÆ¥Åä¼°³£¹æÍêȫƥÅ䣬¶à¸öÌõ¼þ¿ÉÒÔ×éºÏ¶àÖÖÆ¥Å䷽ʽ£¬¶à¸öÌõ¼þ×ֶοÉÒÔÓÐÁ½ÖÖ²»Í¬µÄÂß¼¹ØÏµ¡£MonitorBoltÔÚ´¦ÀíÊý¾ÝÖ®ºó£¨¹ýÂ˳ö·ûºÏÆ¥Å乿ÔòµÄÊý¾Ý£©£¬·¢Éäµ½Êý¾Ý³Ö¾Ã»¯BoltÖУ¬MysqlBolt¶ÁÈ¡ÅäÖÃÎļþMysqlBolt.xmlÖÐmysqlÏà¹ØÐÅÏ¢£¬°üÀ¨mysqlµÄhost¼°¶Ë¿Ú£¬username¼°password£¬database¼°from£¬×îºó½«Êý¾Ý²åÈëmysqlÖС£
1.2 ´úÂëÊ÷

ͼ1.2 ´úÂëÊ÷
Ô´Âë¼òµ¥ËµÃ÷£º
Storm°üÖÐΪ×ÜÌåÔËÐеÄTopology£¬Storm.baseĿǰֻÓÐmyslqÔ¤´¦ÀíµÄÒ»¸öÀ࣬storm.boltΪbolt´¦ÀíÀ࣬°üÀ¨monitorbolt¼°printbolt£¬storm.spout°üÖÐΪspoutÔ´Êý¾Ý½Ó¿Ú£¬storm.sourceΪ¹¹ÔìÔ´Êý¾ÝµÄÒ»¸öÀࣨÕâ¸ö¿ÉÒÔºöÂÔ£©£¬storm.xmlΪÅäÖÃÎļþ¶ÁÈ¡À࣬domain.logΪԴÊý¾Ý£¬MonitorBolt.xml¼°MyslqBolt.xml·Ö±ðΪÅäÖÃÎļþ¡£
2 ´úÂëÏê½â
2.1 Package storm
* @author blogchong * @Blog www.blogchong.com * @email blogchong@gmail.com * @QQ_G 191321336 * @version 2014Äê11ÔÂ9ÈÕ ÉÏÎç11:26:29 */ // ÉèÖÃÅç·¢½Úµã²¢·ÖÅä²¢·¢Êý£¬¸Ã²¢·¢Êý½«»á¿ØÖƸöÔÏóÔÚ¼¯ÈºÖеÄÏß³ÌÊý¡£ builder.setSpout("readlog", new ReadLogSpout(), 1); //´´½¨monitor¼à¿Ø¹ýÂ˽ڵã,Ö¸¶¨¸Ã½Úµã½ÓÊÕÅç·¢½ÚµãµÄ²ßÂÔÎªËæ»ú·½Ê½¡£ builder.setBolt("monitor", new MonitorBolt("MonitorBolt.xml"), 3) .shuffleGrouping("readlog"); //´´½¨mysqlÊý¾Ý´æ´¢½Úµã,²¢´«ÈëÅäÖÃÎļþ builder.setBolt("mysql", new MysqlBolt("MysqlBolt.xml"), 3) .shuffleGrouping("monitor"); |
×¢£º¸Ã²¿·Ö´úÂëÏÔʾÁËÕû¸ötopologyµÄ½á¹¹£¬Ã¿¸ö½ÚµãÓë½ÚµãÖ®¼äµÄ¹ØÏµ£¨·¢²¼Óë¶©ÔÄ£©£¬²¢ÇÒÖ¸Ã÷ÁËÿ¸ö½ÚµãµÄÅç·¢·½Ê½¡£
2.2 Package storm.xml
MonitorXml.java: import java.io.File; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilder; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.NodeList; File file = new File(fd); //´´½¨xmlÎļþÄ£°å DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); DocumentBuilder db = dbf.newDocumentBuilder(); Document doc = db.parse(file); //½«ParameterÀïµÄÏî´æÈëlistÖÐ NodeList nl = doc.getElementsByTagName("Parameter"); //´ÓlistµÄitemÖлñÈ¡²ÎÊýÖµ Element e = (Element) nl.item(0); MatchLogic = e.getElementsByTagName("MatchLogic").item(0) .getFirstChild().getNodeValue(); MatchType = e.getElementsByTagName("MatchType").item(0) .getFirstChild().getNodeValue(); MatchField = e.getElementsByTagName("MatchField").item(0) .getFirstChild().getNodeValue(); FieldValue = e.getElementsByTagName("FieldValue").item(0) .getFirstChild().getNodeValue(); |
×¢£ºMyslqXml.javaÓëMonitorXml.javaºËÐÄ´úÂëÏàËÆ£¬Ö÷ÒªÊǵ÷ÓÃjavaÖнâÎöxmlµÄÀà,Ö÷ÒªÀà¼ûÈçÉÏimport¡£
2.3 Package storm.spout
ReadLogSpout.java£º public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) { this.collector = collector; try { this.fis = new FileInputStream("domain.log"); this.isr = new InputStreamReader(fis, "UTF-8"); this.br = new BufferedReader(isr); } catch (Exception e) { e.printStackTrace(); } } public void nextTuple() { String str = ""; try { //ÖðÐжÁÈ¡·¢É䣬ֱµ½Ä©Î² while ((str = this.br.readLine()) != null) { this.collector.emit(new Values(str)); Thread.sleep(100); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } |
×¢£º¸ÃÀàΪ²úÉúÔ´Êý¾ÝµÄÀ࣬¸ÃÀàÖðÐжÁÈ¡logÎļþÖеÄÊý¾Ý£¬·¢Éäµ½ÏÂÒ»¼¶´¦ÀíBoltÖУ¬¶ÁÈ¡ÎļþʱעÒâ±àÂëת»»¡£
2.4 Package storm.base
MysqlOpt.java import java.io.Serializable; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; public class MysqlOpt implements Serializable { public Connection conn = null; PreparedStatement statement = null; // Á¬½ÓÊý¾Ý¿â public boolean connSQL(String host_p, String database, String username, String password) { String url = "jdbc:mysql://" + host_p + "/" + database + "?characterEncoding=UTF-8"; try { //ʹÓÃjdbcÇý¶¯ Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection(url, username, password); return true; } catch (ClassNotFoundException cnfex) { System.out .println("MysqlBolt-- Error: Loading JDBC/ODBC dirver failed!"); cnfex.printStackTrace(); } catch (SQLException sqlex) { System.out.println("MysqlBolt-- Error: Connect database failed!"); sqlex.printStackTrace(); } return false; } // ²åÈëÊý¾Ý public boolean insertSQL(String sql) { try { statement = conn.prepareStatement(sql); statement.executeUpdate(); return true; } catch (SQLException e) { System.out.println("MysqlBolt-- Error: Insert database failed!"); e.printStackTrace(); } catch (Exception e) { System.out.println("MysqlBolt-- Error: Insert failed!"); e.printStackTrace(); } return false; } // ¹Ø±ÕÁ¬½Ó public void deconnSQL() { try { if (conn != null) conn.close(); } catch (Exception e) { System.out.println("MysqlBolt-- Error: Deconnect database failed!"); e.printStackTrace(); } } } |
×¢£º¸ÃÀàÊÇmysqlµÄ²Ù×÷À࣬°üÀ¨mysqlµÄÁ´½Ó¡¢Êý¾Ý²åÈë¼°Êý¾Ý¿â¹Ø±ÕµÈ²Ù×÷£¬¹©Mysqlboltµ÷Óá£
2.5 Package storm.bolt
Monitorbolt.java: public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { System.out.println("MonitorBolt -- Start!"); this.collector = collector; // ´ÓconfÖлñÈ¡²ÎÊý new MonitorXml(this.monitorXml).read(); this.MatchLogic = MonitorXml.MatchLogic; this.MatchType = MonitorXml.MatchType; this.MatchField = MonitorXml.MatchField; this.FieldValue = MonitorXml.FieldValue; } public void execute(Tuple input) { //¶©ÔÄstr String str = input.getString(0); if (this.flag_par == false) { System.out .println("MonitorBolt-- Erre: can't get the path of Monitor.xml!"); } else { //µ÷ÓÃMonitor½øÐÐÌõ¼þÅжϣ¬³ýÁËstr£¬ÆäËû²ÎÊýΪÅäÖÃÎļþÖжÁÈ¡µÄÁбí boolean moni = Monitor(str, this.MatchLogic, this.MatchType, this.MatchField, this.FieldValue); if (moni == true) { // System.out.println("Monitor!!!"); this.collector.emit(new Values(str)); } } } private boolean Monitor(String str, String logic, String type, String field, String value) { //½«Áбí²ð·Ö String[] types = type.split("::"); String[] fields = field.split("::"); String[] values = value.split("::"); int flag_init = types.length; int flag = 0;//ÅжϱêÖ¾ if (logic.equals("AND")) {//Âß¼AND for (int i = 0; i < flag_init; i++) { if (types[i].equals("regular")) { //µ÷ÓÃÕýÔòÆ¥Åä·½·¨regular boolean regu = regular(str, fields[i], values[i]); if (regu == true) { flag++; } } else if (types[i].equals("range")) { //µ÷Ó÷¶Î§Æ¥Åä·½·¨range boolean ran = range(str, fields[i], values[i]); if (ran == true) { flag++; } } else if (types[i].equals("routine0")) { //µ÷Ó󣹿ģºýÆ¥Åä·½·¨routine0 boolean rou0 = routine0(str, fields[i], values[i]); if (rou0 == true) { flag++; } } else if (types[i].equals("routine1")) { //µ÷Ó󣹿ÍêȫƥÅä·½·¨routine1 boolean rou1 = routine1(str, fields[i], values[i]); if (rou1 == true) { flag++; } } } if (flag == flag_init) { //ËùÓÐÌõ¼þ¶¼Âú×ãʱ return true; } else { return false; } } else if (logic.equals("OR")) {//Âß¼OR for (int i = 0; i < flag_init; i++) { if (types[i].equals("regular")) { boolean regu = regular(str, fields[i], values[i]); if (regu == true) { flag++; } } else if (types[i].equals("range")) { boolean ran = range(str, fields[i], values[i]); if (ran == true) { flag++; } } else if (types[i].equals("routine0")) { boolean rou0 = routine0(str, fields[i], values[i]); if (rou0 == true) { flag++; } } else if (types[i].equals("routine1")) { boolean rou1 = routine1(str, fields[i], values[i]); if (rou1 == true) { flag++; } } } if (flag != 0) { return true; } else { return false; } } return false; } // ÕýÔòÆ¥ÅäÅÐ¶Ï private boolean regular(String str, String field, String value) { String[] strs = str.split("\t"); Pattern p = Pattern.compile(value); Matcher m = p.matcher(strs[Integer.parseInt(field) - 1]); boolean result = m.matches(); if (result == true) { return true; } else { return false; } } // ·¶Î§Æ¥Åä private boolean range(String str, String field, String value) { String[] strs = str.split("\t"); String[] values = value.split(","); int strss = Integer.parseInt(strs[Integer.parseInt(field) - 1]); if (values.length == 1) { if (strss > Integer.parseInt(values[0])) { return true; } else { return false; } } else if (values.length == 2 && values[0].length() == 0) { if (strss < Integer.parseInt(values[1])) { return true; } else { return false; } } else if (values.length == 2 && values[0].length() != 0) { if (strss > Integer.parseInt(values[0]) && strss < Integer.parseInt(values[1])) { return true; } else { return false; } } else { return false; } } // ³£¹æÄ£ºýÆ¥Åä private boolean routine0(String str, String field, String value) { String[] strs = str.split("\t"); String strss = strs[Integer.parseInt(field) - 1]; if (strss.contains(value) && !strss.equals(value)) { return true; } else { return false; } } // ³£¹æÍêȫƥÅä private boolean routine1(String str, String field, String value) { String[] strs = str.split("\t"); String strss = strs[Integer.parseInt(field) - 1]; if (strss.equals(value)) { return true; } else { return false; } } |
×¢1£º¸ÃÀàÖ÷ÒªÉè¼ÆÁËÆ¥Å乿Ôò£¬Ö§³Ö¶àÖÖÆ¥Å䷽ʽ£¬°üÀ¨ÕýÔò¡¢·¶Î§¡¢³£¹æÄ£ºý¼°ÍêȫƥÅ䣬ÇÒÖ§³ÖÁ½ÖÖÂß¼ÅжϹØÏµ¡£
MyslqBolt.java: public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { System.out.println("MysqlBolt -- Start!"); this.collector = collector; // ³õʼ»¯mysql Loading(); } // ²ÎÊý³õʼ»¯ public void Loading() { new MysqlXml(this.mysqlXml).read(); String host_port = MysqlXml.Host_port; // mysqlµØÖ·¼°¶Ë¿Ú String database = MysqlXml.Database; // Êý¾Ý¿âÃû String username = MysqlXml.Username; // Óû§Ãû String password = MysqlXml.Password; // ÃÜÂë this.from = MysqlXml.From; // ±íÃû if (this.mysql.connSQL(host_port, database, username, password) == false) { System.out .println("MysqlBolt--Config errer, Please check Mysql-conf: " + this.mysqlXml); flag_xml = false; } else { System.out.println("MysqlBolt-- Conf Loaded: " + this.mysqlXml); } } public void execute(Tuple input) { String str = input.getString(0); if (this.flag_par == false) { System.out .println("MysqlBolt-- Erre: can't get the path of Mysql.xml!"); } else { if (this.flag_xml == true) { String insert = send_str(str); if (this.mysql.insertSQL(insert) == false) { System.out .println("MysqlBolt-- Erre: can't insert tuple into database!"); System.out.println("MysqlBolt-- Error Tuple: " + str); } } } } //²åÈëmysqlÓï¾ä¹¹Ôì·½·¨ public String send_str(String str) { String send_tmp = null; String field[] = str.split("\t"); for (int i = 0; i < field.length; i++) { if (i == 0) { send_tmp = "'" + field[0] + "', '"; } else if (i == (field.length - 1)) { send_tmp = send_tmp + field[i] + "'"; } else { send_tmp = send_tmp + field[i] + "', '"; } } String send = "insert into " + this.from + " values (" + send_tmp + ");"; return send; } |
×¢2£º¸ÃÀàÖ÷ÒªÓÃÓÚÊý¾Ý´æ´¢£¬µ÷ÓÃÁËbase°üÖеÄmysqlOptÀàÖеĶà¸ö·½·¨£¬¶Ômysql½øÐÐÁ¬½Ó£¬Êý¾Ý²åÈë¼°Êý¾Ý¿â¹Ø±ÕµÈµÈ¡£
StormÏîÄ¿£ºÁ÷Êý¾Ý¼à¿Ø <3>Á÷Êý¾Ý¼à¿ØÊ¾ÀýÔËÐÐ
1 ÎĵµËµÃ÷
¸ÃÎĵµÎªstormÄ£ÄâÏîÄ¿µÚÈý·ÝÎĵµ£¬µÚÒ»·ÝÎĵµ¼òµ¥µÄ½éÉÜÁËÄ£ÄâÏîÄ¿µÄÉè¼Æ£¬µÚ¶þ·ÝÎĵµÎª¹Ø¼ü´úÂëÏê½â£¬Õâ·ÝÎĵµÔòÊÇʾÀýÔËÐеÄÎĵµ£¬´ÓÔ´´úÂë´ò°üµ½ÅäÖÃÎļþÅäÖã¬libÎļþµ¼È룬¼°ÈÎÎñÌá½»£¬×îºóµ½´¦ÀíÊý¾ÝÊä³öµ½mysqlÖУ¬ÕâÒ»Õû¸öÁ÷³Ì¡£
¹ØÓڸò¿·Ö´úÂë¿ÉÒÔµ½²©¿ÍÖÐÁôÑÔ»ñÈ¡£¬»òÕßÊǼÓÈë191321336¿Û¿ÛȺ»ñÈ¡¡£
2 ʾÀý˵Ã÷
2.1 Êý¾ÝÔ´
//Õâ¸öÊý¾ÝÔ´¹¹ÔìµÄÔµÓÉÊÇ£¬µ±ÄêÏë¹ýÒª½¨Õ¾µÄ£¬¹Ø×¢¹ý£¬ËùÒÔÓдˡ£
Èç½ñÍøÕ¾ÖÕÓÚ½¨ÆðÀ´ÁË£¬ÈÈÀáÓ¯Ó¯°¡~~~

ͼ2.1 Ô´Êý¾Ý
Êý¾ÝÔ´ÐÎÈçÒÔÉϽØÍ¼£¬¹²ÓÐÎå¸ö×ֶΣ¬×ֶνâÊÍÔò²»Ò»Ò»ËµÃ÷£¬Ïê¼ûÎĵµÒ»¡£ÒÔÉÏÐÎʽÊý¾Ý×÷ΪÊý¾ÝÔ´¡£
2.2 ¹ýÂ˹æÔò

ͼ2.2 monitorBolt.xml¹ýÂËÅäÖÃ
ÉèÖÃÈçÉϹýÂ˹æÔò£¬¼°¹ýÂË·ûºÏÒÔϹæÔòµÄÊý¾ÝÁ÷£ºµÚÒ»¸ö×Ö¶ÎÕýÔòÆ¥Åä.*google.*£¬µÚ¶þ×Ö¶ÎÔÚ200ÖÁ2001µÄ·¶Î§ÄÚ£¬µÚÎå¸ö×ֶηûºÏ³£¹æÄ£ºýÆ¥Å䣬¼´×Ö¶ÎÖµÖаüº¬ina¼¸¸ö×Ö·û¡£ÇÒÒÔÉÏÈý¸ö×Ö¶ÎÂß¼¹ØÏµÎª¡°AND¡±£¬¼´Í¬Ê±Âú×ãÒÔÉÏÈý¸öÌõ¼þ£¬Êý¾Ý²Å»á±»¹ýÂ˳öÀ´¡£
2.3 Êý¾Ý´æ´¢

ͼ2.3 mysqlBolt.xml´æ´¢ÅäÖÃ
ÒÔÉÏΪmysql´æ´¢ÅäÖã¬mysqlµØÖ·Îª192.168.2.240£¨ÔÚnimbus½ÚµãÉϰ²×°ÓÐmysql£©£¬Ìáǰ´´½¨ºÃstormÊý¾Ý¿â£¬ÔڸÿâÖд´½¨±ímonitor¡£
3 Ïêϸ²½Öè
3.1 ÅäÖÃÎļþ
ͼ3.1-1 ÅäÖÃÎļþ·¾¶
ÅäÖÃÎļþ·¾¶ÎÒдµÄÊÇ/root/hcy/jar/£¬½«ÕâÁ½¸öxmlÎļþ·ÅÔÚsupervisor½ÚµãµÄ¸ÃĿ¼Ï£¬³ÌÐòÔËÐÐʱ»áÈ¥¸Ã·¾¶²éÕÒ¡£²¢ÇÒ½«Êý¾ÝÔ´domian.logÒ²·Åµ½¸ÃĿ¼Ï¡£
¾ßÌåÅäÖÃÅäÖ÷½Ê½£¬²Î¿¼µÚ¶þÕÂÖеÄͼ2.2¼°2.3
3.2 ´úÂë´ò°ü

ͼ3.2 ´úÂë´ò°ü
ʹÓÃeclipseÖеÄfileÑ¡ÏîϵÄexportÑ¡Ïµ¼³öÀàÐÍΪJar file¡£½«Éú³ÉµÄjar°ü·ÅÈënimbus½ÚµãÏ¡£
3.3 Êý¾Ý¿â×¼±¸
ÔÚmysqlÖд´½¨³östormÊý¾Ý¿â£¬ÔڸÿâÖд´½¨±ímonitor£¬ÈçÏ£º

ͼ3.3-1 ´´½¨±ímonitor

ͼ3.3-2 ±íÃèÊö
Ps£ºÎªÊ¡ÊÂÎÒÈ«²¿´´½¨ÁËcharÀàÐ͵ÄŶ¡£
×¢Ò⣺
±ØÐ뽫mysqlÅäÖóÉÄܹ»Ô¶³Ì·ÃÎʵġ£
ÈçÏ£º

ͼ3.3-3 ÅäÖÃmysql
3.4 »·¾³×¼±¸
3.4.1 ÅäÖÃ
¹ØÓÚstorm.yamlÅäÖÃÈçÏ£º

ͼ3.4.1 storm.yamlÅäÖÃ
3.4.2 Æô¶¯¼¯Èº
ÔÚÈý¸ö½ÚµãÉÏÆô¶¯ZK£¬ÔÚnimbusÉÏÆô¶¯ÓÃÃüÁîstorm nimbus&Æô¶¯nimbus¼°ÓÃstorm
ui&ÃüÁîÆô¶¯UI¼à¿ØÒ³Ã棬ÔÚsupervisorÉÏÆô¶¯storm supervisor&¡£
ÔÚnimbusËùÔÚ»úÆ÷ÉÏÆô¶¯mysql¡£
3.5 Ìá½»ÈÎÎñ

ͼ3.5-1 Ìá½»ÈÎÎñÃüÁî
ÎÒ°Ñjar°ü·ÅÔÚnimbusÏÂ/root/hcy/jarĿ¼Ï¡£Ö´ÐÐÒÔÉÏÃüÁʹÓÃ¼à¿ØÒ³Ãæ¼à¿ØÈÎÎñ״̬¡£

ͼ3.5-2 UI¼à¿ØÒ³Ãæ
3.6 Êä³ö²éѯ
Ô¶³ÌµÇ¼myslq£º

ͼ3.6-1 ½øÈëstormÊý¾Ý¿â
½øÐйýÂ˽á¹û²éѯ£º

ͼ3.6-2 10Ìõ¹ýÂËÊý¾Ý²éѯ½á¹û
4 ÏîÄ¿À©Õ¹
ÏÂÒ»²½¼Æ»®¸ÄÉÆspout½Ó¿Ú£¬¼Æ»®Ô´Êý¾Ý´Ómetaq£¨ÏûÏ¢¶ÓÁÐÖжÁÈ¡£©»ñÈ¡£¬Ð´Ò»¸ömetaqÓëstormµÄ½Ó¿Ú¡£
ÓÐÐËȤµÄÅóÓÑÇë¼ÌÐø¹Ø×¢²©¿Í³æ¡£
ÎһὫÕâ¸öÄ£ÄâÏîĿһ²½Ò»²½µÄÍêÉÆ£¬³õ²½¼Æ»®ÈçÏ£º
1¡¢²¼ÖÃmetaq¼¯Èº£¬Ð´metaqÓëstormµÄ½Ó¿Ú£»
2¡¢ÊµÏÖÏßÉϸüУ¨ÀýÈ綯̬¸ü¸Ä¹ýÂ˹æÔò£©£»
3¡¢²¿Êðhadoop¼¯Èº£¬Ð´hdfsÓëstorm½Ó¿Ú£»
4¡¢Ö§³ÖÀàTop Nͳ¼Æ´¦Àí¡£ |