From f3ad32f4b6fc55e89e7fb222ed565ad3e32d47c6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 14 Oct 2020 16:17:28 +0000 Subject: [PATCH] [SPARK-33026][SQL][FOLLOWUP] metrics name should be numOutputRows ### What changes were proposed in this pull request? Follow the convention and rename the metrics `numRows` to `numOutputRows` ### Why are the changes needed? `FilterExec`, `HashAggregateExec`, etc. all use `numOutputRows` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #30039 from cloud-fan/minor. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../sql/execution/exchange/BroadcastExchangeExec.scala | 8 ++++---- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 4b884dfe53..0c5fee2038 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -78,7 +78,7 @@ case class BroadcastExchangeExec( override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "numRows" -> SQLMetrics.createMetric(sparkContext, "number of rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect"), "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build"), "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast")) @@ -91,8 +91,8 @@ case class BroadcastExchangeExec( override def runtimeStatistics: Statistics = { val dataSize = metrics("dataSize").value - val numRows = metrics("numRows").value - Statistics(dataSize, Some(numRows)) + val rowCount = metrics("numOutputRows").value + Statistics(dataSize, Some(rowCount)) } @transient @@ -116,11 +116,11 @@ case class BroadcastExchangeExec( val beforeCollect = System.nanoTime() // Use executeCollect/executeCollectIterator to avoid conversion to Scala types val (numRows, input) = child.executeCollectIterator() + longMetric("numOutputRows") += numRows if (numRows >= MAX_BROADCAST_TABLE_ROWS) { throw new SparkException( s"Cannot broadcast the table over $MAX_BROADCAST_TABLE_ROWS rows: $numRows rows") } - longMetric("numRows") += numRows val beforeBuild = System.nanoTime() longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild - beforeCollect) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index e404e460fe..4872906dbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -751,7 +751,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } assert(exchanges.size === 1) - testMetricsInSparkPlanOperator(exchanges.head, Map("numRows" -> 2)) + testMetricsInSparkPlanOperator(exchanges.head, Map("numOutputRows" -> 2)) } } }