[SPARK-32526][SQL] Fix some test cases of sql/catalyst module in scala 2.13

### What changes were proposed in this pull request?
The purpose of this pr is to partial resolve [SPARK-32526](https://issues.apache.org/jira/browse/SPARK-32526), total of 88 failed and 2 aborted test cases were fixed, the related suite as follow:

- `DataSourceV2AnalysisBaseSuite` related test cases (71 FAILED -> Pass)
- `TreeNodeSuite` (1 FAILED -> Pass)
- `MetadataSuite `(1 FAILED -> Pass)
- `InferFiltersFromConstraintsSuite `(3 FAILED -> Pass)
- `StringExpressionsSuite ` (1 FAILED -> Pass)
- `JacksonParserSuite ` (1 FAILED -> Pass)
- `HigherOrderFunctionsSuite `(1 FAILED -> Pass)
- `ExpressionParserSuite` (1 FAILED -> Pass)
- `CollectionExpressionsSuite `(6 FAILED -> Pass)
- `SchemaUtilsSuite` (2 FAILED -> Pass)
- `ExpressionSetSuite `(ABORTED -> Pass)
- `ArrayDataIndexedSeqSuite `(ABORTED -> Pass)

The main change of this pr as following:

- `Optimizer` and `Analyzer` are changed to pass compile, `ArrayBuffer` is not a `Seq` in scala 2.13, call `toSeq` method manually to compatible with Scala 2.12

- `m.mapValues().view.force` pattern return a `Map` in scala 2.12 but return a `IndexedSeq` in scala 2.13, call `toMap` method manually to compatible with Scala 2.12. `TreeNode` are changed to pass `DataSourceV2AnalysisBaseSuite` related test cases and `TreeNodeSuite` failed case.

- call `toMap` method of `Metadata#hash` method `case map` branch because `map.mapValues` return `Map` in Scala 2.12 and return `MapView` in Scala 2.13.

- `impl` contact method of `ExpressionSet` in Scala 2.13 version refer to `ExpressionSet` in Scala 2.12 to support `+ + ` method conform to `ExpressionSet` semantics

- `GenericArrayData` not accept `ArrayBuffer` input, call `toSeq` when use `ArrayBuffer` construction `GenericArrayData`   for Scala version compatibility

-  Call `toSeq` in `RandomDataGenerator#randomRow` method to ensure contents of `fields` is `Seq` not `ArrayBuffer`

-  Call `toSeq` Let `JacksonParser#parse` still return a `Seq` because the check method of `JacksonParserSuite#"skipping rows using pushdown filters"` dependence on `Seq` type
- Call `toSeq` in `AstBuilder#visitFunctionCall`, otherwise `ctx.argument.asScala.map(expression)` is `Buffer` in Scala 2.13

- Add a `LongType` match to `ArraySetLike.nullValueHolder`

- Add a `sorted` to ensure `duplicateColumns` string in `SchemaUtils.checkColumnNameDuplication` method error message have a deterministic order

### Why are the changes needed?
We need to support a Scala 2.13 build.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Scala 2.12: Pass the Jenkins or GitHub Action

- Scala 2.13: Do the following:

```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests  -pl sql/catalyst -Pscala-2.13 -am
mvn test -pl sql/catalyst -Pscala-2.13
```

**Before**
```
Tests: succeeded 3853, failed 103, canceled 0, ignored 6, pending 0
*** 3 SUITES ABORTED ***
*** 103 TESTS FAILED ***
```

**After**

```
Tests: succeeded 4035, failed 17, canceled 0, ignored 6, pending 0
*** 1 SUITE ABORTED ***
*** 15 TESTS FAILED ***
```

Closes #29370 from LuciferYang/fix-DataSourceV2AnalysisBaseSuite.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
yangjie01 2020-08-13 11:46:30 -05:00 committed by Sean Owen
parent 0c850c71e7
commit 6ae2cb2db3
13 changed files with 39 additions and 25 deletions

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.expressions package org.apache.spark.sql.catalyst.expressions
import scala.collection.mutable import scala.collection.{mutable, View}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
object ExpressionSet { object ExpressionSet {
@ -86,6 +86,12 @@ class ExpressionSet protected(
} }
} }
override def concat(elems: IterableOnce[Expression]): Set[Expression] = {
val newSet = new ExpressionSet(baseSet.clone(), originals.clone())
elems.iterator.foreach(newSet.add)
newSet
}
override def iterator: Iterator[Expression] = originals.iterator override def iterator: Iterator[Expression] = originals.iterator
/** /**

View file

@ -1281,12 +1281,12 @@ class Analyzer(
} }
if (attrMapping.isEmpty) { if (attrMapping.isEmpty) {
newPlan -> attrMapping newPlan -> attrMapping.toSeq
} else { } else {
assert(!attrMapping.groupBy(_._1.exprId) assert(!attrMapping.groupBy(_._1.exprId)
.exists(_._2.map(_._2.exprId).distinct.length > 1), .exists(_._2.map(_._2.exprId).distinct.length > 1),
"Found duplicate rewrite attributes") "Found duplicate rewrite attributes")
val attributeRewrites = AttributeMap(attrMapping) val attributeRewrites = AttributeMap(attrMapping.toSeq)
// Using attrMapping from the children plans to rewrite their parent node. // Using attrMapping from the children plans to rewrite their parent node.
// Note that we shouldn't rewrite a node using attrMapping from its sibling nodes. // Note that we shouldn't rewrite a node using attrMapping from its sibling nodes.
newPlan.transformExpressions { newPlan.transformExpressions {
@ -1294,7 +1294,7 @@ class Analyzer(
dedupAttr(a, attributeRewrites) dedupAttr(a, attributeRewrites)
case s: SubqueryExpression => case s: SubqueryExpression =>
s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites)) s.withNewPlan(dedupOuterReferencesInSubquery(s.plan, attributeRewrites))
} -> attrMapping } -> attrMapping.toSeq
} }
} }
} }

View file

@ -3050,6 +3050,7 @@ trait ArraySetLike {
@transient protected lazy val nullValueHolder = et match { @transient protected lazy val nullValueHolder = et match {
case ByteType => "(byte) 0" case ByteType => "(byte) 0"
case ShortType => "(short) 0" case ShortType => "(short) 0"
case LongType => "(long) 0"
case _ => "0" case _ => "0"
} }
@ -3155,7 +3156,7 @@ case class ArrayDistinct(child: Expression)
} }
} }
} }
new GenericArrayData(arrayBuffer) new GenericArrayData(arrayBuffer.toSeq)
} }
} }
@ -3313,7 +3314,7 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi
i += 1 i += 1
} }
} }
new GenericArrayData(arrayBuffer) new GenericArrayData(arrayBuffer.toSeq)
} else { } else {
(array1, array2) => (array1, array2) =>
val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
@ -3344,7 +3345,7 @@ case class ArrayUnion(left: Expression, right: Expression) extends ArrayBinaryLi
arrayBuffer += elem arrayBuffer += elem
} }
})) }))
new GenericArrayData(arrayBuffer) new GenericArrayData(arrayBuffer.toSeq)
} }
} }
@ -3476,7 +3477,7 @@ object ArrayUnion {
arrayBuffer += elem arrayBuffer += elem
} }
})) }))
new GenericArrayData(arrayBuffer) new GenericArrayData(arrayBuffer.toSeq)
} }
} }
@ -3538,7 +3539,7 @@ case class ArrayIntersect(left: Expression, right: Expression) extends ArrayBina
} }
i += 1 i += 1
} }
new GenericArrayData(arrayBuffer) new GenericArrayData(arrayBuffer.toSeq)
} else { } else {
new GenericArrayData(Array.emptyObjectArray) new GenericArrayData(Array.emptyObjectArray)
} }
@ -3586,7 +3587,7 @@ case class ArrayIntersect(left: Expression, right: Expression) extends ArrayBina
} }
i += 1 i += 1
} }
new GenericArrayData(arrayBuffer) new GenericArrayData(arrayBuffer.toSeq)
} else { } else {
new GenericArrayData(Array.emptyObjectArray) new GenericArrayData(Array.emptyObjectArray)
} }
@ -3777,7 +3778,7 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL
} }
i += 1 i += 1
} }
new GenericArrayData(arrayBuffer) new GenericArrayData(arrayBuffer.toSeq)
} else { } else {
(array1, array2) => (array1, array2) =>
val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any] val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
@ -3822,7 +3823,7 @@ case class ArrayExcept(left: Expression, right: Expression) extends ArrayBinaryL
} }
i += 1 i += 1
} }
new GenericArrayData(arrayBuffer) new GenericArrayData(arrayBuffer.toSeq)
} }
} }

View file

@ -501,7 +501,7 @@ case class ArrayFilter(
} }
i += 1 i += 1
} }
new GenericArrayData(buffer) new GenericArrayData(buffer.toSeq)
} }
override def prettyName: String = "filter" override def prettyName: String = "filter"

View file

@ -2370,8 +2370,8 @@ case class Sentences(
widx = wi.current widx = wi.current
if (Character.isLetterOrDigit(word.charAt(0))) words += UTF8String.fromString(word) if (Character.isLetterOrDigit(word.charAt(0))) words += UTF8String.fromString(word)
} }
result += new GenericArrayData(words) result += new GenericArrayData(words.toSeq)
} }
new GenericArrayData(result) new GenericArrayData(result.toSeq)
} }
} }

View file

@ -465,7 +465,7 @@ class JacksonParser(
case null => None case null => None
case _ => rootConverter.apply(parser) match { case _ => rootConverter.apply(parser) match {
case null => throw new RuntimeException("Root converter returned null") case null => throw new RuntimeException("Root converter returned null")
case rows => rows case rows => rows.toSeq
} }
} }
} }

View file

@ -971,7 +971,7 @@ object CombineUnions extends Rule[LogicalPlan] {
flattened += child flattened += child
} }
} }
union.copy(children = flattened) union.copy(children = flattened.toSeq)
} }
} }

View file

@ -1612,13 +1612,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
// Create the function call. // Create the function call.
val name = ctx.functionName.getText val name = ctx.functionName.getText
val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null) val isDistinct = Option(ctx.setQuantifier()).exists(_.DISTINCT != null)
val arguments = ctx.argument.asScala.map(expression) match { // Call `toSeq`, otherwise `ctx.argument.asScala.map(expression)` is `Buffer` in Scala 2.13
val arguments = ctx.argument.asScala.map(expression).toSeq match {
case Seq(UnresolvedStar(None)) case Seq(UnresolvedStar(None))
if name.toLowerCase(Locale.ROOT) == "count" && !isDistinct => if name.toLowerCase(Locale.ROOT) == "count" && !isDistinct =>
// Transform COUNT(*) into COUNT(1). // Transform COUNT(*) into COUNT(1).
Seq(Literal(1)) Seq(Literal(1))
case expressions => case expressions =>
expressions.toSeq expressions
} }
val filter = Option(ctx.where).map(expression(_)) val filter = Option(ctx.where).map(expression(_))
val function = UnresolvedFunction( val function = UnresolvedFunction(

View file

@ -275,8 +275,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
case s: Seq[_] => case s: Seq[_] =>
s.map(mapChild) s.map(mapChild)
case m: Map[_, _] => case m: Map[_, _] =>
// `map.mapValues().view.force` return `Map` in Scala 2.12 but return `IndexedSeq` in Scala
// 2.13, call `toMap` method manually to compatible with Scala 2.12 and Scala 2.13
// `mapValues` is lazy and we need to force it to materialize // `mapValues` is lazy and we need to force it to materialize
m.mapValues(mapChild).view.force m.mapValues(mapChild).view.force.toMap
case arg: TreeNode[_] if containsChild(arg) => mapTreeNode(arg) case arg: TreeNode[_] if containsChild(arg) => mapTreeNode(arg)
case Some(child) => Some(mapChild(child)) case Some(child) => Some(mapChild(child))
case nonChild: AnyRef => nonChild case nonChild: AnyRef => nonChild
@ -411,6 +413,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
} else { } else {
Some(arg) Some(arg)
} }
// `map.mapValues().view.force` return `Map` in Scala 2.12 but return `IndexedSeq` in Scala
// 2.13, call `toMap` method manually to compatible with Scala 2.12 and Scala 2.13
case m: Map[_, _] => m.mapValues { case m: Map[_, _] => m.mapValues {
case arg: TreeNode[_] if containsChild(arg) => case arg: TreeNode[_] if containsChild(arg) =>
val newChild = f(arg.asInstanceOf[BaseType]) val newChild = f(arg.asInstanceOf[BaseType])
@ -421,7 +425,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
arg arg
} }
case other => other case other => other
}.view.force // `mapValues` is lazy and we need to force it to materialize }.view.force.toMap // `mapValues` is lazy and we need to force it to materialize
case d: DataType => d // Avoid unpacking Structs case d: DataType => d // Avoid unpacking Structs
case args: Stream[_] => args.map(mapChild).force // Force materialization on stream case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
case args: Iterable[_] => args.map(mapChild) case args: Iterable[_] => args.map(mapChild)

View file

@ -202,8 +202,10 @@ object Metadata {
/** Computes the hash code for the types we support. */ /** Computes the hash code for the types we support. */
private def hash(obj: Any): Int = { private def hash(obj: Any): Int = {
obj match { obj match {
// `map.mapValues` return `Map` in Scala 2.12 and return `MapView` in Scala 2.13, call
// `toMap` for Scala version compatibility.
case map: Map[_, _] => case map: Map[_, _] =>
map.mapValues(hash).## map.mapValues(hash).toMap.##
case arr: Array[_] => case arr: Array[_] =>
// Seq.empty[T] has the same hashCode regardless of T. // Seq.empty[T] has the same hashCode regardless of T.
arr.toSeq.map(hash).## arr.toSeq.map(hash).##

View file

@ -118,7 +118,7 @@ private[spark] object SchemaUtils {
case (x, ys) if ys.length > 1 => s"`$x`" case (x, ys) if ys.length > 1 => s"`$x`"
} }
throw new AnalysisException( throw new AnalysisException(
s"Found duplicate column(s) $colType: ${duplicateColumns.mkString(", ")}") s"Found duplicate column(s) $colType: ${duplicateColumns.toSeq.sorted.mkString(", ")}")
} }
} }

View file

@ -360,7 +360,7 @@ object RandomDataGenerator {
arr += gen() arr += gen()
i += 1 i += 1
} }
arr arr.toSeq
} }
fields += data fields += data
case StructType(children) => case StructType(children) =>

View file

@ -41,7 +41,7 @@ class SchemaUtilsSuite extends SparkFunSuite {
test(s"Check column name duplication in $testType cases") { test(s"Check column name duplication in $testType cases") {
def checkExceptionCases(schemaStr: String, duplicatedColumns: Seq[String]): Unit = { def checkExceptionCases(schemaStr: String, duplicatedColumns: Seq[String]): Unit = {
val expectedErrorMsg = "Found duplicate column(s) in SchemaUtilsSuite: " + val expectedErrorMsg = "Found duplicate column(s) in SchemaUtilsSuite: " +
duplicatedColumns.map(c => s"`${c.toLowerCase(Locale.ROOT)}`").mkString(", ") duplicatedColumns.sorted.map(c => s"`${c.toLowerCase(Locale.ROOT)}`").mkString(", ")
val schema = StructType.fromDDL(schemaStr) val schema = StructType.fromDDL(schemaStr)
var msg = intercept[AnalysisException] { var msg = intercept[AnalysisException] {
SchemaUtils.checkSchemaColumnNameDuplication( SchemaUtils.checkSchemaColumnNameDuplication(