阅读本篇需要Yarn的基础,强烈推荐Yarn入门文章: YARN应用开发流程。
本篇代码基于Spark-1.3.0。
Spark on Yarn支持两种部署模式:Cluster VS Client。 两种模式最大的区别在于Spark Driver的运行位置,Cluster模式下Driver运行在Application Master中,而Client模式下Driver运行在本地。
Yarn Cluster模式
Yarn Client模式
Spark利用AKKA位置透明的特性,使得这两种模式可以共用同一套代码。Spark on Yarn调用流图如下:
1: Client SparkSubmit是Spark程序的入口
if (isYarnCluster) {
//启动Client类
childMainClass = "org.apache.spark.deploy.yarn.Client"
//...
}
Client.main
//读取参数
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
Client.run
//submit & monitor application
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
1.1: SubmitApplication Client.submitApplication
//新建一个Application
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
val appId = newAppResponse.getApplicationId()
//创建environment, java options以及启动AM的命令
val containerContext = createContainerLaunchContext(newAppResponse)
//创建提交AM的Context,包括名字、队列、类型、内存、CPU及参数
val appContext = createApplicationSubmissionContext(newApp, containerContext)
//向Yarn提交Application
yarnClient.submitApplication(appContext)
Client.createContainerLaunchContext
//创建environment, java options以及启动AM的命令
val launchEnv = setupLaunchEnv(appStagingDir)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
amContainer.setEnvironment(launchEnv)
val amClass =
if (isClusterMode) {
Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
// Command for the ApplicationMaster
val commands = prefixEnv ++ Seq(
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", "-server"
) ++
javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
val printableCommands = commands.map(s => if (s == null) "null" else s).toList
amContainer.setCommands(printableCommands)
// send the acl settings into YARN to control who has access via YARN interfaces
val securityManager = new SecurityManager(sparkConf)
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
setupSecurityToken(amContainer)
UserGroupInformation.getCurrentUser().addCredentials(credentials)
Client.createApplicationSubmissionContext
//创建提交AM的Context,包括名字、队列、类型、内存、CPU及参数
val appContext = newApp.getApplicationSubmissionContext
appContext.setApplicationName(args.appName)
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
"Cluster's default value will be used.")
}
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
capability.setVirtualCores(args.amCores)
appContext.setResource(capability)
1.2: monitorApplication
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
val details = Seq[(String, String)](
("client token", getClientToken(report)),
("diagnostics", report.getDiagnostics),
("ApplicationMaster host", report.getHost),
("ApplicationMaster RPC port", report.getRpcPort.toString),
("queue", report.getQueue),
("start time", report.getStartTime.toString),
("final status", report.getFinalApplicationStatus.toString),
("tracking URL", report.getTrackingUrl),
("user", report.getUser)
)
2: 启动ApplicationMaster
ApplicationMaster.main 是AM的入口函数
//读取参数并启动ApplicationMaster
val amArgs = new ApplicationMasterArguments(args)
SparkHadoopUtil.get.runAsSparkUser { () =>
master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs))
System.exit(master.run())
}
ApplicationMaster.run
//两种模式在这里分叉
if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}
2.1: ApplicationMaster.runDriver
//配置IP Filter
addAmIpFilter()
//启动用户程序
userClassThread = startUserApplication()
//等待用户启动SC
val sc = waitForSparkContextInitialized()
if (sc == null) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} else {
actorSystem = sc.env.actorSystem
//启动AMAcotr
runAMActor(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
//向RM注册AM相关信息
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join()
}
2.2: startUserApplication
//启动用户的程序
val mainArgs = new Array[String](args.userArgs.size)
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
2.3: SparkContext
//用户的程序新建SparkContext
//启动YarnClusterScheduler和YarnClusterSchedulerBackend
case "yarn-standalone" | "yarn-cluster" =>
if (master == "yarn-standalone") {
logWarning(
"\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
}
val scheduler = try {
val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = try {
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
(backend, scheduler)
2.4: YarnClusterSchedulerBackend -> YarnSchedulerBackend -> CoarseGrainedSchedulerBackend -> SchedulerBackend SchedulerBackend API
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit
def defaultParallelism(): Int
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit
def isReady(): Boolean = true
def applicationId(): String = appId
CoarseGrainedSchedulerBackend API
def doRequestTotalExecutors(requestedTotal: Int): Boolean
def doKillExecutors(executorIds: Seq[String]): Boolean
def sufficientResourcesRegistered(): Boolean
YarnSchedulerBackend
//连接到ApplicationMaster中的AMActor,通过AMActor向Yarn进行资源申请
private val yarnSchedulerActor: ActorRef =
actorSystem.actorOf(
Props(new YarnSchedulerActor),
name = YarnSchedulerBackend.ACTOR_NAME)
override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
AkkaUtils.askWithReply[Boolean](
RequestExecutors(requestedTotal), yarnSchedulerActor, askTimeout)
}
override def doKillExecutors(executorIds: Seq[String]): Boolean = {
AkkaUtils.askWithReply[Boolean](
KillExecutors(executorIds), yarnSchedulerActor, askTimeout)
}
2.5: waitForSparkContextInitialized
//等待SC初始化完成
while (sparkContextRef.get() == null && System.currentTimeMillis < deadline && !finished) {
logInfo("Waiting for spark context initialization ... ")
sparkContextRef.wait(10000L)
}
3: AMActor
AMActor负责接收YarnSchedulerBackend的消息,来对Yarn进行资源申请。
ApplicationMaster.runAMActor
actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isClusterMode)), name = "YarnAM")
ApplicationMaster.AMActor
override def preStart() = {
driver = context.actorSelection(driverUrl)
// Send a hello message to establish the connection, after which
// we can monitor Lifecycle Events.
driver ! "Hello"
driver ! RegisterClusterManager
}
override def receive = {
case x: DisassociatedEvent =>
// In cluster mode, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
if (!isClusterMode) {
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}
case x: AddWebUIFilter =>
driver ! x
case RequestExecutors(requestedTotal) =>
Option(allocator) match {
case Some(a) => a.requestTotalExecutors(requestedTotal)
case None => logWarning("Container allocator is not ready to request executors yet.")
}
sender ! true
case KillExecutors(executorIds) =>
Option(allocator) match {
case Some(a) => executorIds.foreach(a.killExecutor)
case None => logWarning("Container allocator is not ready to kill executors yet.")
}
sender ! true
}
ApplicationMaster.registerAM
//向RM注册AM相关信息(UIAddress、HistoryAddress、SecurityManager、SecurityManager、preferredNodeLocation),并启动线程申请资源
allocator = client.register(yarnConf,
if (sc != null) sc.getConf else sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
uiAddress,
historyAddress,
securityMgr)
//申请资源
allocator.allocateResources()
//启动一个线程来向Yarn进行资源申请
reporterThread = launchReporterThread()
ApplicationMaster.launchReporterThread
//启动一个线程来向Yarn进行资源申请
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Max number of executor failures reached")
} else {
logDebug("Sending progress")
allocator.allocateResources()
}
4: YarnAllocator
用来向Yarn进行资源申请
API
def getNumExecutorsRunning: Int = numExecutorsRunning
def getNumExecutorsFailed: Int = numExecutorsFailed
def getNumPendingAllocate: Int = getNumPendingAtLocation(ANY_HOST)
def requestTotalExecutors(requestedTotal: Int)
def killExecutor(executorId: String)
def allocateResources(): Unit
def updateResourceRequests(): Unit
YarnAllocator.allocateResources
updateResourceRequests()
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
handleAllocatedContainers(allocatedContainers)
}
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
processCompletedContainers(completedContainers)
}
YarnAllocator.updateResourceRequests
val numPendingAllocate = getNumPendingAllocate
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
if (missing > 0) {
for (i <- 0 until missing) {
val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
amClient.addContainerRequest(request)
val nodes = request.getNodes
val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
}
} else if (missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
matchingRequests.head.take(numToCancel).foreach(amClient.removeContainerRequest)
} else {
logWarning("Expected to find pending requests, but found none.")
}
}
5: AMRMClient[ContainerRequest]
Yarn提供的API,用于向Yarn申请资源。
YarnAllocator.getNumPendingAtLocation
amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).map(_.size).sum
YarnAllocator.allocateResources
amClient.allocate(progressIndicator)
YarnAllocator.updateResourceRequests
val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
amClient.addContainerRequest(request)
amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
matchingRequests.head.take(numToCancel).foreach(amClient.removeContainerRequest)
} else {
logWarning("Expected to find pending requests, but found none.")
}
YarnAllocator.internalReleaseContainer
amClient.releaseAssignedContainer(container.getId())
6: ExecutorRunnable
用来在Yarn的Container上启动程序
run
nmClient = NMClient.createNMClient()
nmClient.init(yarnConf)
nmClient.start()
startContainer
startContainer
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
val localResources = prepareLocalResources
ctx.setLocalResources(localResources)
ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
appId, localResources)
ctx.setCommands(commands)
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
val secretString = securityMgr.getSecretKey()
val secretBytes =
if (secretString != null) {
// This conversion must match how the YarnShuffleService decodes our secret
JavaUtils.stringToBytes(secretString)
} else {
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))
}
nmClient.startContainer(container, ctx)
7: 触发提交Application的过程
用户新建SparkContext
//启动YarnClientSchedulerBackend
case "yarn-client" =>
val scheduler = try {
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
val backend = try {
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
(backend, scheduler)
YarnClientSchedulerBackend.start
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += ("--arg", hostport)
argsArrayBuf ++= getExtraClientArguments
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedExecutors = args.numExecutors
client = new Client(args, conf)
//7.1
appId = client.submitApplication()
//7.2
waitForApplication()
//7.3
asyncMonitorApplication()
2: ApplicationMaster (和cluster模式稍有不同)
ExecutorLauncher.main
//Yarn-Client模式下,ApplicationMaster的入口
//为什么不直接用ApplicationMaster.main?因为jps可以通过类的名字区分Client和Cluster模式
def main(args: Array[String]) = {
ApplicationMaster.main(args)
}
ApplicationMaster.run
//这次选择runExecutorLauncher
if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}
2.6: ApplicationMaster.runExecutorLauncher
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityMgr)._1
//等待用户初始化SC
waitForSparkDriver()
addAmIpFilter()
//向RM注册AM相关信息
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
2.7: ApplicationMaster.waitForSparkDriver
var driverUp = false
val hostport = args.userArgs(0)
val (driverHost, driverPort) = Utils.parseHostPort(hostport)
val totalWaitTime = sparkConf.getLong("spark.yarn.am.waitTime", 100000L)
val deadline = System.currentTimeMillis + totalWaitTime
while (!driverUp && !finished && System.currentTimeMillis < deadline) {
try {
val socket = new Socket(driverHost, driverPort)
socket.close()
logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
driverUp = true
} catch {
case e: Exception =>
logError("Failed to connect to driver at %s:%s, retrying ...".
format(driverHost, driverPort))
Thread.sleep(100L)
}
}
if (!driverUp) {
throw new SparkException("Failed to connect to driver!")
}
sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString)
//启动AMActor
runAMActor(driverHost, driverPort.toString, isClusterMode = false)
使用preferredNodeLocationData,可以让Yarn分配距离数据较近的Container
val locData = InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new Path("hdfs:///myfile.txt")))
val sc = new SparkContext(conf, locData)