Compare commits

...

5 Commits

Author SHA1 Message Date
Bruce Robbins 497d17f38a [SPARK-39496][SQL] Handle null struct in `Inline.eval`
Change `Inline.eval` to return a row of null values rather than a null row in the case of a null input struct.

Consider the following query:
```
set spark.sql.codegen.wholeStage=false;
select inline(array(named_struct('a', 1, 'b', 2), null));
```
This query fails with a `NullPointerException`:
```
22/06/16 15:10:06 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$11(GenerateExec.scala:122)
```
(In Spark 3.1.3, you don't need to set `spark.sql.codegen.wholeStage` to false to reproduce the error, since Spark 3.1.3 has no codegen path for `Inline`).

This query fails regardless of the setting of `spark.sql.codegen.wholeStage`:
```
val dfWide = (Seq((1))
  .toDF("col0")
  .selectExpr(Seq.tabulate(99)(x => s"$x as col${x + 1}"): _*))

val df = (dfWide
  .selectExpr("*", "array(named_struct('a', 1, 'b', 2), null) as struct_array"))

df.selectExpr("*", "inline(struct_array)").collect
```
It fails with
```
22/06/16 15:18:55 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.JoinedRow.isNullAt(JoinedRow.scala:80)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_8$(Unknown Source)
```
When `Inline.eval` returns a null row in the collection, GenerateExec gets a NullPointerException either when joining the null row with required child output, or projecting the null row.

This PR avoids producing the null row and produces a row of null values instead:
```
spark-sql> set spark.sql.codegen.wholeStage=false;
spark.sql.codegen.wholeStage	false
Time taken: 3.095 seconds, Fetched 1 row(s)
spark-sql> select inline(array(named_struct('a', 1, 'b', 2), null));
1	2
NULL	NULL
Time taken: 1.214 seconds, Fetched 2 row(s)
spark-sql>
```

No.

New unit test.

Closes #36903 from bersprockets/inline_eval_null_struct_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit c4d5390dd0)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2022-06-18 09:27:07 +09:00
Sean Owen 07edae9734 [SPARK-39505][UI] Escape log content rendered in UI
### What changes were proposed in this pull request?

Escape log content rendered to the UI.

### Why are the changes needed?

Log content may contain reserved characters or other code in the log and be misinterpreted in the UI as HTML.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #36902 from srowen/LogViewEscape.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2022-06-17 13:32:14 -05:00
wangguangxin.cn 380177d0f1 [SPARK-39476][SQL] Disable Unwrap cast optimize when casting from Long to Float/ Double or from Integer to Float
Cast from Integer to Float or from Long to Double/Float may loss precision if the length of Integer/Long beyonds the **significant digits** of a Double(which is 15 or 16 digits) or Float(which is 7 or 8 digits).

For example, ```select *, cast(a as int) from (select cast(33554435 as foat) a )``` gives `33554436` instead of `33554435`.

When it comes the optimization rule `UnwrapCastInBinaryComparison`, it may result in incorrect (confused) result .
We can reproduce it with following script.
```
spark.range(10).map(i => 64707595868612313L).createOrReplaceTempView("tbl")
val df = sql("select * from tbl where cast(value as double) = cast('64707595868612313' as double)")
df.explain(true)
df.show()
```

With we disable this optimization rule , it returns 10 records.
But if we enable this optimization rule, it returns empty, since the sql is optimized to
```
select * from tbl where value = 64707595868612312L
```

Fix the behavior that may confuse users (or maybe a bug?)

No

Add a new UT

Closes #36873 from WangGuangxin/SPARK-24994-followup.

Authored-by: wangguangxin.cn <wangguangxin.cn@bytedance.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 9612db3fc9)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2022-06-16 09:33:18 +08:00
Bruce Robbins f23a5441d4 [SPARK-39061][SQL] Set nullable correctly for `Inline` output attributes
Change `Inline#elementSchema` to make each struct field nullable when the containing array has a null element.

This query returns incorrect results (the last row should be `NULL NULL`):
```
spark-sql> select inline(array(named_struct('a', 1, 'b', 2), null));
1	2
-1	-1
Time taken: 4.053 seconds, Fetched 2 row(s)
spark-sql>
```
And this query gets a NullPointerException:
```
spark-sql> select inline(array(named_struct('a', '1', 'b', '2'), null));
22/04/28 16:51:54 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2)
java.lang.NullPointerException: null
	at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110) ~[spark-catalyst_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown Source) ~[?:?]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) ~[?:?]
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(Buffere
```
When an array of structs is created by `CreateArray`, and no struct field contains a literal null value, the schema for the struct will have non-nullable fields, even if the array itself has a null entry (as in the example above). As a result, the output attributes for the generator will be non-nullable.

When the output attributes for `Inline` are non-nullable, `GenerateUnsafeProjection#writeExpressionsToBuffer` generates incorrect code for null structs.

In more detail, the issue is this: `GenerateExec#codeGenCollection` generates code that will check if the struct instance (i.e., array element) is null and, if so, set a boolean for each struct field to indicate that the field contains a null. However, unless the generator's output attributes are nullable, `GenerateUnsafeProjection#writeExpressionsToBuffer` will not generate any code to check those booleans. Instead it will generate code to write out whatever is in the variables that normally hold the struct values (which will be garbage if the array element is null).

Arrays of structs from file sources do not have this issue. In that case, each `StructField` will have nullable=true due to [this](fe85d7912f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala (L417)).

(Note: the eval path for `Inline` has a different bug with null array elements that occurs even when `nullable` is set correctly in the schema, but I will address that in a separate PR).

No.

New unit test.

Closes #36883 from bersprockets/inline_struct_nullability_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit fc385dafab)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
2022-06-16 09:39:42 +09:00
sychen 333df85894 [SPARK-39355][SQL] Single column uses quoted to construct UnresolvedAttribute
Use `UnresolvedAttribute.quoted` in `Alias.toAttribute` to avoid calling `UnresolvedAttribute.apply` causing `ParseException`.

```sql
SELECT *
FROM (
    SELECT '2022-06-01' AS c1
) a
WHERE c1 IN (
    SELECT date_add('2022-06-01', 0)
);
```
```
Error in query:
mismatched input '(' expecting {<EOF>, '.', '-'}(line 1, pos 8)
== SQL ==
date_add(2022-06-01, 0)
--------^^^
```

No

add UT

Closes #36740 from cxzl25/SPARK-39355.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2022-06-15 16:04:51 +08:00
8 changed files with 118 additions and 8 deletions

View File

@ -85,7 +85,7 @@ function loadMore() {
if (retStartByte == 0) {
disableMoreButton();
}
$("pre", ".log-content").prepend(cleanData);
$("pre", ".log-content").prepend(document.createTextNode(cleanData));
curLogLength = curLogLength + (startByte - retStartByte);
startByte = retStartByte;
@ -115,7 +115,7 @@ function loadNew() {
var retLogLength = dataInfo[2];
var cleanData = data.substring(newlineIndex + 1);
$("pre", ".log-content").append(cleanData);
$("pre", ".log-content").append(document.createTextNode(cleanData));
curLogLength = curLogLength + (retEndByte - retStartByte);
endByte = retEndByte;

View File

@ -117,7 +117,7 @@ case class AggregateExpression(
// This is a bit of a hack. Really we should not be constructing this container and reasoning
// about datatypes / aggregation mode until after we have finished analysis and made it to
// planning.
UnresolvedAttribute(aggregateFunction.toString)
UnresolvedAttribute.quoted(aggregateFunction.toString)
}
def filterAttributes: AttributeSet = filter.map(_.references).getOrElse(AttributeSet.empty)

View File

@ -444,20 +444,25 @@ case class Inline(child: Expression) extends UnaryExpression with CollectionGene
}
override def elementSchema: StructType = child.dataType match {
case ArrayType(st: StructType, _) => st
case ArrayType(st: StructType, false) => st
case ArrayType(st: StructType, true) => st.asNullable
}
override def collectionType: DataType = child.dataType
private lazy val numFields = elementSchema.fields.length
private lazy val generatorNullRow = new GenericInternalRow(elementSchema.length)
override def eval(input: InternalRow): TraversableOnce[InternalRow] = {
val inputArray = child.eval(input).asInstanceOf[ArrayData]
if (inputArray == null) {
Nil
} else {
for (i <- 0 until inputArray.numElements())
yield inputArray.getStruct(i, numFields)
for (i <- 0 until inputArray.numElements()) yield {
val s = inputArray.getStruct(i, numFields)
if (s == null) generatorNullRow else s
}
}
}

View File

@ -206,7 +206,7 @@ case class Alias(child: Expression, name: String)(
if (resolved) {
AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier)
} else {
UnresolvedAttribute(name)
UnresolvedAttribute.quoted(name)
}
}

View File

@ -359,7 +359,17 @@ object UnwrapCastInBinaryComparison extends Rule[LogicalPlan] {
!fromExp.foldable &&
fromExp.dataType.isInstanceOf[NumericType] &&
toType.isInstanceOf[NumericType] &&
Cast.canUpCast(fromExp.dataType, toType)
canUnwrapCast(fromExp.dataType, toType)
}
private def canUnwrapCast(from: DataType, to: DataType): Boolean = (from, to) match {
// SPARK-39476: It's not safe to unwrap cast from Integer to Float or from Long to Float/Double,
// since the length of Integer/Long may exceed the significant digits of Float/Double.
case (IntegerType, FloatType) => false
case (LongType, FloatType) => false
case (LongType, DoubleType) => false
case _ if from.isInstanceOf[NumericType] => Cast.canUpCast(from, to)
case _ => false
}
private[optimizer] def getRange(dt: DataType): Option[(Any, Any)] = dt match {

View File

@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.trees.LeafLike
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StructType}
@ -364,6 +365,41 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*),
Row(1, 6) :: Row(3, 6) :: Nil)
}
def testNullStruct(): Unit = {
val df = sql(
"""select * from values
|(
| 1,
| array(
| named_struct('c1', 0, 'c2', 1),
| null,
| named_struct('c1', 2, 'c2', 3),
| null
| )
|)
|as tbl(a, b)
""".stripMargin)
df.createOrReplaceTempView("t1")
checkAnswer(
sql("select inline(b) from t1"),
Row(0, 1) :: Row(null, null) :: Row(2, 3) :: Row(null, null) :: Nil)
checkAnswer(
sql("select a, inline(b) from t1"),
Row(1, 0, 1) :: Row(1, null, null) :: Row(1, 2, 3) :: Row(1, null, null) :: Nil)
}
test("SPARK-39061: inline should handle null struct") {
testNullStruct
}
test("SPARK-39496: inline eval path should handle null struct") {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
testNullStruct
}
}
}
case class EmptyGenerator() extends Generator with LeafLike[Expression] {

View File

@ -1921,4 +1921,32 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}.getMessage.contains("Correlated column is not allowed in predicate"))
}
}
test("SPARK-39355: Single column uses quoted to construct UnresolvedAttribute") {
checkAnswer(
sql("""
|SELECT *
|FROM (
| SELECT '2022-06-01' AS c1
|) a
|WHERE c1 IN (
| SELECT date_add('2022-06-01', 0)
|)
|""".stripMargin),
Row("2022-06-01"))
checkAnswer(
sql("""
|SELECT *
|FROM (
| SELECT '2022-06-01' AS c1
|) a
|WHERE c1 IN (
| SELECT date_add(a.c1.k1, 0)
| FROM (
| SELECT named_struct('k1', '2022-06-01') AS c1
| ) a
|)
|""".stripMargin),
Row("2022-06-01"))
}
}

View File

@ -190,5 +190,36 @@ class UnwrapCastInComparisonEndToEndSuite extends QueryTest with SharedSparkSess
}
}
test("SPARK-39476: Should not unwrap cast from Long to Double/Float") {
withTable(t) {
Seq((6470759586864300301L))
.toDF("c1").write.saveAsTable(t)
val df = spark.table(t)
checkAnswer(
df.where("cast(c1 as double) == cast(6470759586864300301L as double)")
.select("c1"),
Row(6470759586864300301L))
checkAnswer(
df.where("cast(c1 as float) == cast(6470759586864300301L as float)")
.select("c1"),
Row(6470759586864300301L))
}
}
test("SPARK-39476: Should not unwrap cast from Integer to Float") {
withTable(t) {
Seq((33554435))
.toDF("c1").write.saveAsTable(t)
val df = spark.table(t)
checkAnswer(
df.where("cast(c1 as float) == cast(33554435 as float)")
.select("c1"),
Row(33554435))
}
}
private def decimal(v: BigDecimal): Decimal = Decimal(v, 5, 2)
}