[SPARK-36272][SQL][TEST] Change shuffled hash join metrics test to check relative value of build size

### What changes were proposed in this pull request?

This is a follow up of https://github.com/apache/spark/pull/33447, where the unit test is disabled, due to failure after memory setting changed. I found the root cause is after https://github.com/apache/spark/pull/33447, in unit test, Spark memory page byte size is changed from `67108864` to `33554432` [1]. So the shuffled hash join build size is also changed accordingly due to [memory page byte size change](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L457). Previously the unit test is checking the exact value of build size, so it no longer works. Here we change the unit test to verify the relative value of build size, and it should work.

[1]: I printed out the memory page byte size explicitly in unit test - `org.apache.spark.SparkException: chengsu pageSizeBytes: 33554432!` in https://github.com/c21/spark/runs/3186680616?check_suite_focus=true .

### Why are the changes needed?

Make previously disabled unit test work.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Changed unit test itself.

Closes #33494 from c21/test.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 6a8dd3229a)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
Cheng Su 2021-07-29 11:14:34 +09:00 committed by Hyukjin Kwon
parent fa521c1506
commit 6d188cbb08

View file

@ -396,18 +396,16 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}
// TODO (SPARK-36272): Reenable this after we figure out why the expected size doesn't
// match after we adjust building's memory settings.
ignore("SPARK-32629: ShuffledHashJoin(full outer) metrics") {
test("SPARK-32629: ShuffledHashJoin(full outer) metrics") {
val uniqueLeftDf = Seq(("1", "1"), ("11", "11")).toDF("key", "value")
val nonUniqueLeftDf = Seq(("1", "1"), ("1", "2"), ("11", "11")).toDF("key", "value")
val rightDf = (1 to 10).map(i => (i.toString, i.toString)).toDF("key2", "value")
Seq(
// Test unique key on build side
(uniqueLeftDf, rightDf, 11, 134228048, 10, 134221824),
(uniqueLeftDf, rightDf, 11, 10),
// Test non-unique key on build side
(nonUniqueLeftDf, rightDf, 12, 134228552, 11, 134221824)
).foreach { case (leftDf, rightDf, fojRows, fojBuildSize, rojRows, rojBuildSize) =>
(nonUniqueLeftDf, rightDf, 12, 11)
).foreach { case (leftDf, rightDf, fojRows, rojRows) =>
val fojDf = leftDf.hint("shuffle_hash").join(
rightDf, $"key" === $"key2", "full_outer")
fojDf.collect()
@ -415,8 +413,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
case s: ShuffledHashJoinExec => s
}
assert(fojPlan.isDefined, "The query plan should have shuffled hash join")
testMetricsInSparkPlanOperator(fojPlan.get,
Map("numOutputRows" -> fojRows, "buildDataSize" -> fojBuildSize))
testMetricsInSparkPlanOperator(fojPlan.get, Map("numOutputRows" -> fojRows))
val fojBuildSize = fojPlan.get.metrics("buildDataSize").value
// Test right outer join as well to verify build data size to be different
// from full outer join. This makes sure we take extra BitSet/OpenHashSet
@ -428,8 +426,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
case s: ShuffledHashJoinExec => s
}
assert(rojPlan.isDefined, "The query plan should have shuffled hash join")
testMetricsInSparkPlanOperator(rojPlan.get,
Map("numOutputRows" -> rojRows, "buildDataSize" -> rojBuildSize))
testMetricsInSparkPlanOperator(rojPlan.get, Map("numOutputRows" -> rojRows))
val rojBuildSize = rojPlan.get.metrics("buildDataSize").value
assert(fojBuildSize > rojBuildSize && rojBuildSize > 0,
"Build size of full outer join should be larger than the size of right outer join")
}
}