Spark UI详解
在大型分布式系统中,采用事件监听机制是最常见的。如果Spark UI采用Scala的函数调用方式,由于函数调用多数情况下是同步调用,导致线程被阻塞。将函数调用更换为发送事件,事件的处理时异步的,当前线程可以继续执行后续逻辑,线程池中的线程还可以被重用,这样整个系统的并发度会大大增加。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。
DAGScheduler是主要的产生各类SparkListenerEvent的源头,它将各种SparkListenerEvent发送到ListenerBus的事件队列中,ListenerBus通过定时器将SparkListenerEvent事件匹配到具体的SparkListener,改变SparkListener中的统计监控数据,最终由SparkUI的界面展示。
listenerBus详解
listenerBus的类型是LiveListenerBus。LiveListenerBus实现了监听器模型,通过监听事件触发对各种监听器监听状态信息的修改,达到UI界面的数据刷新效果。LiveListenerBus继承自AsynchronousListenerBus,AsynchronousListenerBus由如下部分组成:
·事件阻塞队列:LinkedBlockingQueue[SparkListenerEvent],固定大小是10000;
·监听器数组:类型为ArrayBuffer[SparkListener],存放各类监听器SparkListener;
·事件匹配监听器的线程:此Thread不断拉取LinkedBlockingQueue中的事件,遍历监听器,调用监听器的方法。任何事件都会在LinkedBlockingQueue中存在一段时间,然后Thread处理了此事件后,会将其清除。
构造JobProgressListener
以JobProgressListener为例讲解SparkListener。通过监听ListenerBus中的事件更新任务进度。SparkStatusTracker和SparkUI实际上也是通过JobProgressListener来实现任务状态跟踪的。
JobProgressListener实现了onJobStart、onJobEnd、onStageCompleted、onStageSubmitted、onTaskStart、onTaskEnd等方法,这些方法是在listenerBus的驱动下,改变JobProgressListener中的各种Job、Stage相关的数据。
SparkUI的创建与初始化
private def create( sc: Option[SparkContext], conf: SparkConf, listenerBus: SparkListenerBus, securityManager: SecurityManager, appName: String, basePath: String = "", jobProgressListener: Option[JobProgressListener] = None, startTime: Long): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { val listener = new JobProgressListener(conf) listenerBus.addListener(listener) listener } val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener val executorsListener = new ExecutorsListener(storageStatusListener) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) listenerBus.addListener(operationGraphListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, operationGraphListener, appName, basePath, startTime) }
在create方法里除了JobprogressListener是外部传入的之外,又增加了一些SparkListener。例如用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的StorageStatusListener;用于准备将Executor的信息展示在ExecutorTab的ExecutorListener;用于准备将Executor相关存储信息展示在BlockManagerUI的storageListener等。最后,创建SparkUI,SparkUI服务默认是可以杀掉 的,设置spark.ui.killEnabled为false可以保证不被杀死。
Initialize方法会组织前端页面各个Tab和Page的展示及布局。
SparkUI的页面布局与展示
JobsTab展示所有Job的进度,状态信息,以它为例说明。JobsTab会复用SparkUI的killEnabled,SparkContext,jobProgressListener,包括AllJobsPage和JobPage两个页面。
private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { val sc = parent.sc val killEnabled = parent.killEnabled val jobProgresslistener = parent.jobProgressListener val executorListener = parent.executorsListener val operationGraphListener = parent.operationGraphListener def isFairScheduler: Boolean = jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR) attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) }
AllJobsPage由render方法渲染,利用JobProgressListener中的统计监控数据生成激活、完成、失败等状态的Job摘要信息,并调用jobsTable方法生成表格等html元素,最终使用UIUtils的headerSparkPage封装好css、js、header及页面布局等。
上面的attachPage方法存在于JobsTab的父类WebUITab中,WebUITab维护有ArrayBuffer[WebUIPage]的数据结构,AllJobsPage和JobPage将被放入此ArrayBuffer中。
private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) { val pages = ArrayBuffer[WebUIPage]() val name = prefix.capitalize def attachPage(page: WebUIPage) { page.prefix = (prefix + "/" + page.prefix).stripSuffix("/") pages += page } def headerTabs: Seq[WebUITab] = parent.getTabs def basePath: String = parent.getBasePath }JobsTab创建之后,被attachTab方法加入SparkUI的ArrayBuffer[WebUITab]中,并且通过attachPage方法,将每个page生成org.apache.jetty.servlet.ServletContextHandler,最后调用attachHandler方法将ServletContextHandler绑定到SparkUI。
def attachTab(tab: WebUITab) { tab.pages.foreach(attachPage) tabs += tab }
def attachPage(page: WebUIPage) { val pagePath = "/" + page.prefix val renderHandler = createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, conf, basePath) val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json", (request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath) attachHandler(renderHandler) attachHandler(renderJsonHandler) pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) .append(renderHandler) }
def attachHandler(handler: ServletContextHandler) { handlers += handler serverInfo.foreach { info => info.rootHandler.addHandler(handler) if (!handler.isStarted) { handler.start() } } }
SparkUI的启动
SparkUI创建好后,需要调用父类WebUI的bind方法,绑定服务和端口。
def bind() { assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) try { serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name)) logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) } catch { case e: Exception => logError("Failed to bind %s".format(className), e) System.exit(1) } }startJettyServer是JettyUtils的静态方法,最终启动Jetty提供的服务。
- 上一篇:没有了
- 下一篇:没有了