[SPARK-3388] Expose aplication ID in ApplicationStart event, use it in history server.

This change exposes the application ID generated by the Spark Master, Mesos or Yarn
via the SparkListenerApplicationStart event. It then uses that information to expose the
application via its ID in the history server, instead of using the internal directory name
generated by the event logger as an application id. This allows someone who knows
the application ID to easily figure out the URL for the application's entry in the HS, aside
from looking better.

In Yarn mode, this is used to generate a direct link from the RM application list to the
Spark history server entry (thus providing a fix for SPARK-2150).

Note this sort of assumes that the different managers will generate app ids that are
sufficiently different from each other that clashes will not occur.

Author: Marcelo Vanzin <vanzin@cloudera.com>

This patch had conflicts when merged, resolved by
Committer: Andrew Or <andrewor14@gmail.com>

Closes #1218 from vanzin/yarn-hs-link-2 and squashes the following commits:

2d19f3c [Marcelo Vanzin] Review feedback.
6706d3a [Marcelo Vanzin] Implement applicationId() in base classes.
56fe42e [Marcelo Vanzin] Fix cluster mode history address, plus a cleanup.
44112a8 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
8278316 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
a86bbcf [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
a0056e6 [Marcelo Vanzin] Unbreak test.
4b10cfd [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
cb0cab2 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
25f2826 [Marcelo Vanzin] Add MIMA excludes.
f0ba90f [Marcelo Vanzin] Use BufferedIterator.
c90a08d [Marcelo Vanzin] Remove unused code.
3f8ec66 [Marcelo Vanzin] Review feedback.
21aa71b [Marcelo Vanzin] Fix JSON test.
b022bae [Marcelo Vanzin] Undo SparkContext cleanup.
c6d7478 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
4e3483f [Marcelo Vanzin] Fix test.
57517b8 [Marcelo Vanzin] Review feedback. Mostly, more consistent use of Scala's Option.
311e49d [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
d35d86f [Marcelo Vanzin] Fix yarn backend after rebase.
36dc362 [Marcelo Vanzin] Don't use Iterator::takeWhile().
0afd696 [Marcelo Vanzin] Wait until master responds before returning from start().
abc4697 [Marcelo Vanzin] Make FsHistoryProvider keep a map of applications by id.
26b266e [Marcelo Vanzin] Use Mesos framework ID as Spark application ID.
b3f3664 [Marcelo Vanzin] [yarn] Make the RM link point to the app direcly in the HS.
2fb7de4 [Marcelo Vanzin] Expose the application ID in the ApplicationStart event.
ed10348 [Marcelo Vanzin] Expose application id to spark context.
This commit is contained in:
Marcelo Vanzin 2014-09-03 14:57:38 -07:00 committed by Andrew Or
parent ccc69e26ec
commit f2b5b619a9
25 changed files with 230 additions and 135 deletions

View file

@ -1261,7 +1261,10 @@ class SparkContext(config: SparkConf) extends Logging {
/** Post the application start event */
private def postApplicationStart() {
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
startTime, sparkUser))
}
/** Post the application end event */

View file

@ -34,15 +34,15 @@ private[spark] abstract class ApplicationHistoryProvider {
*
* @return List of all know applications.
*/
def getListing(): Seq[ApplicationHistoryInfo]
def getListing(): Iterable[ApplicationHistoryInfo]
/**
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
* @return The application's UI, or null if application is not found.
* @return The application's UI, or None if application is not found.
*/
def getAppUI(appId: String): SparkUI
def getAppUI(appId: String): Option[SparkUI]
/**
* Called when the server is shutting down.

View file

@ -32,6 +32,8 @@ import org.apache.spark.util.Utils
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {
private val NOT_STARTED = "<Not Started>"
// Interval between each check for event log updates
private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
conf.getInt("spark.history.updateInterval", 10)) * 1000
@ -47,8 +49,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTimeMs = -1L
// List of applications, in order from newest to oldest.
@volatile private var appList: Seq[ApplicationHistoryInfo] = Nil
// The modification time of the newest log detected during the last scan. This is used
// to ignore logs that are older during subsequent scans, to avoid processing data that
// is already known.
private var lastModifiedTime = -1L
// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()
/**
* A background thread that periodically checks for event log updates on disk.
@ -93,15 +102,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
logCheckingThread.start()
}
override def getListing() = appList
override def getListing() = applications.values
override def getAppUI(appId: String): SparkUI = {
override def getAppUI(appId: String): Option[SparkUI] = {
try {
val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
ui
applications.get(appId).map { info =>
val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
new Path(logDir, info.logDir)))
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId,
s"${HistoryServer.UI_PATH_PREFIX}/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
}
replayBus.replay()
ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
appListener.viewAcls.getOrElse(""))
ui
}
} catch {
case e: FileNotFoundException => null
case e: FileNotFoundException => None
}
}
@ -119,84 +148,79 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
try {
val logStatus = fs.listStatus(new Path(resolvedLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
val logInfos = logDirs.filter { dir =>
fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
}
val currentApps = Map[String, ApplicationHistoryInfo](
appList.map(app => app.id -> app):_*)
// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
// Otherwise just reuse what's already in memory.
val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
for (dir <- logInfos) {
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
try {
val (app, _) = loadAppInfo(dir, renderUI = false)
newApps += app
} catch {
case e: Exception => logError(s"Failed to load app info from directory $dir.")
// Load all new logs from the log directory. Only directories that have a modification time
// later than the last known log directory will be loaded.
var newLastModifiedTime = lastModifiedTime
val logInfos = logDirs
.filter { dir =>
if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
val modTime = getModificationTime(dir)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime > lastModifiedTime
} else {
false
}
} else {
newApps += curr
}
}
.flatMap { dir =>
try {
val (replayBus, appListener) = createReplayBus(dir)
replayBus.replay()
Some(new FsApplicationHistoryInfo(
dir.getPath().getName(),
appListener.appId.getOrElse(dir.getPath().getName()),
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(dir),
appListener.sparkUser.getOrElse(NOT_STARTED)))
} catch {
case e: Exception =>
logInfo(s"Failed to load application log data from $dir.", e)
None
}
}
.sortBy { info => -info.endTime }
appList = newApps.sortBy { info => -info.endTime }
lastModifiedTime = newLastModifiedTime
// When there are new logs, merge the new list with the existing one, maintaining
// the expected ordering (descending end time). Maintaining the order is important
// to avoid having to sort the list every time there is a request for the log list.
if (!logInfos.isEmpty) {
val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo) = {
if (!newApps.contains(info.id)) {
newApps += (info.id -> info)
}
}
val newIterator = logInfos.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (newIterator.head.endTime > oldIterator.head.endTime) {
addIfAbsent(newIterator.next)
} else {
addIfAbsent(oldIterator.next)
}
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)
applications = newApps
}
} catch {
case t: Throwable => logError("Exception in checking for event log updates", t)
}
}
/**
* Parse the application's logs to find out the information we need to build the
* listing page.
*
* When creating the listing of available apps, there is no need to load the whole UI for the
* application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
* clicks on a specific application.
*
* @param logDir Directory with application's log files.
* @param renderUI Whether to create the SparkUI for the application.
* @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
*/
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
val path = logDir.getPath
val appId = path.getName
private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
val path = logDir.getPath()
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
val ui: SparkUI = if (renderUI) {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId,
HistoryServer.UI_PATH_PREFIX + s"/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
} else {
null
}
replayBus.replay()
val appInfo = ApplicationHistoryInfo(
appId,
appListener.appName,
appListener.startTime,
appListener.endTime,
getModificationTime(logDir),
appListener.sparkUser)
if (ui != null) {
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls)
ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
}
(appInfo, ui)
(replayBus, appListener)
}
/** Return when this directory was last modified. */
@ -219,3 +243,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
}
private class FsApplicationHistoryInfo(
val logDir: String,
id: String,
name: String,
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)

View file

@ -52,10 +52,7 @@ class HistoryServer(
private val appLoader = new CacheLoader[String, SparkUI] {
override def load(key: String): SparkUI = {
val ui = provider.getAppUI(key)
if (ui == null) {
throw new NoSuchElementException()
}
val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
attachSparkUI(ui)
ui
}

View file

@ -24,38 +24,31 @@ package org.apache.spark.scheduler
* from multiple applications are seen, the behavior is unspecified.
*/
private[spark] class ApplicationEventListener extends SparkListener {
var appName = "<Not Started>"
var sparkUser = "<Not Started>"
var startTime = -1L
var endTime = -1L
var viewAcls = ""
var adminAcls = ""
def applicationStarted = startTime != -1
def applicationCompleted = endTime != -1
def applicationDuration: Long = {
val difference = endTime - startTime
if (applicationStarted && applicationCompleted && difference > 0) difference else -1L
}
var appName: Option[String] = None
var appId: Option[String] = None
var sparkUser: Option[String] = None
var startTime: Option[Long] = None
var endTime: Option[Long] = None
var viewAcls: Option[String] = None
var adminAcls: Option[String] = None
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = applicationStart.appName
startTime = applicationStart.time
sparkUser = applicationStart.sparkUser
appName = Some(applicationStart.appName)
appId = applicationStart.appId
startTime = Some(applicationStart.time)
sparkUser = Some(applicationStart.sparkUser)
}
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
endTime = applicationEnd.time
endTime = Some(applicationEnd.time)
}
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
val environmentDetails = environmentUpdate.environmentDetails
val allProperties = environmentDetails("Spark Properties").toMap
viewAcls = allProperties.getOrElse("spark.ui.view.acls", "")
adminAcls = allProperties.getOrElse("spark.admin.acls", "")
viewAcls = allProperties.get("spark.ui.view.acls")
adminAcls = allProperties.get("spark.admin.acls")
}
}
}

View file

@ -31,4 +31,12 @@ private[spark] trait SchedulerBackend {
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
def isReady(): Boolean = true
/**
* The application ID associated with the job, if any.
*
* @return The application ID, or None if the backend does not provide an ID.
*/
def applicationId(): Option[String] = None
}

View file

@ -89,8 +89,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
extends SparkListenerEvent
case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
sparkUser: String) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

View file

@ -64,4 +64,12 @@ private[spark] trait TaskScheduler {
*/
def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
blockManagerId: BlockManagerId): Boolean
/**
* The application ID associated with the job, if any.
*
* @return The application ID, or None if the backend does not provide an ID.
*/
def applicationId(): Option[String] = None
}

View file

@ -491,6 +491,9 @@ private[spark] class TaskSchedulerImpl(
}
}
}
override def applicationId(): Option[String] = backend.applicationId()
}
@ -535,4 +538,5 @@ private[spark] object TaskSchedulerImpl {
retval.toList
}
}

View file

@ -51,12 +51,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
// Submit tasks only after (registered resources / total expected resources)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
// if minRegisteredRatio has not yet been reached
val maxRegisteredWaitingTime =
conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
val createTime = System.currentTimeMillis()

View file

@ -69,4 +69,5 @@ private[spark] class SimrSchedulerBackend(
fs.delete(new Path(driverFilePath), false)
super.stop()
}
}

View file

@ -34,6 +34,10 @@ private[spark] class SparkDeploySchedulerBackend(
var client: AppClient = null
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
var appId: String = _
val registrationLock = new Object()
var registrationDone = false
val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
val totalExpectedCores = maxCores.getOrElse(0)
@ -68,6 +72,8 @@ private[spark] class SparkDeploySchedulerBackend(
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
waitForRegistration()
}
override def stop() {
@ -81,15 +87,19 @@ private[spark] class SparkDeploySchedulerBackend(
override def connected(appId: String) {
logInfo("Connected to Spark cluster with app ID " + appId)
this.appId = appId
notifyContext()
}
override def disconnected() {
notifyContext()
if (!stopping) {
logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
}
}
override def dead(reason: String) {
notifyContext()
if (!stopping) {
logError("Application has been killed. Reason: " + reason)
scheduler.error(reason)
@ -116,4 +126,22 @@ private[spark] class SparkDeploySchedulerBackend(
override def sufficientResourcesRegistered(): Boolean = {
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
}
override def applicationId(): Option[String] = Option(appId)
private def waitForRegistration() = {
registrationLock.synchronized {
while (!registrationDone) {
registrationLock.wait()
}
}
}
private def notifyContext() = {
registrationLock.synchronized {
registrationDone = true
registrationLock.notifyAll()
}
}
}

View file

@ -309,4 +309,5 @@ private[spark] class CoarseMesosSchedulerBackend(
logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
slaveLost(d, s)
}
}

View file

@ -349,4 +349,5 @@ private[spark] class MesosSchedulerBackend(
// TODO: query Mesos for number of cores
override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
}

View file

@ -114,4 +114,5 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
localActor ! StatusUpdate(taskId, state, serializedData)
}
}

View file

@ -232,7 +232,7 @@ private[spark] object UIUtils extends Logging {
def listingTable[T](
headers: Seq[String],
generateDataRow: T => Seq[Node],
data: Seq[T],
data: Iterable[T],
fixedWidth: Boolean = false): Seq[Node] = {
var listingTableClass = TABLE_CLASS

View file

@ -171,6 +171,7 @@ private[spark] object JsonProtocol {
def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
("Event" -> Utils.getFormattedClassName(applicationStart)) ~
("App Name" -> applicationStart.appName) ~
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
("User" -> applicationStart.sparkUser)
}
@ -484,9 +485,10 @@ private[spark] object JsonProtocol {
def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
val appName = (json \ "App Name").extract[String]
val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
SparkListenerApplicationStart(appName, time, sparkUser)
SparkListenerApplicationStart(appName, appId, time, sparkUser)
}
def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {

View file

@ -229,7 +229,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
val conf = getLoggingConf(logDirPath, compressionCodec)
val eventLogger = new EventLoggingListener("test", conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey")
val applicationEnd = SparkListenerApplicationEnd(1000L)
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite

View file

@ -83,7 +83,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
val fstream = fileSystem.create(logFilePath)
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
val writer = new PrintWriter(cstream)
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey")
val applicationEnd = SparkListenerApplicationEnd(1000L)
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))

View file

@ -60,7 +60,7 @@ class JsonProtocolSuite extends FunSuite {
val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
testEvent(stageSubmitted, stageSubmittedJsonString)
@ -176,6 +176,13 @@ class JsonProtocolSuite extends FunSuite {
deserializedBmRemoved)
}
test("SparkListenerApplicationStart backwards compatibility") {
// SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
val oldEvent = JsonProtocol.applicationStartToJson(applicationStart)
.removeField({ _._1 == "App ID" })
assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent))
}
/** -------------------------- *
| Helper test running methods |

View file

@ -111,6 +111,8 @@ object MimaExcludes {
MimaBuild.excludeSparkClass("storage.Values") ++
MimaBuild.excludeSparkClass("storage.Entry") ++
MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") ++
// Class was missing "@DeveloperApi" annotation in 1.0.
MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++
Seq(
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.mllib.tree.impurity.Gini.calculate"),
@ -119,14 +121,14 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.mllib.tree.impurity.Variance.calculate")
) ++
Seq ( // Package-private classes removed in SPARK-2341
Seq( // Package-private classes removed in SPARK-2341
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser$")
) ++
) ++
Seq( // package-private classes removed in MLlib
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.org$apache$spark$mllib$regression$GeneralizedLinearAlgorithm$$prependOne")

View file

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
@ -70,6 +71,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private val sparkContextRef = new AtomicReference[SparkContext](null)
final def run(): Int = {
val appAttemptId = client.getAttemptId()
if (isDriver) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
@ -77,9 +80,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// Set the master property to match the requested mode.
System.setProperty("spark.master", "yarn-cluster")
// Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
}
logInfo("ApplicationAttemptId: " + client.getAttemptId())
logInfo("ApplicationAttemptId: " + appAttemptId)
val cleanupHook = new Runnable {
override def run() {
@ -151,13 +157,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkContextRef.compareAndSet(sc, null)
}
private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
private def registerAM(uiAddress: String) = {
val sc = sparkContextRef.get()
val appId = client.getAttemptId().getApplicationId().toString()
val historyAddress =
sparkConf.getOption("spark.yarn.historyServer.address")
.map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
.getOrElse("")
allocator = client.register(yarnConf,
if (sc != null) sc.getConf else sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
uiAddress,
uiHistoryAddress)
historyAddress)
allocator.allocateResources()
reporterThread = launchReporterThread()
@ -175,7 +188,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
} else {
registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
registerAM(sc.ui.appUIHostPort)
try {
userThread.join()
} finally {
@ -190,8 +203,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
conf = sparkConf, securityManager = securityMgr)._1
actor = waitForSparkDriver()
addAmIpFilter()
registerAM(sparkConf.get("spark.driver.appUIAddress", ""),
sparkConf.get("spark.driver.appUIHistoryAddress", ""))
registerAM(sparkConf.get("spark.driver.appUIAddress", ""))
// In client mode the actor will stop the reporter thread.
reporterThread.join()

View file

@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@ -156,19 +155,6 @@ object YarnSparkHadoopUtil {
}
}
def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = {
val eventLogDir = sc.eventLogger match {
case Some(logger) => logger.getApplicationLogDir()
case None => ""
}
val historyServerAddress = conf.get("spark.yarn.historyServer.address", "")
if (historyServerAddress != "" && eventLogDir != "") {
historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir"
} else {
""
}
}
/**
* Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
* using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The

View file

@ -56,7 +56,6 @@ private[spark] class YarnClientSchedulerBackend(
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
@ -150,4 +149,7 @@ private[spark] class YarnClientSchedulerBackend(
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
override def applicationId(): Option[String] = Option(appId).map(_.toString())
}

View file

@ -28,7 +28,7 @@ private[spark] class YarnClusterSchedulerBackend(
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
var totalExpectedExecutors = 0
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
}
@ -47,4 +47,7 @@ private[spark] class YarnClusterSchedulerBackend(
override def sufficientResourcesRegistered(): Boolean = {
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id")
}