Èç¹ûSparkµÄ²¿Êð·½Ê½Ñ¡ÔñStandalone£¬Ò»¸ö²ÉÓÃMaster/SlavesµÄµäÐͼܹ¹£¬ÄÇôMasterÊÇÓÐSPOF£¨µ¥µã¹ÊÕÏ£¬Single
Point of Failure£©¡£Spark¿ÉÒÔÑ¡ÓÃZooKeeperÀ´ÊµÏÖHA¡£
ZooKeeperÌṩÁËÒ»¸öLeader Election»úÖÆ£¬ÀûÓÃÕâ¸ö»úÖÆ¿ÉÒÔ±£Ö¤ËäÈ»¼¯Èº´æÔÚ¶à¸öMasterµ«ÊÇÖ»ÓÐÒ»¸öÊÇActiveµÄ£¬ÆäËûµÄ¶¼ÊÇStandby£¬µ±ActiveµÄMaster³öÏÖ¹ÊÕÏʱ£¬ÁíÍâµÄÒ»¸öStandby
Master»á±»Ñ¡¾Ù³öÀ´¡£ÓÉÓÚ¼¯ÈºµÄÐÅÏ¢£¬°üÀ¨Worker£¬ DriverºÍApplicationµÄÐÅÏ¢¶¼ÒѾ³Ö¾Ã»¯µ½Îļþϵͳ£¬Òò´ËÔÚÇл»µÄ¹ý³ÌÖÐÖ»»áÓ°ÏìÐÂJobµÄÌá½»£¬¶ÔÓÚÕýÔÚ½øÐеÄJobûÓÐÈκεÄÓ°Ïì¡£¼ÓÈëZooKeeperµÄ¼¯ÈºÕûÌå¼Ü¹¹ÈçÏÂͼËùʾ¡£

1. MasterµÄÖØÆô²ßÂÔ
MasterÔÚÆô¶¯Ê±£¬»á¸ù¾ÝÆô¶¯²ÎÊýÀ´¾ö¶¨²»Í¬µÄMaster¹ÊÕÏÖØÆô²ßÂÔ£º
1.ZOOKEEPERʵÏÖHA
2.FILESYSTEM£ºÊµÏÖMasterÎÞÊý¾Ý¶ªÊ§ÖØÆô£¬¼¯ÈºµÄÔËÐÐʱÊý¾Ý»á±£´æµ½±¾µØ/ÍøÂçÎļþϵͳÉÏ
3.¶ªÆúËùÓÐÔÀ´µÄÊý¾ÝÖØÆô
Master::preStart()¿ÉÒÔ¿´³öÕâÈýÖÖ²»Í¬Âß¼µÄʵÏÖ¡£
override def preStart() { logInfo("Starting Spark master at " + masterUrl) ... //persistenceEngineÊdz־û¯Worker£¬DriverºÍApplicationÐÅÏ¢µÄ£¬ÕâÑùÔÚMasterÖØÐÂÆô¶¯Ê±²»»áÓ°Ïì //ÒѾÌá½»JobµÄÔËÐÐ persistenceEngine = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf) case "FILESYSTEM" => logInfo("Persisting recovery state to directory: " + RECOVERY_DIR) new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system)) case _ => new BlackHolePersistenceEngine() } //leaderElectionAgent¸ºÔðLeaderµÄѡȡ¡£ leaderElectionAgent = RECOVERY_MODE match { case "ZOOKEEPER" => context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf)) case _ => // ½ö½öÓÐÒ»¸öMasterµÄ¼¯Èº£¬ÄÇôµ±Ç°µÄMaster¾ÍÊÇActiveµÄ context.actorOf(Props(classOf[MonarchyLeaderAgent], self)) } } |
RECOVERY_MODEÊÇÒ»¸ö×Ö·û´®£¬¿ÉÒÔ´Óspark-env.shÖÐÈ¥ÉèÖá£
val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE") |
Èç¹û²»ÉèÖÃspark.deploy.recoveryModeµÄ»°£¬ÄÇô¼¯ÈºµÄËùÓÐÔËÐÐÊý¾ÝÔÚMasterÖØÆôÊǶ¼»á¶ªÊ§£¬Õâ¸ö½áÂÛÊÇ´ÓBlackHolePersistenceEngineµÄʵÏֵóöµÄ¡£
private[spark] class BlackHolePersistenceEngine extends PersistenceEngine { override def addApplication(app: ApplicationInfo) {} override def removeApplication(app: ApplicationInfo) {} override def addWorker(worker: WorkerInfo) {} override def removeWorker(worker: WorkerInfo) {} override def addDriver(driver: DriverInfo) {} override def removeDriver(driver: DriverInfo) {} override def readPersistedData() = (Nil, Nil, Nil) } |
Ëü°ÑËùÓеĽӿÚʵÏÖΪ¿Õ¡£PersistenceEngineÊÇÒ»¸ötrait¡£×÷Ϊ¶Ô±È£¬¿ÉÒÔ¿´Ò»ÏÂZooKeeperµÄʵÏÖ¡£
class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) extends PersistenceEngine with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status" val zk: CuratorFramework = SparkCuratorUtil.newClient(conf) SparkCuratorUtil.mkdir(zk, WORKING_DIR) // ½«appµÄÐÅÏ¢ÐòÁл¯µ½ÎļþWORKING_DIR/app_{app.id}ÖÐ override def addApplication(app: ApplicationInfo) { serializeIntoFile(WORKING_DIR + "/app_" + app.id, app) } override def removeApplication(app: ApplicationInfo) { zk.delete().forPath(WORKING_DIR + "/app_" + app.id) } |
SparkʹÓõIJ¢²»ÊÇZooKeeperµÄAPI£¬¶øÊÇʹÓõÄorg.apache.curator.framework.CuratorFramework
ºÍ org.apache.curator.framework.recipes.leader.{LeaderLatchListener,
LeaderLatch} ¡£CuratorÔÚZooKeeperÉÏ×öÁËÒ»²ãºÜÓѺõķâ×°¡£
2. ¼¯ÈºÆô¶¯²ÎÊýµÄÅäÖÃ
¼òµ¥×ܽáһϲÎÊýµÄÉèÖã¬Í¨¹ýÉÏÊö´úÂëµÄ·ÖÎö£¬ÎÒÃÇÖªµÀΪÁËʹÓÃZooKeeperÖÁÉÙÓ¦¸ÃÉèÖÃһϲÎÊý£¨Êµ¼ÊÉÏ£¬½ö½öÐèÒªÉèÖÃÕâЩ²ÎÊý¡£Í¨¹ýÉèÖÃspark-env.sh£º
spark.deploy.recoveryMode=ZOOKEEPER spark.deploy.zookeeper.url=zk_server_1:2181,zk_server_2:2181 spark.deploy.zookeeper.dir=/dir // OR ͨ¹ýһϷ½Ê½ÉèÖà export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER " export SPARK_DAEMON_JAVA_OPTS="${SPARK_DAEMON_JAVA_OPTS}
-Dspark.deploy.zookeeper.url=zk_server1:2181,zk_server_2:2181" |
¸÷¸ö²ÎÊýµÄÒâÒ壺

3. CuratorFramework¼ò½é
CuratorFramework¼«´óµÄ¼ò»¯ÁËZooKeeperµÄʹÓã¬ËüÌṩÁËhigh-levelµÄAPI£¬²¢ÇÒ»ùÓÚZooKeeperÌí¼ÓÁ˺ܶàÌØÐÔ£¬°üÀ¨
1.×Ô¶¯Á¬½Ó¹ÜÀí£ºÁ¬½Óµ½ZooKeeperµÄClientÓпÉÄÜ»áÁ¬½ÓÖжϣ¬Curator´¦ÀíÁËÕâÖÖÇé¿ö£¬¶ÔÓÚClientÀ´Ëµ×Ô¶¯ÖØÁ¬ÊÇ͸Ã÷µÄ¡£
2.¼ò½àµÄAPI£º¼ò»¯ÁËÔÉú̬µÄZooKeeperµÄ·½·¨£¬Ê¼þµÈ£»ÌṩÁËÒ»¸ö¼òµ¥Ò×ÓõĽӿڡ£
3.RecipeµÄʵÏÖ£¨¸ü¶à½éÉÜÇëµã»÷Recipes£©£º
1)LeaderµÄÑ¡Ôñ
2)¹²ÏíËø
3)»º´æºÍ¼à¿Ø
4)·Ö²¼Ê½µÄ¶ÓÁÐ
5)·Ö²¼Ê½µÄÓÅÏȶÓÁÐ
CuratorFrameworksͨ¹ýCuratorFrameworkFactoryÀ´´´½¨Ḭ̈߳²È«µÄZooKeeperµÄʵÀý¡£
CuratorFrameworkFactory.newClient()ÌṩÁËÒ»¸ö¼òµ¥µÄ·½Ê½À´´´½¨ZooKeeperµÄʵÀý£¬¿ÉÒÔ´«È벻ͬµÄ²ÎÊýÀ´¶ÔʵÀý½øÐÐÍêÈ«µÄ¿ØÖÆ¡£»ñȡʵÀýºó£¬±ØÐëͨ¹ýstart()À´Æô¶¯Õâ¸öʵÀý£¬ÔÚ½áÊøÊ±£¬ÐèÒªµ÷ÓÃclose()¡£
/** * Create a new client * * * @param connectString list of servers to connect to * @param sessionTimeoutMs session timeout * @param connectionTimeoutMs connection timeout * @param retryPolicy retry policy to use * @return client */ public static CuratorFramework newClient
(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { return builder(). connectString(connectString). sessionTimeoutMs(sessionTimeoutMs). connectionTimeoutMs(connectionTimeoutMs). retryPolicy(retryPolicy). build(); } |
ÐèÒª¹Ø×¢µÄ»¹ÓÐÁ½¸öRecipe£ºorg.apache.curator.framework.recipes.leader.{LeaderLatchListener,
LeaderLatch}¡£
Ê×ÏÈ¿´Ò»ÏÂLeaderlatchListener£¬ËüÔÚLeaderLatch״̬±ä»¯µÄʱºò±»Í¨Öª£º
1.Ôڸýڵ㱻ѡΪLeaderµÄʱºò£¬½Ó¿ÚisLeader()»á±»µ÷ÓÃ
2.Ôڽڵ㱻°þ¶áLeaderµÄʱºò£¬½Ó¿ÚnotLeader()»á±»µ÷ÓÃ
ÓÉÓÚ֪ͨÊÇÒì²½µÄ£¬Òò´ËÓпÉÄÜÔÚ½Ó¿Ú±»µ÷ÓõÄʱºò£¬Õâ¸ö״̬ÊÇ׼ȷµÄ£¬ÐèҪȷÈÏÒ»ÏÂLeaderLatchµÄhasLeadership()ÊÇ·ñµÄÈ·ÊÇtrue/false¡£ÕâÒ»µãÔÚ½ÓÏÂÀ´SparkµÄʵÏÖÖпÉÒԵõ½ÌåÏÖ¡£
/**
* LeaderLatchListener can be used to be notified asynchronously about when the state of the LeaderLatch has changed.
*
* Note that just because you are in the middle of one of these method calls, it does not necessarily mean that
* hasLeadership() is the corresponding true/false value. It is possible for the state to change behind the scenes
* before these methods get called. The contract is that if that happens, you should see another call to the other
* method pretty quickly.
*/
public interface LeaderLatchListener
{
/**
* This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true.
*
* Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If
* this occurs, you can expect {@link #notLeader()} to also be called.
*/
public void isLeader();
/**
* This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false.
*
* Note that it is possible that by the time this method call happens, hasLeadership has become true. If
* this occurs, you can expect {@link #isLeader()} to also be called.
*/
public void notLeader();
}
|
LeaderLatch¸ºÔðÔÚÖÚ¶àÁ¬½Óµ½ZooKeeper ClusterµÄ¾ºÕùÕßÖÐÑ¡ÔñÒ»¸öLeader¡£LeaderµÄÑ¡Ôñ»úÖÆ¿ÉÒÔ¿´ZooKeeperµÄ¾ßÌåʵÏÖ£¬LeaderLatchÕâÊÇÍê³ÉÁ˺ܺõķâ×°¡£ÎÒÃÇÖ»ÐèÒªÒªÖªµÀÔÚ³õʼ»¯ËüµÄʵÀýºó£¬ÐèҪͨ¹ý
public class LeaderLatch implements Closeable { private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFramework client; private final String latchPath; private final String id; private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT); private final AtomicBoolean hasLeadership = new AtomicBoolean(false); private final AtomicReference<String> ourPath = new AtomicReference<String>(); private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>(); private final CloseMode closeMode; private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>(); . . . /** * Attaches a listener to this LeaderLatch * <p/> * Attaching the same listener multiple times is a noop from the second time on. * <p/> * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded * executor so that you can be certain that listener methods are called in sequence, but if you are fine with * them being called out of order you are welcome to use multiple threads. * * @param listener the listener to attach */ public void addListener(LeaderLatchListener listener) { listeners.addListener(listener); } |
ͨ¹ýaddListener¿ÉÒÔ½«ÎÒÃÇʵÏÖµÄListenerÌí¼Óµ½LeaderLatch¡£ÔÚListenerÀÎÒÃÇÔÚÁ½¸ö½Ó¿ÚÀïʵÏÖÁ˱»Ñ¡ÎªLeader»òÕß±»°þ¶áLeader½ÇɫʱµÄÂß¼¼´¿É¡£
4. ZooKeeperLeaderElectionAgentµÄʵÏÖ
ʵ¼ÊÉÏÒòΪÓÐCuratorµÄ´æÔÚ£¬SparkʵÏÖMasterµÄHA¾Í±äµÃ·Ç³£¼òµ¥ÁË£¬ZooKeeperLeaderElectionAgentʵÏÖÁ˽ӿÚLeaderLatchListener£¬ÔÚisLeader()È·ÈÏËùÊôµÄMaster±»Ñ¡ÎªLeaderºó£¬ÏòMaster·¢ËÍÏûÏ¢ElectedLeader£¬Master»á½«×Ô¼ºµÄ״̬¸ÄΪALIVE¡£µ±noLeader()±»µ÷ÓÃʱ£¬Ëü»áÏòMaster·¢ËÍÏûÏ¢RevokedLeadershipʱ£¬Master»á¹Ø±Õ¡£
private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String, conf: SparkConf) extends LeaderElectionAgent with LeaderLatchListener with Logging { val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" // zkÊÇͨ¹ýCuratorFrameworkFactory´´½¨µÄZooKeeperʵÀý private var zk: CuratorFramework = _ // leaderLatch£ºCurator¸ºÔðÑ¡³öLeader¡£ private var leaderLatch: LeaderLatch = _ private var status = LeadershipStatus.NOT_LEADER override def preStart() { logInfo("Starting ZooKeeper LeaderElection agent") zk = SparkCuratorUtil.newClient(conf) leaderLatch = new LeaderLatch(zk, WORKING_DIR) leaderLatch.addListener(this) leaderLatch.start() } |
ÔÚprestartÖУ¬Æô¶¯ÁËleaderLatchÀ´´¦ÀíÑ¡¾ÙZKÖеÄLeader¡£¾ÍÈçÔÚÉϽڷÖÎöµÄ£¬Ö÷ÒªµÄÂß¼ÔÚisLeaderºÍnoLeaderÖС£
override def isLeader() { synchronized { // could have lost leadership by now. //ÏÖÔÚleadership¿ÉÄÜÒѾ±»°þ¶áÁË¡£¡£ÏêÇé²Î¼ûCuratorµÄʵÏÖ¡£ if (!leaderLatch.hasLeadership) { return } logInfo("We have gained leadership") updateLeadershipStatus(true) } } override def notLeader() { synchronized { // ÏÖÔÚ¿ÉÄܸ³ÓèleadershipÁË¡£ÏêÇé²Î¼ûCuratorµÄʵÏÖ¡£ if (leaderLatch.hasLeadership) { return } logInfo("We have lost leadership") updateLeadershipStatus(false) } } |
updateLeadershipStatusµÄÂß¼ºÜ¼òµ¥£¬¾ÍÊÇÏòMaster·¢ËÍÏûÏ¢¡£
def updateLeadershipStatus(isLeader: Boolean) { if (isLeader && status == LeadershipStatus.NOT_LEADER) { status = LeadershipStatus.LEADER masterActor ! ElectedLeader } else if (!isLeader && status == LeadershipStatus.LEADER) { status = LeadershipStatus.NOT_LEADER masterActor ! RevokedLeadership } } |
5. Éè¼ÆÀíÄî
ΪÁ˽â¾öStandaloneģʽϵÄMasterµÄSPOF£¬Spark²ÉÓÃÁËZooKeeperÌṩµÄÑ¡¾Ù¹¦ÄÜ¡£Spark²¢Ã»ÓвÉÓÃZooKeeperÔÉúµÄJava
API£¬¶øÊDzÉÓÃÁËCurator£¬Ò»¸ö¶ÔZooKeeper½øÐÐÁË·â×°µÄ¿ò¼Ü¡£²ÉÓÃÁËCuratorºó£¬Spark²»ÓùÜÀíÓëZooKeeperµÄÁ¬½Ó£¬ÕâЩ¶ÔÓÚSparkÀ´Ëµ¶¼ÊÇ͸Ã÷µÄ¡£Spark½ö½öʹÓÃÁË100ÐдúÂ룬¾ÍʵÏÖÁËMasterµÄHA¡£µ±È»ÁË£¬SparkÊÇÕ¾ÔڵľÞÈ˵ļç°òÉÏ¡£ËÓÖ»áÈ¥ÖØ¸´·¢Ã÷ÂÖ×ÓÄØ£¿
|