[SPARK-28332][SQL] Reserve init value -1 only when do min max statistics in SQLMetrics

### What changes were proposed in this pull request?
This is an alternative solution to https://github.com/apache/spark/pull/25095.
SQLMetrics use -1 as init value as a work around for [SPARK-11013](https://issues.apache.org/jira/browse/SPARK-11013.) However, it may bring out some badcases as https://github.com/apache/spark/pull/26726 reporting. In fact, we only need to reserve -1 when doing min max statistics in `SQLMetrics.stringValue` so that we can filter out those not initialized accumulators.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing UTs

Closes #26899 from WangGuangxin/sqlmetrics.

Authored-by: wangguangxin.cn <wangguangxin.cn@bytedance.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
wangguangxin.cn 2019-12-23 13:13:35 +08:00 committed by Wenchen Fan
parent e5abbab0ed
commit 640dcc435b
3 changed files with 35 additions and 4 deletions

View file

@ -50,14 +50,19 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
override def reset(): Unit = _value = _zeroValue
override def merge(other: AccumulatorV2[Long, Long]): Unit = other match {
case o: SQLMetric => _value += o.value
case o: SQLMetric =>
if (_value < 0) _value = 0
if (o.value > 0) _value += o.value
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
override def isZero(): Boolean = _value == _zeroValue
override def add(v: Long): Unit = _value += v
override def add(v: Long): Unit = {
if (_value < 0) _value = 0
_value += v
}
// We can set a double value to `SQLMetric` which stores only long value, if it is
// average metrics.
@ -65,7 +70,7 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato
def set(v: Long): Unit = _value = v
def +=(v: Long): Unit = _value += v
def +=(v: Long): Unit = add(v)
override def value: Long = _value

View file

@ -495,7 +495,7 @@ class AdaptiveQueryExecSuite
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.LOCAL_SHUFFLE_READER_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "30") {
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
"""
|SELECT * FROM testData t1 join testData2 t2

View file

@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, WholeStageCodegenExec}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@ -613,4 +614,29 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
Map(1L -> (("InMemoryTableScan", Map.empty)))
)
}
test("SPARK-28332: SQLMetric merge should handle -1 properly") {
def checkSparkPlanMetrics(plan: SparkPlan, expected: Map[String, Long]): Unit = {
expected.foreach { case (metricName: String, metricValue: Long) =>
assert(plan.metrics.contains(metricName), s"The query plan should have metric $metricName")
val actualMetric = plan.metrics.get(metricName).get
assert(actualMetric.value == metricValue,
s"The query plan metric $metricName did not match, " +
s"expected:$metricValue, actual:${actualMetric.value}")
}
}
val df = testData.join(testData2.filter('b === 0), $"key" === $"a", "left_outer")
df.collect()
val plan = df.queryExecution.executedPlan
val exchanges = plan.collect {
case s: ShuffleExchangeExec => s
}
assert(exchanges.size == 2, "The query plan should have two shuffle exchanges")
checkSparkPlanMetrics(exchanges(0), Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100))
checkSparkPlanMetrics(exchanges(1), Map("dataSize" -> 0, "shuffleRecordsWritten" -> 0))
}
}