Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
Master»ùÓÚZooKeeperµÄHigh AvailabilityÔ´ÂëʵÏÖ
 
×÷Õߣºanzhsoft À´Ô´£ºCSDN ·¢²¼ÓÚ 2015-03-03
  3343  次浏览      27
 

Èç¹ûSparkµÄ²¿Êð·½Ê½Ñ¡ÔñStandalone£¬Ò»¸ö²ÉÓÃMaster/SlavesµÄµäÐͼܹ¹£¬ÄÇôMasterÊÇÓÐSPOF£¨µ¥µã¹ÊÕÏ£¬Single Point of Failure£©¡£Spark¿ÉÒÔÑ¡ÓÃZooKeeperÀ´ÊµÏÖHA¡£

ZooKeeperÌṩÁËÒ»¸öLeader Election»úÖÆ£¬ÀûÓÃÕâ¸ö»úÖÆ¿ÉÒÔ±£Ö¤ËäÈ»¼¯Èº´æÔÚ¶à¸öMasterµ«ÊÇÖ»ÓÐÒ»¸öÊÇActiveµÄ£¬ÆäËûµÄ¶¼ÊÇStandby£¬µ±ActiveµÄMaster³öÏÖ¹ÊÕÏʱ£¬ÁíÍâµÄÒ»¸öStandby Master»á±»Ñ¡¾Ù³öÀ´¡£ÓÉÓÚ¼¯ÈºµÄÐÅÏ¢£¬°üÀ¨Worker£¬ DriverºÍApplicationµÄÐÅÏ¢¶¼ÒѾ­³Ö¾Ã»¯µ½Îļþϵͳ£¬Òò´ËÔÚÇл»µÄ¹ý³ÌÖÐÖ»»áÓ°ÏìÐÂJobµÄÌá½»£¬¶ÔÓÚÕýÔÚ½øÐеÄJobûÓÐÈκεÄÓ°Ïì¡£¼ÓÈëZooKeeperµÄ¼¯ÈºÕûÌå¼Ü¹¹ÈçÏÂͼËùʾ¡£

1. MasterµÄÖØÆô²ßÂÔ

MasterÔÚÆô¶¯Ê±£¬»á¸ù¾ÝÆô¶¯²ÎÊýÀ´¾ö¶¨²»Í¬µÄMaster¹ÊÕÏÖØÆô²ßÂÔ£º

1.ZOOKEEPERʵÏÖHA

2.FILESYSTEM£ºÊµÏÖMasterÎÞÊý¾Ý¶ªÊ§ÖØÆô£¬¼¯ÈºµÄÔËÐÐʱÊý¾Ý»á±£´æµ½±¾µØ/ÍøÂçÎļþϵͳÉÏ

3.¶ªÆúËùÓÐÔ­À´µÄÊý¾ÝÖØÆô

Master::preStart()¿ÉÒÔ¿´³öÕâÈýÖÖ²»Í¬Âß¼­µÄʵÏÖ¡£

override def preStart() {  
logInfo("Starting Spark master at " + masterUrl)
...
//persistenceEngineÊdz־û¯Worker£¬DriverºÍApplicationÐÅÏ¢µÄ£¬ÕâÑùÔÚMasterÖØÐÂÆô¶¯Ê±²»»áÓ°Ïì
//ÒѾ­Ìá½»JobµÄÔËÐÐ
persistenceEngine = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
case "FILESYSTEM" =>
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
case _ =>
new BlackHolePersistenceEngine()
}
//leaderElectionAgent¸ºÔðLeaderµÄѡȡ¡£
leaderElectionAgent = RECOVERY_MODE match {
case "ZOOKEEPER" =>
context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
case _ => // ½ö½öÓÐÒ»¸öMasterµÄ¼¯Èº£¬ÄÇôµ±Ç°µÄMaster¾ÍÊÇActiveµÄ
context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
}
}

RECOVERY_MODEÊÇÒ»¸ö×Ö·û´®£¬¿ÉÒÔ´Óspark-env.shÖÐÈ¥ÉèÖá£

val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")  

Èç¹û²»ÉèÖÃspark.deploy.recoveryModeµÄ»°£¬ÄÇô¼¯ÈºµÄËùÓÐÔËÐÐÊý¾ÝÔÚMasterÖØÆôÊǶ¼»á¶ªÊ§£¬Õâ¸ö½áÂÛÊÇ´ÓBlackHolePersistenceEngineµÄʵÏֵóöµÄ¡£

private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {  
override def addApplication(app: ApplicationInfo) {}
override def removeApplication(app: ApplicationInfo) {}
override def addWorker(worker: WorkerInfo) {}
override def removeWorker(worker: WorkerInfo) {}
override def addDriver(driver: DriverInfo) {}
override def removeDriver(driver: DriverInfo) {}

override def readPersistedData() = (Nil, Nil, Nil)
}

Ëü°ÑËùÓеĽӿÚʵÏÖΪ¿Õ¡£PersistenceEngineÊÇÒ»¸ötrait¡£×÷Ϊ¶Ô±È£¬¿ÉÒÔ¿´Ò»ÏÂZooKeeperµÄʵÏÖ¡£

class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)  
extends PersistenceEngine
with Logging
{
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

SparkCuratorUtil.mkdir(zk, WORKING_DIR)
// ½«appµÄÐÅÏ¢ÐòÁл¯µ½ÎļþWORKING_DIR/app_{app.id}ÖÐ
override def addApplication(app: ApplicationInfo) {
serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)
}

override def removeApplication(app: ApplicationInfo) {
zk.delete().forPath(WORKING_DIR + "/app_" + app.id)
}

SparkʹÓõIJ¢²»ÊÇZooKeeperµÄAPI£¬¶øÊÇʹÓõÄorg.apache.curator.framework.CuratorFramework ºÍ org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch} ¡£CuratorÔÚZooKeeperÉÏ×öÁËÒ»²ãºÜÓѺõķâ×°¡£

2. ¼¯ÈºÆô¶¯²ÎÊýµÄÅäÖÃ

¼òµ¥×ܽáһϲÎÊýµÄÉèÖã¬Í¨¹ýÉÏÊö´úÂëµÄ·ÖÎö£¬ÎÒÃÇÖªµÀΪÁËʹÓÃZooKeeperÖÁÉÙÓ¦¸ÃÉèÖÃһϲÎÊý£¨Êµ¼ÊÉÏ£¬½ö½öÐèÒªÉèÖÃÕâЩ²ÎÊý¡£Í¨¹ýÉèÖÃspark-env.sh£º

spark.deploy.recoveryMode=ZOOKEEPER  
spark.deploy.zookeeper.url=zk_server_1:2181,zk_server_2:2181
spark.deploy.zookeeper.dir=/dir
// OR ͨ¹ýһϷ½Ê½ÉèÖÃ
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER "
export SPARK_DAEMON_JAVA_OPTS="${SPARK_DAEMON_JAVA_OPTS} -Dspark.deploy.zookeeper.url=zk_server1:2181,zk_server_2:2181"

¸÷¸ö²ÎÊýµÄÒâÒ壺

3. CuratorFramework¼ò½é

CuratorFramework¼«´óµÄ¼ò»¯ÁËZooKeeperµÄʹÓã¬ËüÌṩÁËhigh-levelµÄAPI£¬²¢ÇÒ»ùÓÚZooKeeperÌí¼ÓÁ˺ܶàÌØÐÔ£¬°üÀ¨

1.×Ô¶¯Á¬½Ó¹ÜÀí£ºÁ¬½Óµ½ZooKeeperµÄClientÓпÉÄÜ»áÁ¬½ÓÖжϣ¬Curator´¦ÀíÁËÕâÖÖÇé¿ö£¬¶ÔÓÚClientÀ´Ëµ×Ô¶¯ÖØÁ¬ÊÇ͸Ã÷µÄ¡£

2.¼ò½àµÄAPI£º¼ò»¯ÁËÔ­Éú̬µÄZooKeeperµÄ·½·¨£¬Ê¼þµÈ£»ÌṩÁËÒ»¸ö¼òµ¥Ò×ÓõĽӿڡ£

3.RecipeµÄʵÏÖ£¨¸ü¶à½éÉÜÇëµã»÷Recipes£©£º

1)LeaderµÄÑ¡Ôñ

2)¹²ÏíËø

3)»º´æºÍ¼à¿Ø

4)·Ö²¼Ê½µÄ¶ÓÁÐ

5)·Ö²¼Ê½µÄÓÅÏȶÓÁÐ

CuratorFrameworksͨ¹ýCuratorFrameworkFactoryÀ´´´½¨Ḭ̈߳²È«µÄZooKeeperµÄʵÀý¡£

CuratorFrameworkFactory.newClient()ÌṩÁËÒ»¸ö¼òµ¥µÄ·½Ê½À´´´½¨ZooKeeperµÄʵÀý£¬¿ÉÒÔ´«È벻ͬµÄ²ÎÊýÀ´¶ÔʵÀý½øÐÐÍêÈ«µÄ¿ØÖÆ¡£»ñȡʵÀýºó£¬±ØÐëͨ¹ýstart()À´Æô¶¯Õâ¸öʵÀý£¬ÔÚ½áÊøÊ±£¬ÐèÒªµ÷ÓÃclose()¡£

/** 
* Create a new client
*
*
* @param connectString list of servers to connect to
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param retryPolicy retry policy to use
* @return client
*/
public static CuratorFramework newClient (String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}

ÐèÒª¹Ø×¢µÄ»¹ÓÐÁ½¸öRecipe£ºorg.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}¡£
Ê×ÏÈ¿´Ò»ÏÂLeaderlatchListener£¬ËüÔÚLeaderLatch״̬±ä»¯µÄʱºò±»Í¨Öª£º

1.Ôڸýڵ㱻ѡΪLeaderµÄʱºò£¬½Ó¿ÚisLeader()»á±»µ÷ÓÃ

2.Ôڽڵ㱻°þ¶áLeaderµÄʱºò£¬½Ó¿ÚnotLeader()»á±»µ÷ÓÃ

ÓÉÓÚ֪ͨÊÇÒì²½µÄ£¬Òò´ËÓпÉÄÜÔÚ½Ó¿Ú±»µ÷ÓõÄʱºò£¬Õâ¸ö״̬ÊÇ׼ȷµÄ£¬ÐèҪȷÈÏÒ»ÏÂLeaderLatchµÄhasLeadership()ÊÇ·ñµÄÈ·ÊÇtrue/false¡£ÕâÒ»µãÔÚ½ÓÏÂÀ´SparkµÄʵÏÖÖпÉÒԵõ½ÌåÏÖ¡£

/** 
* LeaderLatchListener can be used to be notified asynchronously about when the state of the LeaderLatch has changed. 
* 
* Note that just because you are in the middle of one of these method calls, it does not necessarily mean that 
* hasLeadership() is the corresponding true/false value. It is possible for the state to change behind the scenes 
* before these methods get called. The contract is that if that happens, you should see another call to the other 
* method pretty quickly. 
*/  
public interface LeaderLatchListener  
{  
  /** 
* This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true. 
* 
* Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If 
* this occurs, you can expect {@link #notLeader()} to also be called. 
*/  
  public void isLeader();  
  
  /** 
* This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false. 
* 
* Note that it is possible that by the time this method call happens, hasLeadership has become true. If 
* this occurs, you can expect {@link #isLeader()} to also be called. 
*/  
  public void notLeader();  
}  

LeaderLatch¸ºÔðÔÚÖÚ¶àÁ¬½Óµ½ZooKeeper ClusterµÄ¾ºÕùÕßÖÐÑ¡ÔñÒ»¸öLeader¡£LeaderµÄÑ¡Ôñ»úÖÆ¿ÉÒÔ¿´ZooKeeperµÄ¾ßÌåʵÏÖ£¬LeaderLatchÕâÊÇÍê³ÉÁ˺ܺõķâ×°¡£ÎÒÃÇÖ»ÐèÒªÒªÖªµÀÔÚ³õʼ»¯ËüµÄʵÀýºó£¬ÐèҪͨ¹ý

public class LeaderLatch implements Closeable  
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final String latchPath;
private final String id;
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final AtomicBoolean hasLeadership = new AtomicBoolean(false);
private final AtomicReference<String> ourPath = new AtomicReference<String>();
private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();
private final CloseMode closeMode;
private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();
.
.
.
/**
* Attaches a listener to this LeaderLatch
* <p/>
* Attaching the same listener multiple times is a noop from the second time on.
* <p/>
* All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded
* executor so that you can be certain that listener methods are called in sequence, but if you are fine with
* them being called out of order you are welcome to use multiple threads.
*
* @param listener the listener to attach
*/
public void addListener(LeaderLatchListener listener)
{
listeners.addListener(listener);
}

ͨ¹ýaddListener¿ÉÒÔ½«ÎÒÃÇʵÏÖµÄListenerÌí¼Óµ½LeaderLatch¡£ÔÚListenerÀÎÒÃÇÔÚÁ½¸ö½Ó¿ÚÀïʵÏÖÁ˱»Ñ¡ÎªLeader»òÕß±»°þ¶áLeader½ÇɫʱµÄÂß¼­¼´¿É¡£

4. ZooKeeperLeaderElectionAgentµÄʵÏÖ

ʵ¼ÊÉÏÒòΪÓÐCuratorµÄ´æÔÚ£¬SparkʵÏÖMasterµÄHA¾Í±äµÃ·Ç³£¼òµ¥ÁË£¬ZooKeeperLeaderElectionAgentʵÏÖÁ˽ӿÚLeaderLatchListener£¬ÔÚisLeader()È·ÈÏËùÊôµÄMaster±»Ñ¡ÎªLeaderºó£¬ÏòMaster·¢ËÍÏûÏ¢ElectedLeader£¬Master»á½«×Ô¼ºµÄ״̬¸ÄΪALIVE¡£µ±noLeader()±»µ÷ÓÃʱ£¬Ëü»áÏòMaster·¢ËÍÏûÏ¢RevokedLeadershipʱ£¬Master»á¹Ø±Õ¡£

private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,  
masterUrl: String, conf: SparkConf)
extends LeaderElectionAgent with LeaderLatchListener with Logging {
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
// zkÊÇͨ¹ýCuratorFrameworkFactory´´½¨µÄZooKeeperʵÀý
private var zk: CuratorFramework = _
// leaderLatch£ºCurator¸ºÔðÑ¡³öLeader¡£
private var leaderLatch: LeaderLatch = _
private var status = LeadershipStatus.NOT_LEADER

override def preStart() {

logInfo("Starting ZooKeeper LeaderElection agent")
zk = SparkCuratorUtil.newClient(conf)
leaderLatch = new LeaderLatch(zk, WORKING_DIR)
leaderLatch.addListener(this)

leaderLatch.start()
}

ÔÚprestartÖУ¬Æô¶¯ÁËleaderLatchÀ´´¦ÀíÑ¡¾ÙZKÖеÄLeader¡£¾ÍÈçÔÚÉϽڷÖÎöµÄ£¬Ö÷ÒªµÄÂß¼­ÔÚisLeaderºÍnoLeaderÖС£

override def isLeader() {  
synchronized {
// could have lost leadership by now.
//ÏÖÔÚleadership¿ÉÄÜÒѾ­±»°þ¶áÁË¡£¡£ÏêÇé²Î¼ûCuratorµÄʵÏÖ¡£
if (!leaderLatch.hasLeadership) {
return
}

logInfo("We have gained leadership")
updateLeadershipStatus(true)
}
}

override def notLeader() {
synchronized {
// ÏÖÔÚ¿ÉÄܸ³ÓèleadershipÁË¡£ÏêÇé²Î¼ûCuratorµÄʵÏÖ¡£
if (leaderLatch.hasLeadership) {
return
}

logInfo("We have lost leadership")
updateLeadershipStatus(false)
}
}

updateLeadershipStatusµÄÂß¼­ºÜ¼òµ¥£¬¾ÍÊÇÏòMaster·¢ËÍÏûÏ¢¡£

def updateLeadershipStatus(isLeader: Boolean) {  
if (isLeader && status == LeadershipStatus.NOT_LEADER) {
status = LeadershipStatus.LEADER
masterActor ! ElectedLeader
} else if (!isLeader && status == LeadershipStatus.LEADER) {
status = LeadershipStatus.NOT_LEADER
masterActor ! RevokedLeadership
}
}

5. Éè¼ÆÀíÄî

ΪÁ˽â¾öStandaloneģʽϵÄMasterµÄSPOF£¬Spark²ÉÓÃÁËZooKeeperÌṩµÄÑ¡¾Ù¹¦ÄÜ¡£Spark²¢Ã»ÓвÉÓÃZooKeeperÔ­ÉúµÄJava API£¬¶øÊDzÉÓÃÁËCurator£¬Ò»¸ö¶ÔZooKeeper½øÐÐÁË·â×°µÄ¿ò¼Ü¡£²ÉÓÃÁËCuratorºó£¬Spark²»ÓùÜÀíÓëZooKeeperµÄÁ¬½Ó£¬ÕâЩ¶ÔÓÚSparkÀ´Ëµ¶¼ÊÇ͸Ã÷µÄ¡£Spark½ö½öʹÓÃÁË100ÐдúÂ룬¾ÍʵÏÖÁËMasterµÄHA¡£µ±È»ÁË£¬SparkÊÇÕ¾ÔڵľÞÈ˵ļç°òÉÏ¡£Ë­ÓÖ»áÈ¥ÖØ¸´·¢Ã÷ÂÖ×ÓÄØ£¿

   
3343 ´Îä¯ÀÀ       27
 
Ïà¹ØÎÄÕÂ

ÔÆ¼ÆËãµÄ¼Ü¹¹
¶ÔÔÆ¼ÆËã·þÎñÄ£ÐÍ
ÔÆ¼ÆËãºËÐļ¼ÊõÆÊÎö
Á˽âÔÆ¼ÆËãµÄ©¶´
 
Ïà¹ØÎĵµ

ÔÆ¼ÆËã¼ò½é
ÔÆ¼ÆËã¼ò½éÓëÔÆ°²È«
ÏÂÒ»´úÍøÂç¼ÆËã--ÔÆ¼ÆËã
ÈídzÎöÔÆ¼ÆËã
 
Ïà¹Ø¿Î³Ì

ÔÆ¼ÆËãÔ­ÀíÓëÓ¦ÓÃ
ÔÆ¼ÆËãÓ¦ÓÃÓ뿪·¢
CMMIÌåϵÓëʵ¼ù
»ùÓÚCMMI±ê×¼µÄÈí¼þÖÊÁ¿±£Ö¤
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

ר¼ÒÊӽǿ´ITÓë¼Ü¹¹
Èí¼þ¼Ü¹¹Éè¼Æ
ÃæÏò·þÎñÌåϵ¼Ü¹¹ºÍÒµÎñ×é¼þµÄ˼¿¼
ÈËÈËÍøÒÆ¶¯¿ª·¢¼Ü¹¹
¼Ü¹¹¸¯»¯Ö®ÃÕ
̸ƽ̨¼´·þÎñPaaS
Ïà¹ØÅàѵ¿Î³Ì

ÔÆ¼ÆËãÔ­ÀíÓëÓ¦ÓÃ
Windows Azure ÔÆ¼ÆËãÓ¦ÓÃ

ĦÍÐÂÞÀ­ ÔÆÆ½Ì¨µÄ¹¹½¨ÓëÓ¦ÓÃ
ͨÓù«Ë¾GE DockerÔ­ÀíÓëʵ¼ù
ijÑз¢ÖÐÐÄ Openstackʵ¼ù
ÖªÃûµç×Ó¹«Ë¾ ÔÆÆ½Ì¨¼Ü¹¹ÓëÓ¦ÓÃ
ijµçÁ¦ÐÐÒµ »ùÓÚÔÆÆ½Ì¨¹¹½¨ÔÆ·þÎñ
ÔÆ¼ÆËãÓëWindows AzureÅàѵ
±±¾© ÔÆ¼ÆËãÔ­ÀíÓëÓ¦ÓÃ