[SPARK-34877][CORE][YARN] Add the code change for adding the Spark AM log link in spark UI

### What changes were proposed in this pull request?
On Running Spark job with yarn and deployment mode as client, Spark Driver and Spark Application master launch in two separate containers. In various scenarios there is need to see Spark Application master logs to see the resource allocation, Decommissioning status and other information shared between yarn RM and Spark Application master.

In Cluster mode Spark driver and Spark AM is on same container, So Log link of the driver already there to see the logs in Spark UI

This PR is for adding the spark AM log link for spark job running in the client mode for yarn. Instead of searching the container id and then find the logs. We can directly check in the Spark UI

This change is only for showing the AM log links in the Client mode when resource manager is yarn.

### Why are the changes needed?
Till now the only way to check this by finding the container id of the AM and check the logs either using Yarn utility or Yarn RM Application History server.

This PR is for adding the spark AM log link for spark job running in the client mode for yarn. Instead of searching the container id and then find the logs. We can directly check in the Spark UI

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added the unit test also checked the Spark UI
**In Yarn Client mode**
Before Change

![image](https://user-images.githubusercontent.com/34540906/112644861-e1733200-8e6b-11eb-939b-c76ca9902a4e.png)

After the Change - The AM info is there

![image](https://user-images.githubusercontent.com/34540906/115264198-b7075280-a153-11eb-98f3-2aed66ffad2a.png)

AM Log

![image](https://user-images.githubusercontent.com/34540906/112645680-c0f7a780-8e6c-11eb-8b82-4ccc0aee927b.png)

**In Yarn Cluster Mode**  - The AM log link will not be there

![image](https://user-images.githubusercontent.com/34540906/112649512-86900980-8e70-11eb-9b37-69d5c4b53ffa.png)

Closes #31974 from SaurabhChawla100/SPARK-34877.

Authored-by: SaurabhChawla <s.saurabhtim@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
SaurabhChawla 2021-04-20 08:56:07 -05:00 committed by Thomas Graves
parent 9c956abb1d
commit 1e64b4fa27
19 changed files with 288 additions and 3 deletions

View file

@ -134,4 +134,21 @@ limitations under the License.
</tbody>
</table>
</div>
<div class="container-fluid active-process-container">
<h4 class="title-table">Miscellaneous Process</h4>
<table id="active-process-table" class="table table-striped compact cell-border" style="width: 100%">
<thead>
<tr>
<th>Process ID</th>
<th>Address</th>
<th>Status</th>
<th>Cores</th>
<th>Logs</th>
</tr>
</thead>
<tbody>
</tbody>
</table>
</div>
</script>

View file

@ -39,6 +39,13 @@ function formatStatus(status, type, row) {
return "Dead"
}
function formatProcessStatus(activeStatus) {
if (activeStatus) {
return "Active"
}
return "Dead"
}
function formatResourceCells(resources) {
var result = ""
var count = 0
@ -548,7 +555,48 @@ $(document).ready(function () {
execDataTable.column('executorLogsCol:name').visible(logsExist(response));
execDataTable.column('threadDumpCol:name').visible(getThreadDumpEnabled());
$('#active-executors [data-toggle="tooltip"]').tooltip();
// This section should be visible once API gives the response.
$('.active-process-container').hide()
var endPoint = createRESTEndPointForMiscellaneousProcess(appId);
$.getJSON(endPoint, function( response, status, jqXHR ) {
if (response.length) {
var processSummaryResponse = response;
var processSummaryConf = {
"data": processSummaryResponse,
"columns": [{
data: "id"
},
{
data: "hostPort"
},
{
data: function(row) {
return formatProcessStatus(row.isActive);
}
},
{
data: "totalCores"
},
{
data: "processLogs",
render: formatLogsCells
},
],
"deferRender": true,
"order": [
[0, "asc"]
],
"bAutoWidth": false,
"oLanguage": {
"sEmptyTable": "No data to show yet"
}
};
$("#active-process-table").DataTable(processSummaryConf);
$('.active-process-container').show()
}
});
var sumSelector = "#summary-execs-table";
var sumConf = {
"data": [activeSummary, deadSummary, totalSummary],

View file

@ -195,3 +195,25 @@ function createRESTEndPointForExecutorsPage(appId) {
}
return uiRoot + "/api/v1/applications/" + appId + "/allexecutors";
}
function createRESTEndPointForMiscellaneousProcess(appId) {
var words = document.baseURI.split('/');
var ind = words.indexOf("proxy");
if (ind > 0) {
var appId = words[ind + 1];
var newBaseURI = words.slice(0, ind + 2).join('/');
return newBaseURI + "/api/v1/applications/" + appId + "/allmiscellaneousprocess";
}
ind = words.indexOf("history");
if (ind > 0) {
var appId = words[ind + 1];
var attemptId = words[ind + 2];
var newBaseURI = words.slice(0, ind).join('/');
if (isNaN(attemptId)) {
return newBaseURI + "/api/v1/applications/" + appId + "/allmiscellaneousprocess";
} else {
return newBaseURI + "/api/v1/applications/" + appId + "/" + attemptId + "/allmiscellaneousprocess";
}
}
return uiRoot + "/api/v1/applications/" + appId + "/allmiscellaneousprocess";
}

View file

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Stores information about an Miscellaneous Process to pass from the scheduler to SparkListeners.
*/
@DeveloperApi
class MiscellaneousProcessDetails(
val hostPort: String,
val cores: Int,
val logUrlInfo: Map[String, String]) extends Serializable

View file

@ -227,6 +227,10 @@ case class SparkListenerUnschedulableTaskSetRemoved(
@DeveloperApi
case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerMiscellaneousProcessAdded(time: Long, processId: String,
info: MiscellaneousProcessDetails) extends SparkListenerEvent
/**
* Periodic updates from executors.
* @param execId executor id

View file

@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.ExecutorLossReason
import org.apache.spark.scheduler.{ExecutorLossReason, MiscellaneousProcessDetails}
import org.apache.spark.util.SerializableBuffer
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
@ -124,6 +124,11 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterClusterManager(am: RpcEndpointRef) extends CoarseGrainedClusterMessage
// Send Miscellaneous Process information to the driver
case class MiscellaneousProcessAdded(
time: Long, processId: String, info: MiscellaneousProcessDetails)
extends CoarseGrainedClusterMessage
// Used by YARN's client mode AM to retrieve the current set of delegation tokens.
object RetrieveDelegationTokens extends CoarseGrainedClusterMessage

View file

@ -213,6 +213,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
data.freeCores = data.totalCores
}
makeOffers(executorId)
case MiscellaneousProcessAdded(time: Long,
processId: String, info: MiscellaneousProcessDetails) =>
listenerBus.post(SparkListenerMiscellaneousProcessAdded(time, processId, info))
case e =>
logError(s"Received unexpected message. ${e}")
}

View file

@ -78,6 +78,7 @@ private[spark] class AppStatusListener(
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()
private val liveResourceProfiles = new HashMap[Int, LiveResourceProfile]()
private[spark] val liveMiscellaneousProcess = new HashMap[String, LiveMiscellaneousProcess]()
private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
// Keep the active executor count as a separate variable to avoid having to do synchronization
@ -107,6 +108,8 @@ private[spark] class AppStatusListener(
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerLogStart(version) => sparkVersion = version
case processInfoEvent: SparkListenerMiscellaneousProcessAdded =>
onMiscellaneousProcessAdded(processInfoEvent)
case _ =>
}
@ -1124,6 +1127,13 @@ private[spark] class AppStatusListener(
})
}
private def getOrCreateOtherProcess(processId: String,
addTime: Long): LiveMiscellaneousProcess = {
liveMiscellaneousProcess.getOrElseUpdate(processId, {
new LiveMiscellaneousProcess(processId, addTime)
})
}
private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = {
val storageLevel = event.blockUpdatedInfo.storageLevel
if (storageLevel.isValid) {
@ -1353,4 +1363,16 @@ private[spark] class AppStatusListener(
}
}
private def onMiscellaneousProcessAdded(
processInfoEvent: SparkListenerMiscellaneousProcessAdded): Unit = {
val processInfo = processInfoEvent.info
val miscellaneousProcess =
getOrCreateOtherProcess(processInfoEvent.processId, processInfoEvent.time)
miscellaneousProcess.processLogs = processInfo.logUrlInfo
miscellaneousProcess.hostPort = processInfo.hostPort
miscellaneousProcess.isActive = true
miscellaneousProcess.totalCores = processInfo.cores
update(miscellaneousProcess, System.nanoTime())
}
}

View file

@ -92,6 +92,16 @@ private[spark] class AppStatusStore(
filtered.asScala.map(_.info).filter(_.id != FALLBACK_BLOCK_MANAGER_ID.executorId).toSeq
}
def miscellaneousProcessList(activeOnly: Boolean): Seq[v1.ProcessSummary] = {
val base = store.view(classOf[ProcessSummaryWrapper])
val filtered = if (activeOnly) {
base.index("active").reverse().first(true).last(true)
} else {
base
}
filtered.asScala.map(_.info).toSeq
}
def executorSummary(executorId: String): v1.ExecutorSummary = {
store.read(classOf[ExecutorSummaryWrapper], executorId).info
}

View file

@ -914,3 +914,27 @@ private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] {
}
}
private[spark] class LiveMiscellaneousProcess(val processId: String,
creationTime: Long) extends LiveEntity {
var hostPort: String = null
var isActive = true
var totalCores = 0
val addTime = new Date(creationTime)
var removeTime: Date = null
var processLogs = Map[String, String]()
override protected def doUpdate(): Any = {
val info = new v1.ProcessSummary(
processId,
hostPort,
isActive,
totalCores,
addTime,
Option(removeTime),
processLogs)
new ProcessSummaryWrapper(info)
}
}

View file

@ -79,6 +79,10 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
@Path("allexecutors")
def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))
@GET
@Path("allmiscellaneousprocess")
def allProcessList(): Seq[ProcessSummary] = withUI(_.store.miscellaneousProcessList(false))
@Path("stages")
def stages(): Class[StagesResource] = classOf[StagesResource]

View file

@ -486,3 +486,12 @@ case class ThreadStackTrace(
val blockedByThreadId: Option[Long],
val blockedByLock: String,
val holdingLocks: Seq[String])
class ProcessSummary private[spark](
val id: String,
val hostPort: String,
val isActive: Boolean,
val totalCores: Int,
val addTime: Date,
val removeTime: Option[Date],
val processLogs: Map[String, String])

View file

@ -514,3 +514,16 @@ private[spark] class CachedQuantile(
def stage: Array[Int] = Array(stageId, stageAttemptId)
}
private[spark] class ProcessSummaryWrapper(val info: ProcessSummary) {
@JsonIgnore @KVIndex
private def id: String = info.id
@JsonIgnore @KVIndex("active")
private def active: Boolean = info.isActive
@JsonIgnore @KVIndex("host")
val host: String = Utils.parseHostPort(info.hostPort)._1
}

View file

@ -0,0 +1,11 @@
[ {
"id" : "yarn-am",
"hostPort" : "192.168.1.19:8042",
"isActive" : true,
"totalCores" : 1,
"addTime" : "2021-04-19T15:35:50.218GMT",
"processLogs" : {
"stdout" : "http://192.168.1.19:8042/node/containerlogs/container_1555004656427_0144_01_000001/test/stdout?start=-4096",
"stderr" : "http://192.168.1.19:8042/node/containerlogs/container_1555004656427_0144_01_000001/test/stderr?start=-4096"
}
} ]

File diff suppressed because one or more lines are too long

View file

@ -191,7 +191,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
// Enable "spark.eventLog.logBlockUpdates.enabled", to get the storage information
// in the history server.
"one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0"
"one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0",
"miscellaneous process" -> "applications/application_1555004656427_0144/allmiscellaneousprocess"
)
// run a bunch of characterization tests -- just verify the behavior is the same as what is saved

View file

@ -1796,6 +1796,44 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}
test("SPARK-34877 - check YarnAmInfoEvent is populated correctly") {
def checkInfoPopulated(listener: AppStatusListener,
logUrlMap: Map[String, String], processId: String): Unit = {
val yarnAmInfo = listener.liveMiscellaneousProcess.get(processId)
assert(yarnAmInfo.isDefined)
yarnAmInfo.foreach { info =>
assert(info.processId == processId)
assert(info.isActive)
assert(info.processLogs == logUrlMap)
}
check[ProcessSummaryWrapper](processId) { process =>
assert(process.info.id === processId)
assert(process.info.isActive)
assert(process.info.processLogs == logUrlMap)
}
}
val processId = "yarn-am"
val listener = new AppStatusListener(store, conf, true)
var stdout = "http:yarnAmHost:2453/con1/stdout"
var stderr = "http:yarnAmHost:2453/con2/stderr"
var logUrlMap: Map[String, String] = Map("stdout" -> stdout,
"stderr" -> stderr)
var hostport = "yarnAmHost:2453"
var info = new MiscellaneousProcessDetails(hostport, 1, logUrlMap)
listener.onOtherEvent(SparkListenerMiscellaneousProcessAdded(123678L, processId, info))
checkInfoPopulated(listener, logUrlMap, processId)
// Launch new AM in case of failure
// New container entry will be updated in this scenario
stdout = "http:yarnAmHost:2451/con1/stdout"
stderr = "http:yarnAmHost:2451/con2/stderr"
logUrlMap = Map("stdout" -> stdout,
"stderr" -> stderr)
hostport = "yarnAmHost:2451"
info = new MiscellaneousProcessDetails(hostport, 1, logUrlMap)
listener.onOtherEvent(SparkListenerMiscellaneousProcessAdded(123678L, processId, info))
checkInfoPopulated(listener, logUrlMap, processId)
}
private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)

View file

@ -50,6 +50,7 @@ import org.apache.spark.internal.config.UI._
import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc._
import org.apache.spark.scheduler.MiscellaneousProcessDetails
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util._
@ -65,6 +66,11 @@ private[spark] class ApplicationMaster(
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private def extractLogUrls: Map[String, String] = {
YarnContainerInfoHelper.getLogUrls(SparkHadoopUtil.
newConfiguration(sparkConf), None).getOrElse(Map())
}
private val appAttemptId =
if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) != null) {
YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId()
@ -776,6 +782,15 @@ private[spark] class ApplicationMaster(
override def onStart(): Unit = {
driver.send(RegisterClusterManager(self))
// if deployment mode for yarn Application is client
// then send the AM Log Info to spark driver
if (!isClusterMode) {
val hostPort = YarnContainerInfoHelper.getNodeManagerHttpAddress(None)
val yarnAMID = "yarn-am"
val info = new MiscellaneousProcessDetails(hostPort,
sparkConf.get(AM_CORES), extractLogUrls)
driver.send(MiscellaneousProcessAdded(System.currentTimeMillis(), yarnAMID, info))
}
}
override def receive: PartialFunction[Any, Unit] = {

View file

@ -333,6 +333,12 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning(s"Requesting driver to remove executor $executorId for reason $reason")
driverEndpoint.send(r)
}
// In case of yarn Miscellaneous Process is Spark AM Container
// Launched for the deploy mode client
case processInfo @ MiscellaneousProcessAdded(_, _, _) =>
logDebug(s"Sending the Spark AM info for yarn client mode")
driverEndpoint.send(processInfo)
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {