[SPARK-25621][SPARK-25622][TEST] Reduce test time of BucketedReadWithHiveSupportSuite

## What changes were proposed in this pull request?

By replacing loops with random possible value.
- `read partitioning bucketed tables with bucket pruning filters` reduce from 55s to 7s
- `read partitioning bucketed tables having composite filters` reduce from 54s to 8s
- total time: reduce from 288s to 192s

## How was this patch tested?

Unit test

Closes #22640 from gengliangwang/fastenBucketedReadSuite.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
This commit is contained in:
Gengliang Wang 2018-10-06 14:54:04 +08:00 committed by hyukjinkwon
parent f2f4e7afe7
commit 1ee472eec1

View file

@ -20,6 +20,8 @@ package org.apache.spark.sql.sources
import java.io.File
import java.net.URI
import scala.util.Random
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
@ -47,11 +49,13 @@ class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedS
abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
import testImplicits._
private lazy val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
private val maxI = 5
private val maxJ = 13
private lazy val df = (0 until 50).map(i => (i % maxI, i % maxJ, i.toString)).toDF("i", "j", "k")
private lazy val nullDF = (for {
i <- 0 to 50
s <- Seq(null, "a", "b", "c", "d", "e", "f", null, "g")
} yield (i % 5, s, i % 13)).toDF("i", "j", "k")
} yield (i % maxI, s, i % maxJ)).toDF("i", "j", "k")
// number of buckets that doesn't yield empty buckets when bucketing on column j on df/nullDF
// empty buckets before filtering might hide bugs in pruning logic
@ -66,23 +70,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
.bucketBy(8, "j", "k")
.saveAsTable("bucketed_table")
for (i <- 0 until 5) {
val table = spark.table("bucketed_table").filter($"i" === i)
val query = table.queryExecution
val output = query.analyzed.output
val rdd = query.toRdd
val bucketValue = Random.nextInt(maxI)
val table = spark.table("bucketed_table").filter($"i" === bucketValue)
val query = table.queryExecution
val output = query.analyzed.output
val rdd = query.toRdd
assert(rdd.partitions.length == 8)
assert(rdd.partitions.length == 8)
val attrs = table.select("j", "k").queryExecution.analyzed.output
val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => {
val getBucketId = UnsafeProjection.create(
HashPartitioning(attrs, 8).partitionIdExpression :: Nil,
output)
rows.map(row => getBucketId(row).getInt(0) -> index)
})
checkBucketId.collect().foreach(r => assert(r._1 == r._2))
}
val attrs = table.select("j", "k").queryExecution.analyzed.output
val checkBucketId = rdd.mapPartitionsWithIndex((index, rows) => {
val getBucketId = UnsafeProjection.create(
HashPartitioning(attrs, 8).partitionIdExpression :: Nil,
output)
rows.map(row => getBucketId(row).getInt(0) -> index)
})
checkBucketId.collect().foreach(r => assert(r._1 == r._2))
}
}
@ -145,36 +148,36 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
.bucketBy(numBuckets, "j")
.saveAsTable("bucketed_table")
for (j <- 0 until 13) {
// Case 1: EqualTo
checkPrunedAnswers(
bucketSpec,
bucketValues = j :: Nil,
filterCondition = $"j" === j,
df)
val bucketValue = Random.nextInt(maxJ)
// Case 1: EqualTo
checkPrunedAnswers(
bucketSpec,
bucketValues = bucketValue :: Nil,
filterCondition = $"j" === bucketValue,
df)
// Case 2: EqualNullSafe
checkPrunedAnswers(
bucketSpec,
bucketValues = j :: Nil,
filterCondition = $"j" <=> j,
df)
// Case 2: EqualNullSafe
checkPrunedAnswers(
bucketSpec,
bucketValues = bucketValue :: Nil,
filterCondition = $"j" <=> bucketValue,
df)
// Case 3: In
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(j, j + 1, j + 2, j + 3),
filterCondition = $"j".isin(j, j + 1, j + 2, j + 3),
df)
// Case 3: In
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3),
filterCondition = $"j".isin(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3),
df)
// Case 4: InSet
val inSetExpr = expressions.InSet($"j".expr, Set(j, j + 1, j + 2, j + 3).map(lit(_).expr))
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(j, j + 1, j + 2, j + 3),
filterCondition = Column(inSetExpr),
df)
}
// Case 4: InSet
val inSetExpr = expressions.InSet($"j".expr,
Set(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3).map(lit(_).expr))
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(bucketValue, bucketValue + 1, bucketValue + 2, bucketValue + 3),
filterCondition = Column(inSetExpr),
df)
}
}
@ -188,13 +191,12 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
.bucketBy(numBuckets, "j")
.saveAsTable("bucketed_table")
for (j <- 0 until 13) {
checkPrunedAnswers(
bucketSpec,
bucketValues = j :: Nil,
filterCondition = $"j" === j,
df)
}
val bucketValue = Random.nextInt(maxJ)
checkPrunedAnswers(
bucketSpec,
bucketValues = bucketValue :: Nil,
filterCondition = $"j" === bucketValue,
df)
}
}
@ -236,40 +238,39 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
.bucketBy(numBuckets, "j")
.saveAsTable("bucketed_table")
for (j <- 0 until 13) {
checkPrunedAnswers(
bucketSpec,
bucketValues = j :: Nil,
filterCondition = $"j" === j && $"k" > $"j",
df)
val bucketValue = Random.nextInt(maxJ)
checkPrunedAnswers(
bucketSpec,
bucketValues = bucketValue :: Nil,
filterCondition = $"j" === bucketValue && $"k" > $"j",
df)
checkPrunedAnswers(
bucketSpec,
bucketValues = j :: Nil,
filterCondition = $"j" === j && $"i" > j % 5,
df)
checkPrunedAnswers(
bucketSpec,
bucketValues = bucketValue :: Nil,
filterCondition = $"j" === bucketValue && $"i" > bucketValue % 5,
df)
// check multiple bucket values OR condition
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(j, j + 1),
filterCondition = $"j" === j || $"j" === (j + 1),
df)
// check multiple bucket values OR condition
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(bucketValue, bucketValue + 1),
filterCondition = $"j" === bucketValue || $"j" === (bucketValue + 1),
df)
// check bucket value and none bucket value OR condition
checkPrunedAnswers(
bucketSpec,
bucketValues = Nil,
filterCondition = $"j" === j || $"i" === 0,
df)
// check bucket value and none bucket value OR condition
checkPrunedAnswers(
bucketSpec,
bucketValues = Nil,
filterCondition = $"j" === bucketValue || $"i" === 0,
df)
// check AND condition in complex expression
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(j),
filterCondition = ($"i" === 0 || $"k" > $"j") && $"j" === j,
df)
}
// check AND condition in complex expression
checkPrunedAnswers(
bucketSpec,
bucketValues = Seq(bucketValue),
filterCondition = ($"i" === 0 || $"k" > $"j") && $"j" === bucketValue,
df)
}
}