[SPARK-32693][SQL] Compare two dataframes with same schema except nullable property

### What changes were proposed in this pull request?

This PR changes key data types check in `HashJoin` to use `sameType`.

### Why are the changes needed?

Looks at the resolving condition of `SetOperation`, it requires only each left data types should be `sameType` as the right ones. Logically the `EqualTo` expression in equi-join, also requires only left data type `sameType` as right data type. Then `HashJoin` requires left keys data type exactly the same as right keys data type, looks not reasonable.

It makes inconsistent results when doing `except` between two dataframes.

If two dataframes don't have nested fields, even their field nullable property different, `HashJoin` passes the key type check because it checks field individually so field nullable property is ignored.

If two dataframes have nested fields like struct, `HashJoin` fails the key type check because now it compare two struct types and nullable property now affects.

### Does this PR introduce _any_ user-facing change?

Yes. Making consistent `except` operation between dataframes.

### How was this patch tested?

Unit test.

Closes #29555 from viirya/SPARK-32693.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
This commit is contained in:
Liang-Chi Hsieh 2020-08-28 10:32:23 +09:00 committed by Takeshi Yamamuro
parent 182727d90f
commit d6c095c92c
2 changed files with 44 additions and 2 deletions

View file

@ -106,8 +106,11 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
}
protected lazy val (buildKeys, streamedKeys) = {
require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType),
"Join keys from two sides should have same types")
require(leftKeys.length == rightKeys.length &&
leftKeys.map(_.dataType)
.zip(rightKeys.map(_.dataType))
.forall(types => types._1.sameType(types._2)),
"Join keys from two sides should have same length and types")
buildSide match {
case BuildLeft => (leftKeys, rightKeys)
case BuildRight => (rightKeys, leftKeys)

View file

@ -17,6 +17,8 @@
package org.apache.spark.sql
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Filter, HintInfo, Join, JoinHint, LogicalPlan, Project}
@ -29,6 +31,7 @@ import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
class DataFrameJoinSuite extends QueryTest
with SharedSparkSession
@ -418,4 +421,40 @@ class DataFrameJoinSuite extends QueryTest
}
}
}
test("SPARK-32693: Compare two dataframes with same schema except nullable property") {
val schema1 = StructType(
StructField("a", IntegerType, false) ::
StructField("b", IntegerType, false) ::
StructField("c", IntegerType, false) :: Nil)
val rowSeq1: List[Row] = List(Row(10, 1, 1), Row(10, 50, 2))
val df1 = spark.createDataFrame(rowSeq1.asJava, schema1)
val schema2 = StructType(
StructField("a", IntegerType) ::
StructField("b", IntegerType) ::
StructField("c", IntegerType) :: Nil)
val rowSeq2: List[Row] = List(Row(10, 1, 1))
val df2 = spark.createDataFrame(rowSeq2.asJava, schema2)
checkAnswer(df1.except(df2), Row(10, 50, 2))
val schema3 = StructType(
StructField("a", IntegerType, false) ::
StructField("b", IntegerType, false) ::
StructField("c", IntegerType, false) ::
StructField("d", schema1, false) :: Nil)
val rowSeq3: List[Row] = List(Row(10, 1, 1, Row(10, 1, 1)), Row(10, 50, 2, Row(10, 50, 2)))
val df3 = spark.createDataFrame(rowSeq3.asJava, schema3)
val schema4 = StructType(
StructField("a", IntegerType) ::
StructField("b", IntegerType) ::
StructField("b", IntegerType) ::
StructField("d", schema2) :: Nil)
val rowSeq4: List[Row] = List(Row(10, 1, 1, Row(10, 1, 1)))
val df4 = spark.createDataFrame(rowSeq4.asJava, schema4)
checkAnswer(df3.except(df4), Row(10, 50, 2, Row(10, 50, 2)))
}
}