[SPARK-21335][SQL] support un-aliased subquery

## What changes were proposed in this pull request?

un-aliased subquery is supported by Spark SQL for a long time. Its semantic was not well defined and had confusing behaviors, and it's not a standard SQL syntax, so we disallowed it in https://issues.apache.org/jira/browse/SPARK-20690 .

However, this is a breaking change, and we do have existing queries using un-aliased subquery. We should add the support back and fix its semantic.

This PR fixes the un-aliased subquery by assigning a default alias name.

After this PR, there is no syntax change from branch 2.2 to master, but we invalid a weird use case:
`SELECT v.i from (SELECT i FROM v)`. Now this query will throw analysis exception because users should not be able to use the qualifier inside a subquery.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18559 from cloud-fan/sub-query.
This commit is contained in:
Wenchen Fan 2017-07-07 20:04:30 +08:00
parent 56536e9992
commit fef081309f
22 changed files with 83 additions and 123 deletions

View file

@ -751,15 +751,17 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
* hooks.
*/
override def visitAliasedQuery(ctx: AliasedQueryContext): LogicalPlan = withOrigin(ctx) {
// The unaliased subqueries in the FROM clause are disallowed. Instead of rejecting it in
// parser rules, we handle it here in order to provide better error message.
if (ctx.strictIdentifier == null) {
throw new ParseException("The unaliased subqueries in the FROM clause are not supported.",
ctx)
val alias = if (ctx.strictIdentifier == null) {
// For un-aliased subqueries, use a default alias name that is not likely to conflict with
// normal subquery names, so that parent operators can only access the columns in subquery by
// unqualified names. Users can still use this special qualifier to access columns if they
// know it, but that's not recommended.
"__auto_generated_subquery_name"
} else {
ctx.strictIdentifier.getText
}
aliasPlan(ctx.strictIdentifier,
plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample))
SubqueryAlias(alias, plan(ctx.queryNoWith).optionalMap(ctx.sample)(withSample))
}
/**

View file

@ -253,7 +253,7 @@ abstract class LogicalPlan
// More than one match.
case ambiguousReferences =>
val referenceNames = ambiguousReferences.map(_._1).mkString(", ")
val referenceNames = ambiguousReferences.map(_._1.qualifiedName).mkString(", ")
throw new AnalysisException(
s"Reference '$name' is ambiguous, could be: $referenceNames.")
}

View file

@ -450,19 +450,6 @@ class PlanParserSuite extends AnalysisTest {
| (select id from t0)) as u_1
""".stripMargin,
plan.union(plan).union(plan).as("u_1").select('id))
}
test("aliased subquery") {
val errMsg = "The unaliased subqueries in the FROM clause are not supported"
assertEqual("select a from (select id as a from t0) tt",
table("t0").select('id.as("a")).as("tt").select('a))
intercept("select a from (select id as a from t0)", errMsg)
assertEqual("from (select id as a from t0) tt select a",
table("t0").select('id.as("a")).as("tt").select('a))
intercept("from (select id as a from t0) select a", errMsg)
}
test("scalar sub-query") {

View file

@ -34,7 +34,7 @@ SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a), AVG(a), VARIANCE(a), STDDEV(a),
FROM testData;
-- Aggregate with foldable input and multiple distinct groups.
SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) t GROUP BY a;
SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a;
-- Aliases in SELECT could be used in GROUP BY
SELECT a AS k, COUNT(b) FROM testData GROUP BY k;

View file

@ -21,7 +21,7 @@ SELECT * FROM testdata LIMIT true;
SELECT * FROM testdata LIMIT 'a';
-- limit within a subquery
SELECT * FROM (SELECT * FROM range(10) LIMIT 5) t WHERE id > 3;
SELECT * FROM (SELECT * FROM range(10) LIMIT 5) WHERE id > 3;
-- limit ALL
SELECT * FROM testdata WHERE key < 3 LIMIT ALL;

View file

@ -7,7 +7,7 @@ select 'a' || 'b' || 'c';
-- Check if catalyst combine nested `Concat`s
EXPLAIN EXTENDED SELECT (col1 || col2 || col3 || col4) col
FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10)) t;
FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10));
-- replace function
select replace('abc', 'b', '123');

View file

@ -394,7 +394,7 @@ FROM (SELECT *
FROM t1)) t4
WHERE t4.t2b IN (SELECT Min(t3b)
FROM t3
WHERE t4.t2a = t3a)) T;
WHERE t4.t2a = t3a));
-- UNION, UNION ALL, UNION DISTINCT, INTERSECT and EXCEPT for NOT IN
-- TC 01.12

View file

@ -23,7 +23,7 @@ AND t2b = (SELECT max(avg)
FROM (SELECT t2b, avg(t2b) avg
FROM t2
WHERE t2a = t1.t1b
) T
)
)
;

View file

@ -19,7 +19,7 @@ AND c.cv = (SELECT max(avg)
FROM (SELECT c1.cv, avg(c1.cv) avg
FROM c c1
WHERE c1.ck = p.pk
GROUP BY c1.cv) T);
GROUP BY c1.cv));
create temporary view t1 as select * from values
('val1a', 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 00:00:00.000', date '2014-04-04'),

View file

@ -5,7 +5,7 @@ CREATE OR REPLACE TEMPORARY VIEW t2 AS VALUES (1.0, 1), (2.0, 4) tbl(c1, c2);
SELECT *
FROM (SELECT * FROM t1
UNION ALL
SELECT * FROM t1) T;
SELECT * FROM t1);
-- Type Coerced Union
SELECT *
@ -13,7 +13,7 @@ FROM (SELECT * FROM t1
UNION ALL
SELECT * FROM t2
UNION ALL
SELECT * FROM t2) T;
SELECT * FROM t2);
-- Regression test for SPARK-18622
SELECT a

View file

@ -72,7 +72,7 @@ SELECT i1 FROM t1, mydb1.t1
struct<>
-- !query 8 output
org.apache.spark.sql.AnalysisException
Reference 'i1' is ambiguous, could be: i1#x, i1#x.; line 1 pos 7
Reference 'i1' is ambiguous, could be: t1.i1, t1.i1.; line 1 pos 7
-- !query 9
@ -81,7 +81,7 @@ SELECT t1.i1 FROM t1, mydb1.t1
struct<>
-- !query 9 output
org.apache.spark.sql.AnalysisException
Reference 't1.i1' is ambiguous, could be: i1#x, i1#x.; line 1 pos 7
Reference 't1.i1' is ambiguous, could be: t1.i1, t1.i1.; line 1 pos 7
-- !query 10
@ -99,7 +99,7 @@ SELECT i1 FROM t1, mydb2.t1
struct<>
-- !query 11 output
org.apache.spark.sql.AnalysisException
Reference 'i1' is ambiguous, could be: i1#x, i1#x.; line 1 pos 7
Reference 'i1' is ambiguous, could be: t1.i1, t1.i1.; line 1 pos 7
-- !query 12
@ -108,7 +108,7 @@ SELECT t1.i1 FROM t1, mydb2.t1
struct<>
-- !query 12 output
org.apache.spark.sql.AnalysisException
Reference 't1.i1' is ambiguous, could be: i1#x, i1#x.; line 1 pos 7
Reference 't1.i1' is ambiguous, could be: t1.i1, t1.i1.; line 1 pos 7
-- !query 13
@ -125,7 +125,7 @@ SELECT i1 FROM t1, mydb1.t1
struct<>
-- !query 14 output
org.apache.spark.sql.AnalysisException
Reference 'i1' is ambiguous, could be: i1#x, i1#x.; line 1 pos 7
Reference 'i1' is ambiguous, could be: t1.i1, t1.i1.; line 1 pos 7
-- !query 15
@ -134,7 +134,7 @@ SELECT t1.i1 FROM t1, mydb1.t1
struct<>
-- !query 15 output
org.apache.spark.sql.AnalysisException
Reference 't1.i1' is ambiguous, could be: i1#x, i1#x.; line 1 pos 7
Reference 't1.i1' is ambiguous, could be: t1.i1, t1.i1.; line 1 pos 7
-- !query 16
@ -143,7 +143,7 @@ SELECT i1 FROM t1, mydb2.t1
struct<>
-- !query 16 output
org.apache.spark.sql.AnalysisException
Reference 'i1' is ambiguous, could be: i1#x, i1#x.; line 1 pos 7
Reference 'i1' is ambiguous, could be: t1.i1, t1.i1.; line 1 pos 7
-- !query 17
@ -152,7 +152,7 @@ SELECT t1.i1 FROM t1, mydb2.t1
struct<>
-- !query 17 output
org.apache.spark.sql.AnalysisException
Reference 't1.i1' is ambiguous, could be: i1#x, i1#x.; line 1 pos 7
Reference 't1.i1' is ambiguous, could be: t1.i1, t1.i1.; line 1 pos 7
-- !query 18

View file

@ -134,7 +134,7 @@ struct<skewness(CAST(a AS DOUBLE)):double,kurtosis(CAST(a AS DOUBLE)):double,min
-- !query 14
SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) t GROUP BY a
SELECT COUNT(DISTINCT b), COUNT(DISTINCT b, c) FROM (SELECT 1 AS a, 2 AS b, 3 AS c) GROUP BY a
-- !query 14 schema
struct<count(DISTINCT b):bigint,count(DISTINCT b, c):bigint>
-- !query 14 output

View file

@ -93,7 +93,7 @@ The limit expression must be integer type, but got string;
-- !query 10
SELECT * FROM (SELECT * FROM range(10) LIMIT 5) t WHERE id > 3
SELECT * FROM (SELECT * FROM range(10) LIMIT 5) WHERE id > 3
-- !query 10 schema
struct<id:bigint>
-- !query 10 output

View file

@ -30,20 +30,20 @@ abc
-- !query 3
EXPLAIN EXTENDED SELECT (col1 || col2 || col3 || col4) col
FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10)) t
FROM (SELECT id col1, id col2, id col3, id col4 FROM range(10))
-- !query 3 schema
struct<plan:string>
-- !query 3 output
== Parsed Logical Plan ==
'Project [concat(concat(concat('col1, 'col2), 'col3), 'col4) AS col#x]
+- 'SubqueryAlias t
+- 'SubqueryAlias __auto_generated_subquery_name
+- 'Project ['id AS col1#x, 'id AS col2#x, 'id AS col3#x, 'id AS col4#x]
+- 'UnresolvedTableValuedFunction range, [10]
== Analyzed Logical Plan ==
col: string
Project [concat(concat(concat(cast(col1#xL as string), cast(col2#xL as string)), cast(col3#xL as string)), cast(col4#xL as string)) AS col#x]
+- SubqueryAlias t
+- SubqueryAlias __auto_generated_subquery_name
+- Project [id#xL AS col1#xL, id#xL AS col2#xL, id#xL AS col3#xL, id#xL AS col4#xL]
+- Range (0, 10, step=1, splits=None)

View file

@ -496,7 +496,7 @@ FROM (SELECT *
FROM t1)) t4
WHERE t4.t2b IN (SELECT Min(t3b)
FROM t3
WHERE t4.t2a = t3a)) T
WHERE t4.t2a = t3a))
-- !query 13 schema
struct<t2a:string,t2b:smallint,t2c:int,t2d:bigint,t2e:float,t2f:double,t2g:decimal(2,-2),t2h:timestamp,t2i:date>
-- !query 13 output

View file

@ -40,7 +40,7 @@ AND t2b = (SELECT max(avg)
FROM (SELECT t2b, avg(t2b) avg
FROM t2
WHERE t2a = t1.t1b
) T
)
)
-- !query 3 schema
struct<>

View file

@ -39,7 +39,7 @@ AND c.cv = (SELECT max(avg)
FROM (SELECT c1.cv, avg(c1.cv) avg
FROM c c1
WHERE c1.ck = p.pk
GROUP BY c1.cv) T)
GROUP BY c1.cv))
-- !query 3 schema
struct<pk:int,cv:int>
-- !query 3 output

View file

@ -37,26 +37,14 @@ struct<key:int,value:string>
-- !query 4
SELECT * FROM (SELECT * FROM testData) WHERE key = 1
-- !query 4 schema
struct<>
struct<key:int,value:string>
-- !query 4 output
org.apache.spark.sql.catalyst.parser.ParseException
The unaliased subqueries in the FROM clause are not supported.(line 1, pos 14)
== SQL ==
SELECT * FROM (SELECT * FROM testData) WHERE key = 1
--------------^^^
1 1
-- !query 5
FROM (SELECT * FROM testData WHERE key = 1) SELECT *
-- !query 5 schema
struct<>
struct<key:int,value:string>
-- !query 5 output
org.apache.spark.sql.catalyst.parser.ParseException
The unaliased subqueries in the FROM clause are not supported.(line 1, pos 5)
== SQL ==
FROM (SELECT * FROM testData WHERE key = 1) SELECT *
-----^^^
1 1

View file

@ -22,7 +22,7 @@ struct<>
SELECT *
FROM (SELECT * FROM t1
UNION ALL
SELECT * FROM t1) T
SELECT * FROM t1)
-- !query 2 schema
struct<c1:int,c2:string>
-- !query 2 output
@ -38,7 +38,7 @@ FROM (SELECT * FROM t1
UNION ALL
SELECT * FROM t2
UNION ALL
SELECT * FROM t2) T
SELECT * FROM t2)
-- !query 3 schema
struct<c1:decimal(11,1),c2:string>
-- !query 3 output

View file

@ -631,13 +631,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
val ds2 =
sql(
"""
|SELECT * FROM (SELECT max(c1) as c1 FROM t1 GROUP BY c1) tt
|SELECT * FROM (SELECT c1, max(c1) FROM t1 GROUP BY c1)
|WHERE
|tt.c1 = (SELECT max(c1) FROM t2 GROUP BY c1)
|c1 = (SELECT max(c1) FROM t2 GROUP BY c1)
|OR
|EXISTS (SELECT c1 FROM t3)
|OR
|tt.c1 IN (SELECT c1 FROM t4)
|c1 IN (SELECT c1 FROM t4)
""".stripMargin)
assert(getNumInMemoryRelations(ds2) == 4)
}
@ -683,20 +683,15 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
Seq(1).toDF("c1").createOrReplaceTempView("t1")
Seq(2).toDF("c1").createOrReplaceTempView("t2")
sql(
val sql1 =
"""
|SELECT * FROM t1
|WHERE
|NOT EXISTS (SELECT * FROM t2)
""".stripMargin).cache()
""".stripMargin
sql(sql1).cache()
val cachedDs =
sql(
"""
|SELECT * FROM t1
|WHERE
|NOT EXISTS (SELECT * FROM t2)
""".stripMargin)
val cachedDs = sql(sql1)
assert(getNumInMemoryRelations(cachedDs) == 1)
// Additional predicate in the subquery plan should cause a cache miss
@ -717,20 +712,15 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
Seq(1).toDF("c1").createOrReplaceTempView("t2")
// Simple correlated predicate in subquery
sql(
val sqlText =
"""
|SELECT * FROM t1
|WHERE
|t1.c1 in (SELECT t2.c1 FROM t2 where t1.c1 = t2.c1)
""".stripMargin).cache()
""".stripMargin
sql(sqlText).cache()
val cachedDs =
sql(
"""
|SELECT * FROM t1
|WHERE
|t1.c1 in (SELECT t2.c1 FROM t2 where t1.c1 = t2.c1)
""".stripMargin)
val cachedDs = sql(sqlText)
assert(getNumInMemoryRelations(cachedDs) == 1)
}
}
@ -741,22 +731,16 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
spark.catalog.cacheTable("t1")
// underlying table t1 is cached as well as the query that refers to it.
val ds =
sql(
val sqlText =
"""
|SELECT * FROM t1
|WHERE
|NOT EXISTS (SELECT * FROM t1)
""".stripMargin)
""".stripMargin
val ds = sql(sqlText)
assert(getNumInMemoryRelations(ds) == 2)
val cachedDs =
sql(
"""
|SELECT * FROM t1
|WHERE
|NOT EXISTS (SELECT * FROM t1)
""".stripMargin).cache()
val cachedDs = sql(sqlText).cache()
assert(getNumInMemoryTablesRecursively(cachedDs.queryExecution.sparkPlan) == 3)
}
}
@ -769,45 +753,31 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
Seq(1).toDF("c1").createOrReplaceTempView("t4")
// Nested predicate subquery
sql(
val sql1 =
"""
|SELECT * FROM t1
|WHERE
|c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1))
""".stripMargin).cache()
""".stripMargin
sql(sql1).cache()
val cachedDs =
sql(
"""
|SELECT * FROM t1
|WHERE
|c1 IN (SELECT c1 FROM t2 WHERE c1 IN (SELECT c1 FROM t3 WHERE c1 = 1))
""".stripMargin)
val cachedDs = sql(sql1)
assert(getNumInMemoryRelations(cachedDs) == 1)
// Scalar subquery and predicate subquery
sql(
val sql2 =
"""
|SELECT * FROM (SELECT max(c1) as c1 FROM t1 GROUP BY c1) tt
|SELECT * FROM (SELECT c1, max(c1) FROM t1 GROUP BY c1)
|WHERE
|tt.c1 = (SELECT max(c1) FROM t2 GROUP BY c1)
|c1 = (SELECT max(c1) FROM t2 GROUP BY c1)
|OR
|EXISTS (SELECT c1 FROM t3)
|OR
|tt.c1 IN (SELECT c1 FROM t4)
""".stripMargin).cache()
|c1 IN (SELECT c1 FROM t4)
""".stripMargin
sql(sql2).cache()
val cachedDs2 =
sql(
"""
|SELECT * FROM (SELECT max(c1) as c1 FROM t1 GROUP BY c1) tt
|WHERE
|tt.c1 = (SELECT max(c1) FROM t2 GROUP BY c1)
|OR
|EXISTS (SELECT c1 FROM t3)
|OR
|tt.c1 IN (SELECT c1 FROM t4)
""".stripMargin)
val cachedDs2 = sql(sql2)
assert(getNumInMemoryRelations(cachedDs2) == 1)
}
}

View file

@ -2638,4 +2638,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}
test("SPARK-21335: support un-aliased subquery") {
withTempView("v") {
Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("v")
checkAnswer(sql("SELECT i from (SELECT i FROM v)"), Row(1))
val e = intercept[AnalysisException](sql("SELECT v.i from (SELECT i FROM v)"))
assert(e.message ==
"cannot resolve '`v.i`' given input columns: [__auto_generated_subquery_name.i]")
checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"), Row(1))
}
}
}

View file

@ -112,7 +112,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
| with t4 as (select 1 as d, 3 as e)
| select * from t4 cross join t2 where t2.b = t4.d
| )
| select a from (select 1 as a union all select 2 as a) t
| select a from (select 1 as a union all select 2 as a)
| where a = (select max(d) from t3)
""".stripMargin),
Array(Row(1))
@ -606,8 +606,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
| select cntPlusOne + 1 as cntPlusTwo from (
| select cnt + 1 as cntPlusOne from (
| select sum(r.c) s, count(*) cnt from r where l.a = r.c having cnt = 0
| ) t1
| ) t2
| )
| )
|) = 2""".stripMargin),
Row(1) :: Row(1) :: Row(null) :: Row(null) :: Nil)
}
@ -655,7 +655,7 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
"""
| select c1 from onerow t1
| where exists (select 1
| from (select 1 as c1 from onerow t2 LIMIT 1) t2
| from (select c1 from onerow t2 LIMIT 1) t2
| where t1.c1=t2.c1)""".stripMargin),
Row(1) :: Nil)
}