[SPARK-17008][SPARK-17009][SQL] Normalization and isolation in SQLQueryTestSuite.

## What changes were proposed in this pull request?
This patch enhances SQLQueryTestSuite in two ways:

1. SPARK-17009: Use a new SparkSession for each test case to provide stronger isolation (e.g. config changes in one test case does not impact another). That said, we do not currently isolate catalog changes.
2. SPARK-17008: Normalize query output using sorting, inspired by HiveComparisonTest.

I also ported a few new test cases over from SQLQuerySuite.

## How was this patch tested?
This is a test harness update.

Author: petermaxlee <petermaxlee@gmail.com>

Closes #14590 from petermaxlee/SPARK-17008.
This commit is contained in:
petermaxlee 2016-08-10 21:05:32 -07:00 committed by Reynold Xin
parent ab648c0004
commit 425c7c2dbd
8 changed files with 180 additions and 65 deletions

View file

@ -0,0 +1,4 @@
-- date time functions
-- [SPARK-16836] current_date and current_timestamp literals
select current_date = current_date(), current_timestamp = current_timestamp();

View file

@ -0,0 +1,15 @@
create temporary view hav as select * from values
("one", 1),
("two", 2),
("three", 3),
("one", 5)
as hav(k, v);
-- having clause
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2;
-- having condition contains grouping column
SELECT count(k) FROM hav GROUP BY v + 1 HAVING v + 1 = 2;
-- SPARK-11032: resolve having correctly
SELECT MIN(t.v) FROM (SELECT * FROM hav WHERE v > 0) t HAVING(COUNT(1) > 0);

View file

@ -0,0 +1,20 @@
create temporary view nt1 as select * from values
("one", 1),
("two", 2),
("three", 3)
as nt1(k, v1);
create temporary view nt2 as select * from values
("one", 1),
("two", 22),
("one", 5)
as nt2(k, v2);
SELECT * FROM nt1 natural join nt2 where k = "one";
SELECT * FROM nt1 natural left join nt2 order by v1, v2;
SELECT * FROM nt1 natural right join nt2 order by v1, v2;
SELECT count(*) FROM nt1 natural full outer join nt2;

View file

@ -0,0 +1,10 @@
-- Automatically generated by org.apache.spark.sql.SQLQueryTestSuite
-- Number of queries: 1
-- !query 0
select current_date = current_date(), current_timestamp = current_timestamp()
-- !query 0 schema
struct<(current_date() = current_date()):boolean,(current_timestamp() = current_timestamp()):boolean>
-- !query 0 output
true true

View file

@ -0,0 +1,40 @@
-- Automatically generated by org.apache.spark.sql.SQLQueryTestSuite
-- Number of queries: 4
-- !query 0
create temporary view hav as select * from values
("one", 1),
("two", 2),
("three", 3),
("one", 5)
as hav(k, v)
-- !query 0 schema
struct<>
-- !query 0 output
-- !query 1
SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2
-- !query 1 schema
struct<k:string,sum(v):bigint>
-- !query 1 output
one 6
three 3
-- !query 2
SELECT count(k) FROM hav GROUP BY v + 1 HAVING v + 1 = 2
-- !query 2 schema
struct<count(k):bigint>
-- !query 2 output
1
-- !query 3
SELECT MIN(t.v) FROM (SELECT * FROM hav WHERE v > 0) t HAVING(COUNT(1) > 0)
-- !query 3 schema
struct<min(v):int>
-- !query 3 output
1

View file

@ -0,0 +1,64 @@
-- Automatically generated by org.apache.spark.sql.SQLQueryTestSuite
-- Number of queries: 6
-- !query 0
create temporary view nt1 as select * from values
("one", 1),
("two", 2),
("three", 3)
as nt1(k, v1)
-- !query 0 schema
struct<>
-- !query 0 output
-- !query 1
create temporary view nt2 as select * from values
("one", 1),
("two", 22),
("one", 5)
as nt2(k, v2)
-- !query 1 schema
struct<>
-- !query 1 output
-- !query 2
SELECT * FROM nt1 natural join nt2 where k = "one"
-- !query 2 schema
struct<k:string,v1:int,v2:int>
-- !query 2 output
one 1 1
one 1 5
-- !query 3
SELECT * FROM nt1 natural left join nt2 order by v1, v2
-- !query 3 schema
struct<k:string,v1:int,v2:int>
-- !query 3 output
one 1 1
one 1 5
two 2 22
three 3 NULL
-- !query 4
SELECT * FROM nt1 natural right join nt2 order by v1, v2
-- !query 4 schema
struct<k:string,v1:int,v2:int>
-- !query 4 output
one 1 1
one 1 5
two 2 22
-- !query 5
SELECT count(*) FROM nt1 natural full outer join nt2
-- !query 5 schema
struct<count(1):bigint>
-- !query 5 output
4

View file

@ -38,26 +38,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
setupTestData()
test("having clause") {
withTempView("hav") {
Seq(("one", 1), ("two", 2), ("three", 3), ("one", 5)).toDF("k", "v")
.createOrReplaceTempView("hav")
checkAnswer(
sql("SELECT k, sum(v) FROM hav GROUP BY k HAVING sum(v) > 2"),
Row("one", 6) :: Row("three", 3) :: Nil)
}
}
test("having condition contains grouping column") {
withTempView("hav") {
Seq(("one", 1), ("two", 2), ("three", 3), ("one", 5)).toDF("k", "v")
.createOrReplaceTempView("hav")
checkAnswer(
sql("SELECT count(k) FROM hav GROUP BY v + 1 HAVING v + 1 = 2"),
Row(1) :: Nil)
}
}
test("SPARK-8010: promote numeric to string") {
val df = Seq((1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
@ -1969,15 +1949,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
test("SPARK-11032: resolve having correctly") {
withTempView("src") {
Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("src")
checkAnswer(
sql("SELECT MIN(t.i) FROM (SELECT * FROM src WHERE i > 0) t HAVING(COUNT(1) > 0)"),
Row(1))
}
}
test("SPARK-11303: filter should not be pushed down into sample") {
val df = spark.range(100)
List(true, false).foreach { withReplacement =>
@ -2517,30 +2488,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
test("natural join") {
val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("k", "v1")
val df2 = Seq(("one", 1), ("two", 22), ("one", 5)).toDF("k", "v2")
withTempView("nt1", "nt2") {
df1.createOrReplaceTempView("nt1")
df2.createOrReplaceTempView("nt2")
checkAnswer(
sql("SELECT * FROM nt1 natural join nt2 where k = \"one\""),
Row("one", 1, 1) :: Row("one", 1, 5) :: Nil)
checkAnswer(
sql("SELECT * FROM nt1 natural left join nt2 order by v1, v2"),
Row("one", 1, 1) :: Row("one", 1, 5) :: Row("two", 2, 22) :: Row("three", 3, null) :: Nil)
checkAnswer(
sql("SELECT * FROM nt1 natural right join nt2 order by v1, v2"),
Row("one", 1, 1) :: Row("one", 1, 5) :: Row("two", 2, 22) :: Nil)
checkAnswer(
sql("SELECT count(*) FROM nt1 natural full outer join nt2"),
Row(4) :: Nil)
}
}
test("join with using clause") {
val df1 = Seq(("r1c1", "r1c2", "t1r1c3"),
("r2c1", "r2c2", "t1r2c3"), ("r3c1x", "r3c2", "t1r3c3")).toDF("c1", "c2", "c3")
@ -2991,13 +2938,4 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
data.selectExpr("`part.col1`", "`col.1`"))
}
}
test("current_date and current_timestamp literals") {
// NOTE that I am comparing the result of the literal with the result of the function call.
// This is done to prevent the test from failing because we are comparing a result to an out
// dated timestamp (quite likely) or date (very unlikely - but equally annoying).
checkAnswer(
sql("select current_date = current_date(), current_timestamp = current_timestamp()"),
Seq(Row(true, true)))
}
}

View file

@ -20,9 +20,12 @@ package org.apache.spark.sql
import java.io.File
import java.util.{Locale, TimeZone}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType
/**
* End-to-end test cases for SQL queries.
@ -126,14 +129,18 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
cleaned.split("(?<=[^\\\\]);").map(_.trim).filter(_ != "").toSeq
}
// Create a local SparkSession to have stronger isolation between different test cases.
// This does not isolate catalog changes.
val localSparkSession = spark.newSession()
// Run the SQL queries preparing them for comparison.
val outputs: Seq[QueryOutput] = queries.map { sql =>
val df = spark.sql(sql)
val (schema, output) = getNormalizedResult(localSparkSession, sql)
// We might need to do some query canonicalization in the future.
QueryOutput(
sql = sql,
schema = df.schema.catalogString,
output = df.queryExecution.hiveResultString().mkString("\n"))
schema = schema.catalogString,
output = output.mkString("\n"))
}
if (regenerateGoldenFiles) {
@ -176,6 +183,23 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
}
}
/** Executes a query and returns the result as (schema of the output, normalized output). */
private def getNormalizedResult(session: SparkSession, sql: String): (StructType, Seq[String]) = {
// Returns true if the plan is supposed to be sorted.
def isSorted(plan: LogicalPlan): Boolean = plan match {
case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false
case PhysicalOperation(_, _, Sort(_, true, _)) => true
case _ => plan.children.iterator.exists(isSorted)
}
val df = session.sql(sql)
val schema = df.schema
val answer = df.queryExecution.hiveResultString()
// If the output is not pre-sorted, sort it.
if (isSorted(df.queryExecution.analyzed)) (schema, answer) else (schema, answer.sorted)
}
private def listTestCases(): Seq[TestCase] = {
listFilesRecursively(new File(inputFilePath)).map { file =>
val resultFile = file.getAbsolutePath.replace(inputFilePath, goldenFilePath) + ".out"