[SPARK-27639][SQL] InMemoryTableScan shows the table name on UI if possible

## What changes were proposed in this pull request?
<img src="https://user-images.githubusercontent.com/5399861/57213799-7bccf100-701a-11e9-9872-d90b4a185dc6.png" width="200">

It only shows `InMemoryTableScan` when scanning InMemoryTable.
When there are many InMemoryTables, it is difficult to distinguish which one is what we are looking for. This PR show the table name when scanning InMemoryTable.

## How was this patch tested?

unit tests and manual tests

After this PR:
<img src="https://user-images.githubusercontent.com/5399861/57269120-d3219e80-70b8-11e9-9e56-1b5d4c071660.png" width="200">

Closes #24534 from wangyum/SPARK-27639.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Yuming Wang 2019-05-07 21:00:13 -07:00 committed by Dongjoon Hyun
parent 8329e7debd
commit 3ea44e52e7
5 changed files with 39 additions and 5 deletions

View file

@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
@ -50,6 +51,7 @@ trait CodegenSupport extends SparkPlan {
case _: SortMergeJoinExec => "smj"
case _: RDDScanExec => "rdd"
case _: DataSourceScanExec => "scan"
case _: InMemoryTableScanExec => "memoryScan"
case _ => nodeName.toLowerCase(Locale.ROOT)
}

View file

@ -55,6 +55,9 @@ case class CachedRDDBuilder(
val sizeInBytesStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator
val rowCountStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator
val cachedName = tableName.map(n => s"In-memory table $n")
.getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024))
def cachedColumnBuffers: RDD[CachedBatch] = {
if (_cachedColumnBuffers == null) {
synchronized {
@ -130,9 +133,7 @@ case class CachedRDDBuilder(
}
}.persist(storageLevel)
cached.setName(
tableName.map(n => s"In-memory table $n")
.getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024)))
cached.setName(cachedName)
cached
}
}

View file

@ -36,6 +36,15 @@ case class InMemoryTableScanExec(
@transient relation: InMemoryRelation)
extends LeafExecNode with ColumnarBatchScan {
override val nodeName: String = {
relation.cacheBuilder.tableName match {
case Some(_) =>
"Scan " + relation.cacheBuilder.cachedName
case _ =>
super.nodeName
}
}
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren
override def doCanonicalize(): SparkPlan =

View file

@ -574,4 +574,26 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
)
}
}
test("InMemoryTableScan shows the table name on UI if possible") {
// Show table name on UI
withView("inMemoryTable", "```a``b```") {
sql("CREATE TEMPORARY VIEW inMemoryTable AS SELECT 1 AS c1")
sql("CACHE TABLE inMemoryTable")
testSparkPlanMetrics(spark.table("inMemoryTable"), 1,
Map(0L -> (("Scan In-memory table `inMemoryTable`", Map.empty)))
)
sql("CREATE TEMPORARY VIEW ```a``b``` AS SELECT 2 AS c1")
sql("CACHE TABLE ```a``b```")
testSparkPlanMetrics(spark.table("```a``b```"), 1,
Map(0L -> (("Scan In-memory table ```a``b```", Map.empty)))
)
}
// Show InMemoryTableScan on UI
testSparkPlanMetrics(spark.range(1).cache().select("id"), 1,
Map(0L -> (("InMemoryTableScan", Map.empty)))
)
}
}

View file

@ -295,7 +295,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val plan = statement.executeQuery("explain select * from test_table")
plan.next()
plan.next()
assert(plan.getString(1).contains("InMemoryTableScan"))
assert(plan.getString(1).contains("Scan In-memory table `test_table`"))
val rs1 = statement.executeQuery("SELECT key FROM test_table ORDER BY KEY DESC")
val buf1 = new collection.mutable.ArrayBuffer[Int]()
@ -381,7 +381,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val plan = statement.executeQuery("explain select key from test_map ORDER BY key DESC")
plan.next()
plan.next()
assert(plan.getString(1).contains("InMemoryTableScan"))
assert(plan.getString(1).contains("Scan In-memory table `test_table`"))
val rs = statement.executeQuery("SELECT key FROM test_map ORDER BY KEY DESC")
val buf = new collection.mutable.ArrayBuffer[Int]()