From d6c095c92c739eadd7f336e2a812ffee324cb1ef Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 28 Aug 2020 10:32:23 +0900 Subject: [PATCH] [SPARK-32693][SQL] Compare two dataframes with same schema except nullable property ### What changes were proposed in this pull request? This PR changes key data types check in `HashJoin` to use `sameType`. ### Why are the changes needed? Looks at the resolving condition of `SetOperation`, it requires only each left data types should be `sameType` as the right ones. Logically the `EqualTo` expression in equi-join, also requires only left data type `sameType` as right data type. Then `HashJoin` requires left keys data type exactly the same as right keys data type, looks not reasonable. It makes inconsistent results when doing `except` between two dataframes. If two dataframes don't have nested fields, even their field nullable property different, `HashJoin` passes the key type check because it checks field individually so field nullable property is ignored. If two dataframes have nested fields like struct, `HashJoin` fails the key type check because now it compare two struct types and nullable property now affects. ### Does this PR introduce _any_ user-facing change? Yes. Making consistent `except` operation between dataframes. ### How was this patch tested? Unit test. Closes #29555 from viirya/SPARK-32693. Authored-by: Liang-Chi Hsieh Signed-off-by: Takeshi Yamamuro --- .../spark/sql/execution/joins/HashJoin.scala | 7 +++- .../apache/spark/sql/DataFrameJoinSuite.scala | 39 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 085cc29289..0c75eda7a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -106,8 +106,11 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { } protected lazy val (buildKeys, streamedKeys) = { - require(leftKeys.map(_.dataType) == rightKeys.map(_.dataType), - "Join keys from two sides should have same types") + require(leftKeys.length == rightKeys.length && + leftKeys.map(_.dataType) + .zip(rightKeys.map(_.dataType)) + .forall(types => types._1.sameType(types._2)), + "Join keys from two sides should have same length and types") buildSide match { case BuildLeft => (leftKeys, rightKeys) case BuildRight => (rightKeys, leftKeys) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 0b4f43b723..b463a76a74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import scala.collection.JavaConverters._ + import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.{Inner, InnerLike, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Filter, HintInfo, Join, JoinHint, LogicalPlan, Project} @@ -29,6 +31,7 @@ import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ class DataFrameJoinSuite extends QueryTest with SharedSparkSession @@ -418,4 +421,40 @@ class DataFrameJoinSuite extends QueryTest } } } + + test("SPARK-32693: Compare two dataframes with same schema except nullable property") { + val schema1 = StructType( + StructField("a", IntegerType, false) :: + StructField("b", IntegerType, false) :: + StructField("c", IntegerType, false) :: Nil) + val rowSeq1: List[Row] = List(Row(10, 1, 1), Row(10, 50, 2)) + val df1 = spark.createDataFrame(rowSeq1.asJava, schema1) + + val schema2 = StructType( + StructField("a", IntegerType) :: + StructField("b", IntegerType) :: + StructField("c", IntegerType) :: Nil) + val rowSeq2: List[Row] = List(Row(10, 1, 1)) + val df2 = spark.createDataFrame(rowSeq2.asJava, schema2) + + checkAnswer(df1.except(df2), Row(10, 50, 2)) + + val schema3 = StructType( + StructField("a", IntegerType, false) :: + StructField("b", IntegerType, false) :: + StructField("c", IntegerType, false) :: + StructField("d", schema1, false) :: Nil) + val rowSeq3: List[Row] = List(Row(10, 1, 1, Row(10, 1, 1)), Row(10, 50, 2, Row(10, 50, 2))) + val df3 = spark.createDataFrame(rowSeq3.asJava, schema3) + + val schema4 = StructType( + StructField("a", IntegerType) :: + StructField("b", IntegerType) :: + StructField("b", IntegerType) :: + StructField("d", schema2) :: Nil) + val rowSeq4: List[Row] = List(Row(10, 1, 1, Row(10, 1, 1))) + val df4 = spark.createDataFrame(rowSeq4.asJava, schema4) + + checkAnswer(df3.except(df4), Row(10, 50, 2, Row(10, 50, 2))) + } }