Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
ZooKeeperʵÏÖ·Ö²¼Ê½¶ÓÁÐQueue
 
×÷Õß Õŵ¤(Conan)£¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-08-01
  3082  次浏览      31
 

ÏÖÔÚÓ²¼þÔ½À´Ô½±ãÒË£¬Ò»Ì¨·ÇÆ·ÅÆ·þÎñÆ÷£¬2¿Å24ºËCPU£¬Åä48GÄڴ棬2TµÄÓ²ÅÌ£¬ÒѾ­½µµ½2Íò¿éÈËÃñ±ÒÒÔÏÂÁË¡£ÕâÖÖÅäÖÃÈç¹û¼òµ¥µØ·Å¼¸¸öwebÓ¦Óã¬ÏÔÈ»ÊÇÉݳ޵ÄÀË·Ñ¡£¾ÍËãÊÇÓÃÀ´ÊµÏÖµ¥½ÚµãµÄhadoop£¬¶Ô¼ÆËã×ÊÔ´ÀË·ÑÒ²ÊǷdz£¸ßµÄ¡£¶ÔÓÚÕâô¸ßÐÔÄܵļÆËã»ú£¬ÈçºÎÓÐЧÀûÓüÆËã×ÊÔ´£¬¾Í³ÉΪ³É±¾¿ØÖƵÄÒ»ÏîÖØÒªÒéÌâÁË¡£

ͨ¹ýÐéÄ⻯¼¼Êõ£¬ÎÒÃÇ¿ÉÒÔ½«Ò»Ì¨·þÎñÆ÷£¬²ð·Ö³É12̨VPS£¬Ã¿Ì¨2ºËCPU£¬4GÄڴ棬40GÓ²ÅÌ£¬²¢ÇÒÖ§³Ö×ÊÔ´ÖØÐ·ÖÅä¡£¶àôΰ´óµÄ¼¼Êõ°¡£¡ÏÖÔÚÎÒÃÇÓÐÁË12¸ö½ÚµãµÄhadoop¼¯Èº£¬ ÈÃHadoopÅÜÔÚÔÆ¶Ë£¬ÈÃÊÀ½ç¼ÓËÙ¡£

ǰÑÔ

ZooKeeperÊÇÒ»¸ö·Ö²½Ê½µÄЭ×÷ϵͳ£¬ºÎΪЭ×÷£¬ZooKeeper¼ÛÖµÓÖÓкÎÌåÏÖ¡£Í¨¹ýÕâÆªÎÄÕµķֲ¼Ê½¶ÓÁеݸÀý£¬Ä㽫Á˽⵽ZooKeeperµÄÇ¿´ó¡£¹ØÓÚZooKeeperµÄ»ù±¾Ê¹Óã¬Çë²Î¿¼£ºZooKeeperα·Ö²½Ê½¼¯Èº°²×°¼°Ê¹ÓÃ

1. ·Ö²¼Ê½¶ÓÁÐ

¶ÓÁÐÓкܶàÖÖ²úÆ·£¬´ó¶¼ÊÇÏûϢϵͳËùʵÏֵģ¬ÏñActiveMQ,JBossMQ,RabbitMQ,IBM-MQµÈ¡£·Ö²½Ê½¶ÓÁвúÆ·²¢²»Ì«¶à£¬ÏñBeanstalkd¡£

±¾ÎÄʵÏֵķֲ¼Ê½¶ÔÁУ¬ÊÇ»ùÓÚZooKeeperÏÖʵµÄÒ»ÖÖͬ²½µÄ·Ö²½Ê½¶ÓÁУ¬µ±Ò»¸ö¶ÓÁеijÉÔ±¶¼¾ÛÆëʱ£¬Õâ¸ö¶ÓÁвſÉÓ㬷ñÔòÒ»Ö±µÈ´ýËùÓгÉÔ±µ½´ï¡£

2. Éè¼ÆË¼Â·

´´½¨Ò»¸ö¸¸Ä¿Â¼ /queue£¬Ã¿¸ö³ÉÔ±¶¼¼à¿Ø(Watch)±ê־λĿ¼/queue/start ÊÇ·ñ´æÔÚ£¬È»ºóÿ¸ö³ÉÔ±¶¼¼ÓÈëÕâ¸ö¶ÓÁУ¬¼ÓÈë¶ÓÁеķ½Ê½¾ÍÊÇ´´½¨ /queue/x(i)µÄÁÙʱĿ¼½Úµã£¬È»ºóÿ¸ö³ÉÔ±»ñÈ¡ /queue Ŀ¼µÄËùÓÐĿ¼½Úµã£¬Ò²¾ÍÊÇ x(i)¡£ÅÐ¶Ï i µÄÖµÊÇ·ñÒѾ­ÊdzÉÔ±µÄ¸öÊý£¬Èç¹ûСÓÚ³ÉÔ±¸öÊýµÈ´ý /queue/start µÄ³öÏÖ£¬Èç¹ûÒѾ­ÏàµÈ¾Í´´½¨ /queue/start¡£

²úÆ·Á÷³Ìͼ

Ó¦ÓÃʵÀý

ͼ±ê½âÊÍ

app1,app2,app3,app4ÊÇ4¸ö¶ÀÁ¢µÄÒµÎñϵͳ

zk1,zk2,zk3ÊÇZooKeeper¼¯ÈºµÄ3¸öÁ¬½Óµã

/queue£¬ÊÇznodeµÄ¶ÓÁУ¬¼ÙÉè¶ÓÁ㤶ÈΪ3

/queue/x1£¬ÊÇznode¶ÓÁÐÖУ¬1ºÅÅŶÔÕߣ¬ÓÉapp1Ìá½»£¬Í¬²½ÇëÇó£¬app1¹ÒÔØµÈ´ý

/queue/x2£¬ÊÇznode¶ÓÁÐÖУ¬2ºÅÅŶÔÕߣ¬ÓÉapp2Ìá½»£¬Í¬²½ÇëÇó£¬app2¹ÒÆðµÈ´ý

/queue/x3£¬ÊÇznode¶ÓÁÐÖУ¬3ºÅÅŶÔÕߣ¬ÓÉapp3Ìá½»£¬Í¬²½ÇëÇó£¬app3¹ÒÆðµÈ´ý

/queue/start£¬µ±znode¶ÓÁÐÖÐÂúÁË£¬´¥·¢´´½¨¿ªÊ¼½Úµã

µ±/qeueu/start±»´´½¨ºó£¬app4±»Æô¶¯£¬ËùÓÐzkµÄÁ¬½Ó֪ͨͬ²½³ÌÐò(ºìÉ«Ïß)£¬¶ÓÁÐÒÑÍê³É£¬ËùÓгÌÐò½áÊø

×¢£º

1). ´´½¨/queue/x1,/queue/x2,/queue/x3ûÓÐǰºó˳Ðò£¬Ìá½»ºó³ÌÐò¾Íͬ²½¹ÒÆð¡£

2). app1¿ÉÒÔͨ¹ýzk2Ìá½»£¬app2Ò²¿Éͨ¹ýzk3Ìá½»

3). app1¿ÉÒÔÌá½»3´ÎÇëÇó£¬Éú³Éx1,x2,x3ʹÓöÓÁгäÂú

4). /queue/start±»´´½¨ºó£¬zk1»á¼àÌýµ½Õâ¸öʼþ£¬ÔÙ¸æËßapp1£¬¶ÓÁÐÒÑÍê³É£¡

3. ³ÌÐòʵÏÖ

1). µ¥½ÚµãÄ£ÄâʵÑé

Ä£Äâapp1£¬Í¨¹ýzk1£¬Ìá½»3¸öÇëÇó

public static void doOne() throws Exception {
String host1 = "192.168.1.201:2181";
ZooKeeper zk = connection(host1);
initQueue(zk);
joinQueue(zk, 1);
joinQueue(zk, 2);
joinQueue(zk, 3);
zk.close();
}

´´½¨Ò»¸öÓë·þÎñÆ÷µÄÁ¬½Ó

public static ZooKeeper connection(String host) throws IOException {
ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
// ¼àÌý/queue/start´´½¨µÄʼþ
public void process(WatchedEvent event) {
if (event.getPath() != null && event.getPath().equals("/queue/start") && event.getType() == Event.EventType.NodeCreated) {
System.out.println("Queue has Completed.Finish testing!!!");
}
}
});
return zk;
}

³öʼ»¯¶ÓÁÐ

public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
System.out.println("WATCH => /queue/start");
zk.exists("/queue/start", true);

if (zk.exists("/queue", false) == null) {
System.out.println("create /queue task-queue");
zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println("/queue is exist!");
}
}

Ôö¼Ó¶ÓÁнڵã

public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
System.out.println("create /queue/x" + x + " x" + x);
zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
isCompleted(zk);
}

¼ì²é¶ÓÁÐÊÇ·ñÍêÕû

public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
int size = 3;
int length = zk.getChildren("/queue", true).size();

System.out.println("Queue Complete:" + length + "/" + size);
if (length >= size) {
System.out.println("create /queue/start start");
zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}

Æô¶¯º¯Êýmain

public static void main(String[] args) throws Exception {
doOne();
}

ÔËÐнá¹û£º

WATCH => /queue/start
/queue is exist!
create /queue/x1 x1
Queue Complete:1/3
create /queue/x2 x2
Queue Complete:2/3
create /queue/x3 x3
Queue Complete:3/3
create /queue/start start
Queue has Completed.Finish testing!!!

ÍêÈ«·ûºÏÎÒµÄÃÇÔ¤ÆÚ¡£½ÓÏÂÀ´ÎÒÃÇ¿´·Ö²¼Ê½»·¾³

2). ·Ö²¼Ê½Ä£ÄâʵÑé

Ä£Äâapp1ͨ¹ýzk1Ìá½»x1,app2ͨ¹ýzk2Ìá½»x2,app3ͨ¹ýzk3Ìá½»x3

public static void doAction(int client) throws Exception {
String host1 = "192.168.1.201:2181";
String host2 = "192.168.1.201:2182";
String host3 = "192.168.1.201:2183";

ZooKeeper zk = null;
switch (client) {
case 1:
zk = connection(host1);
initQueue(zk);
joinQueue(zk, 1);
break;
case 2:
zk = connection(host2);
initQueue(zk);
joinQueue(zk, 2);
break;
case 3:
zk = connection(host3);
initQueue(zk);
joinQueue(zk, 3);
break;
}
}

×¢£º

1). ΪÁ˼òµ¥Æð¼û£¬ÎÒÃÇûÓÐÔö¼Ó¸´ÔӵĶàÏ߳̿ØÖƵĻúÖÆ¡£

2). ûÓе÷ÓÃzk.close()·½·¨£¬Ò²¾ÍÊÇ˵£¬app1Ö´ÐÐÍêµ¥¶ÀµÄÌá½»£¬app1¾Í½áÊøÁË£¬µ«zk1»¹´æÔÚ×Å£¬ËùÒÔ/queue/x1´æÔÚÓÚ¶ÓÁС£

3). ³ÌÐòÆô¶¯·½·¨£¬·Ö3´ÎÆô¶¯£¬ÃüÁîÐд«²»Í¬µÄ²ÎÊý£¬·Ö±ðÊÇ1,2,3

Ö´ÐÐapp1¨C>zk1

#ÈÕÖ¾Êä³ö
WATCH => /queue/start
/queue is exist!
create /queue/x1 x1
Queue Complete:1/3

#zookeeper¿ØÖÆÌ¨
[zk: 192.168.1.201:2181(CONNECTED) 4] ls /queue
[x10000000011]

Ö´ÐÐapp2¨C>zk2

#ÈÕÖ¾Êä³ö
WATCH => /queue/start
/queue is exist!
create /queue/x2 x2
Queue Complete:2/3

#zookeeper¿ØÖÆÌ¨
[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[x20000000012, x10000000011]

Ö´ÐÐapp3¨C>zk3

#ÈÕÖ¾Êä³ö
WATCH => /queue/start
/queue is exist!
create /queue/x3 x3
Queue Complete:3/3
create /queue/start start
Queue has Completed.Finish testing!!!

#zookeeper¿ØÖÆÌ¨
[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[x30000000016, x10000000014, start, x20000000015]

/queue/stats±»½¨Á¢£¬´òÓ¡³ö¡°Queue has Completed.Finish testing!!!¡±£¬´ú±íµ÷ÓÃapp4Íê³É£¡

ÎÒÃÇÍê³É·Ö²¼Ê½¶ÓÁеÄʵÑ飬ÓÉÓÚʱ¼ä²Ö´Ù¡£ÎÄ×Ö˵Ã÷¼°´úÂëÄÑÃâÓÐһЩÎÊÌ⣬Çë·¢ÏÖÎÊÌâµÄͬѧ°ïæָÕý¡£

ÏÂÃæÌùÒ»ÏÂÍêÕûµÄ´úÂ룺

package org.conan.zookeeper.demo;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;

public class QueueZooKeeper {

public static void main(String[] args) throws Exception {
if (args.length == 0) {
doOne();
} else {
doAction(Integer.parseInt(args[0]));
}
}

public static void doOne() throws Exception {
String host1 = "192.168.1.201:2181";
ZooKeeper zk = connection(host1);
initQueue(zk);
joinQueue(zk, 1);
joinQueue(zk, 2);
joinQueue(zk, 3);
zk.close();
}

public static void doAction(int client) throws Exception {
String host1 = "192.168.1.201:2181";
String host2 = "192.168.1.201:2182";
String host3 = "192.168.1.201:2183";

ZooKeeper zk = null;
switch (client) {
case 1:
zk = connection(host1);
initQueue(zk);
joinQueue(zk, 1);
break;
case 2:
zk = connection(host2);
initQueue(zk);
joinQueue(zk, 2);
break;
case 3:
zk = connection(host3);
initQueue(zk);
joinQueue(zk, 3);
break;
}
}

// ´´½¨Ò»¸öÓë·þÎñÆ÷µÄÁ¬½Ó
public static ZooKeeper connection(String host) throws IOException {
ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
// ¼à¿ØËùÓб»´¥·¢µÄʼþ
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals("/queue/start")) {
System.out.println("Queue has Completed.Finish testing!!!");
}
}
});
return zk;
}

public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
System.out.println("WATCH => /queue/start");
zk.exists("/queue/start", true);

if (zk.exists("/queue", false) == null) {
System.out.println("create /queue task-queue");
zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println("/queue is exist!");
}
}

public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
System.out.println("create /queue/x" + x + " x" + x);
zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
isCompleted(zk);
}

public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
int size = 3;
int length = zk.getChildren("/queue", true).size();

System.out.println("Queue Complete:" + length + "/" + size);
if (length >= size) {
System.out.println("create /queue/start start");
zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}

}

   
3082 ´Îä¯ÀÀ       31
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí