Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Modeler   Code  
»áÔ±   
 
   
 
 
     
   
 ¶©ÔÄ
  ¾èÖú
StormÔÓ̸֮TopologyµÄÆô¶¯¹ý³Ì
 
×÷Õߣºjoeywen À´Ô´£ºCSDN ·¢²¼ÓÚ 2015-6-9
  3303  次浏览      29
 

topologyµÄÌá½»

´ó¼Ò¶¼ÖªµÀ£¬ÒªÌá½»Storm Topology µ½Cluster£¬ÐèÒªÔËÐÐÈçÏÂÃüÁ

${STORM_HOME}/bin/storm jar xxxxxxxxxxx.jar ${main class} [args ...]  

binĿ¼ÏÂstormÊÇÒ»¸öPythonÎļþ£¬ÎÒÃÇ¿ÉÒÔ¿´Ò»ÏÂPython½Å±¾µÄmain·½·¨

def main():  
if len(sys.argv) <= 1:
print_usage()
sys.exit(-1)
global CONFIG_OPTS
config_list, args = parse_config_opts(sys.argv[1:])
parse_config(config_list)
COMMAND = args[0]
ARGS = args[1:]
(COMMANDS.get(COMMAND, unknown_command))(*ARGS)

if __name__ == "__main__":
main()

Ê×ÏȽâÎöargs²ÎÊý£¬½âÎöÍêÁËÖ®ºó£¬°ÑËùÓеIJÎÊý´«µÝ¸øCOMMANDS£¬ÓÉCOMMANDSµ÷ÓÃÕýÈ·µÄ·½·¨£¬COMMANDSÊÇÒ»¸öDict£¬keyÊÇstring£¬valueÊÇfunction

COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer,  
"drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
"remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
"activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
"list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor}

ÎÒÃÇÊǵ÷ÓÃjar·½·¨£º

def jar(jarfile, klass, *args):  
"""Syntax: [storm jar topology-jar-path class ...]

Runs the main method of class with the specified arguments.
The storm jars and configs in ~/.storm are put on the classpath.
The process is configured so that StormSubmitter
(http://storm.incubator.apache.org/apidocs/backtype/storm/StormSubmitter.html)
will upload the jar at topology-jar-path when the topology is submitted.
"""
exec_storm_class(
klass,
jvmtype="-client",
extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"],
args=args,
jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile])

exec_storm_classʱ¼ÓÁËһЩĬÈϵIJÎÊý£¬jvmtypeÊÇclientµÄ£¬ÎªÊ²Ã´ÓÃclientģʽÆô¶¯£¬¶ø²»ÊÇserverÄÅ£¿¶þÕßÇø±ðÇ뿴֮ǰµÄһƪblog£ºReal differences between ¡°java -server¡± and ¡°java -client¡± £¬ÆäËûµÄ¾ÍÊǰÑϵͳÅäÖô«½øÈ¥£º

def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):  
global CONFFILE
storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR])
if(storm_log_dir == None or storm_log_dir == "nil"):
storm_log_dir = STORM_DIR+"/logs"
all_args = [
JAVA_CMD, jvmtype, get_config_opts(),
"-Dstorm.home=" + STORM_DIR,
"-Dstorm.log.dir=" + storm_log_dir,
"-Djava.library.path=" + confvalue("java.library.path", extrajars),
"-Dstorm.conf.file=" + CONFFILE,
"-cp", get_classpath(extrajars),
] + jvmopts + [klass] + list(args)
print("Running: " + " ".join(all_args))
if fork:
os.spawnvp(os.P_WAIT, JAVA_CMD, all_args)
else:
os.execvp(JAVA_CMD, all_args) # replaces the current process and
# never returns

×é¼þ³õʼ»¯

½ø³ÌÆô¶¯Ö®ºó£¬¾Í¿ªÊ¼µ÷ÓÃÄã×Ô¼ºÐ´µÄTopology´úÂëÁË£¬ÎÒÃÇÒ»°ãÓÃTopologyBuilderÀ´¹¹½¨Topology£¬TopologyBuilderÓÐÈý¸ö±äÁ¿

private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();  
private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();

_boltsºÍ_spouts¾Í²»ÑÔ¶øÓ÷ÁË£¬¾ÍÊÇ´æ·ÅÄ㶨ÒåµÄboltºÍspout£¬È»ºósetXXX£¨£©½øÀ´µÄ£¬key=componentId£¬valueÊÇ×Ô¶¨ÒåʵÏÖµÄ×é¼þ
_commons´æ·Å¸Ã×é¼þ¶îÍâµÄһЩÐÅÏ¢£¬²¢Ðжȣ¬¶îÍâÅäÖõȵȡ£Ã¿setÒ»¸ö×é¼þʱ¶¼»áµ÷Óóõʼ»¯common·½·¨

private void initCommon(String id, IComponent component, Number parallelism) {  
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if(parallelism!=null) common.set_parallelism_hint(parallelism.intValue());
Map conf = component.getComponentConfiguration();
if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}

¸Ã·½·¨»áµ÷getComponentCommon·½·¨

private ComponentCommon getComponentCommon(String id, IComponent component) {  
ComponentCommon ret = new ComponentCommon(_commons.get(id));

OutputFieldsGetter getter = new OutputFieldsGetter();
component.declareOutputFields(getter);
ret.set_streams(getter.getFieldsDeclaration());
return ret;
}

´ó¼Ò»á¿´µ½·½·¨µ÷ÓÃ×é¼þµÄdeclareOutputFields·½·¨£¬ËùÒÔÔÚÒ»°ãÖØÔØµÄ·½·¨£¨Sput»áÖØÔØopen£¬nextTupleµÈµÈ£¬Bolt»áÖØÔØprepare£¬executeµÈµÈ£©ÖÐdeclareOutputFieldsÊDZ»×îÏȵ÷Óõģ¬ËùÒÔÊDz»ÄÜÔÙdeclareOutputFieldsÖÐʹÓÃδ±»³õʼ»¯µÄ±äÁ¿£¨ÎÒÃÇÒ»°ã»áÔÚopen»òprepareÖгõʼ»¯±äÁ¿£¬Ò»°ãÒ²²»Ç¿µ÷ÔÚ¹¹Ô캯ÊýÖгõʼ»¯£¬ÒòΪStorm×ÔÉíµÄÐòÁл¯¿ò¼Ü»úÖÆ£©£¬ÕâÑù»áÅ׳öNullPointerÒì³£¡£

µ±ËùÓеÄboltºÍspout¶¼setÍê±ÏÖ®ºó£¬ÎÒÃǾͻáµ÷ÓÃcreateTopology·½·¨Éú³ÉÒ»¸öStormTopology£¬ÓÉStormSubmitterÀ´submit topology

/** 
* Submits a topology to run on the cluster. A topology runs forever or until
* explicitly killed.
*
*
* @param name the name of the storm.
* @param stormConf the topology-specific configuration. See {@link Config}.
* @param topology the processing to execute.
* @param opts to manipulate the starting of the topology
* @param progressListener to track the progress of the jar upload process
* @throws AlreadyAliveException if a topology with this name is already running
* @throws InvalidTopologyException if an invalid topology was submitted
*/
public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException {
if(!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
stormConf = new HashMap(stormConf);
stormConf.putAll(Utils.readCommandLineOpts());
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
try {
String serConf = JSONValue.toJSONString(stormConf);
if(localNimbus!=null) {
LOG.info("Submitting topology " + name + " in local mode");
localNimbus.submitTopology(name, null, serConf, topology);
} else {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
if(topologyNameExists(conf, name)) {
throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
}
submitJar(conf, progressListener);
try {
LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
if(opts!=null) {
client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
} else {
// this is for backwards compatibility
client.getClient().submitTopology(name, submittedJar, serConf, topology);
}
} catch(InvalidTopologyException e) {
LOG.warn("Topology submission exception: "+e.get_msg());
throw e;
} catch(AlreadyAliveException e) {
LOG.warn("Topology already alive exception", e);
throw e;
} finally {
client.close();
}
}
LOG.info("Finished submitting topology: " + name);
} catch(TException e) {
throw new RuntimeException(e);
}
}

Ìá½»TopologyµÄ²Ù×÷ÊÇ£¬³õʼ»¯NimbusClient£¬ÉÏ´«Jar°ü£¬¼ì²é¸ÃTopologyÊÇ·ñ´æÔÚ£¬Ò»ÇÐÍ깤ºó£¬½ÓÏÂÀ´¾Í½»ÓÉNimbusÀ´×öÁË¡£

Nimbus

Nimbus¿ÉÒÔ ËµÊÇstormÖÐ×îºËÐĵIJ¿·Ö£¬ËüµÄÖ÷Òª¹¦ÄÜÓÐÁ½¸ö£º

¶ÔTopologyµÄÈÎÎñ½øÐзÖÅä×ÊÔ´

½ÓÊÕÓû§µÄÃüÁî²¢×öÏàÓ¦µÄ´¦Àí£¬ÈçTopologyµÄÌá½»£¬É±ËÀ£¬¼¤»îµÈµÈ

Nimbus±¾ÉíÊÇ»ùÓÚThrift¿ò¼ÜʵÏֵģ¬Ê¹ÓÃÁËThriftµÄTHsHaServer·þÎñ£¬¼´°ëͬ²½°ëÒì²½·þÎñģʽ£¬Ê¹ÓÃÒ»¸öµ¥¶ÀµÄÏß³ÌÀ´´¦ÀíÍøÂçIO£¬Ê¹ÓÃÒ»¸ö¶ÀÁ¢µÄÏ̳߳ØÀ´´¦ÀíÏûÏ¢£¬´ó´óÌá¸ßÁËÏûÏ¢µÄ²¢·¢´¦ÀíÄÜÁ¦¡£

·þÎñ½Ó¿ÚµÄ¶¨Òå¶¼ÔÚstorm.thriftÎļþÖж¨Ò壬Ìùϲ¿·Ö´úÂ룺

service Nimbus {  
void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
void killTopology(1: string name) throws (1: NotAliveException e);
void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
void activate(1: string name) throws (1: NotAliveException e);
void deactivate(1: string name) throws (1: NotAliveException e);
void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);

// need to add functions for asking about status of storms, what nodes they're running on, looking at task logs

string beginFileUpload();
void uploadChunk(1: string location, 2: binary chunk);
void finishFileUpload(1: string location);

string beginFileDownload(1: string file);
//can stop downloading chunks when receive 0-length byte array back
binary downloadChunk(1: string id);

// returns json
string getNimbusConf();
// stats functions
ClusterSummary getClusterInfo();
TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
//returns json
string getTopologyConf(1: string id) throws (1: NotAliveException e);
StormTopology getTopology(1: string id) throws (1: NotAliveException e);
StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
}

µ±Ö´ÐÐÃüÁî nohup ${STORM_HOME}/bin/storm nimbus & ʱ£¬»áÆô¶¯nimbus·þÎñ£¬¾ßÌåµÄ´úÂëÖ´ÐУºstorm python½Å±¾´úÂ룬ĬÈÏÆô¶¯backtype.storm.daemon.nimbus³ÌÐò£º

def nimbus(klass="backtype.storm.daemon.nimbus"):  
"""Syntax: [storm nimbus]

Launches the nimbus daemon. This command should be run under
supervision with a tool like daemontools or monit.

See Setting up a Storm cluster for more information.
(http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster)
"""
cppaths = [CLUSTER_CONF_DIR]
jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
"-Dlogfile.name=nimbus.log",
"-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
]
exec_storm_class(
klass,
jvmtype="-server",
extrajars=cppaths,
jvmopts=jvmopts)

È»ºóÖ´ÐÐnimbus.clj ½Å±¾£¬Ö÷񻃾¼°Á½¸ö·½·¨¡ª¡ªlaunch-server!(nimbusµÄÆô¶¯Èë¿Ú)ºÍservice-handler£¨ÕæÕý¶¨Òå´¦ÀíÂß¼­µÄµØ·½£©¡£

nimbusÆô¶¯ºó£¬¶ÔÍâÌṩÁËһЩ·þÎñ£¬topologyµÄÌá½»£¬UIÐÅÏ¢£¬topologyµÄkill£¬rebalanceµÈµÈ¡£ÔÚÎÄÕÂÒ»Öн²µ½Ìá½»topology¸ønimbus£¬ÕâЩ·þÎñµÄ´¦ÀíÂß¼­È«²¿ÔÚservice-handler·½·¨ÖС£ÒÔϽØÈ¡service-handlerÀïÃæ´¦ÀíÌá½»TopologyµÄÂß¼­

(reify Nimbus$Iface  
(^void submitTopologyWithOpts
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
^SubmitOptions submitOptions]
(try
(assert (not-nil? submitOptions))
(validate-topology-name! storm-name)
(check-storm-active! nimbus storm-name false)
(let [topo-conf (from-json serializedConf)]
(try
(validate-configs-with-schemas topo-conf)
(catch IllegalArgumentException ex
(throw (InvalidTopologyException. (.getMessage ex)))))
(.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus)
storm-name
topo-conf
topology))
(swap! (:submitted-count nimbus) inc)
(let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs))
storm-conf (normalize-conf
conf
(-> serializedConf
from-json
(assoc STORM-ID storm-id)
(assoc TOPOLOGY-NAME storm-name))
topology)
total-storm-conf (merge conf storm-conf)
topology (normalize-topology total-storm-conf topology)
storm-cluster-state (:storm-cluster-state nimbus)]
(system-topology! total-storm-conf topology) ;; this validates the structure of the topology
(log-message "Received topology submission for " storm-name " with conf " storm-conf)
;; lock protects against multiple topologies being submitted at once and
;; cleanup thread killing topology in b/w assignment and starting the topology
(locking (:submit-lock nimbus)
(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
(.setup-heartbeats! storm-cluster-state storm-id)
(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
TopologyInitialStatus/ACTIVE :active}]
(start-storm nimbus storm-name storm-id (thrift-status->kw-status (.get_initial_status submitOptions))))
(mk-assignments nimbus)))
(catch Throwable e
(log-warn-error e "Topology submission exception. (topology name='" storm-name "')")
(throw e))))

(^void submitTopology
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology]
(.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology
(SubmitOptions. TopologyInitialStatus/ACTIVE)))

¼ì²éTopologyµÄDAGͼÊÇ·ñÊÇÓÐЧÁ¬½Óͼ¡¢ÒÔ¼°¸Ãtopology NameÊÇ·ñÒѾ­´æÔÚ£¬È»ºó·ÖÅä×ÊÔ´ºÍÈÎÎñµ÷¶È£¨mk-assignments £©·½·¨£¬µÈ·ÖÅäºÃ×ÊÔ´Ö®ºó£¬°ÑÊý¾ÝдÈëµ½zookeeper£¬watcher·¢ÏÖÓÐÊý¾Ý£¬¾Í֪ͨsupervisor¶ÁÈ¡Êý¾ÝÆô¶¯ÐµÄworker£¬Ò»¸öworker¾ÍÊÇÒ»¸öJVM½ø³Ì£¬workerÆô¶¯ºó¾Í»á°´ÕÕÓû§ÊÂÏȶ¨ºÃµÄtaskÊýÀ´Æô¶¯task£¬Ò»¸ötask¾ÍÊÇÒ»¸öthread

ÔÚexecutor.cljÖÐmk-threads: spout ,mk-threads: bolt·½·¨¾ÍÊÇÆô¶¯task£¬¶øtask¾ÍÊǶÔÓ¦µÄspout»òbolt ×é¼þ£¬¶øÇÒÕâʱSpoutµÄopen£¬nextTuple·½·¨£¬ÒÔ¼°boltµÄpreapre£¬execute·½·¨¶¼ÊÇÔÚÕâÀï±»µ÷Óõ쬽áºÏÎÄÕÂÒ»ÖÐÌáµ½µÄ£¬

¶ÔÓÚSpout ·½·¨µ÷ÓÃ˳Ðò:declareOutputFields-> open -> nextTuple -> fail/ack or other

Bolt ·½·¨µ÷ÓÃ˳Ðò£ºdeclareOutputFields-> prepare -> execute

ÐèÒªµÄ×¢ÒâµÄÊÇÔÚSpoutÖÐfail¡¢ack·½·¨ºÍnextTupleÊÇÔÚͬһÏß³ÌÖб»Ë³Ðòµ÷Óõģ¬ËùÒÔÔÚnextTupleÖв»Òª×öÑӳٺܴóµÄ²Ù×÷¡£

ÖÁ´Ë£¬Ò»¸ötopologyËãÊÇ¿ÉÒÔÕýʽÆô¶¯¹¤×÷ÁË¡£

   
3303 ´Îä¯ÀÀ       29
 
Ïà¹ØÎÄÕÂ

ÔÆ¼ÆËãµÄ¼Ü¹¹
¶ÔÔÆ¼ÆËã·þÎñÄ£ÐÍ
ÔÆ¼ÆËãºËÐļ¼ÊõÆÊÎö
Á˽âÔÆ¼ÆËãµÄ©¶´
 
Ïà¹ØÎĵµ

ÔÆ¼ÆËã¼ò½é
ÔÆ¼ÆËã¼ò½éÓëÔÆ°²È«
ÏÂÒ»´úÍøÂç¼ÆËã--ÔÆ¼ÆËã
ÈídzÎöÔÆ¼ÆËã
 
Ïà¹Ø¿Î³Ì

ÔÆ¼ÆËãÔ­ÀíÓëÓ¦ÓÃ
ÔÆ¼ÆËãÓ¦ÓÃÓ뿪·¢
CMMIÌåϵÓëʵ¼ù
»ùÓÚCMMI±ê×¼µÄÈí¼þÖÊÁ¿±£Ö¤
×îл¼Æ»®
DeepSeek´óÄ£ÐÍÓ¦Óÿª·¢ 6-12[ÏÃÃÅ]
È˹¤ÖÇÄÜ.»úÆ÷ѧϰTensorFlow 6-22[Ö±²¥]
»ùÓÚ UML ºÍEA½øÐзÖÎöÉè¼Æ 6-30[±±¾©]
ǶÈëʽÈí¼þ¼Ü¹¹-¸ß¼¶Êµ¼ù 7-9[±±¾©]
Óû§ÌåÑé¡¢Ò×ÓÃÐÔ²âÊÔÓëÆÀ¹À 7-25[Î÷°²]
ͼÊý¾Ý¿âÓë֪ʶͼÆ× 8-23[±±¾©]

ר¼ÒÊӽǿ´ITÓë¼Ü¹¹
Èí¼þ¼Ü¹¹Éè¼Æ
ÃæÏò·þÎñÌåϵ¼Ü¹¹ºÍÒµÎñ×é¼þµÄ˼¿¼
ÈËÈËÍøÒÆ¶¯¿ª·¢¼Ü¹¹
¼Ü¹¹¸¯»¯Ö®ÃÕ
̸ƽ̨¼´·þÎñPaaS
Ïà¹ØÅàѵ¿Î³Ì

ÔÆ¼ÆËãÔ­ÀíÓëÓ¦ÓÃ
Windows Azure ÔÆ¼ÆËãÓ¦ÓÃ

ĦÍÐÂÞÀ­ ÔÆÆ½Ì¨µÄ¹¹½¨ÓëÓ¦ÓÃ
ͨÓù«Ë¾GE DockerÔ­ÀíÓëʵ¼ù
ijÑз¢ÖÐÐÄ Openstackʵ¼ù
ÖªÃûµç×Ó¹«Ë¾ ÔÆÆ½Ì¨¼Ü¹¹ÓëÓ¦ÓÃ
ijµçÁ¦ÐÐÒµ »ùÓÚÔÆÆ½Ì¨¹¹½¨ÔÆ·þÎñ
ÔÆ¼ÆËãÓëWindows AzureÅàѵ
±±¾© ÔÆ¼ÆËãÔ­ÀíÓëÓ¦ÓÃ