topologyµÄÌá½»
´ó¼Ò¶¼ÖªµÀ£¬ÒªÌá½»Storm Topology µ½Cluster£¬ÐèÒªÔËÐÐÈçÏÂÃüÁ
${STORM_HOME}/bin/storm jar xxxxxxxxxxx.jar ${main class} [args ...] |
binĿ¼ÏÂstormÊÇÒ»¸öPythonÎļþ£¬ÎÒÃÇ¿ÉÒÔ¿´Ò»ÏÂPython½Å±¾µÄmain·½·¨
def main(): if len(sys.argv) <= 1: print_usage() sys.exit(-1) global CONFIG_OPTS config_list, args = parse_config_opts(sys.argv[1:]) parse_config(config_list) COMMAND = args[0] ARGS = args[1:] (COMMANDS.get(COMMAND, unknown_command))(*ARGS) if __name__ == "__main__": main() |
Ê×ÏȽâÎöargs²ÎÊý£¬½âÎöÍêÁËÖ®ºó£¬°ÑËùÓеIJÎÊý´«µÝ¸øCOMMANDS£¬ÓÉCOMMANDSµ÷ÓÃÕýÈ·µÄ·½·¨£¬COMMANDSÊÇÒ»¸öDict£¬keyÊÇstring£¬valueÊÇfunction
COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer, "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue, "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath, "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage, "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor} |
ÎÒÃÇÊǵ÷ÓÃjar·½·¨£º
def jar(jarfile, klass, *args): """Syntax: [storm jar topology-jar-path class ...] Runs the main method of class with the specified arguments. The storm jars and configs in ~/.storm are put on the classpath. The process is configured so that StormSubmitter (http://storm.incubator.apache.org/apidocs/backtype/storm/StormSubmitter.html) will upload the jar at topology-jar-path when the topology is submitted. """ exec_storm_class( klass, jvmtype="-client", extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"], args=args, jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile]) |
exec_storm_classʱ¼ÓÁËһЩĬÈϵIJÎÊý£¬jvmtypeÊÇclientµÄ£¬ÎªÊ²Ã´ÓÃclientģʽÆô¶¯£¬¶ø²»ÊÇserverÄÅ£¿¶þÕßÇø±ðÇ뿴֮ǰµÄһƪblog£ºReal
differences between ¡°java -server¡± and ¡°java -client¡±
£¬ÆäËûµÄ¾ÍÊǰÑϵͳÅäÖô«½øÈ¥£º
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False): global CONFFILE storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR]) if(storm_log_dir == None or storm_log_dir == "nil"): storm_log_dir = STORM_DIR+"/logs" all_args = [ JAVA_CMD, jvmtype, get_config_opts(), "-Dstorm.home=" + STORM_DIR, "-Dstorm.log.dir=" + storm_log_dir, "-Djava.library.path=" + confvalue("java.library.path", extrajars), "-Dstorm.conf.file=" + CONFFILE, "-cp", get_classpath(extrajars), ] + jvmopts + [klass] + list(args) print("Running: " + " ".join(all_args)) if fork: os.spawnvp(os.P_WAIT, JAVA_CMD, all_args) else: os.execvp(JAVA_CMD, all_args) # replaces the current process and # never returns |
×é¼þ³õʼ»¯
½ø³ÌÆô¶¯Ö®ºó£¬¾Í¿ªÊ¼µ÷ÓÃÄã×Ô¼ºÐ´µÄTopology´úÂëÁË£¬ÎÒÃÇÒ»°ãÓÃTopologyBuilderÀ´¹¹½¨Topology£¬TopologyBuilderÓÐÈý¸ö±äÁ¿
private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>(); private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>(); private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>(); |
_boltsºÍ_spouts¾Í²»ÑÔ¶øÓ÷ÁË£¬¾ÍÊÇ´æ·ÅÄ㶨ÒåµÄboltºÍspout£¬È»ºósetXXX£¨£©½øÀ´µÄ£¬key=componentId£¬valueÊÇ×Ô¶¨ÒåʵÏÖµÄ×é¼þ
_commons´æ·Å¸Ã×é¼þ¶îÍâµÄһЩÐÅÏ¢£¬²¢Ðжȣ¬¶îÍâÅäÖõȵȡ£Ã¿setÒ»¸ö×é¼þʱ¶¼»áµ÷Óóõʼ»¯common·½·¨
private void initCommon(String id, IComponent component, Number parallelism) { ComponentCommon common = new ComponentCommon(); common.set_inputs(new HashMap<GlobalStreamId, Grouping>()); if(parallelism!=null) common.set_parallelism_hint(parallelism.intValue()); Map conf = component.getComponentConfiguration(); if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf)); _commons.put(id, common); } |
¸Ã·½·¨»áµ÷getComponentCommon·½·¨
private ComponentCommon getComponentCommon(String id, IComponent component) { ComponentCommon ret = new ComponentCommon(_commons.get(id)); OutputFieldsGetter getter = new OutputFieldsGetter(); component.declareOutputFields(getter); ret.set_streams(getter.getFieldsDeclaration()); return ret; } |
´ó¼Ò»á¿´µ½·½·¨µ÷ÓÃ×é¼þµÄdeclareOutputFields·½·¨£¬ËùÒÔÔÚÒ»°ãÖØÔØµÄ·½·¨£¨Sput»áÖØÔØopen£¬nextTupleµÈµÈ£¬Bolt»áÖØÔØprepare£¬executeµÈµÈ£©ÖÐdeclareOutputFieldsÊDZ»×îÏȵ÷Óõģ¬ËùÒÔÊDz»ÄÜÔÙdeclareOutputFieldsÖÐʹÓÃδ±»³õʼ»¯µÄ±äÁ¿£¨ÎÒÃÇÒ»°ã»áÔÚopen»òprepareÖгõʼ»¯±äÁ¿£¬Ò»°ãÒ²²»Ç¿µ÷ÔÚ¹¹Ô캯ÊýÖгõʼ»¯£¬ÒòΪStorm×ÔÉíµÄÐòÁл¯¿ò¼Ü»úÖÆ£©£¬ÕâÑù»áÅ׳öNullPointerÒì³£¡£
µ±ËùÓеÄboltºÍspout¶¼setÍê±ÏÖ®ºó£¬ÎÒÃǾͻáµ÷ÓÃcreateTopology·½·¨Éú³ÉÒ»¸öStormTopology£¬ÓÉStormSubmitterÀ´submit
topology
/** * Submits a topology to run on the cluster. A topology runs forever or until * explicitly killed. * * * @param name the name of the storm. * @param stormConf the topology-specific configuration. See {@link Config}. * @param topology the processing to execute. * @param opts to manipulate the starting of the topology * @param progressListener to track the progress of the jar upload process * @throws AlreadyAliveException if a topology with this name is already running * @throws InvalidTopologyException if an invalid topology was submitted */ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException { if(!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); } stormConf = new HashMap(stormConf); stormConf.putAll(Utils.readCommandLineOpts()); Map conf = Utils.readStormConfig(); conf.putAll(stormConf); try { String serConf = JSONValue.toJSONString(stormConf); if(localNimbus!=null) { LOG.info("Submitting topology " + name + " in local mode"); localNimbus.submitTopology(name, null, serConf, topology); } else { NimbusClient client = NimbusClient.getConfiguredClient(conf); if(topologyNameExists(conf, name)) { throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); } submitJar(conf, progressListener); try { LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); if(opts!=null) { client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); } else { // this is for backwards compatibility client.getClient().submitTopology(name, submittedJar, serConf, topology); } } catch(InvalidTopologyException e) { LOG.warn("Topology submission exception: "+e.get_msg()); throw e; } catch(AlreadyAliveException e) { LOG.warn("Topology already alive exception", e); throw e; } finally { client.close(); } } LOG.info("Finished submitting topology: " + name); } catch(TException e) { throw new RuntimeException(e); } } |
Ìá½»TopologyµÄ²Ù×÷ÊÇ£¬³õʼ»¯NimbusClient£¬ÉÏ´«Jar°ü£¬¼ì²é¸ÃTopologyÊÇ·ñ´æÔÚ£¬Ò»ÇÐÍ깤ºó£¬½ÓÏÂÀ´¾Í½»ÓÉNimbusÀ´×öÁË¡£
Nimbus
Nimbus¿ÉÒÔ ËµÊÇstormÖÐ×îºËÐĵIJ¿·Ö£¬ËüµÄÖ÷Òª¹¦ÄÜÓÐÁ½¸ö£º
¶ÔTopologyµÄÈÎÎñ½øÐзÖÅä×ÊÔ´
½ÓÊÕÓû§µÄÃüÁî²¢×öÏàÓ¦µÄ´¦Àí£¬ÈçTopologyµÄÌá½»£¬É±ËÀ£¬¼¤»îµÈµÈ
Nimbus±¾ÉíÊÇ»ùÓÚThrift¿ò¼ÜʵÏֵģ¬Ê¹ÓÃÁËThriftµÄTHsHaServer·þÎñ£¬¼´°ëͬ²½°ëÒì²½·þÎñģʽ£¬Ê¹ÓÃÒ»¸öµ¥¶ÀµÄÏß³ÌÀ´´¦ÀíÍøÂçIO£¬Ê¹ÓÃÒ»¸ö¶ÀÁ¢µÄÏ̳߳ØÀ´´¦ÀíÏûÏ¢£¬´ó´óÌá¸ßÁËÏûÏ¢µÄ²¢·¢´¦ÀíÄÜÁ¦¡£
·þÎñ½Ó¿ÚµÄ¶¨Òå¶¼ÔÚstorm.thriftÎļþÖж¨Ò壬Ìùϲ¿·Ö´úÂ룺
service Nimbus { void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf,
4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite); void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf,
4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite); void killTopology(1: string name) throws (1: NotAliveException e); void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e); void activate(1: string name) throws (1: NotAliveException e); void deactivate(1: string name) throws (1: NotAliveException e); void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e,
2: InvalidTopologyException ite); // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs string beginFileUpload(); void uploadChunk(1: string location, 2: binary chunk); void finishFileUpload(1: string location); string beginFileDownload(1: string file); //can stop downloading chunks when receive 0-length byte array back binary downloadChunk(1: string id); // returns json string getNimbusConf(); // stats functions ClusterSummary getClusterInfo(); TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e); //returns json string getTopologyConf(1: string id) throws (1: NotAliveException e); StormTopology getTopology(1: string id) throws (1: NotAliveException e); StormTopology getUserTopology(1: string id) throws (1: NotAliveException e); } |
µ±Ö´ÐÐÃüÁî nohup ${STORM_HOME}/bin/storm
nimbus & ʱ£¬»áÆô¶¯nimbus·þÎñ£¬¾ßÌåµÄ´úÂëÖ´ÐУºstorm python½Å±¾´úÂ룬ĬÈÏÆô¶¯backtype.storm.daemon.nimbus³ÌÐò£º
def nimbus(klass="backtype.storm.daemon.nimbus"): """Syntax: [storm nimbus] Launches the nimbus daemon. This command should be run under supervision with a tool like daemontools or monit. See Setting up a Storm cluster for more information. (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) """ cppaths = [CLUSTER_CONF_DIR] jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [ "-Dlogfile.name=nimbus.log", "-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml", ] exec_storm_class( klass, jvmtype="-server", extrajars=cppaths, jvmopts=jvmopts) |
È»ºóÖ´ÐÐnimbus.clj ½Å±¾£¬Ö÷񻃾¼°Á½¸ö·½·¨¡ª¡ªlaunch-server!(nimbusµÄÆô¶¯Èë¿Ú)ºÍservice-handler£¨ÕæÕý¶¨Òå´¦ÀíÂß¼µÄµØ·½£©¡£
nimbusÆô¶¯ºó£¬¶ÔÍâÌṩÁËһЩ·þÎñ£¬topologyµÄÌá½»£¬UIÐÅÏ¢£¬topologyµÄkill£¬rebalanceµÈµÈ¡£ÔÚÎÄÕÂÒ»Öн²µ½Ìá½»topology¸ønimbus£¬ÕâЩ·þÎñµÄ´¦ÀíÂ߼ȫ²¿ÔÚservice-handler·½·¨ÖС£ÒÔϽØÈ¡service-handlerÀïÃæ´¦ÀíÌá½»TopologyµÄÂß¼
(reify Nimbus$Iface (^void submitTopologyWithOpts [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology ^SubmitOptions submitOptions] (try (assert (not-nil? submitOptions)) (validate-topology-name! storm-name) (check-storm-active! nimbus storm-name false) (let [topo-conf (from-json serializedConf)] (try (validate-configs-with-schemas topo-conf) (catch IllegalArgumentException ex (throw (InvalidTopologyException. (.getMessage ex))))) (.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) storm-name topo-conf topology)) (swap! (:submitted-count nimbus) inc) (let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) storm-conf (normalize-conf conf (-> serializedConf from-json (assoc STORM-ID storm-id) (assoc TOPOLOGY-NAME storm-name)) topology) total-storm-conf (merge conf storm-conf) topology (normalize-topology total-storm-conf topology) storm-cluster-state (:storm-cluster-state nimbus)] (system-topology! total-storm-conf topology) ;; this validates the structure of the topology (log-message "Received topology submission for " storm-name " with conf " storm-conf) ;; lock protects against multiple topologies being submitted at once and ;; cleanup thread killing topology in b/w assignment and starting the topology (locking (:submit-lock nimbus) (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) (.setup-heartbeats! storm-cluster-state storm-id) (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active}] (start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions)))) (mk-assignments nimbus))) (catch Throwable e (log-warn-error e "Topology submission exception. (topology name='" storm-name "')") (throw e)))) (^void submitTopology [this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] (.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology (SubmitOptions. TopologyInitialStatus/ACTIVE))) |
¼ì²éTopologyµÄDAGͼÊÇ·ñÊÇÓÐЧÁ¬½Óͼ¡¢ÒÔ¼°¸Ãtopology NameÊÇ·ñÒѾ´æÔÚ£¬È»ºó·ÖÅä×ÊÔ´ºÍÈÎÎñµ÷¶È£¨mk-assignments
£©·½·¨£¬µÈ·ÖÅäºÃ×ÊÔ´Ö®ºó£¬°ÑÊý¾ÝдÈëµ½zookeeper£¬watcher·¢ÏÖÓÐÊý¾Ý£¬¾Í֪ͨsupervisor¶ÁÈ¡Êý¾ÝÆô¶¯ÐµÄworker£¬Ò»¸öworker¾ÍÊÇÒ»¸öJVM½ø³Ì£¬workerÆô¶¯ºó¾Í»á°´ÕÕÓû§ÊÂÏȶ¨ºÃµÄtaskÊýÀ´Æô¶¯task£¬Ò»¸ötask¾ÍÊÇÒ»¸öthread
ÔÚexecutor.cljÖÐmk-threads: spout ,mk-threads:
bolt·½·¨¾ÍÊÇÆô¶¯task£¬¶øtask¾ÍÊǶÔÓ¦µÄspout»òbolt ×é¼þ£¬¶øÇÒÕâʱSpoutµÄopen£¬nextTuple·½·¨£¬ÒÔ¼°boltµÄpreapre£¬execute·½·¨¶¼ÊÇÔÚÕâÀï±»µ÷Óõ쬽áºÏÎÄÕÂÒ»ÖÐÌáµ½µÄ£¬
¶ÔÓÚSpout ·½·¨µ÷ÓÃ˳Ðò:declareOutputFields->
open -> nextTuple -> fail/ack or other
Bolt ·½·¨µ÷ÓÃ˳Ðò£ºdeclareOutputFields->
prepare -> execute
ÐèÒªµÄ×¢ÒâµÄÊÇÔÚSpoutÖÐfail¡¢ack·½·¨ºÍnextTupleÊÇÔÚͬһÏß³ÌÖб»Ë³Ðòµ÷Óõģ¬ËùÒÔÔÚnextTupleÖв»Òª×öÑӳٺܴóµÄ²Ù×÷¡£
ÖÁ´Ë£¬Ò»¸ötopologyËãÊÇ¿ÉÒÔÕýʽÆô¶¯¹¤×÷ÁË¡£
|