
nimbus是storm集群的"控制器",是storm集群的重要组成部分。我们可以通用执行bin/storm nimbus &/dev/null 2&&1 &来启动nimbus。bin/storm是一个python脚本,在这个脚本中定义了一个nimbus函数:
def nimbus(klass="backtype.storm.daemon.nimbus"): & &"""Syntax: [storm nimbus] & &Launches the nimbus daemon. This command should be run under
& &supervision with a tool like daemontools or monit.
& &See Setting up a Storm cluster for more information. & &(/nathanmarz/storm/wiki/Setting-up-a-Storm-cluster) & &""" & &cppaths = [STORM_DIR + "/log4j", STORM_DIR + "/conf"] & &jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [ & & & &"", & & & &"", & &] & &exec_storm_class( & & & &klass,
& & & &jvmtype="-server",
& & & &extrajars=cppaths,
& & & &jvmopts=jvmopts)
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False): & & &global CONFFILE & & &all_args = [ & & & & &"java", jvmtype, get_config_opts(), & & & & &"-Dstorm.home=" + STORM_DIR, &
& & & &"-Djava.library.path=" + confvalue("java.library.path", extrajars), & & & & &"-Dstorm.conf.file=" + CONFFILE, & & & & &"-cp", get_classpath(extrajars), & & &] + jvmopts + [klass] + list(args) & & &print "Running: " + " ".join(all_args) & & &if fork: & & & & &os.spawnvp(os.P_WAIT, "java", all_args) & & &else: & & & & &os.execvp("java", all_args) # replaces the current process and never returns
get_config_opts()获取jvm的默认配置信息,confvalue("java.library.path", extrajars)获取storm使用的本地库JZMQ加载路径,get_classpath(extrajars)获取所有依赖jar包的完整路径,然后拼接一个java -cp命令运行klass的main方法。klass默认值为backtype.storm.daemon.nimbus,所以exec_storm_class函数最终调用backtype.storm.daemon.nimbus类的main方法。
(ns backtype.storm.daemon.nimbus &(:import [org.apache.thrift.server THsHaServer THsHaServer$Args]) &(:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory]) &(:import [org.apache.thrift.exception]) &(:import [org.apache.thrift.transport TNonblockingServerTransport TNonblockingServerSocket]) &(:import [java.nio ByteBuffer]) &(:import [ FileNotFoundException]) &(:import [java.nio.channels Channels WritableByteChannel]) &(:use [backtype.storm.scheduler.DefaultScheduler]) &(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails & & & & & &Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) &(:use [backtype.storm bootstrap util]) &(:use [backtype.storm.config :only [validate-configs-with-schemas]]) &(:use [backtype.storm.daemon common]) &(:gen-class & &:methods [^{:static true} [launch [backtype.storm.scheduler.INimbus] void]])) & &... & &;; 其他方法 & &... & &(defn -main [] &(-launch (standalone-nimbus)))
(defn standalone-nimbus [] &;; 实现INimbus接口 &(reify INimbus & &;; prepare函数为空实现 & &(prepare [this conf local-dir] & & &) & &;; allSlotsAvailableForScheduling获取所有可用的slot集合 & &(allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments] & & &;; supervisors标识集群所有supervisor的详细信息对象SupervisorDetails的集合 & & &(-&& supervisors & & & & & ;; 遍历supervisors,为supervisor的每个port生成对应的WorkerSlot对象,WorkerSlot包含两个属性节点id和port & & & & & (mapcat (fn [^SupervisorDetails s] & & & & & & & & & & (for [p (.getMeta s)] & & & & & & & & & & & (WorkerSlot. (.getId s) p)))) & & & & & set )) & &(assignSlots [this topology slots] & & &) & &(getForcedScheduler [this] & & &nil ) & &;; 获取supervisor主机名 & &(getHostName [this supervisors node-id] & & &(if-let [^SupervisorDetails supervisor (get supervisors node-id)] & & & &(.getHost supervisor))) & &))
(defn -launch [nimbus] &;; &;; read-storm-config函数用于读取storm集群的配置信息,参见其定义部分 &(launch-server! (read-storm-config) nimbus))launch-server!函数定义如下: &(defn launch-server! [conf nimbus] &;; 判断是否是分布式模式,如果是本地模式则抛出IllegalArgumentException &(validate-distributed-mode! conf) &;; service-handler函数是由宏defserverfn定义的,返回一个实现了Nimbus类中的Iface接口的实例,Nimbus类是由thrift框架自动生成的,Iface接口封装了service Nimbus的全部接口。 &;; nimbus thrift server端提供的接口服务都是由这个实例实现的。service-handler函数参见其定义部分,service Nimbus参见storm.thrift &;; service-handler绑定实现了Nimbus类中的Iface接口的实例 &(let [service-handler (service-handler conf nimbus) & & & &options (-& (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))) & & & & & & & & & &(THsHaServer$Args.) & & & & & & & & & &(.workerThreads 64) & & & & & & & & & &(.protocolFactory (TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE))) & & & & & & & & & &(.processor (Nimbus$Processor. service-handler)) & & & & & & & & & &) & & & server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))] & &(add-shutdown-hook-with-force-kill-in-1-sec (fn [] & & & & & & & & & & & & & & & & & & & & & & & & &(.shutdown service-handler) & & & & & & & & & & & & & & & & & & & & & & & & &(.stop server))) & &(log-message "Starting Nimbus server...") & &(.serve server)))
(defn read-storm-config & &[] & &;; conf绑定storm集群配置信息 &(let [conf (clojurify-structure (Utils/readStormConfig))] & & &;; validate-configs-with-schemas函数验证配置信息的正确性并删除不正确的配置信息 & &(validate-configs-with-schemas conf) & & &conf))
public static Map readStormConfig() { & & & & &// 调用readDefaultConfig从defaults.yaml配置文件读取默认配置信息存入ret & & & &Map ret = readDefaultConfig();
& & & &// 获取用户自定义配置文件路径 & & & &String confFile = System.getProperty("storm.conf.file"); & & & & &Map storm; & & & & &if (confFile==null || confFile.equals("")) { & & & & & & &storm = findAndReadConfigFile("storm.yaml", false); & & & & &} else {
& & & & & &// 读取用户自定义配置信息 & & & & & &storm = findAndReadConfigFile(confFile, true); & & & & &} & & & & &// 将用户自定义的配置信息覆盖更新到ret中 & & & &ret.putAll(storm);
& & & &// 将命令行方式提供的配置信息覆盖更新到ret中
& & & &ret.putAll(readCommandLineOpts()); & & & & &// 返回覆盖更新后的配置信息ret & & & &return ret; & & &}
defserverfn是一个宏,(defserverfn service-handler [conf inimbus] ... )返回一个名字为service-handler函数。宏扩展是在编译时进行的
(defserverfn service-handler [conf inimbus] &;; 调用inimbus的prepare方法,inimbus是standalone-nimbus函数返回的实现INimbus接口的实例,当前版本prepare方法为空实现 &(.prepare inimbus conf (master-inimbus-dir conf)) &;; 打印日志信息 &(log-message "Starting Nimbus with conf " conf) &;; nimbus绑定了一个map,这个map保存了nimbus端所必需的"属性",详见nimbus-data函数定义部分 &(let [nimbus (nimbus-data conf inimbus)] & &;; 调用nimbus这个map中保存的backtype.storm.nimbus.DefaultTopologyValidator对象的prepare方法,通过查看backtype.storm.nimbus.DefaultTopologyValidator类,我们可以发现prepare默认为空实现 & &(.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf) & &;; cleanup-corrupt-topologies!函数的主要功能就是将在nimbus服务器{storm.local.dir}/nimbus/stormdist/路径中不存在的topology id从zookeeper的/storms/路径中删除,即删除在nimbus服务器上缺失jar包、topology信息和配置信息的当前正在运行的topology, & &;; cleanup-corrupt-topologies!函数参见其定义部分 & &(cleanup-corrupt-topologies! nimbus) & &;; 更新当前storm集群上topology的状态 & &(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))] & & &;; transition!函数主要功能就是负责topology状态转换,规定了当topology由一种状态转换成另一种新状态时,需要做哪些处理操作,参见其定义部分 & & &(transition! nimbus storm-id :startup)) & &;; 通过schedule-recurring函数向storm定时器添加了一个"周期任务"检查心跳,重新分配任务,清理不活跃的topology,mk-assignments函数的主要功能就是检查心跳和重新分配任务。关于storm定时器详细分析请见"storm定时器timer源码分析";关于mk-assignments函数请见"storm任务分配源码分析" & &;; do-cleanup函数主要功能就是清理不活跃的topology,请参加其定义部分 & &(schedule-recurring (:timer nimbus) & & & & & & & & & & & &0 & & & & & & & & & & & &(conf NIMBUS-MONITOR-FREQ-SECS) & & & & & & & & & & & &(fn [] & & & & & & & & & & & & &(when (conf NIMBUS-REASSIGN) & & & & & & & & & & & & & &(locking (:submit-lock nimbus) & & & & & & & & & & & & & & &(mk-assignments nimbus))) & & & & & & & & & & & & &(do-cleanup nimbus) & & & & & & & & & & & & &)) & &;; Schedule Nimbus inbox cleaner & &;; 通过schedule-recurring函数向storm定时器添加一个"周期任务"删除nimbus服务器上的过期jar包 & &(schedule-recurring (:timer nimbus) & & & & & & & & & & & &0 & & & & & & & & & & & &(conf NIMBUS-CLEANUP-INBOX-FREQ-SECS) & & & & & & & & & & & &(fn [] & & & & & & & & & & & & &(clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS)) & & & & & & & & & & & & &)) & & & &(reify Nimbus$Iface & & &;; submitTopologyWithOpts函数负责topology的提交,有关该函数的详细分析请参见"storm源码分析之topology提交过程" & & &(^void submitTopologyWithOpts & & & &[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology & & & & ^SubmitOptions submitOptions] & & & &(try & & & & &(assert (not-nil? submitOptions)) & & & & &(validate-topology-name! storm-name) & & & & &(check-storm-active! nimbus storm-name false) & & & & &(let [topo-conf (from-json serializedConf)] & & & & & &(try & & & & & & &(validate-configs-with-schemas topo-conf) & & & & & & &(catch IllegalArgumentException ex & & & & & & & &(throw (InvalidTopologyException. (.getMessage ex))))) & & & & & &(.validate ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) & & & & & & & & & & & storm-name & & & & & & & & & & & topo-conf & & & & & & & & & & & topology)) & & & & &(swap! (:submitted-count nimbus) inc) & & & & &(let [storm-id (str storm-name "-" @(:submitted-count nimbus) "-" (current-time-secs)) & & & & & & & &storm-conf (normalize-conf & & & & & & & & & & & & & &conf & & & & & & & & & & & & & &(-& serializedConf & & & & & & & & & & & & & & & &from-json & & & & & & & & & & & & & & & &(assoc STORM-ID storm-id) & & & & & & & & & & & & & & &(assoc TOPOLOGY-NAME storm-name)) & & & & & & & & & & & & & &topology) & & & & & & & &total-storm-conf (merge conf storm-conf) & & & & & & & &topology (normalize-topology total-storm-conf topology) & & & & & & & &storm-cluster-state (:storm-cluster-state nimbus)] & & & & & &(system-topology! total-storm-conf topology) ;; this validates the structure of the topology & & & & & &(log-message "Received topology submission for " storm-name " with conf " storm-conf) & & & & & &;; lock protects against multiple topologies being submitted at once and & & & & & &;; cleanup thread killing topology in b/w assignment and starting the topology & & & & & &(locking (:submit-lock nimbus) & & & & & & &(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology) & & & & & & &(.setup-heartbeats! storm-cluster-state storm-id) & & & & & & &(let [thrift-status-&kw-status {TopologyInitialStatus/INACTIVE :inactive & & & & & & & & & & & & & & & & & & & & & & &TopologyInitialStatus/ACTIVE :active}] & & & & & & & &(start-storm nimbus storm-name storm-id (thrift-status-&kw-status (.get_initial_status submitOptions)))) & & & & & & &(mk-assignments nimbus))) & & & & &(catch Throwable e & & & & & &(log-warn-error e "Topology submission exception. (topology name='" storm-name "')") & & & & & &(throw e)))) & & &;; submitTopology函数调用了submitTopologyWithOpts函数 & & &(^void submitTopology & & & &[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology] & & & &(.submitTopologyWithOpts this storm-name uploadedJarLocation serializedConf topology & & & & & & & & & & & & & & & & (SubmitOptions. TopologyInitialStatus/ACTIVE))) & & &;; killTopology函数见名知意,调用了killTopologyWithOpts函数 & & &(^void killTopology [this ^String name] & & & &(.killTopologyWithOpts this name (KillOptions.))) & & &;; storm-name绑定kill的topology名称,KillOptions是一个thrift数据结构,只有个属性wait_secs,表示延迟多长时间执行kill & & &(^void killTopologyWithOpts [this ^String storm-name ^KillOptions options] & & & &;; check-storm-active!检查topology是否是"active",如果不活跃则抛出异常 & & & &(check-storm-active! nimbus storm-name true) & & & &;; 如果设置了延迟时间,wait-amt绑定延迟时间 & & & &(let [wait-amt (if (.is_set_wait_secs options) & & & & & & & & & & & & (.get_wait_secs options) & & & & & & & & & & & &
& & & & & & & & & & & & )] & & & & &;; transition-name!函数主要功能就是根据storm-name获取topology id,然后调用transition!函数,topology由当前状态转换到:kill状态,:kill状态是一个"临时状态",最终修改topology状态为:killed,:killed状态为"持久状态" & & & & &;; 通过state-transitions函数我们可以知道无论从哪种状态转换到:kill状态,都将调用kill-transition函数,kill-transition通过调用delay-event向storm定时器添加一个定时任务,这个定时任务的主要功能就是负责topology由:killed状态 & & & & &;; 转换到:remove状态,这时将调用remove-storm!函数清理topology & & & & &(transition-name! nimbus storm-name [:kill wait-amt] true) & & & & &)) & & &;; rebalance函数可以重新设置topology的进程数和各个component的并行度,RebalanceOptions是thirft数据结构,有三个属性rebalance的延迟时间、新的进程数,新的并行度 & & &(^void rebalance [this ^String storm-name ^RebalanceOptions options] & & & &;; check-storm-active!检查topology是否是"active",如果不活跃则抛出异常 & & & &(check-storm-active! nimbus storm-name true) & & & &;; 如果设置了延迟时间,wait-amt绑定延迟时间 & & & &(let [wait-amt (if (.is_set_wait_secs options) & & & & & & & & & & & & (.get_wait_secs options)) & & & & & & &;; 如果设置了新的进程数,num-workers绑定新进程数 & & & & & & &num-workers (if (.is_set_num_workers options) & & & & & & & & & & & & & &(.get_num_workers options)) & & & & & & &;; 如果设置了新的组件并行度,executor-overrides绑定新组件并行度 & & & & & & &executor-overrides (if (.is_set_num_executors options) & & & & & & & & & & & & & & & & & (.get_num_executors options) & & & & & & & & & & & & & & & & & {})] & & & & &(doseq [[c num-executors] executor-overrides] & & & & & &(when (&= num-executors 0) & & & & & & &(throw (InvalidTopologyException. "Number of executors must be greater than 0")) & & & & & & &)) & & & & &;; transition-name!函数主要功能就是根据storm-name获取topology id,然后调用transition!函数,topology由当前状态转换到:rebalance状态,:rebalance状态是一个"临时状态",最终修改topology状态为:rebalancing,:rebalancing状态为"持久状态" & & & & &;; 通过state-transitions函数我们可以知道只允许从:active和:inactive状态转换到:rebalance状态,并调用rebalance-transition函数,rebalance-transition通过调用delay-event向storm定时器添加一个定时任务,这个定时任务的主要功能就是负责topology由:rebalancing状态 & & & & &;; 转换到:do-rebalance状态,并调用do-rebalance函数(重新设置topology的进程数和组件并行度,然后调用mk-assignments函数重新进行任务分配),然后将topology状态修改成:rebalancing的前一个状态 & & & & &(transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true) & & & & &)) & & & &;; 激活topology,将topology状态修改成:active,处理过程与killTopologyWithOpts、rebalance相似 & & &(activate [this storm-name] & & & &(transition-name! nimbus storm-name :activate true) & & & &) & & &;; 将topology状态修改成:inactive,deactivate处理过程与activate相似 & & &(deactivate [this storm-name] & & & &(transition-name! nimbus storm-name :inactivate true)) & & &;; beginFileUpload()函数获取nimbus存放jar的目录 & & &(beginFileUpload [this] & & & &(let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")] & & & & &(.put (:uploaders nimbus) & & & & & & & &fileloc & & & & & & & &(Channels/newChannel (FileOutputStream. fileloc))) & & & & &(log-message "Uploading file from client to " fileloc) & & & & &fileloc & & & & &)) & & &;; 上传jar包文件 & & &(^void uploadChunk [this ^String location ^ByteBuffer chunk] & & & &(let [uploaders (:uploaders nimbus) & & & & & & &^WritableByteChannel channel (.get uploaders location)] & & & & &(when-not channel & & & & & &(throw (RuntimeException. & & & & & & & & & &"File for that location does not exist (or timed out)"))) & & & & &(.write channel chunk) & & & & &(.put uploaders location channel) & & & & &)) & & &;; 上传jar包完成,关闭Channel & & &(^void finishFileUpload [this ^String location] & & & &(let [uploaders (:uploaders nimbus) & & & & & & &^WritableByteChannel channel (.get uploaders location)] & & & & &(when-not channel & & & & & &(throw (RuntimeException. & & & & & & & & & &"File for that location does not exist (or timed out)"))) & & & & &(.close channel) & & & & &(log-message "Finished uploading file from client: " location) & & & & &(.remove uploaders location) & & & & &)) & & &;; 获取文件输入流 & & &(^String beginFileDownload [this ^String file] & & & &(let [is (BufferFileInputStream. file) & & & & & & &id (uuid)] & & & & &(.put (:downloaders nimbus) id is) & & & & &id & & & & &)) & & &;; 读取文件 & & &(^ByteBuffer downloadChunk [this ^String id] & & & &(let [downloaders (:downloaders nimbus) & & & & & & &^BufferFileInputStream is (.get downloaders id)] & & & & &(when-not is & & & & & &(throw (RuntimeException. & & & & & & & & & &"Could not find input stream for that id"))) & & & & &(let [ret (.read is)] & & & & & &(.put downloaders id is) & & & & & &(when (empty? ret) & & & & & & &(.remove downloaders id)) & & & & & &(ByteBuffer/wrap ret) & & & & & &))) & & &;; 获取storm集群配置信息 & & &(^String getNimbusConf [this] & & & &(to-json (:conf nimbus))) & & &;; 获取topology配置信息 & & &(^String getTopologyConf [this ^String id] & & & &(to-json (try-read-storm-conf conf id))) & & &;; 获取StormTopology & & &(^StormTopology getTopology [this ^String id] & & & &(system-topology! (try-read-storm-conf conf id) (try-read-storm-topology conf id))) & & &(^StormTopology getUserTopology [this ^String id] & & & &(try-read-storm-topology conf id)) & & & & & &;; 获取当前集群的汇总信息包括supervisor汇总信息,nimbus启动时间,所有活跃topology汇总信息 & & &(^ClusterSummary getClusterInfo [this] & & & &(let [storm-cluster-state (:storm-cluster-state nimbus) & & & & & & & & & &;; supervisor-infos绑定supervisor id-&SupervisorInfo对象键值对的map & & & & & & &;; SupervisorInfo定义:(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs]) & & & & & & &supervisor-infos (all-supervisor-info storm-cluster-state) & & & & & & &;; TODO: need to get the port info about supervisors... & & & & & & &;; in standalone just look at metadata, otherwise just say N/A? & & & & & & &;; 根据SupervisorInfo数据创建SupervisorSummary数据 & & & & & & &supervisor-summaries (dofor [[id info] supervisor-infos] & & & & & & & & & & & & & & & & & & & & &(let [ports (set (:meta info)) ;;TODO: this is only true for standalone & & & & & & & & & & & & & & & & & & & & & & & &] & & & & & & & & & & & & & & & & & & & & & &(SupervisorSummary. (:hostname info) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &(:uptime-secs info) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &(count ports) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &(count (:used-ports info)) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &id ) & & & & & & & & & & & & & & & & & & & & & &)) & & & & & & &;; nimbus-uptime绑定nimbus启动时间 & & & & & & & & & & & & & & & & & & & & & &nimbus-uptime ((:uptime nimbus)) & & & & & & &;; bases绑定集群上所有活跃topology的StormBase数据集合 & & & & & & &bases (topology-bases storm-cluster-state) & & & & & & &;; topology-summaries绑定活跃topology的TopologySummary数据 & & & & & & &topology-summaries (dofor [[id base] bases] & & & & & & & & & & & & & & & & & & & &(let [assignment (.assignment-info storm-cluster-state id nil)] & & & & & & & & & & & & & & & & & & & & &(TopologySummary. id & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &(:storm-name base) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &(-&& (:executor-&node+port assignment) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & keys & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & (mapcat executor-id-&tasks) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & count)
& & & & & & & & & & & & & & & & & & & & & & & & & & & & & &(-&& (:executor-&node+port assignment) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & keys & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & count) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &(-&& (:executor-&node+port assignment) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & vals & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & set & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & count) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &(time-delta (:launch-time-secs base)) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &(extract-status-str base)) & & & & & & & & & & & & & & & & & & & & &))] & & & & &;; 创建ClusterSummary数据 & & & & &(ClusterSummary. supervisor-summaries & & & & & & & & & & & & & nimbus-uptime & & & & & & & & & & & & & topology-summaries) & & & & &)) & & &;; 获取指定storm-id的topology的TopologyInfo数据 & & &(^TopologyInfo getTopologyInfo [this ^String storm-id] & & & &;; storm-cluster-state绑定StormClusterState对象 & & & &(let [storm-cluster-state (:storm-cluster-state nimbus) & & & & & & &;; task-&component绑定任务id-&组件名称键值对的map,形如:{1 "boltA", 2 "boltA", 3 "boltA", 4 "boltA", 5 "boltB", 6 "boltB"} & & & & & & &task-&component (storm-task-info (try-read-storm-topology conf storm-id) (try-read-storm-conf conf storm-id)) & & & & & & &;; bases绑storm-id的StormBase & & & & & & &base (.storm-base storm-cluster-state storm-id nil) & & & & & & &;; assignment绑定该topology的AssignmentInfo信息,(defrecord Assignment [master-code-dir node-&host executor-&node+port executor-&start-time-secs]) & & & & & & &assignment (.assignment-info storm-cluster-state storm-id nil) & & & & & & &;; beats绑定该topology所有executor-id-&心跳信息的map & & & & & & &beats (.executor-beats storm-cluster-state storm-id (:executor-&node+port assignment)) & & & & & & &;; all-components绑定该topology所有component-id集合 & & & & & & &all-components (-& task-&component reverse-map keys) & & & & & & &;; errors绑定component-id-&组件错误信息的map & & & & & & &errors (-&& all-components & & & & & & & & & & & & &(map (fn [c] [c (get-errors storm-cluster-state storm-id c)])) & & & & & & & & & & & & &(into {})) & & & & & & &;; executor-summaries绑定ExecutorSummary集合 & & & & & & &executor-summaries (dofor [[executor [node port]] (:executor-&node+port assignment)] & & & & & & & & & & & & & & & & & & & &(let [host (-& assignment :node-&host (get node)) & & & & & & & & & & & & & & & & & & & & & & &heartbeat (get beats executor) & & & & & & & & & & & & & & & & & & & & & & &stats (:stats heartbeat) & & & & & & & & & & & & & & & & & & & & & & &stats (if stats & & & & & & & & & & & & & & & & & & & & & & & & & & &(stats/thriftify-executor-stats stats))] & & & & & & & & & & & & & & & & & & & & &(doto & & & & & & & & & & & & & & & & & & & & & & &(ExecutorSummary. (thriftify-executor-id executor) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &(-& executor first task-&component) & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &host & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &port & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & & &(nil-to-zero (:uptime heartbeat))) & & & & & & & & & & & & & & & & & & & & & &(.set_stats stats)) & & & & & & & & & & & & & & & & & & & & &)) & & & & & & &] & & & & &;; 创建TopologyInfo对象 & & & & &(TopologyInfo. storm-id & & & & & & & & & & & & (:storm-name base) & & & & & & & & & & & & (time-delta (:launch-time-secs base)) & & & & & & & & & & & & executor-summaries & & & & & & & & & & & & (extract-status-str base) & & & & & & & & & & & & errors & & & & & & & & & & & & ) & & & & &)) & & & & & &Shutdownable & & &(shutdown [this] & & & &(log-message "Shutting down master") & & & &(cancel-timer (:timer nimbus)) & & & &(.disconnect (:storm-cluster-state nimbus)) & & & &(.cleanup (:downloaders nimbus)) & & & &(.cleanup (:uploaders nimbus)) & & & &(log-message "Shut down master") & & & &) & & &DaemonCommon & & &(waiting? [this] & & & &(timer-waiting? (:timer nimbus))))))
(defn nimbus-data [conf inimbus] &(let [forced-scheduler (.getForcedScheduler inimbus)] & &;; 保存storm集群的配置信息 & &{:conf conf & & ;; 保存inimbus实例 & & :inimbus inimbus & & ;; 初始化topology提交总数为0 & & :submitted-count (atom 0) & & ;; 调用cluster.clj中的mk-storm-cluster-state函数创建StormClusterState实例,StormClusterState实例封装了与zookeeper交互的接口 & & :storm-cluster-state (cluster/mk-storm-cluster-state conf) & & ;; 保存"提交锁",在topology提交时,需要先获取该锁,然后才能提交,这样可以防止一次提交多个topology,也保证了topology之间操作的互斥性 & & :submit-lock (Object.) & & ;; 初始化心跳缓存 & & :heartbeats-cache (atom {}) & & ;; 创建下载TimeCacheMap缓存,关于TimeCacheMap缓存会在以后文章中单独分析,在此不做介绍 & & :downloaders (file-cache-map conf) & & ;; 创建上传TimeCacheMap缓存 & & :uploaders (file-cache-map conf) & & ;; 保存一个返回值为"当前时间"-"nimbus启动时间"的函数,调用该函数可以获取nimbus启动多长时间 & & :uptime (uptime-computer) & & ;; 通过java反射创建一个NIMBUS-TOPOLOGY-VALIDATOR指定的validator对象,默认为backtype.storm.nimbus.DefaultTopologyValidator对象 & & :validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR)) & & ;; mk-timer函数会创建一个"定时线程",关于定时线程会在以后文章中单位分析,在此不做介绍 & & :timer (mk-timer :kill-fn (fn [t] & & & & & & & & & & & & & & & & (log-error t "Error when processing event") & & & & & & & & ;; exit-process!函数通过调用java的Runtime类的exit(int status)方法终止进程,并传达状态码20 & & & & & & & & & & & & & & & & (exit-process! 20 "Error when processing an event") & & & & & & & & & & & & & & & & )) & & ;; 由mk-scheduler函数创建scheduler调度器,通过分析mk-scheduler函数,可以发现在没有配置用户自定义的scheduler情况下,mk-scheduler函数默认返回DefaultScheduler,mk-scheduler函数参见其定义部分 & & :scheduler (mk-scheduler conf inimbus) & & }))
(defn mk-scheduler [conf inimbus] &;; 当前版本getForcedScheduler函数返回nil &(let [forced-scheduler (.getForcedScheduler inimbus) & & & &;; scheduler绑定IScheduler接口的实现 & &;; cond等价于java中的switch,我们可以发现首先检查forced-scheduler,如果forced-scheduler为nil,则检查是否有用户自定义的scheduler,如果没有则 & &;; 使用默认的DefaultScheduler & & & &scheduler (cond & & & & & & & & & &forced-scheduler & & & & & & & & & &(do (log-message "Using forced scheduler from INimbus " (class forced-scheduler)) & & & & & & & & & & & &forced-scheduler) & & & & & & & & & & & &(conf STORM-SCHEDULER) & & & & & & & & & &(do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER)) & & & & & & & & & & & &(-& (conf STORM-SCHEDULER) new-instance)) & & & & & & & & & & & &:else & & & & & & & & & &(do (log-message "Using default scheduler") & & & & & & & & & & & &(DefaultScheduler.)))] & &;; 先调用prepare函数 & &(.prepare scheduler conf) & &;; 然后返回scheduler & &scheduler & &))
(defn cleanup-corrupt-topologies! [nimbus] &;; 获取nimbus这个map中保存的StormCluterState实例 &(let [storm-cluster-state (:storm-cluster-state nimbus) & & & &;; code-ids绑定了nimbus服务器上{storm.local.dir}/nimbus/stormdist/目录下所有子目录的名称,即提交给nimbus的所有topology的id & & & &code-ids (set (code-ids (:conf nimbus))) & &;; active-topologies绑定zookeeper上/storms/目录中所有文件名称,即当前storm集群上正在运行的topology的id & & & &active-topologies (set (.active-storms storm-cluster-state)) & &;; corrupt-topologies绑定active-topologies和code-ids的差集,即当前正在运行的,但丢失jar包、topology信息和配置信息的topology的id & & & &corrupt-topologies (set/difference active-topologies code-ids)] & &;; 将id包含在corrupt-topologies集合的topology的分配信息从zookeeper的/assignments目录删除,同时将StormBase信息从zookeeper的/storms目录删除 & &(doseq [corrupt corrupt-topologies] & & &(log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...") & & &(.remove-storm! storm-cluster-state corrupt) & & &)))
(defn transition! &([nimbus storm-id event] & & (transition! nimbus storm-id event false)) &([nimbus storm-id event error-on-no-transition?] & & ;; 加锁 & & (locking (:submit-lock nimbus) & & & ;; system-events绑定一个集合#{:startup} & & & (let [system-events #{:startup} & & & & & & ;; 在启动nimbus场景下,event绑定[:startup],event-args为nil & & & & & & [event & event-args] (if (keyword? event) [event] event) & & & & & & ;; 从zookeeper上获取topology的状态,一个map对象,绑定到status上 & & & & & & status (topology-status nimbus storm-id)] & & & & ;; handles the case where event was scheduled but topology has been removed & & & & (if-not status & & & & & ;; 如果status为nil则记录日志,transition!函数执行结束 & & & & & (log-message "Cannot apply event " event " to " storm-id " because topology no longer exists") & & & & & ;; 如果status不为nil,get-event绑定一个函数 & & & & & (let [get-event (fn [m e] & & & & & & & & & & & & & & (if (contains? m e) & & & & & & & & & & & & & & & (m e) & & & & & & & & & & & & & & & (let [msg (str "No transition for event: " event & & & & & & & & & & & & & & & & & & & & & & &", status: " status, & & & & & & & & & & & & & & & & & & & & & & &" storm-id: " storm-id)] & & & & & & & & & & & & & & & & (if error-on-no-transition? & & & & & & & & & & & & & & & & & (throw-runtime msg) & & & & & & & & & & & & & & & & & (do (when-not (contains? system-events event) & & & & & & & & & & & & & & & & & & & & (log-message msg)) & & & & & & & & & & & & & & & & & & & nil)) & & & & & & & & & & & & & & & & ))) & & & & & & & & ;; state-transitions函数返回一个状态转换映射map,这个map中规定了由一种状态可以转换到哪些状态,并且在状态转换后执行哪些处理(即调用哪个函数),参见其定义部分 & & & & & & & & ;; 通过分析state-transitions函数,我们可以发现只有当topology的当前状态为":killed"和":rebalancing"时,才允许转换到":startup"状态,如果当前状态是其他状态,transition将为nil & & & & & & & & ;; 我们先讨论其他状态,这时transition为nil,接着transition通过if判断将绑定一个(fn [] nil)函数,这样new-status将为nil。所以在启动nimbus场景下,topology由其他状态转换到":startup"状态时,transition!函数什么都没做 & & & & & & & & transition (-& (state-transitions nimbus storm-id status) & & & & & & & & & & & & & & & &(get (:type status)) & & & & & & & & & & & & & & & &(get-event event)) & & & & & & & & transition (if (or (nil? transition) & & & & & & & & & & & & & & & & & &(keyword? transition)) & & & & & & & & & & & & & & &(fn [] transition) & & & & & & & & & & & & & & &transition) & & & & & & & & new-status (apply transition event-args) & & & & & & & & new-status (if (keyword? new-status) & & & & & & & & & & & & & & &{:type new-status} & & & & & & & & & & & & & & &new-status)] & & & & & & (when new-status & & & & & & & (set-topology-status! nimbus storm-id new-status))))) & & & )))
1、如果topology由":killed"转换到":startup"(kill topology的过程中,nimbus挂掉了,当重启nimbus时就有可能出现这种状态转换)时,transition将绑定
(fn [] (delay-event nimbus & & & & & & & & & &storm-id & & & & & & & & & &(:kill-time-secs status) & & & & & & & & & &:remove) & &nil)
new-status值为transition绑定的函数的返回值nil。transition绑定的函数通过调用delay-event函数将#(transition! nimbus storm-id :remove false)函数添加到storm定时器中,然后由storm定时器执行该函数,该函数再次调用了transition!函数,不过这次是由":killed"转换到":remove", 调用函数
(fn [] & &(log-message "Killing topology: " storm-id) & &;; 删除zookeeper上该topology的StormBase信息和分配信息 & &(.remove-storm! (:storm-cluster-state nimbus) & & & & & & & & & &storm-id) & &nil)
2、如果topology由":rebalancing"转换到":startup"(rebalance topology的过程中,nimbus挂掉了,当重启nimbus时就有可能出现这种状态转换)时,transition将绑定
(fn [] (delay-event nimbus & & & & & & & &storm-id & & & & & & & &(:delay-secs status) & & & & & & & &:do-rebalance) & &nil)
new-status值为transition绑定的函数的返回值nil。transition绑定的函数通过调用delay-event函数将#(transition! nimbus storm-id :do-rebalance false)函数添加到storm定时器中,然后由storm定时器执行该函数,该函数再次调用了transition!函数,不过这次是由":rebalancing"转换到":do-rebalance",调用函数
(fn [] & (do-rebalance nimbus storm-id status) & (:old-status status))
由于这个函数返回:rebalancing状态的前一个状态,所以storm定时器所执行的定时任务会将topology的状态由:rebalancing修改成前一个状态。以上就是启动nimbus场景下,topology可能的状态转换处理过程。 delay-event函数定义如下:主要功能就是将#(transition! nimbus storm-id event false)函数作为"定时任务"添加到storm定时器中。
(defn delay-event [nimbus storm-id delay-secs event] &(log-message "Delaying event " event " for " delay-secs " secs for " storm-id) &(schedule (:timer nimbus) & & & & & &delay-secs & & & & & &#(transition! nimbus storm-id event false) & & & & & &))
(defn state-transitions [nimbus storm-id status] &{:active {:inactivate :inactive & & & & & & & & & & & &:activate nil & & & & & &:rebalance (rebalance-transition nimbus storm-id status) & & & & & &:kill (kill-transition nimbus storm-id) & & & & & &} & :inactive {:activate :active & & & & & & &:inactivate nil & & & & & & &:rebalance (rebalance-transition nimbus storm-id status) & & & & & & &:kill (kill-transition nimbus storm-id) & & & & & & &} & :killed {:startup (fn [] (delay-event nimbus & & & & & & & & & & & & & & & & & & & & storm-id & & & & & & & & & & & & & & & & & & & & (:kill-time-secs status) & & & & & & & & & & & & & & & & & & & & :remove) & & & & & & & & & & & & & & nil) & & & & & &:kill (kill-transition nimbus storm-id) & & & & & &:remove (fn [] & & & & & & & & & & &(log-message "Killing topology: " storm-id) & & & & & & & & & & &(.remove-storm! (:storm-cluster-state nimbus) & & & & & & & & & & & & & & & & & & &storm-id) & & & & & & & & & & &nil) & & & & & &} & :rebalancing {:startup (fn [] (delay-event nimbus & & & & & & & & & & & & & & & & & & & & & & &storm-id & & & & & & & & & & & & & & & & & & & & & & &(:delay-secs status) & & & & & & & & & & & & & & & & & & & & & & &:do-rebalance) & & & & & & & & & & & & & & & & nil) & & & & & & & & :kill (kill-transition nimbus storm-id) & & & & & & & & :do-rebalance (fn [] & & & & & & & & & & & & & & & & (do-rebalance nimbus storm-id status) & & & & & & & & & & & & & & & & (:old-status status)) & & & & & & & & }})
(defn do-cleanup [nimbus] &(let [storm-cluster-state (:storm-cluster-state nimbus) & & & &conf (:conf nimbus) & & & &submit-lock (:submit-lock nimbus)] & &;; to-cleanup-ids绑定需要清理的topology的id,即不再活跃的topology的id,cleanup-storm-ids函数参见其定义部分 & &(let [to-cleanup-ids (locking submit-lock & & & & & & & & & & & & & (cleanup-storm-ids conf storm-cluster-state))] & & &(when-not (empty? to-cleanup-ids) & & & &(doseq [id to-cleanup-ids] & & & & &(log-message "Cleaning up " id) & & & & &;; 从zookeeper上删除/workerbeats/{id}节点(清理其心跳信息) & & & & &(.teardown-heartbeats! storm-cluster-state id) & & & & &;; 从zookeeper上删除/errors/{id}节点(清理其错误信息) & & & & &(.teardown-topology-errors! storm-cluster-state id) & & & & &;; 从nimbus服务器上删除{storm.local.dir}/nimbus/stormdist/{id}目录(删除其jar包,topology信息,配置信息) & & & & &(rmr (master-stormdist-root conf id)) & & & & &;; 将该topology的心跳信息从nimbus的心跳缓存中删除 & & & & &(swap! (:heartbeats-cache nimbus) dissoc id)) & & & &))))
(defn cleanup-storm-ids [conf storm-cluster-state] &;; heartbeat-ids绑定有心跳的topology的id集合 &(let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state)) & & & &;; error-ids绑定有错误信息的topology的id集合 & & & &error-ids (set (.error-topologies storm-cluster-state)) & & & &;; code-ids绑定在nimbus服务器上有jar包的topology的id集合 & & & &code-ids (code-ids conf) & & & &;; assigned-ids绑定当前活跃的topology的id集合 & & & &assigned-ids (set (.active-storms storm-cluster-state))] & &;; heartbeat-ids、error-ids、code-ids的并集再与assigned-ids做差集就是不活跃的topology的id & &(set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids) & &))
