[SPARK-22462][SQL] Make rdd-based actions in Dataset trackable in SQL UI

## What changes were proposed in this pull request?

For the few Dataset actions such as `foreach`, currently no SQL metrics are visible in the SQL tab of SparkUI. It is because it binds wrongly to Dataset's `QueryExecution`. As the actions directly evaluate on the RDD which has individual `QueryExecution`, to show correct SQL metrics on UI, we should bind to RDD's `QueryExecution`.

## How was this patch tested?

Manually test. Screenshot is attached in the PR.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19689 from viirya/SPARK-22462.
This commit is contained in:
Liang-Chi Hsieh 2017-11-11 12:34:30 +01:00 committed by Wenchen Fan
parent 223d83ee93
commit 154351e6db

View file

@ -2594,7 +2594,7 @@ class Dataset[T] private[sql](
* @group action
* @since 1.6.0
*/
def foreach(f: T => Unit): Unit = withNewExecutionId {
def foreach(f: T => Unit): Unit = withNewRDDExecutionId {
rdd.foreach(f)
}
@ -2613,7 +2613,7 @@ class Dataset[T] private[sql](
* @group action
* @since 1.6.0
*/
def foreachPartition(f: Iterator[T] => Unit): Unit = withNewExecutionId {
def foreachPartition(f: Iterator[T] => Unit): Unit = withNewRDDExecutionId {
rdd.foreachPartition(f)
}
@ -2851,6 +2851,12 @@ class Dataset[T] private[sql](
*/
def unpersist(): this.type = unpersist(blocking = false)
// Represents the `QueryExecution` used to produce the content of the Dataset as an `RDD`.
@transient private lazy val rddQueryExecution: QueryExecution = {
val deserialized = CatalystSerde.deserialize[T](logicalPlan)
sparkSession.sessionState.executePlan(deserialized)
}
/**
* Represents the content of the Dataset as an `RDD` of `T`.
*
@ -2859,8 +2865,7 @@ class Dataset[T] private[sql](
*/
lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
val deserialized = CatalystSerde.deserialize[T](logicalPlan)
sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows =>
rddQueryExecution.toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType).asInstanceOf[T])
}
}
@ -3113,6 +3118,20 @@ class Dataset[T] private[sql](
SQLExecution.withNewExecutionId(sparkSession, queryExecution)(body)
}
/**
* Wrap an action of the Dataset's RDD to track all Spark jobs in the body so that we can connect
* them with an execution. Before performing the action, the metrics of the executed plan will be
* reset.
*/
private def withNewRDDExecutionId[U](body: => U): U = {
SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) {
rddQueryExecution.executedPlan.foreach { plan =>
plan.resetMetrics()
}
body
}
}
/**
* Wrap a Dataset action to track the QueryExecution and time cost, then report to the
* user-registered callback functions.