[SPARK-26142][SQL] Implement shuffle read metrics in SQL

## What changes were proposed in this pull request?

Implement `SQLShuffleMetricsReporter` on the sql side as the customized ShuffleMetricsReporter, which extended the `TempShuffleReadMetrics` and update SQLMetrics, in this way shuffle metrics can be reported in the SQL UI.

## How was this patch tested?

Add UT in SQLMetricsSuite.
Manual test locally, before:
![image](https://user-images.githubusercontent.com/4833765/48960517-30f97880-efa8-11e8-982c-92d05938fd1d.png)
after:
![image](https://user-images.githubusercontent.com/4833765/48960587-b54bfb80-efa8-11e8-8e95-7a3c8c74cc5c.png)

Closes #23128 from xuanyuanking/SPARK-26142.

Lead-authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Co-authored-by: liyuanjian <liyuanjian@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Yuanjian Li 2018-11-28 20:18:13 +08:00 committed by Wenchen Fan
parent 09a91d98bd
commit 93112e6930
7 changed files with 126 additions and 11 deletions

View file

@ -22,6 +22,7 @@ import java.util.Arrays
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleMetricsReporter}
/**
* The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition
@ -112,6 +113,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A
*/
class ShuffledRowRDD(
var dependency: ShuffleDependency[Int, InternalRow, InternalRow],
metrics: Map[String, SQLMetric],
specifiedPartitionStartIndices: Option[Array[Int]] = None)
extends RDD[InternalRow](dependency.rdd.context, Nil) {
@ -154,7 +156,10 @@ class ShuffledRowRDD(
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
// `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator,
// as well as the `tempMetrics` for basic shuffle metrics.
val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, metrics)
// The range of pre-shuffle partitions that we are fetching at here is
// [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
val reader =
@ -163,7 +168,7 @@ class ShuffledRowRDD(
shuffledRowPartition.startPreShufflePartitionIndex,
shuffledRowPartition.endPreShufflePartitionIndex,
context,
metrics)
sqlMetricsReporter)
reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2)
}

View file

@ -48,7 +48,8 @@ case class ShuffleExchangeExec(
// e.g. it can be null on the Executor side
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
) ++ SQLMetrics.getShuffleReadMetrics(sparkContext)
override def nodeName: String = {
val extraInfo = coordinator match {
@ -108,7 +109,7 @@ case class ShuffleExchangeExec(
assert(newPartitioning.isInstanceOf[HashPartitioning])
newPartitioning = UnknownPartitioning(indices.length)
}
new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices)
new ShuffledRowRDD(shuffleDependency, metrics, specifiedPartitionStartIndices)
}
/**

View file

@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* Take the first `limit` elements and collect them to a single partition.
@ -37,11 +38,13 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
override def outputPartitioning: Partitioning = SinglePartition
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext)
protected override def doExecute(): RDD[InternalRow] = {
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
val shuffled = new ShuffledRowRDD(
ShuffleExchangeExec.prepareShuffleDependency(
locallyLimited, child.output, SinglePartition, serializer))
locallyLimited, child.output, SinglePartition, serializer),
metrics)
shuffled.mapPartitionsInternal(_.take(limit))
}
}
@ -151,6 +154,8 @@ case class TakeOrderedAndProjectExec(
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
override lazy val metrics = SQLMetrics.getShuffleReadMetrics(sparkContext)
protected override def doExecute(): RDD[InternalRow] = {
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val localTopK: RDD[InternalRow] = {
@ -160,7 +165,8 @@ case class TakeOrderedAndProjectExec(
}
val shuffled = new ShuffledRowRDD(
ShuffleExchangeExec.prepareShuffleDependency(
localTopK, child.output, SinglePartition, serializer))
localTopK, child.output, SinglePartition, serializer),
metrics)
shuffled.mapPartitions { iter =>
val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
if (projectList != child.output) {

View file

@ -82,6 +82,14 @@ object SQLMetrics {
private val baseForAvgMetric: Int = 10
val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
val REMOTE_BYTES_READ = "remoteBytesRead"
val REMOTE_BYTES_READ_TO_DISK = "remoteBytesReadToDisk"
val LOCAL_BYTES_READ = "localBytesRead"
val FETCH_WAIT_TIME = "fetchWaitTime"
val RECORDS_READ = "recordsRead"
/**
* Converts a double value to long value by multiplying a base integer, so we can store it in
* `SQLMetrics`. It only works for average metrics. When showing the metrics on UI, we restore
@ -194,4 +202,16 @@ object SQLMetrics {
SparkListenerDriverAccumUpdates(executionId.toLong, metrics.map(m => m.id -> m.value)))
}
}
/**
* Create all shuffle read relative metrics and return the Map.
*/
def getShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map(
REMOTE_BLOCKS_FETCHED -> createMetric(sc, "remote blocks fetched"),
LOCAL_BLOCKS_FETCHED -> createMetric(sc, "local blocks fetched"),
REMOTE_BYTES_READ -> createSizeMetric(sc, "remote bytes read"),
REMOTE_BYTES_READ_TO_DISK -> createSizeMetric(sc, "remote bytes read to disk"),
LOCAL_BYTES_READ -> createSizeMetric(sc, "local bytes read"),
FETCH_WAIT_TIME -> createTimingMetric(sc, "fetch wait time"),
RECORDS_READ -> createMetric(sc, "records read"))
}

View file

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.metric
import org.apache.spark.executor.TempShuffleReadMetrics
/**
* A shuffle metrics reporter for SQL exchange operators.
* @param tempMetrics [[TempShuffleReadMetrics]] created in TaskContext.
* @param metrics All metrics in current SparkPlan. This param should not empty and
* contains all shuffle metrics defined in [[SQLMetrics.getShuffleReadMetrics]].
*/
private[spark] class SQLShuffleMetricsReporter(
tempMetrics: TempShuffleReadMetrics,
metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
private[this] val _remoteBlocksFetched = metrics(SQLMetrics.REMOTE_BLOCKS_FETCHED)
private[this] val _localBlocksFetched = metrics(SQLMetrics.LOCAL_BLOCKS_FETCHED)
private[this] val _remoteBytesRead = metrics(SQLMetrics.REMOTE_BYTES_READ)
private[this] val _remoteBytesReadToDisk = metrics(SQLMetrics.REMOTE_BYTES_READ_TO_DISK)
private[this] val _localBytesRead = metrics(SQLMetrics.LOCAL_BYTES_READ)
private[this] val _fetchWaitTime = metrics(SQLMetrics.FETCH_WAIT_TIME)
private[this] val _recordsRead = metrics(SQLMetrics.RECORDS_READ)
override def incRemoteBlocksFetched(v: Long): Unit = {
_remoteBlocksFetched.add(v)
tempMetrics.incRemoteBlocksFetched(v)
}
override def incLocalBlocksFetched(v: Long): Unit = {
_localBlocksFetched.add(v)
tempMetrics.incLocalBlocksFetched(v)
}
override def incRemoteBytesRead(v: Long): Unit = {
_remoteBytesRead.add(v)
tempMetrics.incRemoteBytesRead(v)
}
override def incRemoteBytesReadToDisk(v: Long): Unit = {
_remoteBytesReadToDisk.add(v)
tempMetrics.incRemoteBytesReadToDisk(v)
}
override def incLocalBytesRead(v: Long): Unit = {
_localBytesRead.add(v)
tempMetrics.incLocalBytesRead(v)
}
override def incFetchWaitTime(v: Long): Unit = {
_fetchWaitTime.add(v)
tempMetrics.incFetchWaitTime(v)
}
override def incRecordsRead(v: Long): Unit = {
_recordsRead.add(v)
tempMetrics.incRecordsRead(v)
}
}

View file

@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types._
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
@ -137,7 +138,9 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
rowsRDD,
new PartitionIdPassthrough(2),
new UnsafeRowSerializer(2))
val shuffled = new ShuffledRowRDD(dependency)
val shuffled = new ShuffledRowRDD(
dependency,
SQLMetrics.getShuffleReadMetrics(spark.sparkContext))
shuffled.count()
}
}

View file

@ -94,8 +94,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"),
Map("number of output rows" -> 1L,
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
val shuffleExpected1 = Map(
"records read" -> 2L,
"local blocks fetched" -> 2L,
"remote blocks fetched" -> 0L)
testSparkPlanMetrics(df, 1, Map(
2L -> (("HashAggregate", expected1(0))),
1L -> (("Exchange", shuffleExpected1)),
0L -> (("HashAggregate", expected1(1))))
)
@ -106,8 +111,13 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"),
Map("number of output rows" -> 3L,
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
val shuffleExpected2 = Map(
"records read" -> 4L,
"local blocks fetched" -> 4L,
"remote blocks fetched" -> 0L)
testSparkPlanMetrics(df2, 1, Map(
2L -> (("HashAggregate", expected2(0))),
1L -> (("Exchange", shuffleExpected2)),
0L -> (("HashAggregate", expected2(1))))
)
}
@ -191,7 +201,11 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
testSparkPlanMetrics(df, 1, Map(
0L -> (("SortMergeJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
"number of output rows" -> 4L))))
"number of output rows" -> 4L))),
2L -> (("Exchange", Map(
"records read" -> 4L,
"local blocks fetched" -> 2L,
"remote blocks fetched" -> 0L))))
)
}
}
@ -208,7 +222,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = testDataForJoin.a")
testSparkPlanMetrics(df, 1, Map(
0L -> (("SortMergeJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
// It's 8 because we read 6 rows in the left and 2 row in the right one
"number of output rows" -> 8L))))
)
@ -216,7 +230,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = testDataForJoin.a")
testSparkPlanMetrics(df2, 1, Map(
0L -> (("SortMergeJoin", Map(
// It's 4 because we only read 3 rows in the first partition and 1 row in the second one
// It's 8 because we read 6 rows in the left and 2 row in the right one
"number of output rows" -> 8L))))
)
}
@ -287,7 +301,6 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
// Assume the execution plan is
// ... -> ShuffledHashJoin(nodeId = 1) -> Project(nodeId = 0)
val df = df1.join(df2, "key")
val metrics = getSparkPlanMetrics(df, 1, Set(1L))
testSparkPlanMetrics(df, 1, Map(
1L -> (("ShuffledHashJoin", Map(
"number of output rows" -> 2L,