[SPARK-34199][SQL] Block table.* inside function to follow ANSI standard and other SQL engines

### What changes were proposed in this pull request?
In spark, the `count(table.*)` may cause very weird result, for example:
```
select count(*) from (select 1 as a, null as b) t;
output: 1
select count(t.*) from (select 1 as a, null as b) t;
output: 0
```
 This is because spark expands `t.*` while converts `*` to count(1), this will confuse
users. After checking the ANSI standard, `count(*)` should always be `count(1)` while `count(t.*)`
is not allowed. What's more, this is also not allowed by common databases, e.g. MySQL, Oracle.

So, this PR proposes to block the ambiguous behavior and print a clear error message for users.

### Why are the changes needed?
to avoid ambiguous behavior and follow ANSI standard and other SQL engines

### Does this PR introduce _any_ user-facing change?
Yes, `count(table.*)` behavior will be blocked and output an error message.

### How was this patch tested?
newly added and existing tests

Closes #31286 from linhongliu-db/fix-table-star.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Linhong Liu 2021-02-02 07:49:50 +00:00 committed by Wenchen Fan
parent e9362c2571
commit bb9bf66bb6
14 changed files with 132 additions and 27 deletions

View file

@ -55,6 +55,8 @@ license: |
* and the method `spark.catalog.refreshTable`
In Spark 3.1 and earlier, table refreshing leaves dependents uncached.
- In Spark 3.2, the usage of `count(tblName.*)` is blocked to avoid producing ambiguous results. Because `count(*)` and `count(tblName.*)` will output differently if there is any null values. To restore the behavior before Spark 3.2, you can set `spark.sql.legacy.allowStarWithSingleTableIdentifierInCount` to `true`.
## Upgrading from Spark SQL 3.0 to 3.1
- In Spark 3.1, statistical aggregation function includes `std`, `stddev`, `stddev_samp`, `variance`, `var_samp`, `skewness`, `kurtosis`, `covar_samp`, `corr` will return `NULL` instead of `Double.NaN` when `DivideByZero` occurs during expression evaluation, for example, when `stddev_samp` applied on a single element set. In Spark version 3.0 and earlier, it will return `Double.NaN` in such case. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.statisticalAggregate` to `true`.

View file

@ -1774,6 +1774,21 @@ class Analyzer(override val catalogManager: CatalogManager)
def expandStarExpression(expr: Expression, child: LogicalPlan): Expression = {
expr.transformUp {
case f1: UnresolvedFunction if containsStar(f1.arguments) =>
// SPECIAL CASE: We want to block count(tblName.*) because in spark, count(tblName.*) will
// be expanded while count(*) will be converted to count(1). They will produce different
// results and confuse users if there is any null values. For count(t1.*, t2.*), it is
// still allowed, since it's well-defined in spark.
if (!conf.allowStarWithSingleTableIdentifierInCount &&
f1.name.database.isEmpty &&
f1.name.funcName == "count" &&
f1.arguments.length == 1) {
f1.arguments.foreach {
case u: UnresolvedStar if u.isQualifiedByTable(child, resolver) =>
throw QueryCompilationErrors
.singleTableStarInCountNotAllowedError(u.target.get.mkString("."))
case _ => // do nothing
}
}
f1.copy(arguments = f1.arguments.flatMap {
case s: Star => s.expand(child, resolver)
case o => o :: Nil

View file

@ -336,6 +336,10 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
nameParts.corresponds(qualifierList)(resolver)
}
def isQualifiedByTable(input: LogicalPlan, resolver: Resolver): Boolean = {
target.exists(nameParts => input.output.exists(matchedQualifier(_, nameParts, resolver)))
}
override def expand(
input: LogicalPlan,
resolver: Resolver): Seq[NamedExpression] = {

View file

@ -251,6 +251,11 @@ private[spark] object QueryCompilationErrors {
new AnalysisException(s"Invalid usage of '*' in $prettyName")
}
def singleTableStarInCountNotAllowedError(targetString: String): Throwable = {
new AnalysisException(s"count($targetString.*) is not allowed. " +
"Please use count(*) or expand the columns manually, e.g. count(col1, col2)")
}
def orderByPositionRangeError(index: Int, size: Int, t: TreeNode[_]): Throwable = {
new AnalysisException(s"ORDER BY position $index is not in select list " +
s"(valid range is [1, $size])", t.origin.line, t.origin.startPosition)

View file

@ -1559,6 +1559,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
val ALLOW_STAR_WITH_SINGLE_TABLE_IDENTIFIER_IN_COUNT =
buildConf("spark.sql.legacy.allowStarWithSingleTableIdentifierInCount")
.internal()
.doc("When true, the SQL function 'count' is allowed to take single 'tblName.*' as parameter")
.version("3.2")
.booleanConf
.createWithDefault(false)
val USE_CURRENT_SQL_CONFIGS_FOR_VIEW =
buildConf("spark.sql.legacy.useCurrentConfigsForView")
.internal()
@ -3577,6 +3585,9 @@ class SQLConf extends Serializable with Logging {
def storeAnalyzedPlanForView: Boolean = getConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW)
def allowStarWithSingleTableIdentifierInCount: Boolean =
getConf(SQLConf.ALLOW_STAR_WITH_SINGLE_TABLE_IDENTIFIER_IN_COUNT)
def starSchemaDetection: Boolean = getConf(STARSCHEMA_DETECTION)
def starSchemaFTRatio: Double = getConf(STARSCHEMA_FACT_TABLE_RATIO)

View file

@ -19,11 +19,11 @@ SELECT
FROM testData;
-- count with multiple expressions
SELECT count(a, b), count(b, a), count(testData.*) FROM testData;
SELECT count(a, b), count(b, a), count(testData.*, testData.*) FROM testData;
-- distinct count with multiple expressions
SELECT
count(DISTINCT a, b), count(DISTINCT b, a), count(DISTINCT *), count(DISTINCT testData.*)
count(DISTINCT a, b), count(DISTINCT b, a), count(DISTINCT *), count(DISTINCT testData.*, testData.*)
FROM testData;
-- distinct count with multiple literals
@ -43,3 +43,11 @@ SELECT count() FROM testData;
-- count without expressions
set spark.sql.legacy.allowParameterlessCount=false;
SELECT count() FROM testData;
-- legacy behavior: allow count(testData.*)
set spark.sql.legacy.allowStarWithSingleTableIdentifierInCount=true;
SELECT count(testData.*) FROM testData;
-- count with a single tblName.* as parameter
set spark.sql.legacy.allowStarWithSingleTableIdentifierInCount=false;
SELECT count(testData.*) FROM testData;

View file

@ -743,20 +743,25 @@ select * from a left join b on i = x and i = y and x = i;
--
-- test NULL behavior of whole-row Vars, per bug #5025
--
select t1.q2, count(t2.*)
--- [SPARK-34199] changed the `count(t2.*)` to `count(t2.q1, t2.q2)` since we have
--- blocked `count(tblName.*)`. Besides this, in pgsql, `count(t2.*)` of outter join
--- means how many matching rows produced by t2 while Spark SQL doesn't have this semantic.
--- So here we use `count(t2.q1, t2.q2)` instead of `count(1)` to keep the query output
--- unchanged.
select t1.q2, count(t2.q1, t2.q2)
from int8_tbl t1 left join int8_tbl t2 on (t1.q2 = t2.q1)
group by t1.q2 order by 1;
select t1.q2, count(t2.*)
select t1.q2, count(t2.q1, t2.q2)
from int8_tbl t1 left join (select * from int8_tbl) t2 on (t1.q2 = t2.q1)
group by t1.q2 order by 1;
-- [SPARK-28330] Enhance query limit
-- select t1.q2, count(t2.*)
-- select t1.q2, count(t2.q1, t2.q2)
-- from int8_tbl t1 left join (select * from int8_tbl offset 0) t2 on (t1.q2 = t2.q1)
-- group by t1.q2 order by 1;
select t1.q2, count(t2.*)
select t1.q2, count(t2.q1, t2.q2)
from int8_tbl t1 left join
(select q1, case when q2=1 then 1 else q2 end as q2 from int8_tbl) t2
on (t1.q2 = t2.q1)

View file

@ -735,20 +735,20 @@ select * from a left join b on udf(i) = x and i = udf(y) and udf(x) = udf(i);
--
-- test NULL behavior of whole-row Vars, per bug #5025
--
select udf(t1.q2), udf(count(t2.*))
select udf(t1.q2), udf(count(t2.q1, t2.q2))
from int8_tbl t1 left join int8_tbl t2 on (udf(udf(t1.q2)) = t2.q1)
group by udf(t1.q2) order by 1;
select udf(udf(t1.q2)), udf(count(t2.*))
select udf(udf(t1.q2)), udf(count(t2.q1, t2.q2))
from int8_tbl t1 left join (select * from int8_tbl) t2 on (udf(udf(t1.q2)) = udf(t2.q1))
group by udf(udf(t1.q2)) order by 1;
-- [SPARK-28330] Enhance query limit
-- select t1.q2, count(t2.*)
-- select t1.q2, count(t2.q1, t2.q2)
-- from int8_tbl t1 left join (select * from int8_tbl offset 0) t2 on (t1.q2 = t2.q1)
-- group by t1.q2 order by 1;
select udf(t1.q2) as q2, udf(udf(count(t2.*)))
select udf(t1.q2) as q2, udf(udf(count(t2.q1, t2.q2)))
from int8_tbl t1 left join
(select udf(q1) as q1, case when q2=1 then 1 else q2 end as q2 from int8_tbl) t2
on (udf(t1.q2) = udf(t2.q1))

View file

@ -20,9 +20,9 @@ SELECT
FROM testData;
-- count with multiple expressions
SELECT udf(count(a, b)), udf(count(b, a)), udf(count(testData.*)) FROM testData;
SELECT udf(count(a, b)), udf(count(b, a)), udf(count(testData.*, testData.*)) FROM testData;
-- distinct count with multiple expressions
SELECT
udf(count(DISTINCT a, b)), udf(count(DISTINCT b, a)), udf(count(DISTINCT *)), udf(count(DISTINCT testData.*))
udf(count(DISTINCT a, b)), udf(count(DISTINCT b, a)), udf(count(DISTINCT *)), udf(count(DISTINCT testData.*, testData.*))
FROM testData;

View file

@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 17
-- Number of queries: 21
-- !query
@ -38,19 +38,19 @@ struct<count(DISTINCT 1):bigint,count(DISTINCT NULL):bigint,count(DISTINCT a):bi
-- !query
SELECT count(a, b), count(b, a), count(testData.*) FROM testData
SELECT count(a, b), count(b, a), count(testData.*, testData.*) FROM testData
-- !query schema
struct<count(a, b):bigint,count(b, a):bigint,count(a, b):bigint>
struct<count(a, b):bigint,count(b, a):bigint,count(a, b, a, b):bigint>
-- !query output
4 4 4
-- !query
SELECT
count(DISTINCT a, b), count(DISTINCT b, a), count(DISTINCT *), count(DISTINCT testData.*)
count(DISTINCT a, b), count(DISTINCT b, a), count(DISTINCT *), count(DISTINCT testData.*, testData.*)
FROM testData
-- !query schema
struct<count(DISTINCT a, b):bigint,count(DISTINCT b, a):bigint,count(DISTINCT a, b):bigint,count(DISTINCT a, b):bigint>
struct<count(DISTINCT a, b):bigint,count(DISTINCT b, a):bigint,count(DISTINCT a, b):bigint,count(DISTINCT a, b, a, b):bigint>
-- !query output
3 3 3 3
@ -150,3 +150,36 @@ struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 'count()' due to data type mismatch: count requires at least one argument. If you have to call the function count without arguments, set the legacy configuration `spark.sql.legacy.allowParameterlessCount` as true; line 1 pos 7
-- !query
set spark.sql.legacy.allowStarWithSingleTableIdentifierInCount=true
-- !query schema
struct<key:string,value:string>
-- !query output
spark.sql.legacy.allowStarWithSingleTableIdentifierInCount true
-- !query
SELECT count(testData.*) FROM testData
-- !query schema
struct<count(a, b):bigint>
-- !query output
4
-- !query
set spark.sql.legacy.allowStarWithSingleTableIdentifierInCount=false
-- !query schema
struct<key:string,value:string>
-- !query output
spark.sql.legacy.allowStarWithSingleTableIdentifierInCount false
-- !query
SELECT count(testData.*) FROM testData
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
count(testData.*) is not allowed. Please use count(*) or expand the columns manually, e.g. count(col1, col2)

View file

@ -2729,7 +2729,7 @@ struct<i:int,x:int,y:int>
-- !query
select t1.q2, count(t2.*)
select t1.q2, count(t2.q1, t2.q2)
from int8_tbl t1 left join int8_tbl t2 on (t1.q2 = t2.q1)
group by t1.q2 order by 1
-- !query schema
@ -2742,7 +2742,7 @@ struct<q2:bigint,count(q1, q2):bigint>
-- !query
select t1.q2, count(t2.*)
select t1.q2, count(t2.q1, t2.q2)
from int8_tbl t1 left join (select * from int8_tbl) t2 on (t1.q2 = t2.q1)
group by t1.q2 order by 1
-- !query schema
@ -2755,7 +2755,7 @@ struct<q2:bigint,count(q1, q2):bigint>
-- !query
select t1.q2, count(t2.*)
select t1.q2, count(t2.q1, t2.q2)
from int8_tbl t1 left join
(select q1, case when q2=1 then 1 else q2 end as q2 from int8_tbl) t2
on (t1.q2 = t2.q1)

View file

@ -2757,7 +2757,7 @@ struct<i:int,x:int,y:int>
-- !query
select udf(t1.q2), udf(count(t2.*))
select udf(t1.q2), udf(count(t2.q1, t2.q2))
from int8_tbl t1 left join int8_tbl t2 on (udf(udf(t1.q2)) = t2.q1)
group by udf(t1.q2) order by 1
-- !query schema
@ -2770,7 +2770,7 @@ struct<udf(q2):bigint,udf(count(q1, q2)):bigint>
-- !query
select udf(udf(t1.q2)), udf(count(t2.*))
select udf(udf(t1.q2)), udf(count(t2.q1, t2.q2))
from int8_tbl t1 left join (select * from int8_tbl) t2 on (udf(udf(t1.q2)) = udf(t2.q1))
group by udf(udf(t1.q2)) order by 1
-- !query schema
@ -2783,7 +2783,7 @@ struct<udf(udf(q2)):bigint,udf(count(q1, q2)):bigint>
-- !query
select udf(t1.q2) as q2, udf(udf(count(t2.*)))
select udf(t1.q2) as q2, udf(udf(count(t2.q1, t2.q2)))
from int8_tbl t1 left join
(select udf(q1) as q1, case when q2=1 then 1 else q2 end as q2 from int8_tbl) t2
on (udf(t1.q2) = udf(t2.q1))

View file

@ -38,18 +38,18 @@ struct<udf(count(DISTINCT 1)):bigint,udf(count(DISTINCT NULL)):bigint,udf(count(
-- !query
SELECT udf(count(a, b)), udf(count(b, a)), udf(count(testData.*)) FROM testData
SELECT udf(count(a, b)), udf(count(b, a)), udf(count(testData.*, testData.*)) FROM testData
-- !query schema
struct<udf(count(a, b)):bigint,udf(count(b, a)):bigint,udf(count(a, b)):bigint>
struct<udf(count(a, b)):bigint,udf(count(b, a)):bigint,udf(count(a, b, a, b)):bigint>
-- !query output
4 4 4
-- !query
SELECT
udf(count(DISTINCT a, b)), udf(count(DISTINCT b, a)), udf(count(DISTINCT *)), udf(count(DISTINCT testData.*))
udf(count(DISTINCT a, b)), udf(count(DISTINCT b, a)), udf(count(DISTINCT *)), udf(count(DISTINCT testData.*, testData.*))
FROM testData
-- !query schema
struct<udf(count(DISTINCT a, b)):bigint,udf(count(DISTINCT b, a)):bigint,udf(count(DISTINCT a, b)):bigint,udf(count(DISTINCT a, b)):bigint>
struct<udf(count(DISTINCT a, b)):bigint,udf(count(DISTINCT b, a)):bigint,udf(count(DISTINCT a, b)):bigint,udf(count(DISTINCT a, b, a, b)):bigint>
-- !query output
3 3 3 3

View file

@ -155,6 +155,28 @@ class ColumnExpressionSuite extends QueryTest with SharedSparkSession {
checkAnswer(testData.as("testData").select($"testData.*"), testData.collect().toSeq)
}
test("SPARK-34199: star can be qualified by table name inside a non-count function") {
checkAnswer(
testData.as("testData").selectExpr("hash(testData.*)"),
testData.as("testData").selectExpr("hash(testData.key, testData.value)")
)
}
test("SPARK-34199: star cannot be qualified by table name inside a count function") {
val e = intercept[AnalysisException] {
testData.as("testData").selectExpr("count(testData.*)").collect()
}
assert(e.getMessage.contains(
"count(testData.*) is not allowed. Please use count(*) or expand the columns manually"))
}
test("SPARK-34199: table star can be qualified inside a count function with multiple arguments") {
checkAnswer(
testData.as("testData").selectExpr("count(testData.*, testData.key)"),
testData.as("testData").selectExpr("count(testData.key, testData.value, testData.key)")
)
}
test("+") {
checkAnswer(
testData2.select($"a" + 1),