[SPARK-29303][WEB UI] Add UI support for stage level scheduling

### What changes were proposed in this pull request?

This adds UI updates to support stage level scheduling and ResourceProfiles. 3 main things have been added. ResourceProfile id added to the Stage page, the Executors page now has an optional selectable column to show the ResourceProfile Id of each executor, and the Environment page now has a section with the ResourceProfile ids.  Along with this the rest api for environment page was updated to include the Resource profile information.

I debating on splitting the resource profile information into its own page but I wasn't sure it called for a completely separate page. Open to peoples thoughts on this.

Screen shots:
![Screen Shot 2020-04-01 at 3 07 46 PM](https://user-images.githubusercontent.com/4563792/78185169-469a7000-7430-11ea-8b0c-d9ede2d41df8.png)
![Screen Shot 2020-04-01 at 3 08 14 PM](https://user-images.githubusercontent.com/4563792/78185175-48fcca00-7430-11ea-8d1d-6b9333700f32.png)
![Screen Shot 2020-04-01 at 3 09 03 PM](https://user-images.githubusercontent.com/4563792/78185176-4a2df700-7430-11ea-92d9-73c382bb0f32.png)
![Screen Shot 2020-04-01 at 11 05 48 AM](https://user-images.githubusercontent.com/4563792/78185186-4dc17e00-7430-11ea-8962-f749dd47ea60.png)

### Why are the changes needed?

For user to be able to know what resource profile was used with which stage and executors. The resource profile information is also available so user debugging can see exactly what resources were requested with that profile.

### Does this PR introduce any user-facing change?

Yes, UI updates.

### How was this patch tested?

Unit tests and tested on yarn both active applications and with the history server.

Closes #28094 from tgravescs/SPARK-29303-pr.

Lead-authored-by: Thomas Graves <tgraves@nvidia.com>
Co-authored-by: Thomas Graves <tgraves@apache.org>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
Thomas Graves 2020-05-21 13:11:35 -05:00 committed by Thomas Graves
parent f1495c5bc0
commit b64688ebba
49 changed files with 657 additions and 103 deletions

View file

@ -162,6 +162,11 @@ public class SparkFirehoseListener implements SparkListenerInterface {
onEvent(speculativeTask);
}
@Override
public void onResourceProfileAdded(SparkListenerResourceProfileAdded event) {
onEvent(event);
}
@Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);

View file

@ -89,6 +89,7 @@ limitations under the License.
<th>Disk Used</th>
<th>Cores</th>
<th>Resources</th>
<th>Resource Profile Id</th>
<th><span data-toggle="tooltip" data-placement="top" title="Number of tasks currently executing. Darker shading highlights executors with more active tasks.">Active Tasks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Number of tasks that have failed on this executor. Darker shading highlights executors with a high proportion of failed tasks.">Failed Tasks</span></th>
<th>Complete Tasks</th>

View file

@ -119,7 +119,7 @@ function totalDurationColor(totalGCTime, totalDuration) {
}
var sumOptionalColumns = [3, 4];
var execOptionalColumns = [5, 6, 9];
var execOptionalColumns = [5, 6, 9, 10];
var execDataTable;
var sumDataTable;
@ -415,6 +415,7 @@ $(document).ready(function () {
{data: 'diskUsed', render: formatBytes},
{data: 'totalCores'},
{name: 'resourcesCol', data: 'resources', render: formatResourceCells, orderable: false},
{name: 'resourceProfileIdCol', data: 'resourceProfileId'},
{
data: 'activeTasks',
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
@ -461,7 +462,8 @@ $(document).ready(function () {
"columnDefs": [
{"visible": false, "targets": 5},
{"visible": false, "targets": 6},
{"visible": false, "targets": 9}
{"visible": false, "targets": 9},
{"visible": false, "targets": 10}
],
"deferRender": true
};
@ -570,6 +572,7 @@ $(document).ready(function () {
"<div id='on_heap_memory' class='on-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='3' data-exec-col-idx='5'>On Heap Memory</div>" +
"<div id='off_heap_memory' class='off-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='4' data-exec-col-idx='6'>Off Heap Memory</div>" +
"<div id='extra_resources' class='resources-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='9'>Resources</div>" +
"<div id='resource_prof_id' class='resource-prof-id-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='10'>Resource Profile Id</div>" +
"</div>");
reselectCheckboxesBasedOnTaskTableState();

View file

@ -435,7 +435,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
_listenerBus = new LiveListenerBus(_conf)
_resourceProfileManager = new ResourceProfileManager(_conf)
_resourceProfileManager = new ResourceProfileManager(_conf, _listenerBus)
// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.

View file

@ -72,7 +72,8 @@ private[spark] class HistoryAppStatusStore(
source.totalGCTime, source.totalInputBytes, source.totalShuffleRead,
source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime,
source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics,
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources)
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources,
source.resourceProfileId)
}
}

View file

@ -25,17 +25,19 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Tests._
import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerResourceProfileAdded}
import org.apache.spark.util.Utils
import org.apache.spark.util.Utils.isTesting
/**
* Manager of resource profiles. The manager allows one place to keep the actual ResourceProfiles
* and everywhere else we can use the ResourceProfile Id to save on space.
* Note we never remove a resource profile at this point. Its expected this number if small
* Note we never remove a resource profile at this point. Its expected this number is small
* so this shouldn't be much overhead.
*/
@Evolving
private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Logging {
private[spark] class ResourceProfileManager(sparkConf: SparkConf,
listenerBus: LiveListenerBus) extends Logging {
private val resourceProfileIdToResourceProfile = new HashMap[Int, ResourceProfile]()
private val (readLock, writeLock) = {
@ -83,6 +85,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
// force the computation of maxTasks and limitingResource now so we don't have cost later
rp.limitingResource(sparkConf)
logInfo(s"Added ResourceProfile id: ${rp.id}")
listenerBus.post(SparkListenerResourceProfileAdded(rp))
}
}

View file

@ -235,6 +235,10 @@ private[spark] class EventLoggingListener(
}
}
override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = {
logEvent(event, flushLogger = true)
}
override def onOtherEvent(event: SparkListenerEvent): Unit = {
if (event.logEvent) {
logEvent(event, flushLogger = true)

View file

@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo
import org.apache.spark.TaskEndReason
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
@ -207,6 +208,10 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerResourceProfileAdded(resourceProfile: ResourceProfile)
extends SparkListenerEvent
/**
* Interface for listening to events from the Spark scheduler. Most applications should probably
* extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
@ -348,6 +353,11 @@ private[spark] trait SparkListenerInterface {
* Called when other events like SQL-specific events are posted.
*/
def onOtherEvent(event: SparkListenerEvent): Unit
/**
* Called when a Resource Profile is added to the manager.
*/
def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit
}
@ -421,4 +431,6 @@ abstract class SparkListener extends SparkListenerInterface {
speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { }
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = { }
}

View file

@ -79,6 +79,8 @@ private[spark] trait SparkListenerBus
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case resourceProfileAdded: SparkListenerResourceProfileAdded =>
listener.onResourceProfileAdded(resourceProfileAdded)
case _ => listener.onOtherEvent(event)
}
}

View file

@ -28,6 +28,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CPUS_PER_TASK
import org.apache.spark.internal.config.Status._
import org.apache.spark.resource.ResourceProfile.CPUS
import org.apache.spark.scheduler._
import org.apache.spark.status.api.v1
import org.apache.spark.storage._
@ -51,7 +52,7 @@ private[spark] class AppStatusListener(
private var sparkVersion = SPARK_VERSION
private var appInfo: v1.ApplicationInfo = null
private var appSummary = new AppSummary(0, 0)
private var coresPerTask: Int = 1
private var defaultCpusPerTask: Int = 1
// How often to update live entities. -1 means "never update" when replaying applications,
// meaning only the last write will happen. For live applications, this avoids a few
@ -76,6 +77,7 @@ private[spark] class AppStatusListener(
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()
private val liveResourceProfiles = new HashMap[Int, LiveResourceProfile]()
private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
// Keep the active executor count as a separate variable to avoid having to do synchronization
@ -145,6 +147,20 @@ private[spark] class AppStatusListener(
}
}
override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = {
val maxTasks = if (event.resourceProfile.isCoresLimitKnown) {
Some(event.resourceProfile.maxTasksPerExecutor(conf))
} else {
None
}
val liveRP = new LiveResourceProfile(event.resourceProfile.id,
event.resourceProfile.executorResources, event.resourceProfile.taskResources, maxTasks)
liveResourceProfiles(event.resourceProfile.id) = liveRP
val rpInfo = new v1.ResourceProfileInfo(liveRP.resourceProfileId,
liveRP.executorResources, liveRP.taskResources)
kvstore.write(new ResourceProfileWrapper(rpInfo))
}
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
val details = event.environmentDetails
@ -159,10 +175,11 @@ private[spark] class AppStatusListener(
details.getOrElse("Spark Properties", Nil),
details.getOrElse("Hadoop Properties", Nil),
details.getOrElse("System Properties", Nil),
details.getOrElse("Classpath Entries", Nil))
details.getOrElse("Classpath Entries", Nil),
Nil)
coresPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)
.getOrElse(coresPerTask)
defaultCpusPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)
.getOrElse(defaultCpusPerTask)
kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo))
}
@ -197,10 +214,16 @@ private[spark] class AppStatusListener(
exec.host = event.executorInfo.executorHost
exec.isActive = true
exec.totalCores = event.executorInfo.totalCores
exec.maxTasks = event.executorInfo.totalCores / coresPerTask
val rpId = event.executorInfo.resourceProfileId
val liveRP = liveResourceProfiles.get(rpId)
val cpusPerTask = liveRP.flatMap(_.taskResources.get(CPUS))
.map(_.amount.toInt).getOrElse(defaultCpusPerTask)
val maxTasksPerExec = liveRP.flatMap(_.maxTasksPerExecutor)
exec.maxTasks = maxTasksPerExec.getOrElse(event.executorInfo.totalCores / cpusPerTask)
exec.executorLogs = event.executorInfo.logUrlMap
exec.resources = event.executorInfo.resourcesInfo
exec.attributes = event.executorInfo.attributes
exec.resourceProfileId = rpId
liveUpdate(exec, System.nanoTime())
}

View file

@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import org.apache.spark.{JobExecutionStatus, SparkConf, SparkException}
import org.apache.spark.resource.ResourceProfileManager
import org.apache.spark.status.api.v1
import org.apache.spark.ui.scope._
import org.apache.spark.util.Utils
@ -51,6 +52,10 @@ private[spark] class AppStatusStore(
store.read(klass, klass.getName()).info
}
def resourceProfileInfo(): Seq[v1.ResourceProfileInfo] = {
store.view(classOf[ResourceProfileWrapper]).asScala.map(_.rpInfo).toSeq
}
def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = {
val it = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
if (statuses != null && !statuses.isEmpty()) {
@ -486,7 +491,8 @@ private[spark] class AppStatusStore(
accumulatorUpdates = stage.accumulatorUpdates,
tasks = Some(tasks),
executorSummary = Some(executorSummary(stage.stageId, stage.attemptId)),
killedTasksSummary = stage.killedTasksSummary)
killedTasksSummary = stage.killedTasksSummary,
resourceProfileId = stage.resourceProfileId)
}
def rdd(rddId: Int): v1.RDDStorageInfo = {

View file

@ -28,7 +28,7 @@ import com.google.common.collect.Interners
import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest}
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
import org.apache.spark.status.api.v1
import org.apache.spark.storage.{RDDInfo, StorageLevel}
@ -245,6 +245,21 @@ private class LiveTask(
}
private class LiveResourceProfile(
val resourceProfileId: Int,
val executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest],
val maxTasksPerExecutor: Option[Int]) extends LiveEntity {
def toApi(): v1.ResourceProfileInfo = {
new v1.ResourceProfileInfo(resourceProfileId, executorResources, taskResources)
}
override protected def doUpdate(): Any = {
new ResourceProfileWrapper(toApi())
}
}
private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {
var hostPort: String = null
@ -285,6 +300,8 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend
var usedOnHeap = 0L
var usedOffHeap = 0L
var resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
def hasMemoryInfo: Boolean = totalOnHeap >= 0L
// peak values for executor level metrics
@ -327,7 +344,8 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend
blacklistedInStages,
Some(peakExecutorMetrics).filter(_.isSet),
attributes,
resources)
resources,
resourceProfileId)
new ExecutorSummaryWrapper(info)
}
}
@ -465,7 +483,8 @@ private class LiveStage extends LiveEntity {
accumulatorUpdates = newAccumulatorInfos(info.accumulables.values),
tasks = None,
executorSummary = None,
killedTasksSummary = killedSummary)
killedTasksSummary = killedSummary,
resourceProfileId = info.resourceProfileId)
}
override protected def doUpdate(): Any = {

View file

@ -101,12 +101,14 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
@Path("environment")
def environmentInfo(): ApplicationEnvironmentInfo = withUI { ui =>
val envInfo = ui.store.environmentInfo()
val resourceProfileInfo = ui.store.resourceProfileInfo()
new v1.ApplicationEnvironmentInfo(
envInfo.runtime,
Utils.redact(ui.conf, envInfo.sparkProperties),
Utils.redact(ui.conf, envInfo.hadoopProperties),
Utils.redact(ui.conf, envInfo.systemProperties),
envInfo.classpathEntries)
envInfo.classpathEntries,
resourceProfileInfo)
}
@GET

View file

@ -30,7 +30,7 @@ import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize
import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, TaskResourceRequest}
case class ApplicationInfo private[spark](
id: String,
@ -62,6 +62,11 @@ case class ApplicationAttemptInfo private[spark](
}
class ResourceProfileInfo private[spark](
val id: Int,
val executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest])
class ExecutorStageSummary private[spark](
val taskTime : Long,
val failedTasks : Int,
@ -109,7 +114,8 @@ class ExecutorSummary private[spark](
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
val peakMemoryMetrics: Option[ExecutorMetrics],
val attributes: Map[String, String],
val resources: Map[String, ResourceInformation])
val resources: Map[String, ResourceInformation],
val resourceProfileId: Int)
class MemoryMetrics private[spark](
val usedOnHeapStorageMemory: Long,
@ -252,7 +258,8 @@ class StageData private[spark](
val accumulatorUpdates: Seq[AccumulableInfo],
val tasks: Option[Map[Long, TaskData]],
val executorSummary: Option[Map[String, ExecutorStageSummary]],
val killedTasksSummary: Map[String, Int])
val killedTasksSummary: Map[String, Int],
val resourceProfileId: Int)
class TaskData private[spark](
val taskId: Long,
@ -365,12 +372,15 @@ class AccumulableInfo private[spark](
class VersionInfo private[spark](
val spark: String)
// Note the resourceProfiles information are only added here on return from the
// REST call, they are not stored with it.
class ApplicationEnvironmentInfo private[spark] (
val runtime: RuntimeInfo,
val sparkProperties: Seq[(String, String)],
val hadoopProperties: Seq[(String, String)],
val systemProperties: Seq[(String, String)],
val classpathEntries: Seq[(String, String)])
val classpathEntries: Seq[(String, String)],
val resourceProfiles: Seq[ResourceProfileInfo])
class RuntimeInfo private[spark](
val javaVersion: String,

View file

@ -374,6 +374,13 @@ private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) {
}
private[spark] class ResourceProfileWrapper(val rpInfo: ResourceProfileInfo) {
@JsonIgnore @KVIndex
def id: Int = rpInfo.id
}
private[spark] class ExecutorStageSummaryWrapper(
val stageId: Int,
val stageAttemptId: Int,

View file

@ -19,9 +19,11 @@ package org.apache.spark.ui.env
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable.StringBuilder
import scala.xml.Node
import org.apache.spark.SparkConf
import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
import org.apache.spark.status.AppStatusStore
import org.apache.spark.ui._
import org.apache.spark.util.Utils
@ -38,6 +40,37 @@ private[ui] class EnvironmentPage(
"Java Home" -> appEnv.runtime.javaHome,
"Scala Version" -> appEnv.runtime.scalaVersion)
def constructExecutorRequestString(execReqs: Map[String, ExecutorResourceRequest]): String = {
execReqs.map {
case (_, execReq) =>
val execStr = new StringBuilder(s"\t${execReq.resourceName}: [amount: ${execReq.amount}")
if (execReq.discoveryScript.nonEmpty) {
execStr ++= s", discovery: ${execReq.discoveryScript}"
}
if (execReq.vendor.nonEmpty) {
execStr ++= s", vendor: ${execReq.vendor}"
}
execStr ++= "]"
execStr.toString()
}.mkString("\n")
}
def constructTaskRequestString(taskReqs: Map[String, TaskResourceRequest]): String = {
taskReqs.map {
case (_, taskReq) => s"\t${taskReq.resourceName}: [amount: ${taskReq.amount}]"
}.mkString("\n")
}
val resourceProfileInfo = store.resourceProfileInfo().map { rinfo =>
val einfo = constructExecutorRequestString(rinfo.executorResources)
val tinfo = constructTaskRequestString(rinfo.taskResources)
val res = s"Executor Reqs:\n$einfo\nTask Reqs:\n$tinfo"
(rinfo.id.toString, res)
}.toMap
val resourceProfileInformationTable = UIUtils.listingTable(resourceProfileHeader,
jvmRowDataPre, resourceProfileInfo.toSeq.sortWith(_._1.toInt < _._1.toInt),
fixedWidth = true, headerClasses = headerClassesNoSortValues)
val runtimeInformationTable = UIUtils.listingTable(
propertyHeader, jvmRow, jvmInformation.toSeq.sorted, fixedWidth = true,
headerClasses = headerClasses)
@ -77,6 +110,17 @@ private[ui] class EnvironmentPage(
<div class="aggregated-sparkProperties collapsible-table">
{sparkPropertiesTable}
</div>
<span class="collapse-aggregated-execResourceProfileInformation collapse-table"
onClick="collapseTable('collapse-aggregated-execResourceProfileInformation',
'aggregated-execResourceProfileInformation')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<a>Resource Profiles</a>
</h4>
</span>
<div class="aggregated-execResourceProfileInformation collapsible-table">
{resourceProfileInformationTable}
</div>
<span class="collapse-aggregated-hadoopProperties collapse-table"
onClick="collapseTable('collapse-aggregated-hadoopProperties',
'aggregated-hadoopProperties')">
@ -115,10 +159,14 @@ private[ui] class EnvironmentPage(
UIUtils.headerSparkPage(request, "Environment", content, parent)
}
private def resourceProfileHeader = Seq("Resource Profile Id", "Resource Profile Contents")
private def propertyHeader = Seq("Name", "Value")
private def classPathHeader = Seq("Resource", "Source")
private def headerClasses = Seq("sorttable_alpha", "sorttable_alpha")
private def headerClassesNoSortValues = Seq("sorttable_numeric", "sorttable_nosort")
private def jvmRowDataPre(kv: (String, String)) =
<tr><td>{kv._1}</td><td><pre>{kv._2}</pre></td></tr>
private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>

View file

@ -26,6 +26,7 @@ import scala.xml.{Node, NodeSeq, Unparsed, Utility}
import org.apache.commons.text.StringEscapeUtils
import org.apache.spark.JobExecutionStatus
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1
import org.apache.spark.ui._
@ -253,7 +254,8 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
accumulatorUpdates = Nil,
tasks = None,
executorSummary = None,
killedTasksSummary = Map())
killedTasksSummary = Map(),
ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID)
}
}

View file

@ -142,6 +142,10 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
val summary =
<div>
<ul class="list-unstyled">
<li>
<strong>Resource Profile Id: </strong>
{stageData.resourceProfileId}
</li>
<li>
<strong>Total Time Across All Tasks: </strong>
{UIUtils.formatDuration(stageData.executorRunTime)}

View file

@ -33,7 +33,7 @@ import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._
@ -105,6 +105,8 @@ private[spark] object JsonProtocol {
stageExecutorMetricsToJson(stageExecutorMetrics)
case blockUpdate: SparkListenerBlockUpdated =>
blockUpdateToJson(blockUpdate)
case resourceProfileAdded: SparkListenerResourceProfileAdded =>
resourceProfileAddedToJson(resourceProfileAdded)
case _ => parse(mapper.writeValueAsString(event))
}
}
@ -224,6 +226,15 @@ private[spark] object JsonProtocol {
("Timestamp" -> applicationEnd.time)
}
def resourceProfileAddedToJson(profileAdded: SparkListenerResourceProfileAdded): JValue = {
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.resourceProfileAdded) ~
("Resource Profile Id" -> profileAdded.resourceProfile.id) ~
("Executor Resource Requests" ->
executorResourceRequestMapToJson(profileAdded.resourceProfile.executorResources)) ~
("Task Resource Requests" ->
taskResourceRequestMapToJson(profileAdded.resourceProfile.taskResources))
}
def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorAdded) ~
("Timestamp" -> executorAdded.time) ~
@ -297,7 +308,8 @@ private[spark] object JsonProtocol {
("Submission Time" -> submissionTime) ~
("Completion Time" -> completionTime) ~
("Failure Reason" -> failureReason) ~
("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values))
("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values)) ~
("Resource Profile Id" -> stageInfo.resourceProfileId)
}
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
@ -500,7 +512,8 @@ private[spark] object JsonProtocol {
("Total Cores" -> executorInfo.totalCores) ~
("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
("Attributes" -> mapToJson(executorInfo.attributes)) ~
("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo))
("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo)) ~
("Resource Profile Id" -> executorInfo.resourceProfileId)
}
def resourcesMapToJson(m: Map[String, ResourceInformation]): JValue = {
@ -518,6 +531,34 @@ private[spark] object JsonProtocol {
("Disk Size" -> blockUpdatedInfo.diskSize)
}
def executorResourceRequestToJson(execReq: ExecutorResourceRequest): JValue = {
("Resource Name" -> execReq.resourceName) ~
("Amount" -> execReq.amount) ~
("Discovery Script" -> execReq.discoveryScript) ~
("Vendor" -> execReq.vendor)
}
def executorResourceRequestMapToJson(m: Map[String, ExecutorResourceRequest]): JValue = {
val jsonFields = m.map {
case (k, execReq) =>
JField(k, executorResourceRequestToJson(execReq))
}
JObject(jsonFields.toList)
}
def taskResourceRequestToJson(taskReq: TaskResourceRequest): JValue = {
("Resource Name" -> taskReq.resourceName) ~
("Amount" -> taskReq.amount)
}
def taskResourceRequestMapToJson(m: Map[String, TaskResourceRequest]): JValue = {
val jsonFields = m.map {
case (k, taskReq) =>
JField(k, taskResourceRequestToJson(taskReq))
}
JObject(jsonFields.toList)
}
/** ------------------------------ *
* Util JSON serialization methods |
* ------------------------------- */
@ -577,6 +618,7 @@ private[spark] object JsonProtocol {
val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
val stageExecutorMetrics = Utils.getFormattedClassName(SparkListenerStageExecutorMetrics)
val blockUpdate = Utils.getFormattedClassName(SparkListenerBlockUpdated)
val resourceProfileAdded = Utils.getFormattedClassName(SparkListenerResourceProfileAdded)
}
def sparkEventFromJson(json: JValue): SparkListenerEvent = {
@ -602,6 +644,7 @@ private[spark] object JsonProtocol {
case `metricsUpdate` => executorMetricsUpdateFromJson(json)
case `stageExecutorMetrics` => stageExecutorMetricsFromJson(json)
case `blockUpdate` => blockUpdateFromJson(json)
case `resourceProfileAdded` => resourceProfileAddedFromJson(json)
case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
.asInstanceOf[SparkListenerEvent]
}
@ -678,6 +721,45 @@ private[spark] object JsonProtocol {
SparkListenerJobEnd(jobId, completionTime, jobResult)
}
def resourceProfileAddedFromJson(json: JValue): SparkListenerResourceProfileAdded = {
val profId = (json \ "Resource Profile Id").extract[Int]
val executorReqs = executorResourceRequestMapFromJson(json \ "Executor Resource Requests")
val taskReqs = taskResourceRequestMapFromJson(json \ "Task Resource Requests")
val rp = new ResourceProfile(executorReqs.toMap, taskReqs.toMap)
rp.setResourceProfileId(profId)
SparkListenerResourceProfileAdded(rp)
}
def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest = {
val rName = (json \ "Resource Name").extract[String]
val amount = (json \ "Amount").extract[Int]
val discoveryScript = (json \ "Discovery Script").extract[String]
val vendor = (json \ "Vendor").extract[String]
new ExecutorResourceRequest(rName, amount, discoveryScript, vendor)
}
def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = {
val rName = (json \ "Resource Name").extract[String]
val amount = (json \ "Amount").extract[Int]
new TaskResourceRequest(rName, amount)
}
def taskResourceRequestMapFromJson(json: JValue): Map[String, TaskResourceRequest] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
val req = taskResourceRequestFromJson(v)
(k, req)
}.toMap
}
def executorResourceRequestMapFromJson(json: JValue): Map[String, ExecutorResourceRequest] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
val req = executorResourceRequestFromJson(v)
(k, req)
}.toMap
}
def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {
// For compatible with previous event logs
val hadoopProperties = jsonOption(json \ "Hadoop Properties").map(mapFromJson(_).toSeq)
@ -804,9 +886,10 @@ private[spark] object JsonProtocol {
}
}
val stageInfo = new StageInfo(
stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details,
resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val rpId = jsonOption(json \ "Resource Profile Id").map(_.extract[Int])
val stageProf = rpId.getOrElse(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos,
parentIds, details, resourceProfileId = stageProf)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
@ -1109,7 +1192,11 @@ private[spark] object JsonProtocol {
case Some(resources) => resourcesMapFromJson(resources).toMap
case None => Map.empty[String, ResourceInformation]
}
new ExecutorInfo(executorHost, totalCores, logUrls, attributes, resources)
val resourceProfileId = jsonOption(json \ "Resource Profile Id") match {
case Some(id) => id.extract[Int]
case None => ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
}
new ExecutorInfo(executorHost, totalCores, logUrls, attributes, resources, resourceProfileId)
}
def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {

View file

@ -282,5 +282,6 @@
[ "/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/hadoop-mapreduce-client-jobclient-2.2.0.jar", "System Classpath" ],
[ "/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/spark-sketch_2.11-2.1.0-SNAPSHOT.jar", "System Classpath" ],
[ "/Users/Jose/IdeaProjects/spark/assembly/target/scala-2.11/jars/pmml-model-1.2.15.jar", "System Classpath" ]
]
],
"resourceProfiles" : [ ]
}

View file

@ -1,4 +1,19 @@
[ {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-01-11T17:44:22.851GMT",
"endTime" : "2020-01-11T17:46:42.615GMT",
"lastUpdated" : "",
"duration" : 139764,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"endTimeEpoch" : 1578764802615,
"startTimeEpoch" : 1578764662851,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {

View file

@ -717,5 +717,6 @@
"isBlacklistedForStage" : false
}
},
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
}

View file

@ -876,5 +876,6 @@
"isBlacklistedForStage" : true
}
},
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
}

View file

@ -41,7 +41,8 @@
"schedulingPool" : "default",
"rddIds" : [ 6, 5 ],
"accumulatorUpdates" : [ ],
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
}, {
"status" : "COMPLETE",
"stageId" : 1,
@ -85,7 +86,8 @@
"schedulingPool" : "default",
"rddIds" : [ 1, 0 ],
"accumulatorUpdates" : [ ],
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
}, {
"status" : "COMPLETE",
"stageId" : 0,
@ -129,5 +131,6 @@
"schedulingPool" : "default",
"rddIds" : [ 0 ],
"accumulatorUpdates" : [ ],
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
} ]

View file

@ -1,4 +1,19 @@
[ {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-01-11T17:44:22.851GMT",
"endTime" : "2020-01-11T17:46:42.615GMT",
"lastUpdated" : "",
"duration" : 139764,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"endTimeEpoch" : 1578764802615,
"startTimeEpoch" : 1578764662851,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {

View file

@ -22,5 +22,6 @@
"executorLogs" : { },
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
} ]

View file

@ -50,7 +50,8 @@
"MajorGCTime" : 144
},
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "3",
"hostPort" : "test-3.vpc.company.com:37641",
@ -116,7 +117,8 @@
"NM_HOST" : "test-3.vpc.company.com",
"CONTAINER_ID" : "container_1553914137147_0018_01_000004"
},
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "2",
"hostPort" : "test-4.vpc.company.com:33179",
@ -182,7 +184,8 @@
"NM_HOST" : "test-4.vpc.company.com",
"CONTAINER_ID" : "container_1553914137147_0018_01_000003"
},
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "1",
"hostPort" : "test-2.vpc.company.com:43764",
@ -248,5 +251,6 @@
"NM_HOST" : "test-2.vpc.company.com",
"CONTAINER_ID" : "container_1553914137147_0018_01_000002"
},
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
} ]

View file

@ -28,7 +28,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "3",
"hostPort" : "172.22.0.167:51485",
@ -62,7 +63,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
} ,{
"id" : "2",
"hostPort" : "172.22.0.167:51487",
@ -96,7 +98,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "1",
"hostPort" : "172.22.0.167:51490",
@ -130,7 +133,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "0",
"hostPort" : "172.22.0.167:51491",
@ -164,5 +168,6 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
} ]

View file

@ -28,7 +28,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "3",
"hostPort" : "172.22.0.167:51485",
@ -62,7 +63,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "2",
"hostPort" : "172.22.0.167:51487",
@ -96,7 +98,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "1",
"hostPort" : "172.22.0.167:51490",
@ -130,7 +133,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "0",
"hostPort" : "172.22.0.167:51491",
@ -164,5 +168,6 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
} ]

View file

@ -22,7 +22,8 @@
"executorLogs" : { },
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "3",
"hostPort" : "172.22.0.111:64543",
@ -50,7 +51,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "2",
"hostPort" : "172.22.0.111:64539",
@ -78,7 +80,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "1",
"hostPort" : "172.22.0.111:64541",
@ -106,7 +109,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "0",
"hostPort" : "172.22.0.111:64540",
@ -134,5 +138,6 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
} ]

View file

@ -28,7 +28,8 @@
},
"blacklistedInStages" : [ ],
"attributes" : { },
"resources" : { }
"resources" : { },
"resourceProfileId" : 0
}, {
"id" : "2",
"hostPort" : "tomg-test:46005",
@ -77,7 +78,8 @@
"name" : "gpu",
"addresses" : [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12" ]
}
}
},
"resourceProfileId" : 0
}, {
"id" : "1",
"hostPort" : "tomg-test:44873",
@ -126,5 +128,6 @@
"name" : "gpu",
"addresses" : [ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12" ]
}
}
},
"resourceProfileId" : 0
} ]

View file

@ -42,5 +42,6 @@
"schedulingPool" : "default",
"rddIds" : [ 3, 2 ],
"accumulatorUpdates" : [ ],
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
} ]

View file

@ -1,4 +1,19 @@
[ {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-01-11T17:44:22.851GMT",
"endTime" : "2020-01-11T17:46:42.615GMT",
"lastUpdated" : "",
"duration" : 139764,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"endTimeEpoch" : 1578764802615,
"startTimeEpoch" : 1578764662851,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {
@ -28,19 +43,4 @@
"endTimeEpoch" : 1554756046454,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1516285256255_0012",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2018-01-18T18:30:35.119GMT",
"endTime" : "2018-01-18T18:38:27.938GMT",
"lastUpdated" : "",
"duration" : 472819,
"sparkUser" : "attilapiros",
"completed" : true,
"appSparkVersion" : "2.3.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"startTimeEpoch" : 1516300235119,
"endTimeEpoch" : 1516300707938
} ]
} ]

View file

@ -1,5 +1,19 @@
[
{
[ {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-01-11T17:44:22.851GMT",
"endTime" : "2020-01-11T17:46:42.615GMT",
"lastUpdated" : "",
"duration" : 139764,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"endTimeEpoch" : 1578764802615,
"startTimeEpoch" : 1578764662851,
"lastUpdatedEpoch" : 0
} ]
}, {
"id": "application_1555004656427_0144",
"name": "Spark shell",
"attempts": [

View file

@ -1,4 +1,19 @@
[ {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-01-11T17:44:22.851GMT",
"endTime" : "2020-01-11T17:46:42.615GMT",
"lastUpdated" : "",
"duration" : 139764,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"endTimeEpoch" : 1578764802615,
"startTimeEpoch" : 1578764662851,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {

View file

@ -0,0 +1,112 @@
{
"runtime" : {
"javaVersion" : "1.8.0_232 (Private Build)",
"javaHome" : "/usr/lib/jvm/java-8-openjdk-amd64/jre",
"scalaVersion" : "version 2.12.10"
},
"sparkProperties" : [ ],
"hadoopProperties" : [ ],
"systemProperties" : [ ],
"classpathEntries" : [ ],
"resourceProfiles" : [ {
"id" : 0,
"executorResources" : {
"cores" : {
"resourceName" : "cores",
"amount" : 1,
"discoveryScript" : "",
"vendor" : ""
},
"memory" : {
"resourceName" : "memory",
"amount" : 1024,
"discoveryScript" : "",
"vendor" : ""
},
"gpu" : {
"resourceName" : "gpu",
"amount" : 1,
"discoveryScript" : "/home/tgraves/getGpus",
"vendor" : ""
}
},
"taskResources" : {
"cpus" : {
"resourceName" : "cpus",
"amount" : 1.0
},
"gpu" : {
"resourceName" : "gpu",
"amount" : 1.0
}
}
}, {
"id" : 1,
"executorResources" : {
"cores" : {
"resourceName" : "cores",
"amount" : 4,
"discoveryScript" : "",
"vendor" : ""
},
"gpu" : {
"resourceName" : "gpu",
"amount" : 1,
"discoveryScript" : "./getGpus",
"vendor" : ""
}
},
"taskResources" : {
"cpus" : {
"resourceName" : "cpus",
"amount" : 1.0
},
"gpu" : {
"resourceName" : "gpu",
"amount" : 1.0
}
}
}, {
"id" : 2,
"executorResources" : {
"cores" : {
"resourceName" : "cores",
"amount" : 2,
"discoveryScript" : "",
"vendor" : ""
}
},
"taskResources" : {
"cpus" : {
"resourceName" : "cpus",
"amount" : 2.0
}
}
}, {
"id" : 3,
"executorResources" : {
"cores" : {
"resourceName" : "cores",
"amount" : 4,
"discoveryScript" : "",
"vendor" : ""
},
"gpu" : {
"resourceName" : "gpu",
"amount" : 1,
"discoveryScript" : "./getGpus",
"vendor" : ""
}
},
"taskResources" : {
"cpus" : {
"resourceName" : "cpus",
"amount" : 2.0
},
"gpu" : {
"resourceName" : "gpu",
"amount" : 1.0
}
}
} ]
}

View file

@ -462,5 +462,6 @@
"isBlacklistedForStage" : false
}
},
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
}

View file

@ -462,5 +462,6 @@
"isBlacklistedForStage" : false
}
},
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
} ]

View file

@ -41,7 +41,8 @@
"schedulingPool" : "default",
"rddIds" : [ 6, 5 ],
"accumulatorUpdates" : [ ],
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
}, {
"status" : "FAILED",
"stageId" : 2,
@ -86,7 +87,8 @@
"schedulingPool" : "default",
"rddIds" : [ 3, 2 ],
"accumulatorUpdates" : [ ],
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
}, {
"status" : "COMPLETE",
"stageId" : 1,
@ -130,7 +132,8 @@
"schedulingPool" : "default",
"rddIds" : [ 1, 0 ],
"accumulatorUpdates" : [ ],
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
}, {
"status" : "COMPLETE",
"stageId" : 0,
@ -174,5 +177,6 @@
"schedulingPool" : "default",
"rddIds" : [ 0 ],
"accumulatorUpdates" : [ ],
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
} ]

View file

@ -45,5 +45,6 @@
"name" : "my counter",
"value" : "5050"
} ],
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
} ]

View file

@ -506,5 +506,6 @@
"isBlacklistedForStage" : false
}
},
"killedTasksSummary" : { }
"killedTasksSummary" : { },
"resourceProfileId" : 0
}

View file

@ -0,0 +1,27 @@
{"Event":"SparkListenerLogStart","Spark Version":"3.0.0-SNAPSHOT"}
{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":0,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":1,"Discovery Script":"","Vendor":""},"memory":{"Resource Name":"memory","Amount":1024,"Discovery Script":"","Vendor":""},"gpu":{"Resource Name":"gpu","Amount":1,"Discovery Script":"/home/tgraves/getGpus","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":1.0},"gpu":{"Resource Name":"gpu","Amount":1.0}}}
{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"10.10.10.10","Port":32957},"Maximum Memory":428762726,"Timestamp":1578764671818,"Maximum Onheap Memory":428762726,"Maximum Offheap Memory":0}
{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/lib/jvm/java-8-openjdk-amd64/jre","Java Version":"1.8.0_232 (Private Build)","Scala Version":"version 2.12.10"},"Spark Properties":{},"Hadoop Properties":{},"System Properties":{}, "Classpath Entries": {}}
{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"application_1578436911597_0052","Timestamp":1578764662851,"User":"tgraves"}
{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":1,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":4,"Discovery Script":"","Vendor":""},"gpu":{"Resource Name":"gpu","Amount":1,"Discovery Script":"./getGpus","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":1.0},"gpu":{"Resource Name":"gpu","Amount":1.0}}}
{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":2,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":2,"Discovery Script":"","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":2.0}}}
{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":3,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":4,"Discovery Script":"","Vendor":""},"gpu":{"Resource Name":"gpu","Amount":1,"Discovery Script":"./getGpus","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":2.0},"gpu":{"Resource Name":"gpu","Amount":1.0}}}
{"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1578764765274,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"collect at <console>:29","Number of Tasks":6,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:31","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":6,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Callsite":"parallelize at <console>:31","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":6,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.rdd.RDD.collect(RDD.scala:1004)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:29)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:39)\n$line37.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:41)\n$line37.$read$$iw$$iw$$iw$$iw.<init>(<console>:43)\n$line37.$read$$iw$$iw$$iw.<init>(<console>:45)\n$line37.$read$$iw$$iw.<init>(<console>:47)\n$line37.$read$$iw.<init>(<console>:49)\n$line37.$read.<init>(<console>:51)\n$line37.$read$.<init>(<console>:55)\n$line37.$read$.<clinit>(<console>)\n$line37.$eval$.$print$lzycompute(<console>:7)\n$line37.$eval$.$print(<console>:6)\n$line37.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)","Accumulables":[],"Resource Profile Id":3}],"Stage IDs":[0],"Properties":{"spark.rdd.scope":"{\"id\":\"2\",\"name\":\"collect\"}","spark.rdd.scope.noOverride":"true"}}
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"collect at <console>:29","Number of Tasks":6,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:31","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":6,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Callsite":"parallelize at <console>:31","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":6,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.rdd.RDD.collect(RDD.scala:1004)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:29)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:39)\n$line37.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:41)\n$line37.$read$$iw$$iw$$iw$$iw.<init>(<console>:43)\n$line37.$read$$iw$$iw$$iw.<init>(<console>:45)\n$line37.$read$$iw$$iw.<init>(<console>:47)\n$line37.$read$$iw.<init>(<console>:49)\n$line37.$read.<init>(<console>:51)\n$line37.$read$.<init>(<console>:55)\n$line37.$read$.<clinit>(<console>)\n$line37.$eval$.$print$lzycompute(<console>:7)\n$line37.$eval$.$print(<console>:6)\n$line37.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)","Submission Time":1578764765293,"Accumulables":[],"Resource Profile Id":3},"Properties":{"spark.rdd.scope":"{\"id\":\"2\",\"name\":\"collect\"}","spark.rdd.scope.noOverride":"true"}}
{"Event":"SparkListenerExecutorAdded","Timestamp":1578764769706,"Executor ID":"1","Executor Info":{"Host":"host1","Total Cores":4,"Log Urls":{"stdout":"http://host1:8042/node/containerlogs/container_1578436911597_0052_01_000002/tgraves/stdout?start=-4096","stderr":"http://host1:8042/node/containerlogs/container_1578436911597_0052_01_000002/tgraves/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host1:8042","USER":"tgraves","LOG_FILES":"stderr,stdout","NM_HTTP_PORT":"8042","CLUSTER_ID":"","NM_PORT":"37783","HTTP_SCHEME":"http://","NM_HOST":"host1","CONTAINER_ID":"container_1578436911597_0052_01_000002"},"Resources":{"gpu":{"name":"gpu","addresses":["0","1","2"]}},"Resource Profile Id":3}}
{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":"host1","Port":40787},"Maximum Memory":384093388,"Timestamp":1578764769796,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":0}
{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1578764769858,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1578764769877,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":2,"Index":2,"Attempt":0,"Launch Time":1578764770507,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":3,"Index":3,"Attempt":0,"Launch Time":1578764770509,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1578764769858,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770512,"Failed":false,"Killed":false,"Accumulables":[{"ID":6,"Name":"internal.metrics.resultSerializationTime","Update":2,"Value":2,"Internal":true,"Count Failed Values":true},{"ID":5,"Name":"internal.metrics.jvmGCTime","Update":49,"Value":49,"Internal":true,"Count Failed Values":true},{"ID":4,"Name":"internal.metrics.resultSize","Update":3706,"Value":3706,"Internal":true,"Count Failed Values":true},{"ID":3,"Name":"internal.metrics.executorCpuTime","Update":20740892,"Value":20740892,"Internal":true,"Count Failed Values":true},{"ID":2,"Name":"internal.metrics.executorRunTime","Update":32,"Value":32,"Internal":true,"Count Failed Values":true},{"ID":1,"Name":"internal.metrics.executorDeserializeCpuTime","Update":250921658,"Value":250921658,"Internal":true,"Count Failed Values":true},{"ID":0,"Name":"internal.metrics.executorDeserializeTime","Update":555,"Value":555,"Internal":true,"Count Failed Values":true}]},"Task Executor Metrics":{"JVMHeapMemory":0,"JVMOffHeapMemory":0,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":0,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":0,"OffHeapUnifiedMemory":0,"DirectPoolMemory":0,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":0,"ProcessTreeJVMRSSMemory":0,"ProcessTreePythonVMemory":0,"ProcessTreePythonRSSMemory":0,"ProcessTreeOtherVMemory":0,"ProcessTreeOtherRSSMemory":0,"MinorGCCount":0,"MinorGCTime":0,"MajorGCCount":0,"MajorGCTime":0},"Task Metrics":{"Executor Deserialize Time":555,"Executor Deserialize CPU Time":250921658,"Executor Run Time":32,"Executor CPU Time":20740892,"Peak Execution Memory":0,"Result Size":3706,"JVM GC Time":49,"Result Serialization Time":2,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":0,"Shuffle Write Time":0,"Shuffle Records Written":0},"Input Metrics":{"Bytes Read":0,"Records Read":0},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}}
{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1578764769877,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770515,"Failed":false,"Killed":false,"Accumulables":[{"ID":6,"Name":"internal.metrics.resultSerializationTime","Update":2,"Value":4,"Internal":true,"Count Failed Values":true},{"ID":5,"Name":"internal.metrics.jvmGCTime","Update":49,"Value":98,"Internal":true,"Count Failed Values":true},{"ID":4,"Name":"internal.metrics.resultSize","Update":3722,"Value":7428,"Internal":true,"Count Failed Values":true},{"ID":3,"Name":"internal.metrics.executorCpuTime","Update":25185125,"Value":45926017,"Internal":true,"Count Failed Values":true},{"ID":2,"Name":"internal.metrics.executorRunTime","Update":32,"Value":64,"Internal":true,"Count Failed Values":true},{"ID":1,"Name":"internal.metrics.executorDeserializeCpuTime","Update":416274503,"Value":667196161,"Internal":true,"Count Failed Values":true},{"ID":0,"Name":"internal.metrics.executorDeserializeTime","Update":555,"Value":1110,"Internal":true,"Count Failed Values":true}]},"Task Executor Metrics":{"JVMHeapMemory":0,"JVMOffHeapMemory":0,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":0,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":0,"OffHeapUnifiedMemory":0,"DirectPoolMemory":0,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":0,"ProcessTreeJVMRSSMemory":0,"ProcessTreePythonVMemory":0,"ProcessTreePythonRSSMemory":0,"ProcessTreeOtherVMemory":0,"ProcessTreeOtherRSSMemory":0,"MinorGCCount":0,"MinorGCTime":0,"MajorGCCount":0,"MajorGCTime":0},"Task Metrics":{"Executor Deserialize Time":555,"Executor Deserialize CPU Time":416274503,"Executor Run Time":32,"Executor CPU Time":25185125,"Peak Execution Memory":0,"Result Size":3722,"JVM GC Time":49,"Result Serialization Time":2,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":0,"Shuffle Write Time":0,"Shuffle Records Written":0},"Input Metrics":{"Bytes Read":0,"Records Read":0},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}}
{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":4,"Index":4,"Attempt":0,"Launch Time":1578764770525,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":2,"Index":2,"Attempt":0,"Launch Time":1578764770507,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770526,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3636,"Value":11064,"Internal":true,"Count Failed Values":true},{"ID":3,"Name":"internal.metrics.executorCpuTime","Update":2203515,"Value":48129532,"Internal":true,"Count Failed Values":true},{"ID":2,"Name":"internal.metrics.executorRunTime","Update":2,"Value":66,"Internal":true,"Count Failed Values":true},{"ID":1,"Name":"internal.metrics.executorDeserializeCpuTime","Update":2733237,"Value":669929398,"Internal":true,"Count Failed Values":true},{"ID":0,"Name":"internal.metrics.executorDeserializeTime","Update":2,"Value":1112,"Internal":true,"Count Failed Values":true}]},"Task Executor Metrics":{"JVMHeapMemory":0,"JVMOffHeapMemory":0,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":0,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":0,"OffHeapUnifiedMemory":0,"DirectPoolMemory":0,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":0,"ProcessTreeJVMRSSMemory":0,"ProcessTreePythonVMemory":0,"ProcessTreePythonRSSMemory":0,"ProcessTreeOtherVMemory":0,"ProcessTreeOtherRSSMemory":0,"MinorGCCount":0,"MinorGCTime":0,"MajorGCCount":0,"MajorGCTime":0},"Task Metrics":{"Executor Deserialize Time":2,"Executor Deserialize CPU Time":2733237,"Executor Run Time":2,"Executor CPU Time":2203515,"Peak Execution Memory":0,"Result Size":3636,"JVM GC Time":0,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":0,"Shuffle Write Time":0,"Shuffle Records Written":0},"Input Metrics":{"Bytes Read":0,"Records Read":0},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}}
{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":5,"Index":5,"Attempt":0,"Launch Time":1578764770527,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":3,"Index":3,"Attempt":0,"Launch Time":1578764770509,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770529,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3620,"Value":14684,"Internal":true,"Count Failed Values":true},{"ID":3,"Name":"internal.metrics.executorCpuTime","Update":2365599,"Value":50495131,"Internal":true,"Count Failed Values":true},{"ID":2,"Name":"internal.metrics.executorRunTime","Update":2,"Value":68,"Internal":true,"Count Failed Values":true},{"ID":1,"Name":"internal.metrics.executorDeserializeCpuTime","Update":3387884,"Value":673317282,"Internal":true,"Count Failed Values":true},{"ID":0,"Name":"internal.metrics.executorDeserializeTime","Update":3,"Value":1115,"Internal":true,"Count Failed Values":true}]},"Task Executor Metrics":{"JVMHeapMemory":0,"JVMOffHeapMemory":0,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":0,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":0,"OffHeapUnifiedMemory":0,"DirectPoolMemory":0,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":0,"ProcessTreeJVMRSSMemory":0,"ProcessTreePythonVMemory":0,"ProcessTreePythonRSSMemory":0,"ProcessTreeOtherVMemory":0,"ProcessTreeOtherRSSMemory":0,"MinorGCCount":0,"MinorGCTime":0,"MajorGCCount":0,"MajorGCTime":0},"Task Metrics":{"Executor Deserialize Time":3,"Executor Deserialize CPU Time":3387884,"Executor Run Time":2,"Executor CPU Time":2365599,"Peak Execution Memory":0,"Result Size":3620,"JVM GC Time":0,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":0,"Shuffle Write Time":0,"Shuffle Records Written":0},"Input Metrics":{"Bytes Read":0,"Records Read":0},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}}
{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":4,"Index":4,"Attempt":0,"Launch Time":1578764770525,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770542,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3636,"Value":18320,"Internal":true,"Count Failed Values":true},{"ID":3,"Name":"internal.metrics.executorCpuTime","Update":2456346,"Value":52951477,"Internal":true,"Count Failed Values":true},{"ID":2,"Name":"internal.metrics.executorRunTime","Update":2,"Value":70,"Internal":true,"Count Failed Values":true},{"ID":1,"Name":"internal.metrics.executorDeserializeCpuTime","Update":3502860,"Value":676820142,"Internal":true,"Count Failed Values":true},{"ID":0,"Name":"internal.metrics.executorDeserializeTime","Update":3,"Value":1118,"Internal":true,"Count Failed Values":true}]},"Task Executor Metrics":{"JVMHeapMemory":0,"JVMOffHeapMemory":0,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":0,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":0,"OffHeapUnifiedMemory":0,"DirectPoolMemory":0,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":0,"ProcessTreeJVMRSSMemory":0,"ProcessTreePythonVMemory":0,"ProcessTreePythonRSSMemory":0,"ProcessTreeOtherVMemory":0,"ProcessTreeOtherRSSMemory":0,"MinorGCCount":0,"MinorGCTime":0,"MajorGCCount":0,"MajorGCTime":0},"Task Metrics":{"Executor Deserialize Time":3,"Executor Deserialize CPU Time":3502860,"Executor Run Time":2,"Executor CPU Time":2456346,"Peak Execution Memory":0,"Result Size":3636,"JVM GC Time":0,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":0,"Shuffle Write Time":0,"Shuffle Records Written":0},"Input Metrics":{"Bytes Read":0,"Records Read":0},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}}
{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":5,"Index":5,"Attempt":0,"Launch Time":1578764770527,"Executor ID":"1","Host":"host1","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1578764770542,"Failed":false,"Killed":false,"Accumulables":[{"ID":4,"Name":"internal.metrics.resultSize","Update":3636,"Value":21956,"Internal":true,"Count Failed Values":true},{"ID":3,"Name":"internal.metrics.executorCpuTime","Update":2162370,"Value":55113847,"Internal":true,"Count Failed Values":true},{"ID":2,"Name":"internal.metrics.executorRunTime","Update":2,"Value":72,"Internal":true,"Count Failed Values":true},{"ID":1,"Name":"internal.metrics.executorDeserializeCpuTime","Update":3622437,"Value":680442579,"Internal":true,"Count Failed Values":true},{"ID":0,"Name":"internal.metrics.executorDeserializeTime","Update":3,"Value":1121,"Internal":true,"Count Failed Values":true}]},"Task Executor Metrics":{"JVMHeapMemory":0,"JVMOffHeapMemory":0,"OnHeapExecutionMemory":0,"OffHeapExecutionMemory":0,"OnHeapStorageMemory":0,"OffHeapStorageMemory":0,"OnHeapUnifiedMemory":0,"OffHeapUnifiedMemory":0,"DirectPoolMemory":0,"MappedPoolMemory":0,"ProcessTreeJVMVMemory":0,"ProcessTreeJVMRSSMemory":0,"ProcessTreePythonVMemory":0,"ProcessTreePythonRSSMemory":0,"ProcessTreeOtherVMemory":0,"ProcessTreeOtherRSSMemory":0,"MinorGCCount":0,"MinorGCTime":0,"MajorGCCount":0,"MajorGCTime":0},"Task Metrics":{"Executor Deserialize Time":3,"Executor Deserialize CPU Time":3622437,"Executor Run Time":2,"Executor CPU Time":2162370,"Peak Execution Memory":0,"Result Size":3636,"JVM GC Time":0,"Result Serialization Time":0,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write Metrics":{"Shuffle Bytes Written":0,"Shuffle Write Time":0,"Shuffle Records Written":0},"Input Metrics":{"Bytes Read":0,"Records Read":0},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}}
{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"collect at <console>:29","Number of Tasks":6,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:31","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":6,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Callsite":"parallelize at <console>:31","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":6,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.rdd.RDD.collect(RDD.scala:1004)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:29)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:33)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:35)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:37)\n$line37.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:39)\n$line37.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:41)\n$line37.$read$$iw$$iw$$iw$$iw.<init>(<console>:43)\n$line37.$read$$iw$$iw$$iw.<init>(<console>:45)\n$line37.$read$$iw$$iw.<init>(<console>:47)\n$line37.$read$$iw.<init>(<console>:49)\n$line37.$read.<init>(<console>:51)\n$line37.$read$.<init>(<console>:55)\n$line37.$read$.<clinit>(<console>)\n$line37.$eval$.$print$lzycompute(<console>:7)\n$line37.$eval$.$print(<console>:6)\n$line37.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)","Submission Time":1578764765293,"Completion Time":1578764770543,"Accumulables":[{"ID":2,"Name":"internal.metrics.executorRunTime","Value":72,"Internal":true,"Count Failed Values":true},{"ID":5,"Name":"internal.metrics.jvmGCTime","Value":98,"Internal":true,"Count Failed Values":true},{"ID":4,"Name":"internal.metrics.resultSize","Value":21956,"Internal":true,"Count Failed Values":true},{"ID":1,"Name":"internal.metrics.executorDeserializeCpuTime","Value":680442579,"Internal":true,"Count Failed Values":true},{"ID":3,"Name":"internal.metrics.executorCpuTime","Value":55113847,"Internal":true,"Count Failed Values":true},{"ID":6,"Name":"internal.metrics.resultSerializationTime","Value":4,"Internal":true,"Count Failed Values":true},{"ID":0,"Name":"internal.metrics.executorDeserializeTime","Value":1121,"Internal":true,"Count Failed Values":true}],"Resource Profile Id":3}}
{"Event":"SparkListenerJobEnd","Job ID":0,"Completion Time":1578764770546,"Job Result":{"Result":"JobSucceeded"}}
{"Event":"SparkListenerApplicationEnd","Timestamp":1578764802615}

View file

@ -1442,7 +1442,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
conf: SparkConf,
clock: Clock = new SystemClock()): ExecutorAllocationManager = {
ResourceProfile.reInitDefaultProfile(conf)
rpManager = new ResourceProfileManager(conf)
rpManager = new ResourceProfileManager(conf, listenerBus)
val manager = new ExecutorAllocationManager(client, listenerBus, conf, clock = clock,
resourceProfileManager = rpManager)
managers += manager

View file

@ -171,6 +171,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
"executor node blacklisting unblacklisting" -> "applications/app-20161115172038-0000/executors",
"executor memory usage" -> "applications/app-20161116163331-0000/executors",
"executor resource information" -> "applications/application_1555004656427_0144/executors",
"multiple resource profiles" -> "applications/application_1578436911597_0052/environment",
"app environment" -> "applications/app-20161116163331-0000/environment",

View file

@ -20,6 +20,7 @@ package org.apache.spark.resource
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.scheduler.LiveListenerBus
class ResourceProfileManagerSuite extends SparkFunSuite {
@ -39,9 +40,11 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
}
}
val listenerBus = new LiveListenerBus(new SparkConf())
test("ResourceProfileManager") {
val conf = new SparkConf().set(EXECUTOR_CORES, 4)
val rpmanager = new ResourceProfileManager(conf)
val rpmanager = new ResourceProfileManager(conf, listenerBus)
val defaultProf = rpmanager.defaultResourceProfile
assert(defaultProf.id === ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
assert(defaultProf.executorResources.size === 2,
@ -53,7 +56,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
test("isSupported yarn no dynamic allocation") {
val conf = new SparkConf().setMaster("yarn").set(EXECUTOR_CORES, 4)
conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
val rpmanager = new ResourceProfileManager(conf)
val rpmanager = new ResourceProfileManager(conf, listenerBus)
// default profile should always work
val defaultProf = rpmanager.defaultResourceProfile
val rprof = new ResourceProfileBuilder()
@ -71,7 +74,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
val conf = new SparkConf().setMaster("yarn").set(EXECUTOR_CORES, 4)
conf.set(DYN_ALLOCATION_ENABLED, true)
conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
val rpmanager = new ResourceProfileManager(conf)
val rpmanager = new ResourceProfileManager(conf, listenerBus)
// default profile should always work
val defaultProf = rpmanager.defaultResourceProfile
val rprof = new ResourceProfileBuilder()
@ -84,7 +87,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
test("isSupported yarn with local mode") {
val conf = new SparkConf().setMaster("local").set(EXECUTOR_CORES, 4)
conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
val rpmanager = new ResourceProfileManager(conf)
val rpmanager = new ResourceProfileManager(conf, listenerBus)
// default profile should always work
val defaultProf = rpmanager.defaultResourceProfile
val rprof = new ResourceProfileBuilder()
@ -100,7 +103,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
test("ResourceProfileManager has equivalent profile") {
val conf = new SparkConf().set(EXECUTOR_CORES, 4)
val rpmanager = new ResourceProfileManager(conf)
val rpmanager = new ResourceProfileManager(conf, listenerBus)
var rpAlreadyExist: Option[ResourceProfile] = None
val checkId = 500
for (i <- 1 to 1000) {

View file

@ -92,7 +92,8 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
accumulatorUpdates = Seq(new UIAccumulableInfo(0L, "acc", None, "value")),
tasks = None,
executorSummary = None,
killedTasksSummary = Map.empty
killedTasksSummary = Map.empty,
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
)
val taskTable = new TaskPagedTable(
stageData,

View file

@ -32,7 +32,7 @@ import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceUtils}
import org.apache.spark.resource._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.shuffle.MetadataFetchFailedException
@ -92,7 +92,7 @@ class JsonProtocolSuite extends SparkFunSuite {
42L, "Garfield", Some("appAttempt"), Some(logUrlMap))
val applicationEnd = SparkListenerApplicationEnd(42L)
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes, resources.toMap))
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, attributes, resources.toMap, 4))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
val executorBlacklisted = SparkListenerExecutorBlacklisted(executorBlacklistedTime, "exec1", 22)
val executorUnblacklisted =
@ -119,6 +119,14 @@ class JsonProtocolSuite extends SparkFunSuite {
SparkListenerStageExecutorMetrics("1", 2, 3,
new ExecutorMetrics(Array(543L, 123456L, 12345L, 1234L, 123L, 12L, 432L,
321L, 654L, 765L, 256912L, 123456L, 123456L, 61728L, 30364L, 15182L, 10L, 90L, 2L, 20L)))
val rprofBuilder = new ResourceProfileBuilder()
val taskReq = new TaskResourceRequests().cpus(1).resource("gpu", 1)
val execReq =
new ExecutorResourceRequests().cores(2).resource("gpu", 2, "myscript")
rprofBuilder.require(taskReq).require(execReq)
val resourceProfile = rprofBuilder.build
resourceProfile.setResourceProfileId(21)
val resourceProfileAdded = SparkListenerResourceProfileAdded(resourceProfile)
testEvent(stageSubmitted, stageSubmittedJsonString)
testEvent(stageCompleted, stageCompletedJsonString)
testEvent(taskStart, taskStartJsonString)
@ -144,6 +152,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testEvent(executorMetricsUpdate, executorMetricsUpdateJsonString)
testEvent(blockUpdated, blockUpdatedJsonString)
testEvent(stageExecutorMetrics, stageExecutorMetricsJsonString)
testEvent(resourceProfileAdded, resourceProfileJsonString)
}
test("Dependent Classes") {
@ -231,6 +240,20 @@ class JsonProtocolSuite extends SparkFunSuite {
assert(0 === newInfo.accumulables.size)
}
test("StageInfo resourceProfileId") {
val info = makeStageInfo(1, 2, 3, 4L, 5L, 5)
val json = JsonProtocol.stageInfoToJson(info)
// Fields added after 1.0.0.
assert(info.details.nonEmpty)
assert(info.resourceProfileId === 5)
val newInfo = JsonProtocol.stageInfoFromJson(json)
assert(info.name === newInfo.name)
assert(5 === newInfo.resourceProfileId)
}
test("InputMetrics backward compatibility") {
// InputMetrics were added after 1.0.1.
val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = false)
@ -865,6 +888,10 @@ private[spark] object JsonProtocolSuite extends Assertions {
assert(ste1.getFileName === ste2.getFileName)
}
private def assertEquals(rp1: ResourceProfile, rp2: ResourceProfile): Unit = {
assert(rp1 === rp2)
}
/** ----------------------------------- *
| Util methods for constructing events |
* ------------------------------------ */
@ -895,10 +922,16 @@ private[spark] object JsonProtocolSuite extends Assertions {
r
}
private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
private def makeStageInfo(
a: Int,
b: Int,
c: Int,
d: Long,
e: Long,
rpId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) = {
val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, Seq(100, 200, 300), "details",
resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
resourceProfileId = rpId)
val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
stageInfo.accumulables(acc1.id) = acc1
stageInfo.accumulables(acc2.id) = acc2
@ -1034,7 +1067,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Internal": false,
| "Count Failed Values": false
| }
| ]
| ],
| "Resource Profile Id" : 0
| },
| "Properties": {
| "France": "Paris",
@ -1091,7 +1125,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Internal": false,
| "Count Failed Values": false
| }
| ]
| ],
| "Resource Profile Id" : 0
| }
|}
""".stripMargin
@ -1613,7 +1648,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Internal": false,
| "Count Failed Values": false
| }
| ]
| ],
| "Resource Profile Id" : 0
| },
| {
| "Stage ID": 2,
@ -1673,7 +1709,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Internal": false,
| "Count Failed Values": false
| }
| ]
| ],
| "Resource Profile Id" : 0
| },
| {
| "Stage ID": 3,
@ -1749,7 +1786,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Internal": false,
| "Count Failed Values": false
| }
| ]
| ],
| "Resource Profile Id" : 0
| },
| {
| "Stage ID": 4,
@ -1841,7 +1879,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Internal": false,
| "Count Failed Values": false
| }
| ]
| ],
| "Resource Profile Id" : 0
| }
| ],
| "Stage IDs": [
@ -1988,7 +2027,8 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "name" : "gpu",
| "addresses" : [ "0", "1" ]
| }
| }
| },
| "Resource Profile Id": 4
| }
|}
""".stripMargin
@ -2334,6 +2374,38 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "hostId" : "node1"
|}
""".stripMargin
private val resourceProfileJsonString =
"""
|{
| "Event":"SparkListenerResourceProfileAdded",
| "Resource Profile Id":21,
| "Executor Resource Requests":{
| "cores" : {
| "Resource Name":"cores",
| "Amount":2,
| "Discovery Script":"",
| "Vendor":""
| },
| "gpu":{
| "Resource Name":"gpu",
| "Amount":2,
| "Discovery Script":"myscript",
| "Vendor":""
| }
| },
| "Task Resource Requests":{
| "cpus":{
| "Resource Name":"cpus",
| "Amount":1.0
| },
| "gpu":{
| "Resource Name":"gpu",
| "Amount":1.0
| }
| }
|}
""".stripMargin
}
case class TestListenerEvent(foo: String, bar: Int) extends SparkListenerEvent

View file

@ -124,3 +124,4 @@ vote.tmpl
SessionManager.java
SessionHandler.java
GangliaReporter.java
application_1578436911597_0052

View file

@ -33,7 +33,7 @@ import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.resource.ResourceProfileManager
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{ExecutorKilled, TaskSchedulerImpl}
import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
@ -87,7 +87,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
private var schedulerBackendUnderTest: KubernetesClusterSchedulerBackend = _
private val resourceProfileManager = new ResourceProfileManager(sparkConf)
private val listenerBus = new LiveListenerBus(new SparkConf())
private val resourceProfileManager = new ResourceProfileManager(sparkConf, listenerBus)
before {
MockitoAnnotations.initMocks(this)