Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
ZooKeeperʵÏÖ·Ö²¼Ê½FIFO¶ÓÁÐ
 
×÷Õß Õŵ¤(Conan)£¬»ðÁú¹ûÈí¼þ    ·¢²¼ÓÚ 2014-07-31
  3305  次浏览      40
 

ÎÄÕ½éÉÜÁËÈçºÎÕûºÏÐéÄ⻯ºÍHadoop£¬ÈÃHadoop¼¯ÈºÅÜÔÚVPSÐéÄâÖ÷»úÉÏ£¬Í¨¹ýÔÆÏòÓû§Ìṩ´æ´¢ºÍ¼ÆËãµÄ·þÎñ¡£

ÏÖÔÚÓ²¼þÔ½À´Ô½±ãÒË£¬Ò»Ì¨·ÇÆ·ÅÆ·þÎñÆ÷£¬2¿Å24ºËCPU£¬Åä48GÄڴ棬2TµÄÓ²ÅÌ£¬ÒѾ­½µµ½2Íò¿éÈËÃñ±ÒÒÔÏÂÁË¡£ÕâÖÖÅäÖÃÈç¹û¼òµ¥µØ·Å¼¸¸öwebÓ¦Óã¬ÏÔÈ»ÊÇÉݳ޵ÄÀË·Ñ¡£¾ÍËãÊÇÓÃÀ´ÊµÏÖµ¥½ÚµãµÄhadoop£¬¶Ô¼ÆËã×ÊÔ´ÀË·ÑÒ²ÊǷdz£¸ßµÄ¡£¶ÔÓÚÕâô¸ßÐÔÄܵļÆËã»ú£¬ÈçºÎÓÐЧÀûÓüÆËã×ÊÔ´£¬¾Í³ÉΪ³É±¾¿ØÖƵÄÒ»ÏîÖØÒªÒéÌâÁË¡£

ͨ¹ýÐéÄ⻯¼¼Êõ£¬ÎÒÃÇ¿ÉÒÔ½«Ò»Ì¨·þÎñÆ÷£¬²ð·Ö³É12̨VPS£¬Ã¿Ì¨2ºËCPU£¬4GÄڴ棬40GÓ²ÅÌ£¬²¢ÇÒÖ§³Ö×ÊÔ´ÖØÐ·ÖÅä¡£¶àôΰ´óµÄ¼¼Êõ°¡£¡ÏÖÔÚÎÒÃÇÓÐÁË12¸ö½ÚµãµÄhadoop¼¯Èº£¬ ÈÃHadoopÅÜÔÚÔÆ¶Ë£¬ÈÃÊÀ½ç¼ÓËÙ¡£

ǰÑÔ

ZooKeeperÊÇÒ»¸öÇ¿´óµÄ·Ö²¼Ê½Ð­×÷ϵͳ£¬ÓÃZooKeeper¿ÉÒÔ·½±ãµØÊµÏÖÏȽøÏȳö(FIFO)¶ÓÁС£¸ø¡°¶ÓÁС±µÄ¼¼ÊõÏÖʵ¶àÒ»ÖÖÑ¡Ôñ£¬±ê×¼»¯ÎÒÃǵijÌÐò½á¹¹¡£Áíһƪ£¬·Ö²½Ê½Í¬²½¶ÓÁÐʵÏÖ£¬Çë²Î¿¼£ºZooKeeperʵÏÖ·Ö²¼Ê½¶ÓÁÐQueue

¹ØÓÚZooKeeperµÄ»ù±¾Ê¹Óã¬Çë²Î¿¼£ºZooKeeperα·Ö²½Ê½¼¯Èº°²×°¼°Ê¹Óá£

1. ·Ö²¼Ê½ÏȽøÏȳö(FIFO)¶ÓÁÐ

ÔÚ¼ÆËã»ú¿ÆÑ§ÖУ¬ÏûÏ¢¶ÓÁУ¨Message queue£©ÊÇÒ»ÖÖ½ø³Ì¼äͨÐÅ»òͬһ½ø³ÌµÄ²»Í¬Ï̼߳äµÄͨÐÅ·½Ê½¡£ÏûÏ¢¶ÓÁÐÌṩÁËÒì²½µÄͨÐÅЭÒ飬ÏûÏ¢µÄ·¢ËÍÕߺͽÓÊÕÕß²»ÐèҪͬʱÓëÏûÏ¢¶ÓÁл¥½»¡£ÏûÏ¢»á±£´æÔÚ¶ÓÁÐÖУ¬Ö±µ½½ÓÊÕÕßÈ¡»ØËü¡£

ÏȽøÏȳö(FIFO)¶ÓÁУ¬ÊÇÏûÏ¢¶ÓÁÐ×î»ù±¾µÄÒ»ÖÖʵÏÖÐÎʽ£¬ÏÈ·¢³öµÄÏÈÏû·Ñ¡£

2. Éè¼ÆË¼Â·

ʵÏÖµÄ˼·Ҳ·Ç³£¼òµ¥£¬ÔÚ/queue-fifoµÄĿ¼Ï´´½¨ SEQUENTIAL ÀàÐ͵Ä×ÓĿ¼ /x(i)£¬ÕâÑù¾ÍÄܱ£Ö¤ËùÓгÉÔ±¼ÓÈë¶ÓÁÐʱ¶¼ÊÇÓбàºÅµÄ£¬³ö¶ÓÁÐʱͨ¹ý getChildren( ) ·½·¨¿ÉÒÔ·µ»Øµ±Ç°ËùÓеĶÓÁÐÖеÄÔªËØ£¬È»ºóÏû·ÑÆäÖÐ×îСµÄÒ»¸ö£¬ÕâÑù¾ÍÄܱ£Ö¤FIFO¡£

Ó¦ÓÃʵÀý

ͼ±ê½âÊÍ

1.app1,app2,app3ÊÇ3¸ö¶ÀÁ¢µÄÒµÎñϵͳ

2.zk1,zk2,zk3ÊÇZooKeeper¼¯ÈºµÄ3¸öÁ¬½Óµã

3./queue-fifo£¬ÊÇznodeµÄ¶ÓÁУ¬°´Ë³Ðò´æ´¢Êý¾Ý

4./queue-fifo/x1£¬ÊÇznode¶ÓÁÐÖУ¬1ºÅÅŶÔÕߣ¬ÓÉapp1Ìá½»

5./queue-fifo/x2£¬ÊÇznode¶ÓÁÐÖУ¬2ºÅÅŶÔÕߣ¬ÓÉapp2Ìá½»

6.app3ÊÇÏû·ÑÕߣ¬Í¨¹ýzk3Á¬½Óµ½znode¶ÓÁÐÖУ¬ÕÒµ½/queue-fifoÖÐ˳Ðò×îÉٵĽڵãÏû·Ñ£¬É¾³ýÏû·ÑºóµÄ½Úµã(ºìÉ«Ïß±íʾ)

×¢£º

1). app1¿ÉÒÔͨ¹ýzk2Ìá½»£¬app2Ò²¿Éͨ¹ýzk3Ìá½»

2). app1¿ÉÒÔÌá½»3´ÎÇëÇó£¬Éú³Éx1,x2,x3¶à¸ö½Úµã

3). app1¿ÉÒÔ×÷ΪÏû·ÑÕߣ¬Ïû·Ñ¶ÓÁÐÊý¾Ý

3. ³ÌÐòʵÏÖ

1). µ¥½ÚµãÄ£ÄâʵÑé

Ä£Äâapp1£¬Í¨¹ýzk1£¬Éú²ú2¸ö½Úµã£¬È»ºóÔÙÏû·Ñ3¸ö½Úµã¡£

 public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        produce(zk, 1);
        produce(zk, 2);
        cosume(zk);
        cosume(zk);
        cosume(zk);
        zk.close();
    }

´´½¨Ò»¸öÓë·þÎñÆ÷µÄÁ¬½Ó

public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, null);
        return zk;
    }

³öʼ»¯¶ÓÁÐ

 public static ZooKeeper connection(String host) throws IOException {
        return new ZooKeeper(host, 60000, new Watcher() {
            public void process(WatchedEvent event) {
            }
        });
    }

Éú²úÕß

 public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue-fifo/x" + x + " x" + x);
        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE,
 CreateMode.EPHEMERAL_SEQUENTIAL);
    }

Ïû·ÑÕß

public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
        List list = zk.getChildren("/queue-fifo", true);
        if (list.size() > 0) {
            long min = Long.MAX_VALUE;
            for (String num : list) {
                if (min > Long.parseLong(num.substring(1))) {
                    min = Long.parseLong(num.substring(1));
                }
            }
            System.out.println("delete /queue/x" + min);
            zk.delete("/queue-fifo/x" + min, 0);
        } else {
            System.out.println("No node to cosume");
        }
    }

Æô¶¯mainº¯Êý

public static void main(String[] args) throws Exception {
            doOne();
    }

ÔËÐнá¹û£º

/queue-fifo is exist!
create /queue-fifo/x1 x1
create /queue-fifo/x2 x2
delete /queue/x10000000032
delete /queue/x20000000033
No node to cosume

ÍêÈ«·ûºÏÎÒµÄÃÇÔ¤ÆÚ£¬ÓÉÓÚproduceʱ£¬ÎÒÃÇ´´½¨µÄ½ÚµãģʽÊÇEPHEMERAL_SEQUENTIAL£¬ËùÒÔϵͳ»áÔÚx(i)(n)£¬Ëæ»úÉú³Én=0000000032£¬Êä³öΪx10000000032¡£

½ÓÏÂÀ´ÎÒÃÇ¿´·Ö²¼Ê½»·¾³¡£

2). ·Ö²¼Ê½Ä£ÄâʵÑé

app1ͨ¹ýzk1Éú²úx1, app2ͨ¹ýzk2Éú²úx2, app3ͨ¹ýzk3Ïû·Ñ3¸ö½Úµã

public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            produce(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            produce(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            cosume(zk);
            cosume(zk);
            cosume(zk);
            break;
        }
    }

Æô¶¯mainº¯Êý

public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

³ÌÐòÆô¶¯·½·¨£¬·Ö3´ÎÆô¶¯£¬ÃüÁîÐд«²»Í¬µÄ²ÎÊý£¬·Ö±ðÊÇ1,2,3

run1: Ö´ÐÐapp1¨C>zk1

#ÈÕÖ¾Êä³ö
/queue-fifo is exist!
create /queue-fifo/x1 x1

run2: Ö´ÐÐapp2¨C>zk2

#ÈÕÖ¾Êä³ö
/queue-fifo is exist!
create /queue-fifo/x2 x2

run3: Ö´ÐÐapp3¨C>zk3

#ÈÕÖ¾Êä³ö
/queue-fifo is exist!
delete /queue/x10000000034
delete /queue/x20000000035
No node to cosume

ÎÒÃÇÍê³É·Ö²¼Ê½¶ÓÁеÄʵÑ飬ÓÉÓÚʱ¼ä²Ö´Ù¡£ÎÄ×Ö˵Ã÷¼°´úÂëÄÑÃâÓÐһЩÎÊÌ⣬Çë·¢ÏÖÎÊÌâµÄͬѧ°ïæָÕý¡£

ÏÂÃæÌùÒ»ÏÂÍêÕûµÄ´úÂ룺

package org.conan.zookeeper.demo;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class FIFOZooKeeper {

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        produce(zk, 1);
        produce(zk, 2);
        cosume(zk);
        cosume(zk);
        cosume(zk);
        zk.close();
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            produce(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            produce(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            cosume(zk);
            cosume(zk);
            cosume(zk);
            break;
        }
    }

    // ´´½¨Ò»¸öÓë·þÎñÆ÷µÄÁ¬½Ó
    public static ZooKeeper connection(String host) throws IOException {
        return new ZooKeeper(host, 60000, new Watcher() {
            public void process(WatchedEvent event) {
            }
        });
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        if (zk.exists("/queue-fifo", false) == null) {
            System.out.println("create /queue-fifo task-queue-fifo");
            zk.create("/queue-fifo", "task-queue-fifo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue-fifo is exist!");
        }
    }

    public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue-fifo/x" + x + " x" + x);
        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, 
CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
        List list = zk.getChildren("/queue-fifo", true);
        if (list.size() > 0) {
            long min = Long.MAX_VALUE;
            for (String num : list) {
                if (min > Long.parseLong(num.substring(1))) {
                    min = Long.parseLong(num.substring(1));
                }
            }
            System.out.println("delete /queue/x" + min);
            zk.delete("/queue-fifo/x" + min, 0);
        } else {
            System.out.println("No node to cosume");
        }
    }

}
   
3305 ´Îä¯ÀÀ       40
Ïà¹ØÎÄÕÂ

»ùÓÚEAµÄÊý¾Ý¿â½¨Ä£
Êý¾ÝÁ÷½¨Ä££¨EAÖ¸ÄÏ£©
¡°Êý¾Ýºþ¡±£º¸ÅÄî¡¢ÌØÕ÷¡¢¼Ü¹¹Óë°¸Àý
ÔÚÏßÉ̳ÇÊý¾Ý¿âϵͳÉè¼Æ ˼·+Ч¹û
 
Ïà¹ØÎĵµ

GreenplumÊý¾Ý¿â»ù´¡Åàѵ
MySQL5.1ÐÔÄÜÓÅ»¯·½°¸
ijµçÉÌÊý¾ÝÖÐ̨¼Ü¹¹Êµ¼ù
MySQL¸ßÀ©Õ¹¼Ü¹¹Éè¼Æ
Ïà¹Ø¿Î³Ì

Êý¾ÝÖÎÀí¡¢Êý¾Ý¼Ü¹¹¼°Êý¾Ý±ê×¼
MongoDBʵս¿Î³Ì
²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿âÉè¼ÆÓëÓÅ»¯
PostgreSQLÊý¾Ý¿âʵսÅàѵ
×îл¼Æ»®
DeepSeekÔÚÈí¼þ²âÊÔÓ¦ÓÃʵ¼ù 4-12[ÔÚÏß]
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢Êµ¼ù 4-19[ÔÚÏß]
UAF¼Ü¹¹ÌåϵÓëʵ¼ù 4-11[±±¾©]
AIÖÇÄÜ»¯Èí¼þ²âÊÔ·½·¨Óëʵ¼ù 5-23[ÉϺ£]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 4-26[±±¾©]
ÒµÎñ¼Ü¹¹Éè¼ÆÓ뽨ģ 4-18[±±¾©]

MySQLË÷Òý±³ºóµÄÊý¾Ý½á¹¹
MySQLÐÔÄܵ÷ÓÅÓë¼Ü¹¹Éè¼Æ
SQL ServerÊý¾Ý¿â±¸·ÝÓë»Ö¸´
ÈÃÊý¾Ý¿â·ÉÆðÀ´ 10´óDB2ÓÅ»¯
oracleµÄÁÙʱ±í¿Õ¼äдÂú´ÅÅÌ
Êý¾Ý¿âµÄ¿çƽ̨Éè¼Æ


²¢·¢¡¢´óÈÝÁ¿¡¢¸ßÐÔÄÜÊý¾Ý¿â
¸ß¼¶Êý¾Ý¿â¼Ü¹¹Éè¼ÆÊ¦
HadoopÔ­ÀíÓëʵ¼ù
Oracle Êý¾Ý²Ö¿â
Êý¾Ý²Ö¿âºÍÊý¾ÝÍÚ¾ò
OracleÊý¾Ý¿â¿ª·¢Óë¹ÜÀí


GE Çø¿éÁ´¼¼ÊõÓëʵÏÖÅàѵ
º½Ìì¿Æ¹¤Ä³×Ó¹«Ë¾ Nodejs¸ß¼¶Ó¦Óÿª·¢
ÖÐÊ¢Òæ»ª ׿Խ¹ÜÀíÕß±ØÐë¾ß±¸µÄÎåÏîÄÜÁ¦
ijÐÅÏ¢¼¼Êõ¹«Ë¾ PythonÅàѵ
ij²©²ÊITϵͳ³§ÉÌ Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À
ÖйúÓÊ´¢ÒøÐÐ ²âÊÔ³ÉÊì¶ÈÄ£Ðͼ¯³É(TMMI)
ÖÐÎïÔº ²úÆ·¾­ÀíÓë²úÆ·¹ÜÀí