[SPARK-35192][SQL][TESTS] Port minimal TPC-DS datagen code from databricks/spark-sql-perf

### What changes were proposed in this pull request?

This PR proposes to port minimal code to generate TPC-DS data from [databricks/spark-sql-perf](https://github.com/databricks/spark-sql-perf). The classes in a new class file `tpcdsDatagen.scala` are basically copied from the `databricks/spark-sql-perf` codebase.
Note that I've modified them a bit to follow the Spark code style and removed unnecessary parts from them.

The code authors of these classes are:
juliuszsompolski
npoggi
wangyum

### Why are the changes needed?

We frequently use TPCDS data now for benchmarks/tests, but the classes for the TPCDS schemas of datagen and benchmarks/tests are managed separately, e.g.,
 - https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/TPCDSBase.scala
 - https://github.com/databricks/spark-sql-perf/blob/master/src/main/scala/com/databricks/spark/sql/perf/tpcds/TPCDSTables.scala

 I think this causes some inconveniences, e.g., we need to update both files in the separate repositories if we update the TPCDS schema #32037. So, it would be useful for the Spark codebase to generate them by referring to the same schema definition.

### Does this PR introduce _any_ user-facing change?

dev only.

### How was this patch tested?

Manually checked and GA passed.

Closes #32243 from maropu/tpcdsDatagen.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
This commit is contained in:
Takeshi Yamamuro 2021-05-03 12:04:42 +09:00
parent caa46ce0b6
commit cd689c942c
4 changed files with 1019 additions and 549 deletions

View file

@ -493,19 +493,6 @@ jobs:
steps:
- name: Checkout Spark repository
uses: actions/checkout@v2
- name: Cache TPC-DS generated data
id: cache-tpcds-sf-1
uses: actions/cache@v2
with:
path: ./tpcds-sf-1
key: tpcds-556111e35d400f56cb0625dc16e9063d54628320
- name: Checkout TPC-DS (SF=1) generated data repository
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
uses: actions/checkout@v2
with:
repository: maropu/spark-tpcds-sf-1
ref: 556111e35d400f56cb0625dc16e9063d54628320
path: ./tpcds-sf-1
- name: Cache Scala, SBT and Maven
uses: actions/cache@v2
with:
@ -528,6 +515,24 @@ jobs:
uses: actions/setup-java@v1
with:
java-version: 8
- name: Cache TPC-DS generated data
id: cache-tpcds-sf-1
uses: actions/cache@v2
with:
path: ./tpcds-sf-1
key: tpcds-${{ hashFiles('sql/core/src/test/scala/org/apache/spark/sql/TPCDSSchema.scala') }}
- name: Checkout tpcds-kit repository
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
uses: actions/checkout@v2
with:
repository: maropu/spark-tpcds-datagen
path: ./tpcds-kit
- name: Build tpcds-kit
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
run: cd tpcds-kit/thirdparty/tpcds-kit/tools && make OS=LINUX
- name: Generate TPC-DS (SF=1) table data
if: steps.cache-tpcds-sf-1.outputs.cache-hit != 'true'
run: build/sbt "sql/test:runMain org.apache.spark.sql.GenTPCDSData --dsdgenDir `pwd`/tpcds-kit/thirdparty/tpcds-kit/tools --location `pwd`/tpcds-sf-1 --scaleFactor 1 --numPartitions 1 --overwrite"
- name: Run TPC-DS queries
run: |
SPARK_TPCDS_DATA=`pwd`/tpcds-sf-1 build/sbt "sql/testOnly org.apache.spark.sql.TPCDSQueryTestSuite"

View file

@ -0,0 +1,445 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.util.concurrent.LinkedBlockingQueue
import scala.collection.immutable.Stream
import scala.sys.process._
import scala.util.Try
import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{col, rpad}
import org.apache.spark.sql.types.{CharType, StringType, StructField, StructType, VarcharType}
// The classes in this file are basically moved from https://github.com/databricks/spark-sql-perf
/**
* Using ProcessBuilder.lineStream produces a stream, that uses
* a LinkedBlockingQueue with a default capacity of Integer.MAX_VALUE.
*
* This causes OOM if the consumer cannot keep up with the producer.
*
* See scala.sys.process.ProcessBuilderImpl.lineStream
*/
object BlockingLineStream {
// See scala.sys.process.Streamed
private final class BlockingStreamed[T](
val process: T => Unit,
val done: Int => Unit,
val stream: () => Stream[T])
// See scala.sys.process.Streamed
private object BlockingStreamed {
// scala.process.sys.Streamed uses default of Integer.MAX_VALUE,
// which causes OOMs if the consumer cannot keep up with producer.
val maxQueueSize = 65536
def apply[T](nonzeroException: Boolean): BlockingStreamed[T] = {
val q = new LinkedBlockingQueue[Either[Int, T]](maxQueueSize)
def next(): Stream[T] = q.take match {
case Left(0) => Stream.empty
case Left(code) =>
if (nonzeroException) scala.sys.error("Nonzero exit code: " + code) else Stream.empty
case Right(s) => Stream.cons(s, next())
}
new BlockingStreamed((s: T) => q put Right(s), code => q put Left(code), () => next())
}
}
// See scala.sys.process.ProcessImpl.Spawn
private object Spawn {
def apply(f: => Unit): Thread = apply(f, daemon = false)
def apply(f: => Unit, daemon: Boolean): Thread = {
val thread = new Thread() { override def run() = { f } }
thread.setDaemon(daemon)
thread.start()
thread
}
}
def apply(command: Seq[String]): Stream[String] = {
val streamed = BlockingStreamed[String](true)
val process = command.run(BasicIO(false, streamed.process, None))
Spawn(streamed.done(process.exitValue()))
streamed.stream()
}
}
class Dsdgen(dsdgenDir: String) extends Serializable {
private val dsdgen = s"$dsdgenDir/dsdgen"
def generate(
sparkContext: SparkContext,
tableName: String,
partitions: Int,
scaleFactor: Int): RDD[String] = {
val generatedData = {
sparkContext.parallelize(1 to partitions, partitions).flatMap { i =>
val localToolsDir = if (new java.io.File(dsdgen).exists) {
dsdgenDir
} else if (new java.io.File(s"/$dsdgen").exists) {
s"/$dsdgenDir"
} else {
throw new IllegalStateException(
s"Could not find dsdgen at $dsdgen or /$dsdgen. Run install")
}
// NOTE: RNGSEED is the RNG seed used by the data generator. Right now, it is fixed to
// 19620718 that is used to generate `https://github.com/maropu/spark-tpcds-sf-1`.
val parallel = if (partitions > 1) s"-parallel $partitions -child $i" else ""
val commands = Seq(
"bash", "-c",
s"cd $localToolsDir && ./dsdgen -table $tableName -filter Y -scale $scaleFactor " +
s"-RNGSEED 19620718 $parallel")
BlockingLineStream(commands)
}
}
generatedData.setName(s"$tableName, sf=$scaleFactor, strings")
generatedData
}
}
class TPCDSTables(sqlContext: SQLContext, dsdgenDir: String, scaleFactor: Int)
extends TPCDSSchema with Logging with Serializable {
private val dataGenerator = new Dsdgen(dsdgenDir)
private def tables: Seq[Table] = tableColumns.map { case (tableName, schemaString) =>
val partitionColumns = tablePartitionColumns.getOrElse(tableName, Nil)
.map(_.stripPrefix("`").stripSuffix("`"))
Table(tableName, partitionColumns, StructType.fromDDL(schemaString))
}.toSeq
private case class Table(name: String, partitionColumns: Seq[String], schema: StructType) {
def nonPartitioned: Table = {
Table(name, Nil, schema)
}
private def df(numPartition: Int) = {
val generatedData = dataGenerator.generate(
sqlContext.sparkContext, name, numPartition, scaleFactor)
val rows = generatedData.mapPartitions { iter =>
iter.map { l =>
val values = l.split("\\|", -1).dropRight(1).map { v =>
if (v.equals("")) {
// If the string value is an empty string, we turn it to a null
null
} else {
v
}
}
Row.fromSeq(values)
}
}
val stringData =
sqlContext.createDataFrame(
rows,
StructType(schema.fields.map(f => StructField(f.name, StringType))))
val convertedData = {
val columns = schema.fields.map { f =>
val c = f.dataType match {
// Needs right-padding for char types
case CharType(n) => rpad(Column(f.name), n, " ")
// Don't need a cast for varchar types
case _: VarcharType => col(f.name)
case _ => col(f.name).cast(f.dataType)
}
c.as(f.name)
}
stringData.select(columns: _*)
}
convertedData
}
def genData(
location: String,
format: String,
overwrite: Boolean,
clusterByPartitionColumns: Boolean,
filterOutNullPartitionValues: Boolean,
numPartitions: Int): Unit = {
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Ignore
val data = df(numPartitions)
val tempTableName = s"${name}_text"
data.createOrReplaceTempView(tempTableName)
val writer = if (partitionColumns.nonEmpty) {
if (clusterByPartitionColumns) {
val columnString = data.schema.fields.map { field =>
field.name
}.mkString(",")
val partitionColumnString = partitionColumns.mkString(",")
val predicates = if (filterOutNullPartitionValues) {
partitionColumns.map(col => s"$col IS NOT NULL").mkString("WHERE ", " AND ", "")
} else {
""
}
val query =
s"""
|SELECT
| $columnString
|FROM
| $tempTableName
|$predicates
|DISTRIBUTE BY
| $partitionColumnString
""".stripMargin
val grouped = sqlContext.sql(query)
logInfo(s"Pre-clustering with partitioning columns with query $query.")
grouped.write
} else {
data.write
}
} else {
// treat non-partitioned tables as "one partition" that we want to coalesce
if (clusterByPartitionColumns) {
// in case data has more than maxRecordsPerFile, split into multiple writers to improve
// datagen speed files will be truncated to maxRecordsPerFile value, so the final
// result will be the same.
val numRows = data.count
val maxRecordPerFile = Try {
sqlContext.getConf("spark.sql.files.maxRecordsPerFile").toInt
}.getOrElse(0)
if (maxRecordPerFile > 0 && numRows > maxRecordPerFile) {
val numFiles = (numRows.toDouble/maxRecordPerFile).ceil.toInt
logInfo(s"Coalescing into $numFiles files")
data.coalesce(numFiles).write
} else {
data.coalesce(1).write
}
} else {
data.write
}
}
writer.format(format).mode(mode)
if (partitionColumns.nonEmpty) {
writer.partitionBy(partitionColumns: _*)
}
logInfo(s"Generating table $name in database to $location with save mode $mode.")
writer.save(location)
sqlContext.dropTempTable(tempTableName)
}
}
def genData(
location: String,
format: String,
overwrite: Boolean,
partitionTables: Boolean,
clusterByPartitionColumns: Boolean,
filterOutNullPartitionValues: Boolean,
tableFilter: String = "",
numPartitions: Int = 100): Unit = {
var tablesToBeGenerated = if (partitionTables) {
tables
} else {
tables.map(_.nonPartitioned)
}
if (!tableFilter.isEmpty) {
tablesToBeGenerated = tablesToBeGenerated.filter(_.name == tableFilter)
if (tablesToBeGenerated.isEmpty) {
throw new RuntimeException("Bad table name filter: " + tableFilter)
}
}
tablesToBeGenerated.foreach { table =>
val tableLocation = s"$location/${table.name}"
table.genData(tableLocation, format, overwrite, clusterByPartitionColumns,
filterOutNullPartitionValues, numPartitions)
}
}
}
class GenTPCDSDataConfig(args: Array[String]) {
var master: String = "local[*]"
var dsdgenDir: String = null
var location: String = null
var scaleFactor: Int = 1
var format: String = "parquet"
var overwrite: Boolean = false
var partitionTables: Boolean = false
var clusterByPartitionColumns: Boolean = false
var filterOutNullPartitionValues: Boolean = false
var tableFilter: String = ""
var numPartitions: Int = 100
parseArgs(args.toList)
private def parseArgs(inputArgs: List[String]): Unit = {
var args = inputArgs
while (args.nonEmpty) {
args match {
case "--master" :: value :: tail =>
master = value
args = tail
case "--dsdgenDir" :: value :: tail =>
dsdgenDir = value
args = tail
case "--location" :: value :: tail =>
location = value
args = tail
case "--scaleFactor" :: value :: tail =>
scaleFactor = toPositiveIntValue("Scale factor", value)
args = tail
case "--format" :: value :: tail =>
format = value
args = tail
case "--overwrite" :: tail =>
overwrite = true
args = tail
case "--partitionTables" :: tail =>
partitionTables = true
args = tail
case "--clusterByPartitionColumns" :: tail =>
clusterByPartitionColumns = true
args = tail
case "--filterOutNullPartitionValues" :: tail =>
filterOutNullPartitionValues = true
args = tail
case "--tableFilter" :: value :: tail =>
tableFilter = value
args = tail
case "--numPartitions" :: value :: tail =>
numPartitions = toPositiveIntValue("Number of partitions", value)
args = tail
case "--help" :: tail =>
printUsageAndExit(0)
case _ =>
// scalastyle:off println
System.err.println("Unknown/unsupported param " + args)
// scalastyle:on println
printUsageAndExit(1)
}
}
checkRequiredArguments()
}
private def printUsageAndExit(exitCode: Int): Unit = {
// scalastyle:off
System.err.println("""
|build/sbt "test:runMain <this class> [Options]"
|Options:
| --master the Spark master to use, default to local[*]
| --dsdgenDir location of dsdgen
| --location root directory of location to generate data in
| --scaleFactor size of the dataset to generate (in GB)
| --format generated data format, Parquet, ORC ...
| --overwrite whether to overwrite the data that is already there
| --partitionTables whether to create the partitioned fact tables
| --clusterByPartitionColumns whether to shuffle to get partitions coalesced into single files
| --filterOutNullPartitionValues whether to filter out the partition with NULL key value
| --tableFilter comma-separated list of table names to generate (e.g., store_sales,store_returns),
| all the tables are generated by default
| --numPartitions how many dsdgen partitions to run - number of input tasks
""".stripMargin)
// scalastyle:on
System.exit(exitCode)
}
private def toPositiveIntValue(name: String, v: String): Int = {
if (Try(v.toInt).getOrElse(-1) <= 0) {
// scalastyle:off println
System.err.println(s"$name must be a positive number")
// scalastyle:on println
printUsageAndExit(-1)
}
v.toInt
}
private def checkRequiredArguments(): Unit = {
if (dsdgenDir == null) {
// scalastyle:off println
System.err.println("Must specify a dsdgen path")
// scalastyle:on println
printUsageAndExit(-1)
}
if (location == null) {
// scalastyle:off println
System.err.println("Must specify an output location")
// scalastyle:on println
printUsageAndExit(-1)
}
}
}
/**
* This class generates TPCDS table data by using tpcds-kit:
* - https://github.com/databricks/tpcds-kit
*
* To run this:
* {{{
* build/sbt "sql/test:runMain <this class> --dsdgenDir <path> --location <path> --scaleFactor 1"
* }}}
*/
object GenTPCDSData {
def main(args: Array[String]): Unit = {
val config = new GenTPCDSDataConfig(args)
val spark = SparkSession
.builder()
.appName(getClass.getName)
.master(config.master)
.getOrCreate()
val tables = new TPCDSTables(
spark.sqlContext,
dsdgenDir = config.dsdgenDir,
scaleFactor = config.scaleFactor)
tables.genData(
location = config.location,
format = config.format,
overwrite = config.overwrite,
partitionTables = config.partitionTables,
clusterByPartitionColumns = config.clusterByPartitionColumns,
filterOutNullPartitionValues = config.filterOutNullPartitionValues,
tableFilter = config.tableFilter,
numPartitions = config.numPartitions)
spark.stop()
}
}

View file

@ -21,32 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
/**
* Base trait for TPC-DS related tests.
*
* Datatype mapping for TPC-DS and Spark SQL, fully matching schemas defined in `tpcds.sql` of the
* official tpcds toolkit
* see more at:
* http://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v2.9.0.pdf
*
* |---------------|---------------|
* | TPC-DS | Spark SQL |
* |---------------|---------------|
* | Identifier | INT |
* |---------------|---------------|
* | Integer | INT |
* |---------------|---------------|
* | Decimal(d, f) | Decimal(d, f) |
* |---------------|---------------|
* | Char(N) | Char(N) |
* |---------------|---------------|
* | Varchar(N) | Varchar(N) |
* |---------------|---------------|
* | Date | Date |
* |---------------|---------------|
*/
trait TPCDSBase extends SharedSparkSession {
trait TPCDSBase extends SharedSparkSession with TPCDSSchema {
// The TPCDS queries below are based on v1.4
val tpcdsQueries = Seq(
@ -73,516 +48,6 @@ trait TPCDSBase extends SharedSparkSession {
"q3", "q7", "q10", "q19", "q27", "q34", "q42", "q43", "q46", "q52", "q53", "q55", "q59",
"q63", "q65", "q68", "q73", "q79", "q89", "q98", "ss_max")
protected val tableColumns: Map[String, String] = Map(
"store_sales" ->
"""
|`ss_sold_date_sk` INT,
|`ss_sold_time_sk` INT,
|`ss_item_sk` INT,
|`ss_customer_sk` INT,
|`ss_cdemo_sk` INT,
|`ss_hdemo_sk` INT,
|`ss_addr_sk` INT,
|`ss_store_sk` INT,
|`ss_promo_sk` INT,
|`ss_ticket_number` INT,
|`ss_quantity` INT,
|`ss_wholesale_cost` DECIMAL(7,2),
|`ss_list_price` DECIMAL(7,2),
|`ss_sales_price` DECIMAL(7,2),
|`ss_ext_discount_amt` DECIMAL(7,2),
|`ss_ext_sales_price` DECIMAL(7,2),
|`ss_ext_wholesale_cost` DECIMAL(7,2),
|`ss_ext_list_price` DECIMAL(7,2),
|`ss_ext_tax` DECIMAL(7,2),
|`ss_coupon_amt` DECIMAL(7,2),
|`ss_net_paid` DECIMAL(7,2),
|`ss_net_paid_inc_tax` DECIMAL(7,2),
|`ss_net_profit` DECIMAL(7,2)
""".stripMargin,
"store_returns" ->
"""
|`sr_returned_date_sk` INT,
|`sr_return_time_sk` INT,
|`sr_item_sk` INT,
|`sr_customer_sk` INT,
|`sr_cdemo_sk` INT,
|`sr_hdemo_sk` INT,
|`sr_addr_sk` INT,
|`sr_store_sk` INT,
|`sr_reason_sk` INT,
|`sr_ticket_number` INT,
|`sr_return_quantity` INT,
|`sr_return_amt` DECIMAL(7,2),
|`sr_return_tax` DECIMAL(7,2),
|`sr_return_amt_inc_tax` DECIMAL(7,2),
|`sr_fee` DECIMAL(7,2),
|`sr_return_ship_cost` DECIMAL(7,2),
|`sr_refunded_cash` DECIMAL(7,2),
|`sr_reversed_charge` DECIMAL(7,2),
|`sr_store_credit` DECIMAL(7,2),
|`sr_net_loss` DECIMAL(7,2)
""".stripMargin,
"catalog_sales" ->
"""
|`cs_sold_date_sk` INT,
|`cs_sold_time_sk` INT,
|`cs_ship_date_sk` INT,
|`cs_bill_customer_sk` INT,
|`cs_bill_cdemo_sk` INT,
|`cs_bill_hdemo_sk` INT,
|`cs_bill_addr_sk` INT,
|`cs_ship_customer_sk` INT,
|`cs_ship_cdemo_sk` INT,
|`cs_ship_hdemo_sk` INT,
|`cs_ship_addr_sk` INT,
|`cs_call_center_sk` INT,
|`cs_catalog_page_sk` INT,
|`cs_ship_mode_sk` INT,
|`cs_warehouse_sk` INT,
|`cs_item_sk` INT,
|`cs_promo_sk` INT,
|`cs_order_number` INT,
|`cs_quantity` INT,
|`cs_wholesale_cost` DECIMAL(7,2),
|`cs_list_price` DECIMAL(7,2),
|`cs_sales_price` DECIMAL(7,2),
|`cs_ext_discount_amt` DECIMAL(7,2),
|`cs_ext_sales_price` DECIMAL(7,2),
|`cs_ext_wholesale_cost` DECIMAL(7,2),
|`cs_ext_list_price` DECIMAL(7,2),
|`cs_ext_tax` DECIMAL(7,2),
|`cs_coupon_amt` DECIMAL(7,2),
|`cs_ext_ship_cost` DECIMAL(7,2),
|`cs_net_paid` DECIMAL(7,2),
|`cs_net_paid_inc_tax` DECIMAL(7,2),
|`cs_net_paid_inc_ship` DECIMAL(7,2),
|`cs_net_paid_inc_ship_tax` DECIMAL(7,2),
|`cs_net_profit` DECIMAL(7,2)
""".stripMargin,
"catalog_returns" ->
"""
|`cr_returned_date_sk` INT,
|`cr_returned_time_sk` INT,
|`cr_item_sk` INT,
|`cr_refunded_customer_sk` INT,
|`cr_refunded_cdemo_sk` INT,
|`cr_refunded_hdemo_sk` INT,
|`cr_refunded_addr_sk` INT,
|`cr_returning_customer_sk` INT,
|`cr_returning_cdemo_sk` INT,
|`cr_returning_hdemo_sk` INT,
|`cr_returning_addr_sk` INT,
|`cr_call_center_sk` INT,
|`cr_catalog_page_sk` INT,
|`cr_ship_mode_sk` INT,
|`cr_warehouse_sk` INT,
|`cr_reason_sk` INT,`cr_order_number` INT,
|`cr_return_quantity` INT,
|`cr_return_amount` DECIMAL(7,2),
|`cr_return_tax` DECIMAL(7,2),
|`cr_return_amt_inc_tax` DECIMAL(7,2),
|`cr_fee` DECIMAL(7,2),
|`cr_return_ship_cost` DECIMAL(7,2),
|`cr_refunded_cash` DECIMAL(7,2),
|`cr_reversed_charge` DECIMAL(7,2),
|`cr_store_credit` DECIMAL(7,2),
|`cr_net_loss` DECIMAL(7,2)
""".stripMargin,
"web_sales" ->
"""
|`ws_sold_date_sk` INT,
|`ws_sold_time_sk` INT,
|`ws_ship_date_sk` INT,
|`ws_item_sk` INT,
|`ws_bill_customer_sk` INT,
|`ws_bill_cdemo_sk` INT,
|`ws_bill_hdemo_sk` INT,
|`ws_bill_addr_sk` INT,
|`ws_ship_customer_sk` INT,
|`ws_ship_cdemo_sk` INT,
|`ws_ship_hdemo_sk` INT,
|`ws_ship_addr_sk` INT,
|`ws_web_page_sk` INT,
|`ws_web_site_sk` INT,
|`ws_ship_mode_sk` INT,
|`ws_warehouse_sk` INT,
|`ws_promo_sk` INT,
|`ws_order_number` INT,
|`ws_quantity` INT,
|`ws_wholesale_cost` DECIMAL(7,2),
|`ws_list_price` DECIMAL(7,2),
|`ws_sales_price` DECIMAL(7,2),
|`ws_ext_discount_amt` DECIMAL(7,2),
|`ws_ext_sales_price` DECIMAL(7,2),
|`ws_ext_wholesale_cost` DECIMAL(7,2),
|`ws_ext_list_price` DECIMAL(7,2),
|`ws_ext_tax` DECIMAL(7,2),
|`ws_coupon_amt` DECIMAL(7,2),
|`ws_ext_ship_cost` DECIMAL(7,2),
|`ws_net_paid` DECIMAL(7,2),
|`ws_net_paid_inc_tax` DECIMAL(7,2),
|`ws_net_paid_inc_ship` DECIMAL(7,2),
|`ws_net_paid_inc_ship_tax` DECIMAL(7,2),
|`ws_net_profit` DECIMAL(7,2)
""".stripMargin,
"web_returns" ->
"""
|`wr_returned_date_sk` INT,
|`wr_returned_time_sk` INT,
|`wr_item_sk` INT,
|`wr_refunded_customer_sk` INT,
|`wr_refunded_cdemo_sk` INT,
|`wr_refunded_hdemo_sk` INT,
|`wr_refunded_addr_sk` INT,
|`wr_returning_customer_sk` INT,
|`wr_returning_cdemo_sk` INT,
|`wr_returning_hdemo_sk` INT,
|`wr_returning_addr_sk` INT,
|`wr_web_page_sk` INT,
|`wr_reason_sk` INT,
|`wr_order_number` INT,
|`wr_return_quantity` INT,
|`wr_return_amt` DECIMAL(7,2),
|`wr_return_tax` DECIMAL(7,2),
|`wr_return_amt_inc_tax` DECIMAL(7,2),
|`wr_fee` DECIMAL(7,2),
|`wr_return_ship_cost` DECIMAL(7,2),
|`wr_refunded_cash` DECIMAL(7,2),
|`wr_reversed_charge` DECIMAL(7,2),
|`wr_account_credit` DECIMAL(7,2),
|`wr_net_loss` DECIMAL(7,2)
""".stripMargin,
"inventory" ->
"""
|`inv_date_sk` INT,
|`inv_item_sk` INT,
|`inv_warehouse_sk` INT,
|`inv_quantity_on_hand` INT
""".stripMargin,
"store" ->
"""
|`s_store_sk` INT,
|`s_store_id` CHAR(16),
|`s_rec_start_date` DATE,
|`s_rec_end_date` DATE,
|`s_closed_date_sk` INT,
|`s_store_name` VARCHAR(50),
|`s_number_employees` INT,
|`s_floor_space` INT,
|`s_hours` CHAR(20),
|`s_manager` VARCHAR(40),
|`s_market_id` INT,
|`s_geography_class` VARCHAR(100),
|`s_market_desc` VARCHAR(100),
|`s_market_manager` VARCHAR(40),
|`s_division_id` INT,
|`s_division_name` VARCHAR(50),
|`s_company_id` INT,
|`s_company_name` VARCHAR(50),
|`s_street_number` VARCHAR(10),
|`s_street_name` VARCHAR(60),
|`s_street_type` CHAR(15),
|`s_suite_number` CHAR(10),
|`s_city` VARCHAR(60),
|`s_county` VARCHAR(30),
|`s_state` CHAR(2),
|`s_zip` CHAR(10),
|`s_country` VARCHAR(20),
|`s_gmt_offset` DECIMAL(5,2),
|`s_tax_percentage` DECIMAL(5,2)
""".stripMargin,
"call_center" ->
"""
|`cc_call_center_sk` INT,
|`cc_call_center_id` CHAR(16),
|`cc_rec_start_date` DATE,
|`cc_rec_end_date` DATE,
|`cc_closed_date_sk` INT,
|`cc_open_date_sk` INT,
|`cc_name` VARCHAR(50),
|`cc_class` VARCHAR(50),
|`cc_employees` INT,
|`cc_sq_ft` INT,
|`cc_hours` CHAR(20),
|`cc_manager` VARCHAR(40),
|`cc_mkt_id` INT,
|`cc_mkt_class` CHAR(50),
|`cc_mkt_desc` VARCHAR(100),
|`cc_market_manager` VARCHAR(40),
|`cc_division` INT,
|`cc_division_name` VARCHAR(50),
|`cc_company` INT,
|`cc_company_name` CHAR(50),
|`cc_street_number` CHAR(10),
|`cc_street_name` VARCHAR(60),
|`cc_street_type` CHAR(15),
|`cc_suite_number` CHAR(10),
|`cc_city` VARCHAR(60),
|`cc_county` VARCHAR(30),
|`cc_state` CHAR(2),
|`cc_zip` CHAR(10),
|`cc_country` VARCHAR(20),
|`cc_gmt_offset` DECIMAL(5,2),
|`cc_tax_percentage` DECIMAL(5,2)
""".stripMargin,
"catalog_page" ->
"""
|`cp_catalog_page_sk` INT,
|`cp_catalog_page_id` CHAR(16),
|`cp_start_date_sk` INT,
|`cp_end_date_sk` INT,
|`cp_department` VARCHAR(50),
|`cp_catalog_number` INT,
|`cp_catalog_page_number` INT,
|`cp_description` VARCHAR(100),
|`cp_type` VARCHAR(100)
""".stripMargin,
"web_site" ->
"""
|`web_site_sk` INT,
|`web_site_id` CHAR(16),
|`web_rec_start_date` DATE,
|`web_rec_end_date` DATE,
|`web_name` VARCHAR(50),
|`web_open_date_sk` INT,
|`web_close_date_sk` INT,
|`web_class` VARCHAR(50),
|`web_manager` VARCHAR(40),
|`web_mkt_id` INT,
|`web_mkt_class` VARCHAR(50),
|`web_mkt_desc` VARCHAR(100),
|`web_market_manager` VARCHAR(40),
|`web_company_id` INT,
|`web_company_name` CHAR(50),
|`web_street_number` CHAR(10),
|`web_street_name` VARCHAR(60),
|`web_street_type` CHAR(15),
|`web_suite_number` CHAR(10),
|`web_city` VARCHAR(60),
|`web_county` VARCHAR(30),
|`web_state` CHAR(2),
|`web_zip` CHAR(10),
|`web_country` VARCHAR(20),
|`web_gmt_offset` DECIMAL(5,2),
|`web_tax_percentage` DECIMAL(5,2)
""".stripMargin,
"web_page" ->
"""
|`wp_web_page_sk` INT,
|`wp_web_page_id` CHAR(16),
|`wp_rec_start_date` DATE,
|`wp_rec_end_date` DATE,
|`wp_creation_date_sk` INT,
|`wp_access_date_sk` INT,
|`wp_autogen_flag` CHAR(1),
|`wp_customer_sk` INT,
|`wp_url` VARCHAR(100),
|`wp_type` CHAR(50),
|`wp_char_count` INT,
|`wp_link_count` INT,
|`wp_image_count` INT,
|`wp_max_ad_count` INT
""".stripMargin,
"warehouse" ->
"""
|`w_warehouse_sk` INT,
|`w_warehouse_id` CHAR(16),
|`w_warehouse_name` VARCHAR(20),
|`w_warehouse_sq_ft` INT,
|`w_street_number` CHAR(10),
|`w_street_name` VARCHAR(20),
|`w_street_type` CHAR(15),
|`w_suite_number` CHAR(10),
|`w_city` VARCHAR(60),
|`w_county` VARCHAR(30),
|`w_state` CHAR(2),
|`w_zip` CHAR(10),
|`w_country` VARCHAR(20),
|`w_gmt_offset` DECIMAL(5,2)
""".stripMargin,
"customer" ->
"""
|`c_customer_sk` INT,
|`c_customer_id` CHAR(16),
|`c_current_cdemo_sk` INT,
|`c_current_hdemo_sk` INT,
|`c_current_addr_sk` INT,
|`c_first_shipto_date_sk` INT,
|`c_first_sales_date_sk` INT,
|`c_salutation` CHAR(10),
|`c_first_name` CHAR(20),
|`c_last_name` CHAR(30),
|`c_preferred_cust_flag` CHAR(1),
|`c_birth_day` INT,
|`c_birth_month` INT,
|`c_birth_year` INT,
|`c_birth_country` VARCHAR(20),
|`c_login` CHAR(13),
|`c_email_address` CHAR(50),
|`c_last_review_date` INT
""".stripMargin,
"customer_address" ->
"""
|`ca_address_sk` INT,
|`ca_address_id` CHAR(16),
|`ca_street_number` CHAR(10),
|`ca_street_name` VARCHAR(60),
|`ca_street_type` CHAR(15),
|`ca_suite_number` CHAR(10),
|`ca_city` VARCHAR(60),
|`ca_county` VARCHAR(30),
|`ca_state` CHAR(2),
|`ca_zip` CHAR(10),
|`ca_country` VARCHAR(20),
|`ca_gmt_offset` DECIMAL(5,2),
|`ca_location_type` CHAR(20)
""".stripMargin,
"customer_demographics" ->
"""
|`cd_demo_sk` INT,
|`cd_gender` CHAR(1),
|`cd_marital_status` CHAR(1),
|`cd_education_status` CHAR(20),
|`cd_purchase_estimate` INT,
|`cd_credit_rating` CHAR(10),
|`cd_dep_count` INT,
|`cd_dep_employed_count` INT,
|`cd_dep_college_count` INT
""".stripMargin,
"date_dim" ->
"""
|`d_date_sk` INT,
|`d_date_id` CHAR(16),
|`d_date` DATE,
|`d_month_seq` INT,
|`d_week_seq` INT,
|`d_quarter_seq` INT,
|`d_year` INT,
|`d_dow` INT,
|`d_moy` INT,
|`d_dom` INT,
|`d_qoy` INT,
|`d_fy_year` INT,
|`d_fy_quarter_seq` INT,
|`d_fy_week_seq` INT,
|`d_day_name` CHAR(9),
|`d_quarter_name` CHAR(6),
|`d_holiday` CHAR(1),
|`d_weekend` CHAR(1),
|`d_following_holiday` CHAR(1),
|`d_first_dom` INT,
|`d_last_dom` INT,
|`d_same_day_ly` INT,
|`d_same_day_lq` INT,
|`d_current_day` CHAR(1),
|`d_current_week` CHAR(1),
|`d_current_month` CHAR(1),
|`d_current_quarter` CHAR(1),
|`d_current_year` CHAR(1)
""".stripMargin,
"household_demographics" ->
"""
|`hd_demo_sk` INT,
|`hd_income_band_sk` INT,
|`hd_buy_potential` CHAR(15),
|`hd_dep_count` INT,
|`hd_vehicle_count` INT
""".stripMargin,
"item" ->
"""
|`i_item_sk` INT,
|`i_item_id` CHAR(16),
|`i_rec_start_date` DATE,
|`i_rec_end_date` DATE,
|`i_item_desc` VARCHAR(200),
|`i_current_price` DECIMAL(7,2),
|`i_wholesale_cost` DECIMAL(7,2),
|`i_brand_id` INT,
|`i_brand` CHAR(50),
|`i_class_id` INT,
|`i_class` CHAR(50),
|`i_category_id` INT,
|`i_category` CHAR(50),
|`i_manufact_id` INT,
|`i_manufact` CHAR(50),
|`i_size` CHAR(20),
|`i_formulation` CHAR(20),
|`i_color` CHAR(20),
|`i_units` CHAR(10),
|`i_container` CHAR(10),
|`i_manager_id` INT,
|`i_product_name` CHAR(50)
""".stripMargin,
"income_band" ->
"""
|`ib_income_band_sk` INT,
|`ib_lower_bound` INT,
|`ib_upper_bound` INT
""".stripMargin,
"promotion" ->
"""
|`p_promo_sk` INT,
|`p_promo_id` CHAR(16),
|`p_start_date_sk` INT,
|`p_end_date_sk` INT,
|`p_item_sk` INT,
|`p_cost` DECIMAL(15,2),
|`p_response_target` INT,
|`p_promo_name` CHAR(50),
|`p_channel_dmail` CHAR(1),
|`p_channel_email` CHAR(1),
|`p_channel_catalog` CHAR(1),
|`p_channel_tv` CHAR(1),
|`p_channel_radio` CHAR(1),
|`p_channel_press` CHAR(1),
|`p_channel_event` CHAR(1),
|`p_channel_demo` CHAR(1),
|`p_channel_details` VARCHAR(100),
|`p_purpose` CHAR(15),
|`p_discount_active` CHAR(1)
""".stripMargin,
"reason" ->
"""
|`r_reason_sk` INT,
|`r_reason_id` CHAR(16),
|`r_reason_desc` CHAR(100)
""".stripMargin,
"ship_mode" ->
"""
|`sm_ship_mode_sk` INT,
|`sm_ship_mode_id` CHAR(16),
|`sm_type` CHAR(30),
|`sm_code` CHAR(10),
|`sm_carrier` CHAR(20),
|`sm_contract` CHAR(20)
""".stripMargin,
"time_dim" ->
"""
|`t_time_sk` INT,
|`t_time_id` CHAR(16),
|`t_time` INT,
|`t_hour` INT,
|`t_minute` INT,
|`t_second` INT,
|`t_am_pm` CHAR(2),
|`t_shift` CHAR(20),
|`t_sub_shift` CHAR(20),
|`t_meal_time` CHAR(20)
""".stripMargin
)
// The partition column is consistent with the databricks/spark-sql-perf project.
private val tablePartitionColumns = Map(
"catalog_sales" -> Seq("`cs_sold_date_sk`"),
"catalog_returns" -> Seq("`cr_returned_date_sk`"),
"inventory" -> Seq("`inv_date_sk`"),
"store_sales" -> Seq("`ss_sold_date_sk`"),
"store_returns" -> Seq("`sr_returned_date_sk`"),
"web_sales" -> Seq("`ws_sold_date_sk`"),
"web_returns" -> Seq("`wr_returned_date_sk`")
)
protected def partitionedByClause(tableName: String): String = {
tablePartitionColumns.get(tableName) match {
case Some(cols) if cols.nonEmpty => s"PARTITIONED BY (${cols.mkString(", ")})"

View file

@ -0,0 +1,555 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
/**
* Base trait for TPC-DS related tests.
*
* Datatype mapping for TPC-DS and Spark SQL, fully matching schemas defined in `tpcds.sql` of the
* official tpcds toolkit
* see more at:
* http://www.tpc.org/tpc_documents_current_versions/pdf/tpc-ds_v2.9.0.pdf
*
* |---------------|---------------|
* | TPC-DS | Spark SQL |
* |---------------|---------------|
* | Identifier | INT |
* |---------------|---------------|
* | Integer | INT |
* |---------------|---------------|
* | Decimal(d, f) | Decimal(d, f) |
* |---------------|---------------|
* | Char(N) | Char(N) |
* |---------------|---------------|
* | Varchar(N) | Varchar(N) |
* |---------------|---------------|
* | Date | Date |
* |---------------|---------------|
*/
trait TPCDSSchema {
protected val tableColumns: Map[String, String] = Map(
"store_sales" ->
"""
|`ss_sold_date_sk` INT,
|`ss_sold_time_sk` INT,
|`ss_item_sk` INT,
|`ss_customer_sk` INT,
|`ss_cdemo_sk` INT,
|`ss_hdemo_sk` INT,
|`ss_addr_sk` INT,
|`ss_store_sk` INT,
|`ss_promo_sk` INT,
|`ss_ticket_number` INT,
|`ss_quantity` INT,
|`ss_wholesale_cost` DECIMAL(7,2),
|`ss_list_price` DECIMAL(7,2),
|`ss_sales_price` DECIMAL(7,2),
|`ss_ext_discount_amt` DECIMAL(7,2),
|`ss_ext_sales_price` DECIMAL(7,2),
|`ss_ext_wholesale_cost` DECIMAL(7,2),
|`ss_ext_list_price` DECIMAL(7,2),
|`ss_ext_tax` DECIMAL(7,2),
|`ss_coupon_amt` DECIMAL(7,2),
|`ss_net_paid` DECIMAL(7,2),
|`ss_net_paid_inc_tax` DECIMAL(7,2),
|`ss_net_profit` DECIMAL(7,2)
""".stripMargin,
"store_returns" ->
"""
|`sr_returned_date_sk` INT,
|`sr_return_time_sk` INT,
|`sr_item_sk` INT,
|`sr_customer_sk` INT,
|`sr_cdemo_sk` INT,
|`sr_hdemo_sk` INT,
|`sr_addr_sk` INT,
|`sr_store_sk` INT,
|`sr_reason_sk` INT,
|`sr_ticket_number` INT,
|`sr_return_quantity` INT,
|`sr_return_amt` DECIMAL(7,2),
|`sr_return_tax` DECIMAL(7,2),
|`sr_return_amt_inc_tax` DECIMAL(7,2),
|`sr_fee` DECIMAL(7,2),
|`sr_return_ship_cost` DECIMAL(7,2),
|`sr_refunded_cash` DECIMAL(7,2),
|`sr_reversed_charge` DECIMAL(7,2),
|`sr_store_credit` DECIMAL(7,2),
|`sr_net_loss` DECIMAL(7,2)
""".stripMargin,
"catalog_sales" ->
"""
|`cs_sold_date_sk` INT,
|`cs_sold_time_sk` INT,
|`cs_ship_date_sk` INT,
|`cs_bill_customer_sk` INT,
|`cs_bill_cdemo_sk` INT,
|`cs_bill_hdemo_sk` INT,
|`cs_bill_addr_sk` INT,
|`cs_ship_customer_sk` INT,
|`cs_ship_cdemo_sk` INT,
|`cs_ship_hdemo_sk` INT,
|`cs_ship_addr_sk` INT,
|`cs_call_center_sk` INT,
|`cs_catalog_page_sk` INT,
|`cs_ship_mode_sk` INT,
|`cs_warehouse_sk` INT,
|`cs_item_sk` INT,
|`cs_promo_sk` INT,
|`cs_order_number` INT,
|`cs_quantity` INT,
|`cs_wholesale_cost` DECIMAL(7,2),
|`cs_list_price` DECIMAL(7,2),
|`cs_sales_price` DECIMAL(7,2),
|`cs_ext_discount_amt` DECIMAL(7,2),
|`cs_ext_sales_price` DECIMAL(7,2),
|`cs_ext_wholesale_cost` DECIMAL(7,2),
|`cs_ext_list_price` DECIMAL(7,2),
|`cs_ext_tax` DECIMAL(7,2),
|`cs_coupon_amt` DECIMAL(7,2),
|`cs_ext_ship_cost` DECIMAL(7,2),
|`cs_net_paid` DECIMAL(7,2),
|`cs_net_paid_inc_tax` DECIMAL(7,2),
|`cs_net_paid_inc_ship` DECIMAL(7,2),
|`cs_net_paid_inc_ship_tax` DECIMAL(7,2),
|`cs_net_profit` DECIMAL(7,2)
""".stripMargin,
"catalog_returns" ->
"""
|`cr_returned_date_sk` INT,
|`cr_returned_time_sk` INT,
|`cr_item_sk` INT,
|`cr_refunded_customer_sk` INT,
|`cr_refunded_cdemo_sk` INT,
|`cr_refunded_hdemo_sk` INT,
|`cr_refunded_addr_sk` INT,
|`cr_returning_customer_sk` INT,
|`cr_returning_cdemo_sk` INT,
|`cr_returning_hdemo_sk` INT,
|`cr_returning_addr_sk` INT,
|`cr_call_center_sk` INT,
|`cr_catalog_page_sk` INT,
|`cr_ship_mode_sk` INT,
|`cr_warehouse_sk` INT,
|`cr_reason_sk` INT,`cr_order_number` INT,
|`cr_return_quantity` INT,
|`cr_return_amount` DECIMAL(7,2),
|`cr_return_tax` DECIMAL(7,2),
|`cr_return_amt_inc_tax` DECIMAL(7,2),
|`cr_fee` DECIMAL(7,2),
|`cr_return_ship_cost` DECIMAL(7,2),
|`cr_refunded_cash` DECIMAL(7,2),
|`cr_reversed_charge` DECIMAL(7,2),
|`cr_store_credit` DECIMAL(7,2),
|`cr_net_loss` DECIMAL(7,2)
""".stripMargin,
"web_sales" ->
"""
|`ws_sold_date_sk` INT,
|`ws_sold_time_sk` INT,
|`ws_ship_date_sk` INT,
|`ws_item_sk` INT,
|`ws_bill_customer_sk` INT,
|`ws_bill_cdemo_sk` INT,
|`ws_bill_hdemo_sk` INT,
|`ws_bill_addr_sk` INT,
|`ws_ship_customer_sk` INT,
|`ws_ship_cdemo_sk` INT,
|`ws_ship_hdemo_sk` INT,
|`ws_ship_addr_sk` INT,
|`ws_web_page_sk` INT,
|`ws_web_site_sk` INT,
|`ws_ship_mode_sk` INT,
|`ws_warehouse_sk` INT,
|`ws_promo_sk` INT,
|`ws_order_number` INT,
|`ws_quantity` INT,
|`ws_wholesale_cost` DECIMAL(7,2),
|`ws_list_price` DECIMAL(7,2),
|`ws_sales_price` DECIMAL(7,2),
|`ws_ext_discount_amt` DECIMAL(7,2),
|`ws_ext_sales_price` DECIMAL(7,2),
|`ws_ext_wholesale_cost` DECIMAL(7,2),
|`ws_ext_list_price` DECIMAL(7,2),
|`ws_ext_tax` DECIMAL(7,2),
|`ws_coupon_amt` DECIMAL(7,2),
|`ws_ext_ship_cost` DECIMAL(7,2),
|`ws_net_paid` DECIMAL(7,2),
|`ws_net_paid_inc_tax` DECIMAL(7,2),
|`ws_net_paid_inc_ship` DECIMAL(7,2),
|`ws_net_paid_inc_ship_tax` DECIMAL(7,2),
|`ws_net_profit` DECIMAL(7,2)
""".stripMargin,
"web_returns" ->
"""
|`wr_returned_date_sk` INT,
|`wr_returned_time_sk` INT,
|`wr_item_sk` INT,
|`wr_refunded_customer_sk` INT,
|`wr_refunded_cdemo_sk` INT,
|`wr_refunded_hdemo_sk` INT,
|`wr_refunded_addr_sk` INT,
|`wr_returning_customer_sk` INT,
|`wr_returning_cdemo_sk` INT,
|`wr_returning_hdemo_sk` INT,
|`wr_returning_addr_sk` INT,
|`wr_web_page_sk` INT,
|`wr_reason_sk` INT,
|`wr_order_number` INT,
|`wr_return_quantity` INT,
|`wr_return_amt` DECIMAL(7,2),
|`wr_return_tax` DECIMAL(7,2),
|`wr_return_amt_inc_tax` DECIMAL(7,2),
|`wr_fee` DECIMAL(7,2),
|`wr_return_ship_cost` DECIMAL(7,2),
|`wr_refunded_cash` DECIMAL(7,2),
|`wr_reversed_charge` DECIMAL(7,2),
|`wr_account_credit` DECIMAL(7,2),
|`wr_net_loss` DECIMAL(7,2)
""".stripMargin,
"inventory" ->
"""
|`inv_date_sk` INT,
|`inv_item_sk` INT,
|`inv_warehouse_sk` INT,
|`inv_quantity_on_hand` INT
""".stripMargin,
"store" ->
"""
|`s_store_sk` INT,
|`s_store_id` CHAR(16),
|`s_rec_start_date` DATE,
|`s_rec_end_date` DATE,
|`s_closed_date_sk` INT,
|`s_store_name` VARCHAR(50),
|`s_number_employees` INT,
|`s_floor_space` INT,
|`s_hours` CHAR(20),
|`s_manager` VARCHAR(40),
|`s_market_id` INT,
|`s_geography_class` VARCHAR(100),
|`s_market_desc` VARCHAR(100),
|`s_market_manager` VARCHAR(40),
|`s_division_id` INT,
|`s_division_name` VARCHAR(50),
|`s_company_id` INT,
|`s_company_name` VARCHAR(50),
|`s_street_number` VARCHAR(10),
|`s_street_name` VARCHAR(60),
|`s_street_type` CHAR(15),
|`s_suite_number` CHAR(10),
|`s_city` VARCHAR(60),
|`s_county` VARCHAR(30),
|`s_state` CHAR(2),
|`s_zip` CHAR(10),
|`s_country` VARCHAR(20),
|`s_gmt_offset` DECIMAL(5,2),
|`s_tax_percentage` DECIMAL(5,2)
""".stripMargin,
"call_center" ->
"""
|`cc_call_center_sk` INT,
|`cc_call_center_id` CHAR(16),
|`cc_rec_start_date` DATE,
|`cc_rec_end_date` DATE,
|`cc_closed_date_sk` INT,
|`cc_open_date_sk` INT,
|`cc_name` VARCHAR(50),
|`cc_class` VARCHAR(50),
|`cc_employees` INT,
|`cc_sq_ft` INT,
|`cc_hours` CHAR(20),
|`cc_manager` VARCHAR(40),
|`cc_mkt_id` INT,
|`cc_mkt_class` CHAR(50),
|`cc_mkt_desc` VARCHAR(100),
|`cc_market_manager` VARCHAR(40),
|`cc_division` INT,
|`cc_division_name` VARCHAR(50),
|`cc_company` INT,
|`cc_company_name` CHAR(50),
|`cc_street_number` CHAR(10),
|`cc_street_name` VARCHAR(60),
|`cc_street_type` CHAR(15),
|`cc_suite_number` CHAR(10),
|`cc_city` VARCHAR(60),
|`cc_county` VARCHAR(30),
|`cc_state` CHAR(2),
|`cc_zip` CHAR(10),
|`cc_country` VARCHAR(20),
|`cc_gmt_offset` DECIMAL(5,2),
|`cc_tax_percentage` DECIMAL(5,2)
""".stripMargin,
"catalog_page" ->
"""
|`cp_catalog_page_sk` INT,
|`cp_catalog_page_id` CHAR(16),
|`cp_start_date_sk` INT,
|`cp_end_date_sk` INT,
|`cp_department` VARCHAR(50),
|`cp_catalog_number` INT,
|`cp_catalog_page_number` INT,
|`cp_description` VARCHAR(100),
|`cp_type` VARCHAR(100)
""".stripMargin,
"web_site" ->
"""
|`web_site_sk` INT,
|`web_site_id` CHAR(16),
|`web_rec_start_date` DATE,
|`web_rec_end_date` DATE,
|`web_name` VARCHAR(50),
|`web_open_date_sk` INT,
|`web_close_date_sk` INT,
|`web_class` VARCHAR(50),
|`web_manager` VARCHAR(40),
|`web_mkt_id` INT,
|`web_mkt_class` VARCHAR(50),
|`web_mkt_desc` VARCHAR(100),
|`web_market_manager` VARCHAR(40),
|`web_company_id` INT,
|`web_company_name` CHAR(50),
|`web_street_number` CHAR(10),
|`web_street_name` VARCHAR(60),
|`web_street_type` CHAR(15),
|`web_suite_number` CHAR(10),
|`web_city` VARCHAR(60),
|`web_county` VARCHAR(30),
|`web_state` CHAR(2),
|`web_zip` CHAR(10),
|`web_country` VARCHAR(20),
|`web_gmt_offset` DECIMAL(5,2),
|`web_tax_percentage` DECIMAL(5,2)
""".stripMargin,
"web_page" ->
"""
|`wp_web_page_sk` INT,
|`wp_web_page_id` CHAR(16),
|`wp_rec_start_date` DATE,
|`wp_rec_end_date` DATE,
|`wp_creation_date_sk` INT,
|`wp_access_date_sk` INT,
|`wp_autogen_flag` CHAR(1),
|`wp_customer_sk` INT,
|`wp_url` VARCHAR(100),
|`wp_type` CHAR(50),
|`wp_char_count` INT,
|`wp_link_count` INT,
|`wp_image_count` INT,
|`wp_max_ad_count` INT
""".stripMargin,
"warehouse" ->
"""
|`w_warehouse_sk` INT,
|`w_warehouse_id` CHAR(16),
|`w_warehouse_name` VARCHAR(20),
|`w_warehouse_sq_ft` INT,
|`w_street_number` CHAR(10),
|`w_street_name` VARCHAR(20),
|`w_street_type` CHAR(15),
|`w_suite_number` CHAR(10),
|`w_city` VARCHAR(60),
|`w_county` VARCHAR(30),
|`w_state` CHAR(2),
|`w_zip` CHAR(10),
|`w_country` VARCHAR(20),
|`w_gmt_offset` DECIMAL(5,2)
""".stripMargin,
"customer" ->
"""
|`c_customer_sk` INT,
|`c_customer_id` CHAR(16),
|`c_current_cdemo_sk` INT,
|`c_current_hdemo_sk` INT,
|`c_current_addr_sk` INT,
|`c_first_shipto_date_sk` INT,
|`c_first_sales_date_sk` INT,
|`c_salutation` CHAR(10),
|`c_first_name` CHAR(20),
|`c_last_name` CHAR(30),
|`c_preferred_cust_flag` CHAR(1),
|`c_birth_day` INT,
|`c_birth_month` INT,
|`c_birth_year` INT,
|`c_birth_country` VARCHAR(20),
|`c_login` CHAR(13),
|`c_email_address` CHAR(50),
|`c_last_review_date` INT
""".stripMargin,
"customer_address" ->
"""
|`ca_address_sk` INT,
|`ca_address_id` CHAR(16),
|`ca_street_number` CHAR(10),
|`ca_street_name` VARCHAR(60),
|`ca_street_type` CHAR(15),
|`ca_suite_number` CHAR(10),
|`ca_city` VARCHAR(60),
|`ca_county` VARCHAR(30),
|`ca_state` CHAR(2),
|`ca_zip` CHAR(10),
|`ca_country` VARCHAR(20),
|`ca_gmt_offset` DECIMAL(5,2),
|`ca_location_type` CHAR(20)
""".stripMargin,
"customer_demographics" ->
"""
|`cd_demo_sk` INT,
|`cd_gender` CHAR(1),
|`cd_marital_status` CHAR(1),
|`cd_education_status` CHAR(20),
|`cd_purchase_estimate` INT,
|`cd_credit_rating` CHAR(10),
|`cd_dep_count` INT,
|`cd_dep_employed_count` INT,
|`cd_dep_college_count` INT
""".stripMargin,
"date_dim" ->
"""
|`d_date_sk` INT,
|`d_date_id` CHAR(16),
|`d_date` DATE,
|`d_month_seq` INT,
|`d_week_seq` INT,
|`d_quarter_seq` INT,
|`d_year` INT,
|`d_dow` INT,
|`d_moy` INT,
|`d_dom` INT,
|`d_qoy` INT,
|`d_fy_year` INT,
|`d_fy_quarter_seq` INT,
|`d_fy_week_seq` INT,
|`d_day_name` CHAR(9),
|`d_quarter_name` CHAR(6),
|`d_holiday` CHAR(1),
|`d_weekend` CHAR(1),
|`d_following_holiday` CHAR(1),
|`d_first_dom` INT,
|`d_last_dom` INT,
|`d_same_day_ly` INT,
|`d_same_day_lq` INT,
|`d_current_day` CHAR(1),
|`d_current_week` CHAR(1),
|`d_current_month` CHAR(1),
|`d_current_quarter` CHAR(1),
|`d_current_year` CHAR(1)
""".stripMargin,
"household_demographics" ->
"""
|`hd_demo_sk` INT,
|`hd_income_band_sk` INT,
|`hd_buy_potential` CHAR(15),
|`hd_dep_count` INT,
|`hd_vehicle_count` INT
""".stripMargin,
"item" ->
"""
|`i_item_sk` INT,
|`i_item_id` CHAR(16),
|`i_rec_start_date` DATE,
|`i_rec_end_date` DATE,
|`i_item_desc` VARCHAR(200),
|`i_current_price` DECIMAL(7,2),
|`i_wholesale_cost` DECIMAL(7,2),
|`i_brand_id` INT,
|`i_brand` CHAR(50),
|`i_class_id` INT,
|`i_class` CHAR(50),
|`i_category_id` INT,
|`i_category` CHAR(50),
|`i_manufact_id` INT,
|`i_manufact` CHAR(50),
|`i_size` CHAR(20),
|`i_formulation` CHAR(20),
|`i_color` CHAR(20),
|`i_units` CHAR(10),
|`i_container` CHAR(10),
|`i_manager_id` INT,
|`i_product_name` CHAR(50)
""".stripMargin,
"income_band" ->
"""
|`ib_income_band_sk` INT,
|`ib_lower_bound` INT,
|`ib_upper_bound` INT
""".stripMargin,
"promotion" ->
"""
|`p_promo_sk` INT,
|`p_promo_id` CHAR(16),
|`p_start_date_sk` INT,
|`p_end_date_sk` INT,
|`p_item_sk` INT,
|`p_cost` DECIMAL(15,2),
|`p_response_target` INT,
|`p_promo_name` CHAR(50),
|`p_channel_dmail` CHAR(1),
|`p_channel_email` CHAR(1),
|`p_channel_catalog` CHAR(1),
|`p_channel_tv` CHAR(1),
|`p_channel_radio` CHAR(1),
|`p_channel_press` CHAR(1),
|`p_channel_event` CHAR(1),
|`p_channel_demo` CHAR(1),
|`p_channel_details` VARCHAR(100),
|`p_purpose` CHAR(15),
|`p_discount_active` CHAR(1)
""".stripMargin,
"reason" ->
"""
|`r_reason_sk` INT,
|`r_reason_id` CHAR(16),
|`r_reason_desc` CHAR(100)
""".stripMargin,
"ship_mode" ->
"""
|`sm_ship_mode_sk` INT,
|`sm_ship_mode_id` CHAR(16),
|`sm_type` CHAR(30),
|`sm_code` CHAR(10),
|`sm_carrier` CHAR(20),
|`sm_contract` CHAR(20)
""".stripMargin,
"time_dim" ->
"""
|`t_time_sk` INT,
|`t_time_id` CHAR(16),
|`t_time` INT,
|`t_hour` INT,
|`t_minute` INT,
|`t_second` INT,
|`t_am_pm` CHAR(2),
|`t_shift` CHAR(20),
|`t_sub_shift` CHAR(20),
|`t_meal_time` CHAR(20)
""".stripMargin
)
// The partition column is consistent with the databricks/spark-sql-perf project.
protected val tablePartitionColumns = Map(
"catalog_sales" -> Seq("`cs_sold_date_sk`"),
"catalog_returns" -> Seq("`cr_returned_date_sk`"),
"inventory" -> Seq("`inv_date_sk`"),
"store_sales" -> Seq("`ss_sold_date_sk`"),
"store_returns" -> Seq("`sr_returned_date_sk`"),
"web_sales" -> Seq("`ws_sold_date_sk`"),
"web_returns" -> Seq("`wr_returned_date_sk`")
)
}