[SPARK-21368][SQL] TPCDSQueryBenchmark can't refer query files.
## What changes were proposed in this pull request? TPCDSQueryBenchmark packaged into a jar doesn't work with spark-submit. It's because of the failure of reference query files in the jar file. ## How was this patch tested? Ran the benchmark. Author: sarutak <sarutak@oss.nttdata.co.jp> Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #18592 from sarutak/fix-tpcds-benchmark.
This commit is contained in:
parent
720c94fe77
commit
b9b54b1c88
|
@ -17,8 +17,6 @@
|
||||||
|
|
||||||
package org.apache.spark.sql.execution.benchmark
|
package org.apache.spark.sql.execution.benchmark
|
||||||
|
|
||||||
import java.io.File
|
|
||||||
|
|
||||||
import org.apache.spark.SparkConf
|
import org.apache.spark.SparkConf
|
||||||
import org.apache.spark.sql.SparkSession
|
import org.apache.spark.sql.SparkSession
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
|
@ -31,7 +29,7 @@ import org.apache.spark.util.Benchmark
|
||||||
/**
|
/**
|
||||||
* Benchmark to measure TPCDS query performance.
|
* Benchmark to measure TPCDS query performance.
|
||||||
* To run this:
|
* To run this:
|
||||||
* spark-submit --class <this class> --jars <spark sql test jar>
|
* spark-submit --class <this class> <spark sql test jar> <TPCDS data location>
|
||||||
*/
|
*/
|
||||||
object TPCDSQueryBenchmark {
|
object TPCDSQueryBenchmark {
|
||||||
val conf =
|
val conf =
|
||||||
|
@ -61,12 +59,10 @@ object TPCDSQueryBenchmark {
|
||||||
}
|
}
|
||||||
|
|
||||||
def tpcdsAll(dataLocation: String, queries: Seq[String]): Unit = {
|
def tpcdsAll(dataLocation: String, queries: Seq[String]): Unit = {
|
||||||
require(dataLocation.nonEmpty,
|
|
||||||
"please modify the value of dataLocation to point to your local TPCDS data")
|
|
||||||
val tableSizes = setupTables(dataLocation)
|
val tableSizes = setupTables(dataLocation)
|
||||||
queries.foreach { name =>
|
queries.foreach { name =>
|
||||||
val queryString = fileToString(new File(Thread.currentThread().getContextClassLoader
|
val queryString = resourceToString(s"tpcds/$name.sql",
|
||||||
.getResource(s"tpcds/$name.sql").getFile))
|
classLoader = Thread.currentThread().getContextClassLoader)
|
||||||
|
|
||||||
// This is an indirect hack to estimate the size of each query's input by traversing the
|
// This is an indirect hack to estimate the size of each query's input by traversing the
|
||||||
// logical plan and adding up the sizes of all tables that appear in the plan. Note that this
|
// logical plan and adding up the sizes of all tables that appear in the plan. Note that this
|
||||||
|
@ -99,6 +95,7 @@ object TPCDSQueryBenchmark {
|
||||||
}
|
}
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = {
|
def main(args: Array[String]): Unit = {
|
||||||
|
val benchmarkArgs = new TPCDSQueryBenchmarkArguments(args)
|
||||||
|
|
||||||
// List of all TPC-DS queries
|
// List of all TPC-DS queries
|
||||||
val tpcdsQueries = Seq(
|
val tpcdsQueries = Seq(
|
||||||
|
@ -113,12 +110,6 @@ object TPCDSQueryBenchmark {
|
||||||
"q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
|
"q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
|
||||||
"q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")
|
"q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")
|
||||||
|
|
||||||
// In order to run this benchmark, please follow the instructions at
|
tpcdsAll(benchmarkArgs.dataLocation, queries = tpcdsQueries)
|
||||||
// https://github.com/databricks/spark-sql-perf/blob/master/README.md to generate the TPCDS data
|
|
||||||
// locally (preferably with a scale factor of 5 for benchmarking). Thereafter, the value of
|
|
||||||
// dataLocation below needs to be set to the location where the generated data is stored.
|
|
||||||
val dataLocation = ""
|
|
||||||
|
|
||||||
tpcdsAll(dataLocation, queries = tpcdsQueries)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.sql.execution.benchmark
|
||||||
|
|
||||||
|
class TPCDSQueryBenchmarkArguments(val args: Array[String]) {
|
||||||
|
var dataLocation: String = null
|
||||||
|
|
||||||
|
parseArgs(args.toList)
|
||||||
|
validateArguments()
|
||||||
|
|
||||||
|
private def parseArgs(inputArgs: List[String]): Unit = {
|
||||||
|
var args = inputArgs
|
||||||
|
|
||||||
|
while(args.nonEmpty) {
|
||||||
|
args match {
|
||||||
|
case ("--data-location") :: value :: tail =>
|
||||||
|
dataLocation = value
|
||||||
|
args = tail
|
||||||
|
|
||||||
|
case _ =>
|
||||||
|
// scalastyle:off println
|
||||||
|
System.err.println("Unknown/unsupported param " + args)
|
||||||
|
// scalastyle:on println
|
||||||
|
printUsageAndExit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def printUsageAndExit(exitCode: Int): Unit = {
|
||||||
|
// scalastyle:off
|
||||||
|
System.err.println("""
|
||||||
|
|Usage: spark-submit --class <this class> <spark sql test jar> [Options]
|
||||||
|
|Options:
|
||||||
|
| --data-location Path to TPCDS data
|
||||||
|
|
|
||||||
|
|------------------------------------------------------------------------------------------------------------------
|
||||||
|
|In order to run this benchmark, please follow the instructions at
|
||||||
|
|https://github.com/databricks/spark-sql-perf/blob/master/README.md
|
||||||
|
|to generate the TPCDS data locally (preferably with a scale factor of 5 for benchmarking).
|
||||||
|
|Thereafter, the value of <TPCDS data location> needs to be set to the location where the generated data is stored.
|
||||||
|
""".stripMargin)
|
||||||
|
// scalastyle:on
|
||||||
|
System.exit(exitCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def validateArguments(): Unit = {
|
||||||
|
if (dataLocation == null) {
|
||||||
|
// scalastyle:off println
|
||||||
|
System.err.println("Must specify a data location")
|
||||||
|
// scalastyle:on println
|
||||||
|
printUsageAndExit(-1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue