ÎÄÕ½éÉÜÁËÈçºÎÕûºÏÐéÄ⻯ºÍHadoop£¬ÈÃHadoop¼¯ÈºÅÜÔÚVPSÐéÄâÖ÷»úÉÏ£¬Í¨¹ýÔÆÏòÓû§Ìṩ´æ´¢ºÍ¼ÆËãµÄ·þÎñ¡£
ÏÖÔÚÓ²¼þÔ½À´Ô½±ãÒË£¬Ò»Ì¨·ÇÆ·ÅÆ·þÎñÆ÷£¬2¿Å24ºËCPU£¬Åä48GÄڴ棬2TµÄÓ²ÅÌ£¬ÒѾ½µµ½2Íò¿éÈËÃñ±ÒÒÔÏÂÁË¡£ÕâÖÖÅäÖÃÈç¹û¼òµ¥µØ·Å¼¸¸öwebÓ¦Óã¬ÏÔÈ»ÊÇÉݳ޵ÄÀË·Ñ¡£¾ÍËãÊÇÓÃÀ´ÊµÏÖµ¥½ÚµãµÄhadoop£¬¶Ô¼ÆËã×ÊÔ´ÀË·ÑÒ²ÊǷdz£¸ßµÄ¡£¶ÔÓÚÕâô¸ßÐÔÄܵļÆËã»ú£¬ÈçºÎÓÐЧÀûÓüÆËã×ÊÔ´£¬¾Í³ÉΪ³É±¾¿ØÖƵÄÒ»ÏîÖØÒªÒéÌâÁË¡£
ͨ¹ýÐéÄ⻯¼¼Êõ£¬ÎÒÃÇ¿ÉÒÔ½«Ò»Ì¨·þÎñÆ÷£¬²ð·Ö³É12̨VPS£¬Ã¿Ì¨2ºËCPU£¬4GÄڴ棬40GÓ²ÅÌ£¬²¢ÇÒÖ§³Ö×ÊÔ´ÖØÐ·ÖÅä¡£¶àôΰ´óµÄ¼¼Êõ°¡£¡ÏÖÔÚÎÒÃÇÓÐÁË12¸ö½ÚµãµÄhadoop¼¯Èº£¬
ÈÃHadoopÅÜÔÚÔÆ¶Ë£¬ÈÃÊÀ½ç¼ÓËÙ¡£

ǰÑÔ
ZooKeeperÊÇÒ»¸öÇ¿´óµÄ·Ö²¼Ê½Ð×÷ϵͳ£¬ÓÃZooKeeper¿ÉÒÔ·½±ãµØÊµÏÖÏȽøÏȳö(FIFO)¶ÓÁС£¸ø¡°¶ÓÁС±µÄ¼¼ÊõÏÖʵ¶àÒ»ÖÖÑ¡Ôñ£¬±ê×¼»¯ÎÒÃǵijÌÐò½á¹¹¡£Áíһƪ£¬·Ö²½Ê½Í¬²½¶ÓÁÐʵÏÖ£¬Çë²Î¿¼£ºZooKeeperʵÏÖ·Ö²¼Ê½¶ÓÁÐQueue
¹ØÓÚZooKeeperµÄ»ù±¾Ê¹Óã¬Çë²Î¿¼£ºZooKeeperα·Ö²½Ê½¼¯Èº°²×°¼°Ê¹Óá£
1. ·Ö²¼Ê½ÏȽøÏȳö(FIFO)¶ÓÁÐ
ÔÚ¼ÆËã»ú¿ÆÑ§ÖУ¬ÏûÏ¢¶ÓÁУ¨Message queue£©ÊÇÒ»ÖÖ½ø³Ì¼äͨÐÅ»òͬһ½ø³ÌµÄ²»Í¬Ï̼߳äµÄͨÐÅ·½Ê½¡£ÏûÏ¢¶ÓÁÐÌṩÁËÒì²½µÄͨÐÅÐÒ飬ÏûÏ¢µÄ·¢ËÍÕߺͽÓÊÕÕß²»ÐèҪͬʱÓëÏûÏ¢¶ÓÁл¥½»¡£ÏûÏ¢»á±£´æÔÚ¶ÓÁÐÖУ¬Ö±µ½½ÓÊÕÕßÈ¡»ØËü¡£
ÏȽøÏȳö(FIFO)¶ÓÁУ¬ÊÇÏûÏ¢¶ÓÁÐ×î»ù±¾µÄÒ»ÖÖʵÏÖÐÎʽ£¬ÏÈ·¢³öµÄÏÈÏû·Ñ¡£
2. Éè¼ÆË¼Â·
ʵÏÖµÄ˼·Ҳ·Ç³£¼òµ¥£¬ÔÚ/queue-fifoµÄĿ¼Ï´´½¨ SEQUENTIAL
ÀàÐ͵Ä×ÓĿ¼ /x(i)£¬ÕâÑù¾ÍÄܱ£Ö¤ËùÓгÉÔ±¼ÓÈë¶ÓÁÐʱ¶¼ÊÇÓбàºÅµÄ£¬³ö¶ÓÁÐʱͨ¹ý getChildren(
) ·½·¨¿ÉÒÔ·µ»Øµ±Ç°ËùÓеĶÓÁÐÖеÄÔªËØ£¬È»ºóÏû·ÑÆäÖÐ×îСµÄÒ»¸ö£¬ÕâÑù¾ÍÄܱ£Ö¤FIFO¡£

Ó¦ÓÃʵÀý

ͼ±ê½âÊÍ
1.app1,app2,app3ÊÇ3¸ö¶ÀÁ¢µÄÒµÎñϵͳ
2.zk1,zk2,zk3ÊÇZooKeeper¼¯ÈºµÄ3¸öÁ¬½Óµã
3./queue-fifo£¬ÊÇznodeµÄ¶ÓÁУ¬°´Ë³Ðò´æ´¢Êý¾Ý
4./queue-fifo/x1£¬ÊÇznode¶ÓÁÐÖУ¬1ºÅÅŶÔÕߣ¬ÓÉapp1Ìá½»
5./queue-fifo/x2£¬ÊÇznode¶ÓÁÐÖУ¬2ºÅÅŶÔÕߣ¬ÓÉapp2Ìá½»
6.app3ÊÇÏû·ÑÕߣ¬Í¨¹ýzk3Á¬½Óµ½znode¶ÓÁÐÖУ¬ÕÒµ½/queue-fifoÖÐ˳Ðò×îÉٵĽڵãÏû·Ñ£¬É¾³ýÏû·ÑºóµÄ½Úµã(ºìÉ«Ïß±íʾ)
×¢£º
1). app1¿ÉÒÔͨ¹ýzk2Ìá½»£¬app2Ò²¿Éͨ¹ýzk3Ìá½»
2). app1¿ÉÒÔÌá½»3´ÎÇëÇó£¬Éú³Éx1,x2,x3¶à¸ö½Úµã
3). app1¿ÉÒÔ×÷ΪÏû·ÑÕߣ¬Ïû·Ñ¶ÓÁÐÊý¾Ý
3. ³ÌÐòʵÏÖ
1). µ¥½ÚµãÄ£ÄâʵÑé
Ä£Äâapp1£¬Í¨¹ýzk1£¬Éú²ú2¸ö½Úµã£¬È»ºóÔÙÏû·Ñ3¸ö½Úµã¡£
public static void doOne() throws Exception {
String host1 = "192.168.1.201:2181";
ZooKeeper zk = connection(host1);
initQueue(zk);
produce(zk, 1);
produce(zk, 2);
cosume(zk);
cosume(zk);
cosume(zk);
zk.close();
}
|
´´½¨Ò»¸öÓë·þÎñÆ÷µÄÁ¬½Ó
public static ZooKeeper connection(String host) throws IOException {
ZooKeeper zk = new ZooKeeper(host, 60000, null);
return zk;
}
|
³öʼ»¯¶ÓÁÐ
public static ZooKeeper connection(String host) throws IOException {
return new ZooKeeper(host, 60000, new Watcher() {
public void process(WatchedEvent event) {
}
});
}
|
Éú²úÕß
public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
System.out.println("create /queue-fifo/x" + x + " x" + x);
zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
|
Ïû·ÑÕß
public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
List list = zk.getChildren("/queue-fifo", true);
if (list.size() > 0) {
long min = Long.MAX_VALUE;
for (String num : list) {
if (min > Long.parseLong(num.substring(1))) {
min = Long.parseLong(num.substring(1));
}
}
System.out.println("delete /queue/x" + min);
zk.delete("/queue-fifo/x" + min, 0);
} else {
System.out.println("No node to cosume");
}
}
|
Æô¶¯mainº¯Êý
public static void main(String[] args) throws Exception {
doOne();
}
|
ÔËÐнá¹û£º
/queue-fifo is exist!
create /queue-fifo/x1 x1
create /queue-fifo/x2 x2
delete /queue/x10000000032
delete /queue/x20000000033
No node to cosume
|
ÍêÈ«·ûºÏÎÒµÄÃÇÔ¤ÆÚ£¬ÓÉÓÚproduceʱ£¬ÎÒÃÇ´´½¨µÄ½ÚµãģʽÊÇEPHEMERAL_SEQUENTIAL£¬ËùÒÔϵͳ»áÔÚx(i)(n)£¬Ëæ»úÉú³Én=0000000032£¬Êä³öΪx10000000032¡£
½ÓÏÂÀ´ÎÒÃÇ¿´·Ö²¼Ê½»·¾³¡£
2). ·Ö²¼Ê½Ä£ÄâʵÑé
app1ͨ¹ýzk1Éú²úx1, app2ͨ¹ýzk2Éú²úx2, app3ͨ¹ýzk3Ïû·Ñ3¸ö½Úµã
public static void doAction(int client) throws Exception {
String host1 = "192.168.1.201:2181";
String host2 = "192.168.1.201:2182";
String host3 = "192.168.1.201:2183";
ZooKeeper zk = null;
switch (client) {
case 1:
zk = connection(host1);
initQueue(zk);
produce(zk, 1);
break;
case 2:
zk = connection(host2);
initQueue(zk);
produce(zk, 2);
break;
case 3:
zk = connection(host3);
initQueue(zk);
cosume(zk);
cosume(zk);
cosume(zk);
break;
}
}
|
Æô¶¯mainº¯Êý
public static void main(String[] args) throws Exception {
if (args.length == 0) {
doOne();
} else {
doAction(Integer.parseInt(args[0]));
}
}
|
³ÌÐòÆô¶¯·½·¨£¬·Ö3´ÎÆô¶¯£¬ÃüÁîÐд«²»Í¬µÄ²ÎÊý£¬·Ö±ðÊÇ1,2,3

run1: Ö´ÐÐapp1¨C>zk1
#ÈÕÖ¾Êä³ö
/queue-fifo is exist!
create /queue-fifo/x1 x1
|
run2: Ö´ÐÐapp2¨C>zk2
#ÈÕÖ¾Êä³ö
/queue-fifo is exist!
create /queue-fifo/x2 x2
|
run3: Ö´ÐÐapp3¨C>zk3
#ÈÕÖ¾Êä³ö
/queue-fifo is exist!
delete /queue/x10000000034
delete /queue/x20000000035
No node to cosume
|
ÎÒÃÇÍê³É·Ö²¼Ê½¶ÓÁеÄʵÑ飬ÓÉÓÚʱ¼ä²Ö´Ù¡£ÎÄ×Ö˵Ã÷¼°´úÂëÄÑÃâÓÐһЩÎÊÌ⣬Çë·¢ÏÖÎÊÌâµÄͬѧ°ïæָÕý¡£
ÏÂÃæÌùÒ»ÏÂÍêÕûµÄ´úÂ룺
package org.conan.zookeeper.demo;
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class FIFOZooKeeper {
public static void main(String[] args) throws Exception {
if (args.length == 0) {
doOne();
} else {
doAction(Integer.parseInt(args[0]));
}
}
public static void doOne() throws Exception {
String host1 = "192.168.1.201:2181";
ZooKeeper zk = connection(host1);
initQueue(zk);
produce(zk, 1);
produce(zk, 2);
cosume(zk);
cosume(zk);
cosume(zk);
zk.close();
}
public static void doAction(int client) throws Exception {
String host1 = "192.168.1.201:2181";
String host2 = "192.168.1.201:2182";
String host3 = "192.168.1.201:2183";
ZooKeeper zk = null;
switch (client) {
case 1:
zk = connection(host1);
initQueue(zk);
produce(zk, 1);
break;
case 2:
zk = connection(host2);
initQueue(zk);
produce(zk, 2);
break;
case 3:
zk = connection(host3);
initQueue(zk);
cosume(zk);
cosume(zk);
cosume(zk);
break;
}
}
// ´´½¨Ò»¸öÓë·þÎñÆ÷µÄÁ¬½Ó
public static ZooKeeper connection(String host) throws IOException {
return new ZooKeeper(host, 60000, new Watcher() {
public void process(WatchedEvent event) {
}
});
}
public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
if (zk.exists("/queue-fifo", false) == null) {
System.out.println("create /queue-fifo task-queue-fifo");
zk.create("/queue-fifo", "task-queue-fifo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
System.out.println("/queue-fifo is exist!");
}
}
public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
System.out.println("create /queue-fifo/x" + x + " x" + x);
zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
}
public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
List list = zk.getChildren("/queue-fifo", true);
if (list.size() > 0) {
long min = Long.MAX_VALUE;
for (String num : list) {
if (min > Long.parseLong(num.substring(1))) {
min = Long.parseLong(num.substring(1));
}
}
System.out.println("delete /queue/x" + min);
zk.delete("/queue-fifo/x" + min, 0);
} else {
System.out.println("No node to cosume");
}
}
}
|
|