[SPARK-27489][WEBUI] UI updates to show executor resource information

### What changes were proposed in this pull request?
We are adding other resource type support to the executors and Spark. We should show the resource information for each executor on the UI Executors page.
This also adds a toggle button to show the resources column.  It is off by default.

![executorui1](https://user-images.githubusercontent.com/4563792/63891432-c815b580-c9aa-11e9-9f41-62975649efbc.png)

![Screenshot from 2019-08-28 14-56-26](https://user-images.githubusercontent.com/4563792/63891516-fd220800-c9aa-11e9-9fe4-89fcdca37306.png)

### Why are the changes needed?
to show user what resources the executors have. Like Gpus, fpgas, etc

### Does this PR introduce any user-facing change?
Yes introduces UI and rest api changes to show the resources

### How was this patch tested?
Unit tests and manual UI tests on yarn and standalone modes.

Closes #25613 from tgravescs/SPARK-27489-gpu-ui-latest.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
This commit is contained in:
Thomas Graves 2019-09-04 09:45:44 +08:00 committed by Gengliang Wang
parent 56f2887dc8
commit 4c8f114783
29 changed files with 394 additions and 82 deletions

View file

@ -92,6 +92,7 @@ limitations under the License.
Off Heap Storage Memory</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Resources">Resources</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Active Tasks">Active Tasks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Failed Tasks">Failed Tasks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Complete Tasks">Complete Tasks</span></th>

View file

@ -39,6 +39,19 @@ function formatStatus(status, type, row) {
return "Dead"
}
function formatResourceCells(resources) {
var result = ""
var count = 0
$.each(resources, function (name, resInfo) {
if (count > 0) {
result += ", "
}
result += name + ': [' + resInfo.addresses.join(", ") + ']'
count += 1
});
return result
}
jQuery.extend(jQuery.fn.dataTableExt.oSort, {
"title-numeric-pre": function (a) {
var x = a.match(/title="*(-?[0-9\.]+)/)[1];
@ -106,7 +119,7 @@ function totalDurationColor(totalGCTime, totalDuration) {
}
var sumOptionalColumns = [3, 4];
var execOptionalColumns = [5, 6];
var execOptionalColumns = [5, 6, 9];
var execDataTable;
var sumDataTable;
@ -401,6 +414,7 @@ $(document).ready(function () {
},
{data: 'diskUsed', render: formatBytes},
{data: 'totalCores'},
{name: 'resourcesCol', data: 'resources', render: formatResourceCells, orderable: false},
{
data: 'activeTasks',
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
@ -446,7 +460,8 @@ $(document).ready(function () {
"order": [[0, "asc"]],
"columnDefs": [
{"visible": false, "targets": 5},
{"visible": false, "targets": 6}
{"visible": false, "targets": 6},
{"visible": false, "targets": 9}
]
};
@ -553,6 +568,7 @@ $(document).ready(function () {
"<div><input type='checkbox' class='toggle-vis' id='select-all-box'>Select All</div>" +
"<div id='on_heap_memory' class='on-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='3' data-exec-col-idx='5'>On Heap Memory</div>" +
"<div id='off_heap_memory' class='off-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='4' data-exec-col-idx='6'>Off Heap Memory</div>" +
"<div id='extra_resources' class='resources-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='9'>Resources</div>" +
"</div>");
reselectCheckboxesBasedOnTaskTableState();
@ -584,8 +600,10 @@ $(document).ready(function () {
var execCol = execDataTable.column(execColIdx);
execCol.visible(!execCol.visible());
var sumColIdx = thisBox.attr("data-sum-col-idx");
var sumCol = sumDataTable.column(sumColIdx);
sumCol.visible(!sumCol.visible());
if (sumColIdx) {
var sumCol = sumDataTable.column(sumColIdx);
sumCol.visible(!sumCol.visible());
}
}
});

View file

@ -72,7 +72,7 @@ private[spark] class HistoryAppStatusStore(
source.totalGCTime, source.totalInputBytes, source.totalShuffleRead,
source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime,
source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics,
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes)
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources)
}
}

View file

@ -26,12 +26,15 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils
private[spark] case class WorkerResourceInfo(name: String, addresses: Seq[String])
extends ResourceAllocator(name, addresses) {
extends ResourceAllocator {
override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses
def acquire(amount: Int): ResourceInformation = {
val allocated = availableAddrs.take(amount)
acquire(allocated)
new ResourceInformation(name, allocated.toArray)
new ResourceInformation(resourceName, allocated.toArray)
}
}

View file

@ -23,18 +23,20 @@ import org.apache.spark.SparkException
import org.apache.spark.util.collection.OpenHashMap
/**
* Class used to help executor/worker allocate resources
* Please note that this class is intended to be used in a single thread.
* @param name Resource name, e.g. gpu/fpga
* @param addresses Resource addresses provided by the executor/worker
* Trait used to help executor/worker allocate resources.
* Please note that this is intended to be used in a single thread.
*/
class ResourceAllocator(name: String, addresses: Seq[String]) extends Serializable {
trait ResourceAllocator {
protected def resourceName: String
protected def resourceAddresses: Seq[String]
/**
* Map from an address to its availability, the value `true` means the address is available,
* while value `false` means the address is assigned.
* TODO Use [[OpenHashMap]] instead to gain better performance.
*/
private val addressAvailabilityMap = mutable.HashMap(addresses.map(_ -> true): _*)
private lazy val addressAvailabilityMap = mutable.HashMap(resourceAddresses.map(_ -> true): _*)
/**
* Sequence of currently available resource addresses.
@ -59,15 +61,15 @@ class ResourceAllocator(name: String, addresses: Seq[String]) extends Serializab
def acquire(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
if (!addressAvailabilityMap.contains(address)) {
throw new SparkException(s"Try to acquire an address that doesn't exist. $name address " +
s"$address doesn't exist.")
throw new SparkException(s"Try to acquire an address that doesn't exist. $resourceName " +
s"address $address doesn't exist.")
}
val isAvailable = addressAvailabilityMap(address)
if (isAvailable) {
addressAvailabilityMap(address) = false
} else {
throw new SparkException(s"Try to acquire an address that is not available. $name " +
s"address $address is not available.")
throw new SparkException("Try to acquire an address that is not available. " +
s"$resourceName address $address is not available.")
}
}
}
@ -80,14 +82,14 @@ class ResourceAllocator(name: String, addresses: Seq[String]) extends Serializab
def release(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
if (!addressAvailabilityMap.contains(address)) {
throw new SparkException(s"Try to release an address that doesn't exist. $name address " +
s"$address doesn't exist.")
throw new SparkException(s"Try to release an address that doesn't exist. $resourceName " +
s"address $address doesn't exist.")
}
val isAvailable = addressAvailabilityMap(address)
if (!isAvailable) {
addressAvailabilityMap(address) = true
} else {
throw new SparkException(s"Try to release an address that is not assigned. $name " +
throw new SparkException(s"Try to release an address that is not assigned. $resourceName " +
s"address $address is not assigned.")
}
}

View file

@ -74,6 +74,16 @@ private[spark] object ResourceInformation {
s"Here is a correct example: $exampleJson.", e)
}
}
def parseJson(json: JValue): ResourceInformation = {
implicit val formats = DefaultFormats
try {
json.extract[ResourceInformationJson].toResourceInformation
} catch {
case NonFatal(e) =>
throw new SparkException(s"Error parsing JSON into ResourceInformation:\n$json\n", e)
}
}
}
/** A case class to simplify JSON serialization of [[ResourceInformation]]. */

View file

@ -17,7 +17,7 @@
package org.apache.spark.scheduler
import org.apache.spark.resource.ResourceAllocator
import org.apache.spark.resource.{ResourceAllocator, ResourceInformation}
/**
* Class to hold information about a type of Resource on an Executor. This information is managed
@ -27,4 +27,8 @@ import org.apache.spark.resource.ResourceAllocator
* @param addresses Resource addresses provided by the executor
*/
private[spark] class ExecutorResourceInfo(name: String, addresses: Seq[String])
extends ResourceAllocator(name, addresses)
extends ResourceInformation(name, addresses.toArray) with ResourceAllocator {
override protected def resourceName = this.name
override protected def resourceAddresses = this.addresses
}

View file

@ -31,12 +31,12 @@ import org.apache.spark.scheduler.ExecutorResourceInfo
* @param resourcesInfo The information of the currently available resources on the executor
*/
private[cluster] class ExecutorData(
val executorEndpoint: RpcEndpointRef,
val executorAddress: RpcAddress,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String],
override val attributes: Map[String, String],
val resourcesInfo: Map[String, ExecutorResourceInfo]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes)
val executorEndpoint: RpcEndpointRef,
val executorAddress: RpcAddress,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String],
override val attributes: Map[String, String],
override val resourcesInfo: Map[String, ExecutorResourceInfo]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes, resourcesInfo)

View file

@ -17,6 +17,7 @@
package org.apache.spark.scheduler.cluster
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.resource.ResourceInformation
/**
* :: DeveloperApi ::
@ -27,10 +28,19 @@ class ExecutorInfo(
val executorHost: String,
val totalCores: Int,
val logUrlMap: Map[String, String],
val attributes: Map[String, String]) {
val attributes: Map[String, String],
val resourcesInfo: Map[String, ResourceInformation]) {
def this(executorHost: String, totalCores: Int, logUrlMap: Map[String, String]) = {
this(executorHost, totalCores, logUrlMap, Map.empty)
this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty)
}
def this(
executorHost: String,
totalCores: Int,
logUrlMap: Map[String, String],
attributes: Map[String, String]) = {
this(executorHost, totalCores, logUrlMap, attributes, Map.empty)
}
def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
@ -41,12 +51,13 @@ class ExecutorInfo(
executorHost == that.executorHost &&
totalCores == that.totalCores &&
logUrlMap == that.logUrlMap &&
attributes == that.attributes
attributes == that.attributes &&
resourcesInfo == that.resourcesInfo
case _ => false
}
override def hashCode(): Int = {
val state = Seq(executorHost, totalCores, logUrlMap, attributes)
val state = Seq(executorHost, totalCores, logUrlMap, attributes, resourcesInfo)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}

View file

@ -199,6 +199,7 @@ private[spark] class AppStatusListener(
exec.totalCores = event.executorInfo.totalCores
exec.maxTasks = event.executorInfo.totalCores / coresPerTask
exec.executorLogs = event.executorInfo.logUrlMap
exec.resources = event.executorInfo.resourcesInfo
exec.attributes = event.executorInfo.attributes
liveUpdate(exec, System.nanoTime())
}

View file

@ -27,6 +27,7 @@ import com.google.common.collect.Interners
import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
import org.apache.spark.status.api.v1
import org.apache.spark.storage.RDDInfo
@ -259,6 +260,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
var executorLogs = Map[String, String]()
var attributes = Map[String, String]()
var resources = Map[String, ResourceInformation]()
// Memory metrics. They may not be recorded (e.g. old event logs) so if totalOnHeap is not
// initialized, the store will not contain this information.
@ -308,7 +310,8 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
memoryMetrics,
blacklistedInStages,
Some(peakExecutorMetrics).filter(_.isSet),
attributes)
attributes,
resources)
new ExecutorSummaryWrapper(info)
}
}

View file

@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize
import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.resource.ResourceInformation
case class ApplicationInfo private[spark](
id: String,
@ -107,7 +108,8 @@ class ExecutorSummary private[spark](
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
val peakMemoryMetrics: Option[ExecutorMetrics],
val attributes: Map[String, String])
val attributes: Map[String, String],
val resources: Map[String, ResourceInformation])
class MemoryMetrics private[spark](
val usedOnHeapStorageMemory: Long,

View file

@ -33,6 +33,7 @@ import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._
@ -495,7 +496,15 @@ private[spark] object JsonProtocol {
("Host" -> executorInfo.executorHost) ~
("Total Cores" -> executorInfo.totalCores) ~
("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
("Attributes" -> mapToJson(executorInfo.attributes))
("Attributes" -> mapToJson(executorInfo.attributes)) ~
("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo))
}
def resourcesMapToJson(m: Map[String, ResourceInformation]): JValue = {
val jsonFields = m.map {
case (k, v) => JField(k, v.toJson)
}
JObject(jsonFields.toList)
}
def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = {
@ -1086,7 +1095,11 @@ private[spark] object JsonProtocol {
case Some(attr) => mapFromJson(attr).toMap
case None => Map.empty[String, String]
}
new ExecutorInfo(executorHost, totalCores, logUrls, attributes)
val resources = jsonOption(json \ "Resources") match {
case Some(resources) => resourcesMapFromJson(resources).toMap
case None => Map.empty[String, ResourceInformation]
}
new ExecutorInfo(executorHost, totalCores, logUrls, attributes, resources)
}
def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {
@ -1098,6 +1111,14 @@ private[spark] object JsonProtocol {
BlockUpdatedInfo(blockManagerId, blockId, storageLevel, memorySize, diskSize)
}
def resourcesMapFromJson(json: JValue): Map[String, ResourceInformation] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
val resourceInfo = ResourceInformation.parseJson(v)
(k, resourceInfo)
}.toMap
}
/** -------------------------------- *
* Util JSON deserialization methods |
* --------------------------------- */

View file

@ -1,4 +1,19 @@
[ {
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2019-07-02T21:02:17.180GMT",
"endTime" : "2019-07-02T21:02:35.974GMT",
"lastUpdated" : "",
"duration" : 18794,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"startTimeEpoch" : 1562101337180,
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1562101355974
} ]
}, {
"id" : "application_1553914137147_0018",
"name" : "LargeBlocks",
"attempts" : [ {

View file

@ -1,4 +1,19 @@
[ {
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2019-07-02T21:02:17.180GMT",
"endTime" : "2019-07-02T21:02:35.974GMT",
"lastUpdated" : "",
"duration" : 18794,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"startTimeEpoch" : 1562101337180,
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1562101355974
} ]
}, {
"id" : "application_1553914137147_0018",
"name" : "LargeBlocks",
"attempts" : [ {

View file

@ -21,5 +21,6 @@
"addTime" : "2015-02-03T16:43:00.906GMT",
"executorLogs" : { },
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
} ]

View file

@ -49,7 +49,8 @@
"MajorGCCount" : 3,
"MajorGCTime" : 144
},
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "3",
"hostPort" : "test-3.vpc.company.com:37641",
@ -114,7 +115,8 @@
"HTTP_SCHEME" : "http://",
"NM_HOST" : "test-3.vpc.company.com",
"CONTAINER_ID" : "container_1553914137147_0018_01_000004"
}
},
"resources" : { }
}, {
"id" : "2",
"hostPort" : "test-4.vpc.company.com:33179",
@ -179,7 +181,8 @@
"HTTP_SCHEME" : "http://",
"NM_HOST" : "test-4.vpc.company.com",
"CONTAINER_ID" : "container_1553914137147_0018_01_000003"
}
},
"resources" : { }
}, {
"id" : "1",
"hostPort" : "test-2.vpc.company.com:43764",
@ -244,5 +247,6 @@
"HTTP_SCHEME" : "http://",
"NM_HOST" : "test-2.vpc.company.com",
"CONTAINER_ID" : "container_1553914137147_0018_01_000002"
}
},
"resources" : { }
} ]

View file

@ -27,7 +27,8 @@
"totalOffHeapStorageMemory" : 524288000
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "3",
"hostPort" : "172.22.0.167:51485",
@ -60,7 +61,8 @@
"totalOffHeapStorageMemory" : 524288000
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
} ,{
"id" : "2",
"hostPort" : "172.22.0.167:51487",
@ -93,7 +95,8 @@
"totalOffHeapStorageMemory" : 524288000
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "1",
"hostPort" : "172.22.0.167:51490",
@ -126,7 +129,8 @@
"totalOffHeapStorageMemory": 524288000
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "0",
"hostPort" : "172.22.0.167:51491",
@ -159,5 +163,6 @@
"totalOffHeapStorageMemory" : 524288000
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
} ]

View file

@ -27,7 +27,8 @@
"totalOffHeapStorageMemory" : 524288000
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "3",
"hostPort" : "172.22.0.167:51485",
@ -60,7 +61,8 @@
"totalOffHeapStorageMemory" : 524288000
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "2",
"hostPort" : "172.22.0.167:51487",
@ -93,7 +95,8 @@
"totalOffHeapStorageMemory" : 524288000
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "1",
"hostPort" : "172.22.0.167:51490",
@ -126,7 +129,8 @@
"totalOffHeapStorageMemory": 524288000
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "0",
"hostPort" : "172.22.0.167:51491",
@ -159,5 +163,6 @@
"totalOffHeapStorageMemory": 524288000
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
} ]

View file

@ -21,7 +21,8 @@
"addTime" : "2016-11-15T23:20:38.836GMT",
"executorLogs" : { },
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "3",
"hostPort" : "172.22.0.111:64543",
@ -48,7 +49,8 @@
"stderr" : "http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stderr"
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "2",
"hostPort" : "172.22.0.111:64539",
@ -75,7 +77,8 @@
"stderr" : "http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stderr"
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "1",
"hostPort" : "172.22.0.111:64541",
@ -102,7 +105,8 @@
"stderr" : "http://172.22.0.111:64518/logPage/?appId=app-20161115172038-0000&executorId=1&logType=stderr"
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "0",
"hostPort" : "172.22.0.111:64540",
@ -129,5 +133,6 @@
"stderr" : "http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stderr"
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
} ]

View file

@ -0,0 +1,130 @@
[ {
"id" : "driver",
"hostPort" : "10.28.9.112:37319",
"isActive" : true,
"rddBlocks" : 0,
"memoryUsed" : 0,
"diskUsed" : 0,
"totalCores" : 0,
"maxTasks" : 0,
"activeTasks" : 0,
"failedTasks" : 0,
"completedTasks" : 0,
"totalTasks" : 0,
"totalDuration" : 0,
"totalGCTime" : 0,
"totalInputBytes" : 0,
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"isBlacklisted" : false,
"maxMemory" : 384093388,
"addTime" : "2019-07-02T21:02:25.595GMT",
"executorLogs" : { },
"memoryMetrics" : {
"usedOnHeapStorageMemory" : 0,
"usedOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"totalOffHeapStorageMemory" : 0
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
}, {
"id" : "2",
"hostPort" : "tomg-test:46005",
"isActive" : true,
"rddBlocks" : 0,
"memoryUsed" : 0,
"diskUsed" : 0,
"totalCores" : 1,
"maxTasks" : 1,
"activeTasks" : 0,
"failedTasks" : 0,
"completedTasks" : 0,
"totalTasks" : 0,
"totalDuration" : 0,
"totalGCTime" : 0,
"totalInputBytes" : 0,
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"isBlacklisted" : false,
"maxMemory" : 384093388,
"addTime" : "2019-07-02T21:02:29.256GMT",
"executorLogs" : {
"stdout" : "http://tomg-test:8042/node/containerlogs/container_1555004656427_0144_01_000003/tgraves/stdout?start=-4096",
"stderr" : "http://tomg-test:8042/node/containerlogs/container_1555004656427_0144_01_000003/tgraves/stderr?start=-4096"
},
"memoryMetrics" : {
"usedOnHeapStorageMemory" : 0,
"usedOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"totalOffHeapStorageMemory" : 0
},
"blacklistedInStages" : [ ],
"attributes" : {
"NM_HTTP_ADDRESS" : "tomg-test:8042",
"USER" : "tgraves",
"LOG_FILES" : "stderr,stdout",
"NM_HTTP_PORT" : "8042",
"CLUSTER_ID" : "",
"NM_PORT" : "43125",
"HTTP_SCHEME" : "http://",
"NM_HOST" : "tomg-test",
"CONTAINER_ID" : "container_1555004656427_0144_01_000003"
},
"resources" : {
"gpu" : {
"name" : "gpu",
"addresses" : [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12" ]
}
}
}, {
"id" : "1",
"hostPort" : "tomg-test:44873",
"isActive" : true,
"rddBlocks" : 0,
"memoryUsed" : 0,
"diskUsed" : 0,
"totalCores" : 1,
"maxTasks" : 1,
"activeTasks" : 0,
"failedTasks" : 0,
"completedTasks" : 0,
"totalTasks" : 0,
"totalDuration" : 0,
"totalGCTime" : 0,
"totalInputBytes" : 0,
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"isBlacklisted" : false,
"maxMemory" : 384093388,
"addTime" : "2019-07-02T21:02:28.551GMT",
"executorLogs" : {
"stdout" : "http://tomg-test:8042/node/containerlogs/container_1555004656427_0144_01_000002/tgraves/stdout?start=-4096",
"stderr" : "http://tomg-test:8042/node/containerlogs/container_1555004656427_0144_01_000002/tgraves/stderr?start=-4096"
},
"memoryMetrics" : {
"usedOnHeapStorageMemory" : 0,
"usedOffHeapStorageMemory" : 0,
"totalOnHeapStorageMemory" : 384093388,
"totalOffHeapStorageMemory" : 0
},
"blacklistedInStages" : [ ],
"attributes" : {
"NM_HTTP_ADDRESS" : "tomg-test:8042",
"USER" : "tgraves",
"LOG_FILES" : "stderr,stdout",
"NM_HTTP_PORT" : "8042",
"CLUSTER_ID" : "",
"NM_PORT" : "43125",
"HTTP_SCHEME" : "http://",
"NM_HOST" : "tomg-test",
"CONTAINER_ID" : "container_1555004656427_0144_01_000002"
},
"resources" : {
"gpu" : {
"name" : "gpu",
"addresses" : [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12" ]
}
}
} ]

View file

@ -1,4 +1,19 @@
[ {
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2019-07-02T21:02:17.180GMT",
"endTime" : "2019-07-02T21:02:35.974GMT",
"lastUpdated" : "",
"duration" : 18794,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"startTimeEpoch" : 1562101337180,
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1562101355974
} ]
}, {
"id" : "application_1553914137147_0018",
"name" : "LargeBlocks",
"attempts" : [ {
@ -28,19 +43,4 @@
"startTimeEpoch" : 1516300235119,
"endTimeEpoch" : 1516300707938
} ]
}, {
"id" : "app-20180109111548-0000",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2018-01-09T10:15:42.372GMT",
"endTime" : "2018-01-09T10:24:37.606GMT",
"lastUpdated" : "",
"duration" : 535234,
"sparkUser" : "attilapiros",
"completed" : true,
"appSparkVersion" : "2.3.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1515492942372,
"endTimeEpoch" : 1515493477606
} ]
} ]

View file

@ -1,4 +1,22 @@
[ {
[
{
"id": "application_1555004656427_0144",
"name": "Spark shell",
"attempts": [
{
"startTime": "2019-07-02T21:02:17.180GMT",
"endTime": "2019-07-02T21:02:35.974GMT",
"lastUpdated": "",
"duration": 18794,
"sparkUser": "tgraves",
"completed": true,
"appSparkVersion": "3.0.0-SNAPSHOT",
"startTimeEpoch": 1562101337180,
"lastUpdatedEpoch": 0,
"endTimeEpoch": 1562101355974
}
]
}, {
"id" : "application_1553914137147_0018",
"name" : "LargeBlocks",
"attempts" : [ {

View file

@ -1,4 +1,19 @@
[ {
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2019-07-02T21:02:17.180GMT",
"endTime" : "2019-07-02T21:02:35.974GMT",
"lastUpdated" : "",
"duration" : 18794,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"startTimeEpoch" : 1562101337180,
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1562101355974
} ]
}, {
"id" : "application_1553914137147_0018",
"name" : "LargeBlocks",
"attempts" : [ {

File diff suppressed because one or more lines are too long

View file

@ -783,8 +783,9 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
.set(DRIVER_RESOURCES_FILE, resourcesFile)
.setMaster("local-cluster[1, 1, 1024]")
.setAppName("test-cluster")
conf.set(DRIVER_GPU_ID.amountConf, "3")
conf.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath)
.set(DRIVER_GPU_ID.amountConf, "3")
.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath)
.set(SPARK_RESOURCES_DIR, dir.getName())
sc = new SparkContext(conf)
@ -855,10 +856,11 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
val conf = new SparkConf()
.setMaster("local-cluster[3, 3, 1024]")
.setAppName("test-cluster")
conf.set(WORKER_GPU_ID.amountConf, "3")
conf.set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
conf.set(TASK_GPU_ID.amountConf, "3")
conf.set(EXECUTOR_GPU_ID.amountConf, "3")
.set(WORKER_GPU_ID.amountConf, "3")
.set(WORKER_GPU_ID.discoveryScriptConf, discoveryScript)
.set(TASK_GPU_ID.amountConf, "3")
.set(EXECUTOR_GPU_ID.amountConf, "3")
.set(SPARK_RESOURCES_DIR, dir.getName())
sc = new SparkContext(conf)

View file

@ -170,6 +170,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
"executor node blacklisting" -> "applications/app-20161116163331-0000/executors",
"executor node blacklisting unblacklisting" -> "applications/app-20161115172038-0000/executors",
"executor memory usage" -> "applications/app-20161116163331-0000/executors",
"executor resource information" -> "applications/application_1555004656427_0144/executors",
"app environment" -> "applications/app-20161116163331-0000/environment",

View file

@ -32,6 +32,8 @@ import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.ResourceUtils
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.shuffle.MetadataFetchFailedException
@ -83,13 +85,15 @@ class JsonProtocolSuite extends SparkFunSuite {
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val attributes = Map("ContainerId" -> "ct1", "User" -> "spark").toMap
val resources = Map(ResourceUtils.GPU ->
new ResourceInformation(ResourceUtils.GPU, Array("0", "1")))
val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"),
42L, "Garfield", Some("appAttempt"))
val applicationStartWithLogs = SparkListenerApplicationStart("The winner of all", Some("appId"),
42L, "Garfield", Some("appAttempt"), Some(logUrlMap))
val applicationEnd = SparkListenerApplicationEnd(42L)
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes))
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes, resources.toMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
val executorBlacklisted = SparkListenerExecutorBlacklisted(executorBlacklistedTime, "exec1", 22)
val executorUnblacklisted =
@ -1947,6 +1951,12 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Attributes" : {
| "ContainerId" : "ct1",
| "User" : "spark"
| },
| "Resources" : {
| "gpu" : {
| "name" : "gpu",
| "addresses" : [ "0", "1" ]
| }
| }
| }
|}

View file

@ -83,6 +83,7 @@ app-20161115172038-0000
app-20161116163331-0000
application_1516285256255_0012
application_1553914137147_0018
application_1555004656427_0144
stat
local-1422981759269
local-1422981780767