[SPARK-24111][SQL] Add the TPCDS v2.7 (latest) queries in TPCDSQueryBenchmark
## What changes were proposed in this pull request? This pr added the TPCDS v2.7 (latest) queries in `TPCDSQueryBenchmark`. These query files have been added in `SPARK-23167`. ## How was this patch tested? Manually checked. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #21177 from maropu/AddTpcdsV2_7InBenchmark.
This commit is contained in:
parent
5be8aab144
commit
e4c91c089a
|
@ -58,10 +58,13 @@ object TPCDSQueryBenchmark extends Logging {
|
|||
}.toMap
|
||||
}
|
||||
|
||||
def tpcdsAll(dataLocation: String, queries: Seq[String]): Unit = {
|
||||
val tableSizes = setupTables(dataLocation)
|
||||
def runTpcdsQueries(
|
||||
queryLocation: String,
|
||||
queries: Seq[String],
|
||||
tableSizes: Map[String, Long],
|
||||
nameSuffix: String = ""): Unit = {
|
||||
queries.foreach { name =>
|
||||
val queryString = resourceToString(s"tpcds/$name.sql",
|
||||
val queryString = resourceToString(s"$queryLocation/$name.sql",
|
||||
classLoader = Thread.currentThread().getContextClassLoader)
|
||||
|
||||
// This is an indirect hack to estimate the size of each query's input by traversing the
|
||||
|
@ -78,7 +81,7 @@ object TPCDSQueryBenchmark extends Logging {
|
|||
}
|
||||
val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum
|
||||
val benchmark = new Benchmark(s"TPCDS Snappy", numRows, 5)
|
||||
benchmark.addCase(name) { i =>
|
||||
benchmark.addCase(s"$name$nameSuffix") { _ =>
|
||||
spark.sql(queryString).collect()
|
||||
}
|
||||
logInfo(s"\n\n===== TPCDS QUERY BENCHMARK OUTPUT FOR $name =====\n")
|
||||
|
@ -87,10 +90,20 @@ object TPCDSQueryBenchmark extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
def filterQueries(
|
||||
origQueries: Seq[String],
|
||||
args: TPCDSQueryBenchmarkArguments): Seq[String] = {
|
||||
if (args.queryFilter.nonEmpty) {
|
||||
origQueries.filter(args.queryFilter.contains)
|
||||
} else {
|
||||
origQueries
|
||||
}
|
||||
}
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val benchmarkArgs = new TPCDSQueryBenchmarkArguments(args)
|
||||
|
||||
// List of all TPC-DS queries
|
||||
// List of all TPC-DS v1.4 queries
|
||||
val tpcdsQueries = Seq(
|
||||
"q1", "q2", "q3", "q4", "q5", "q6", "q7", "q8", "q9", "q10", "q11",
|
||||
"q12", "q13", "q14a", "q14b", "q15", "q16", "q17", "q18", "q19", "q20",
|
||||
|
@ -103,20 +116,25 @@ object TPCDSQueryBenchmark extends Logging {
|
|||
"q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
|
||||
"q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")
|
||||
|
||||
// This list only includes TPC-DS v2.7 queries that are different from v1.4 ones
|
||||
val tpcdsQueriesV2_7 = Seq(
|
||||
"q5a", "q6", "q10a", "q11", "q12", "q14", "q14a", "q18a",
|
||||
"q20", "q22", "q22a", "q24", "q27a", "q34", "q35", "q35a", "q36a", "q47", "q49",
|
||||
"q51a", "q57", "q64", "q67a", "q70a", "q72", "q74", "q75", "q77a", "q78",
|
||||
"q80a", "q86a", "q98")
|
||||
|
||||
// If `--query-filter` defined, filters the queries that this option selects
|
||||
val queriesToRun = if (benchmarkArgs.queryFilter.nonEmpty) {
|
||||
val queries = tpcdsQueries.filter { case queryName =>
|
||||
benchmarkArgs.queryFilter.contains(queryName)
|
||||
}
|
||||
if (queries.isEmpty) {
|
||||
val queriesV1_4ToRun = filterQueries(tpcdsQueries, benchmarkArgs)
|
||||
val queriesV2_7ToRun = filterQueries(tpcdsQueriesV2_7, benchmarkArgs)
|
||||
|
||||
if ((queriesV1_4ToRun ++ queriesV2_7ToRun).isEmpty) {
|
||||
throw new RuntimeException(
|
||||
s"Empty queries to run. Bad query name filter: ${benchmarkArgs.queryFilter}")
|
||||
}
|
||||
queries
|
||||
} else {
|
||||
tpcdsQueries
|
||||
}
|
||||
|
||||
tpcdsAll(benchmarkArgs.dataLocation, queries = queriesToRun)
|
||||
val tableSizes = setupTables(benchmarkArgs.dataLocation)
|
||||
runTpcdsQueries(queryLocation = "tpcds", queries = queriesV1_4ToRun, tableSizes)
|
||||
runTpcdsQueries(queryLocation = "tpcds-v2.7.0", queries = queriesV2_7ToRun, tableSizes,
|
||||
nameSuffix = "-v2.7")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue