[SPARK-33026][SQL][FOLLOWUP] metrics name should be numOutputRows
### What changes were proposed in this pull request? Follow the convention and rename the metrics `numRows` to `numOutputRows` ### Why are the changes needed? `FilterExec`, `HashAggregateExec`, etc. all use `numOutputRows` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #30039 from cloud-fan/minor. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
8e5cb1d276
commit
f3ad32f4b6
|
@ -78,7 +78,7 @@ case class BroadcastExchangeExec(
|
|||
|
||||
override lazy val metrics = Map(
|
||||
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
|
||||
"numRows" -> SQLMetrics.createMetric(sparkContext, "number of rows"),
|
||||
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
|
||||
"collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect"),
|
||||
"buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build"),
|
||||
"broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast"))
|
||||
|
@ -91,8 +91,8 @@ case class BroadcastExchangeExec(
|
|||
|
||||
override def runtimeStatistics: Statistics = {
|
||||
val dataSize = metrics("dataSize").value
|
||||
val numRows = metrics("numRows").value
|
||||
Statistics(dataSize, Some(numRows))
|
||||
val rowCount = metrics("numOutputRows").value
|
||||
Statistics(dataSize, Some(rowCount))
|
||||
}
|
||||
|
||||
@transient
|
||||
|
@ -116,11 +116,11 @@ case class BroadcastExchangeExec(
|
|||
val beforeCollect = System.nanoTime()
|
||||
// Use executeCollect/executeCollectIterator to avoid conversion to Scala types
|
||||
val (numRows, input) = child.executeCollectIterator()
|
||||
longMetric("numOutputRows") += numRows
|
||||
if (numRows >= MAX_BROADCAST_TABLE_ROWS) {
|
||||
throw new SparkException(
|
||||
s"Cannot broadcast the table over $MAX_BROADCAST_TABLE_ROWS rows: $numRows rows")
|
||||
}
|
||||
longMetric("numRows") += numRows
|
||||
|
||||
val beforeBuild = System.nanoTime()
|
||||
longMetric("collectTime") += NANOSECONDS.toMillis(beforeBuild - beforeCollect)
|
||||
|
|
|
@ -751,7 +751,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
|
|||
}
|
||||
|
||||
assert(exchanges.size === 1)
|
||||
testMetricsInSparkPlanOperator(exchanges.head, Map("numRows" -> 2))
|
||||
testMetricsInSparkPlanOperator(exchanges.head, Map("numOutputRows" -> 2))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue