
ÏÖÔÚÓ²¼þÔ½À´Ô½±ãÒË£¬Ò»Ì¨·ÇÆ·ÅÆ·þÎñÆ÷£¬2¿Å24ºËCPU£¬Åä48GÄڴ棬2TµÄÓ²ÅÌ£¬ÒѾ½µµ½2Íò¿éÈËÃñ±ÒÒÔÏÂÁË¡£ÕâÖÖÅäÖÃÈç¹û¼òµ¥µØ·Å¼¸¸öwebÓ¦Óã¬ÏÔÈ»ÊÇÉݳ޵ÄÀË·Ñ¡£¾ÍËãÊÇÓÃÀ´ÊµÏÖµ¥½ÚµãµÄhadoop£¬¶Ô¼ÆËã×ÊÔ´ÀË·ÑÒ²ÊǷdz£¸ßµÄ¡£¶ÔÓÚÕâô¸ßÐÔÄܵļÆËã»ú£¬ÈçºÎÓÐЧÀûÓüÆËã×ÊÔ´£¬¾Í³ÉΪ³É±¾¿ØÖƵÄÒ»ÏîÖØÒªÒéÌâÁË¡£
ͨ¹ýÐéÄ⻯¼¼Êõ£¬ÎÒÃÇ¿ÉÒÔ½«Ò»Ì¨·þÎñÆ÷£¬²ð·Ö³É12̨VPS£¬Ã¿Ì¨2ºËCPU£¬4GÄڴ棬40GÓ²ÅÌ£¬²¢ÇÒÖ§³Ö×ÊÔ´ÖØÐ·ÖÅä¡£¶àôΰ´óµÄ¼¼Êõ°¡£¡ÏÖÔÚÎÒÃÇÓÐÁË12¸ö½ÚµãµÄhadoop¼¯Èº£¬
ÈÃHadoopÅÜÔÚÔÆ¶Ë£¬ÈÃÊÀ½ç¼ÓËÙ¡£
ǰÑÔ
ZooKeeperÊÇÒ»¸ö·Ö²½Ê½µÄÐ×÷ϵͳ£¬ºÎΪÐ×÷£¬ZooKeeper¼ÛÖµÓÖÓкÎÌåÏÖ¡£Í¨¹ýÕâÆªÎÄÕµķֲ¼Ê½¶ÓÁеݸÀý£¬Ä㽫Á˽⵽ZooKeeperµÄÇ¿´ó¡£¹ØÓÚZooKeeperµÄ»ù±¾Ê¹Óã¬Çë²Î¿¼£ºZooKeeperα·Ö²½Ê½¼¯Èº°²×°¼°Ê¹ÓÃ
1. ·Ö²¼Ê½¶ÓÁÐ
¶ÓÁÐÓкܶàÖÖ²úÆ·£¬´ó¶¼ÊÇÏûϢϵͳËùʵÏֵģ¬ÏñActiveMQ,JBossMQ,RabbitMQ,IBM-MQµÈ¡£·Ö²½Ê½¶ÓÁвúÆ·²¢²»Ì«¶à£¬ÏñBeanstalkd¡£
±¾ÎÄʵÏֵķֲ¼Ê½¶ÔÁУ¬ÊÇ»ùÓÚZooKeeperÏÖʵµÄÒ»ÖÖͬ²½µÄ·Ö²½Ê½¶ÓÁУ¬µ±Ò»¸ö¶ÓÁеijÉÔ±¶¼¾ÛÆëʱ£¬Õâ¸ö¶ÓÁвſÉÓ㬷ñÔòÒ»Ö±µÈ´ýËùÓгÉÔ±µ½´ï¡£
2. Éè¼ÆË¼Â·
´´½¨Ò»¸ö¸¸Ä¿Â¼ /queue£¬Ã¿¸ö³ÉÔ±¶¼¼à¿Ø(Watch)±ê־λĿ¼/queue/start ÊÇ·ñ´æÔÚ£¬È»ºóÿ¸ö³ÉÔ±¶¼¼ÓÈëÕâ¸ö¶ÓÁУ¬¼ÓÈë¶ÓÁеķ½Ê½¾ÍÊÇ´´½¨
/queue/x(i)µÄÁÙʱĿ¼½Úµã£¬È»ºóÿ¸ö³ÉÔ±»ñÈ¡ /queue Ŀ¼µÄËùÓÐĿ¼½Úµã£¬Ò²¾ÍÊÇ x(i)¡£ÅжÏ
i µÄÖµÊÇ·ñÒѾÊdzÉÔ±µÄ¸öÊý£¬Èç¹ûСÓÚ³ÉÔ±¸öÊýµÈ´ý /queue/start µÄ³öÏÖ£¬Èç¹ûÒѾÏàµÈ¾Í´´½¨
/queue/start¡£
²úÆ·Á÷³Ìͼ

Ó¦ÓÃʵÀý

ͼ±ê½âÊÍ
app1,app2,app3,app4ÊÇ4¸ö¶ÀÁ¢µÄÒµÎñϵͳ
zk1,zk2,zk3ÊÇZooKeeper¼¯ÈºµÄ3¸öÁ¬½Óµã
/queue£¬ÊÇznodeµÄ¶ÓÁУ¬¼ÙÉè¶ÓÁ㤶ÈΪ3
/queue/x1£¬ÊÇznode¶ÓÁÐÖУ¬1ºÅÅŶÔÕߣ¬ÓÉapp1Ìá½»£¬Í¬²½ÇëÇó£¬app1¹ÒÔØµÈ´ý
/queue/x2£¬ÊÇznode¶ÓÁÐÖУ¬2ºÅÅŶÔÕߣ¬ÓÉapp2Ìá½»£¬Í¬²½ÇëÇó£¬app2¹ÒÆðµÈ´ý
/queue/x3£¬ÊÇznode¶ÓÁÐÖУ¬3ºÅÅŶÔÕߣ¬ÓÉapp3Ìá½»£¬Í¬²½ÇëÇó£¬app3¹ÒÆðµÈ´ý
/queue/start£¬µ±znode¶ÓÁÐÖÐÂúÁË£¬´¥·¢´´½¨¿ªÊ¼½Úµã
µ±/qeueu/start±»´´½¨ºó£¬app4±»Æô¶¯£¬ËùÓÐzkµÄÁ¬½Ó֪ͨͬ²½³ÌÐò(ºìÉ«Ïß)£¬¶ÓÁÐÒÑÍê³É£¬ËùÓгÌÐò½áÊø
×¢£º
1). ´´½¨/queue/x1,/queue/x2,/queue/x3ûÓÐǰºó˳Ðò£¬Ìá½»ºó³ÌÐò¾Íͬ²½¹ÒÆð¡£
2). app1¿ÉÒÔͨ¹ýzk2Ìá½»£¬app2Ò²¿Éͨ¹ýzk3Ìá½»
3). app1¿ÉÒÔÌá½»3´ÎÇëÇó£¬Éú³Éx1,x2,x3ʹÓöÓÁгäÂú
4). /queue/start±»´´½¨ºó£¬zk1»á¼àÌýµ½Õâ¸öʼþ£¬ÔÙ¸æËßapp1£¬¶ÓÁÐÒÑÍê³É£¡
3. ³ÌÐòʵÏÖ
1). µ¥½ÚµãÄ£ÄâʵÑé
Ä£Äâapp1£¬Í¨¹ýzk1£¬Ìá½»3¸öÇëÇó
public static void doOne() throws Exception { String host1 = "192.168.1.201:2181"; ZooKeeper zk = connection(host1); initQueue(zk); joinQueue(zk, 1); joinQueue(zk, 2); joinQueue(zk, 3); zk.close(); } |
´´½¨Ò»¸öÓë·þÎñÆ÷µÄÁ¬½Ó
public static ZooKeeper connection(String host) throws IOException { ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() { // ¼àÌý/queue/start´´½¨µÄʼþ public void process(WatchedEvent event) { if (event.getPath() != null && event.getPath().equals("/queue/start")
&& event.getType() == Event.EventType.NodeCreated) { System.out.println("Queue has Completed.Finish testing!!!"); } } }); return zk; } |
³öʼ»¯¶ÓÁÐ
public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException { System.out.println("WATCH => /queue/start"); zk.exists("/queue/start", true);
if (zk.exists("/queue", false) ==
null) {
System.out.println("create /queue task-queue");
zk.create("/queue", "task-queue".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println("/queue is exist!");
}
} |
Ôö¼Ó¶ÓÁнڵã
public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException { System.out.println("create /queue/x" + x + " x" + x); zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); isCompleted(zk); } |
¼ì²é¶ÓÁÐÊÇ·ñÍêÕû
public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException { int size = 3; int length = zk.getChildren("/queue", true).size();
System.out.println("Queue Complete:"
+ length + "/" + size);
if (length >= size) {
System.out.println("create /queue/start start");
zk.create("/queue/start", "start".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
} |
Æô¶¯º¯Êýmain
public static void main(String[] args) throws Exception { doOne(); } |
ÔËÐнá¹û£º
WATCH => /queue/start /queue is exist! create /queue/x1 x1 Queue Complete:1/3 create /queue/x2 x2 Queue Complete:2/3 create /queue/x3 x3 Queue Complete:3/3 create /queue/start start Queue has Completed.Finish testing!!! |
ÍêÈ«·ûºÏÎÒµÄÃÇÔ¤ÆÚ¡£½ÓÏÂÀ´ÎÒÃÇ¿´·Ö²¼Ê½»·¾³
2). ·Ö²¼Ê½Ä£ÄâʵÑé
Ä£Äâapp1ͨ¹ýzk1Ìá½»x1,app2ͨ¹ýzk2Ìá½»x2,app3ͨ¹ýzk3Ìá½»x3
public static void doAction(int client) throws Exception { String host1 = "192.168.1.201:2181"; String host2 = "192.168.1.201:2182"; String host3 = "192.168.1.201:2183";
ZooKeeper zk = null;
switch (client) {
case 1:
zk = connection(host1);
initQueue(zk);
joinQueue(zk, 1);
break;
case 2:
zk = connection(host2);
initQueue(zk);
joinQueue(zk, 2);
break;
case 3:
zk = connection(host3);
initQueue(zk);
joinQueue(zk, 3);
break;
}
} |
×¢£º
1). ΪÁ˼òµ¥Æð¼û£¬ÎÒÃÇûÓÐÔö¼Ó¸´ÔӵĶàÏ߳̿ØÖƵĻúÖÆ¡£
2). ûÓе÷ÓÃzk.close()·½·¨£¬Ò²¾ÍÊÇ˵£¬app1Ö´ÐÐÍêµ¥¶ÀµÄÌá½»£¬app1¾Í½áÊøÁË£¬µ«zk1»¹´æÔÚ×Å£¬ËùÒÔ/queue/x1´æÔÚÓÚ¶ÓÁС£
3). ³ÌÐòÆô¶¯·½·¨£¬·Ö3´ÎÆô¶¯£¬ÃüÁîÐд«²»Í¬µÄ²ÎÊý£¬·Ö±ðÊÇ1,2,3
Ö´ÐÐapp1¨C>zk1
#ÈÕÖ¾Êä³ö WATCH => /queue/start /queue is exist! create /queue/x1 x1 Queue Complete:1/3
#zookeeper¿ØÖÆÌ¨
[zk: 192.168.1.201:2181(CONNECTED) 4] ls /queue
[x10000000011] |
Ö´ÐÐapp2¨C>zk2
#ÈÕÖ¾Êä³ö WATCH => /queue/start /queue is exist! create /queue/x2 x2 Queue Complete:2/3
#zookeeper¿ØÖÆÌ¨
[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[x20000000012, x10000000011] |
Ö´ÐÐapp3¨C>zk3
#ÈÕÖ¾Êä³ö WATCH => /queue/start /queue is exist! create /queue/x3 x3 Queue Complete:3/3 create /queue/start start Queue has Completed.Finish testing!!!
#zookeeper¿ØÖÆÌ¨
[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[x30000000016, x10000000014, start, x20000000015] |
/queue/stats±»½¨Á¢£¬´òÓ¡³ö¡°Queue has Completed.Finish testing!!!¡±£¬´ú±íµ÷ÓÃapp4Íê³É£¡
ÎÒÃÇÍê³É·Ö²¼Ê½¶ÓÁеÄʵÑ飬ÓÉÓÚʱ¼ä²Ö´Ù¡£ÎÄ×Ö˵Ã÷¼°´úÂëÄÑÃâÓÐһЩÎÊÌ⣬Çë·¢ÏÖÎÊÌâµÄͬѧ°ïæָÕý¡£
ÏÂÃæÌùÒ»ÏÂÍêÕûµÄ´úÂ룺
package org.conan.zookeeper.demo;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
public class QueueZooKeeper {
public static void main(String[] args) throws
Exception {
if (args.length == 0) {
doOne();
} else {
doAction(Integer.parseInt(args[0]));
}
}
public static void doOne() throws Exception
{
String host1 = "192.168.1.201:2181";
ZooKeeper zk = connection(host1);
initQueue(zk);
joinQueue(zk, 1);
joinQueue(zk, 2);
joinQueue(zk, 3);
zk.close();
}
public static void doAction(int client) throws
Exception {
String host1 = "192.168.1.201:2181";
String host2 = "192.168.1.201:2182";
String host3 = "192.168.1.201:2183";
ZooKeeper zk = null;
switch (client) {
case 1:
zk = connection(host1);
initQueue(zk);
joinQueue(zk, 1);
break;
case 2:
zk = connection(host2);
initQueue(zk);
joinQueue(zk, 2);
break;
case 3:
zk = connection(host3);
initQueue(zk);
joinQueue(zk, 3);
break;
}
}
// ´´½¨Ò»¸öÓë·þÎñÆ÷µÄÁ¬½Ó
public static ZooKeeper connection(String host)
throws IOException {
ZooKeeper zk = new ZooKeeper(host, 60000, new
Watcher() {
// ¼à¿ØËùÓб»´¥·¢µÄʼþ
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeCreated
&& event.getPath().equals("/queue/start"))
{
System.out.println("Queue has Completed.Finish
testing!!!");
}
}
});
return zk;
}
public static void initQueue(ZooKeeper zk) throws
KeeperException, InterruptedException {
System.out.println("WATCH => /queue/start");
zk.exists("/queue/start", true);
if (zk.exists("/queue", false) ==
null) {
System.out.println("create /queue task-queue");
zk.create("/queue", "task-queue".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println("/queue is exist!");
}
}
public static void joinQueue(ZooKeeper zk, int
x) throws KeeperException, InterruptedException
{
System.out.println("create /queue/x"
+ x + " x" + x);
zk.create("/queue/x" + x, ("x"
+ x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
isCompleted(zk);
}
public static void isCompleted(ZooKeeper zk)
throws KeeperException, InterruptedException {
int size = 3;
int length = zk.getChildren("/queue",
true).size();
System.out.println("Queue Complete:"
+ length + "/" + size);
if (length >= size) {
System.out.println("create /queue/start start");
zk.create("/queue/start", "start".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
} |
|