From 3ce4ab545bfc28db7df2c559726b887b0c8c33b7 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 23 Nov 2020 16:28:43 -0800 Subject: [PATCH] [SPARK-33513][BUILD] Upgrade to Scala 2.13.4 to improve exhaustivity ### What changes were proposed in this pull request? This PR aims the followings. 1. Upgrade from Scala 2.13.3 to 2.13.4 for Apache Spark 3.1 2. Fix exhaustivity issues in both Scala 2.12/2.13 (Scala 2.13.4 requires this for compilation.) 3. Enforce the improved exhaustive check by using the existing Scala 2.13 GitHub Action compilation job. ### Why are the changes needed? Scala 2.13.4 is a maintenance release for 2.13 line and improves JDK 15 support. - https://github.com/scala/scala/releases/tag/v2.13.4 Also, it improves exhaustivity check. - https://github.com/scala/scala/pull/9140 (Check exhaustivity of pattern matches with "if" guards and custom extractors) - https://github.com/scala/scala/pull/9147 (Check all bindings exhaustively, e.g. tuples components) ### Does this PR introduce _any_ user-facing change? Yep. Although it's a maintenance version change, it's a Scala version change. ### How was this patch tested? Pass the CIs and do the manual testing. - Scala 2.12 CI jobs(GitHub Action/Jenkins UT/Jenkins K8s IT) to check the validity of code change. - Scala 2.13 Compilation job to check the compilation Closes #30455 from dongjoon-hyun/SCALA_3.13. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/storage/StorageUtils.scala | 2 +- .../main/scala/org/apache/spark/util/JsonProtocol.scala | 8 ++++---- .../src/main/scala/org/apache/spark/ml/linalg/BLAS.scala | 2 ++ .../org/apache/spark/ml/feature/RFormulaParser.scala | 6 +++++- .../org/apache/spark/ml/feature/StandardScaler.scala | 2 ++ .../org/apache/spark/ml/linalg/JsonMatrixConverter.scala | 2 ++ .../org/apache/spark/ml/linalg/JsonVectorConverter.scala | 2 ++ .../main/scala/org/apache/spark/ml/linalg/VectorUDT.scala | 2 ++ .../spark/ml/optim/aggregator/HingeAggregator.scala | 3 +++ .../spark/ml/optim/aggregator/LogisticAggregator.scala | 3 +++ .../scala/org/apache/spark/ml/util/Instrumentation.scala | 2 ++ .../org/apache/spark/mllib/feature/StandardScaler.scala | 2 ++ .../main/scala/org/apache/spark/mllib/linalg/BLAS.scala | 2 ++ .../scala/org/apache/spark/mllib/linalg/Vectors.scala | 2 ++ .../spark/mllib/linalg/distributed/IndexedRowMatrix.scala | 4 ++++ .../apache/spark/mllib/linalg/distributed/RowMatrix.scala | 2 ++ pom.xml | 2 +- .../scheduler/cluster/mesos/MesosSchedulerUtils.scala | 2 +- .../mesos/MesosFineGrainedSchedulerBackendSuite.scala | 2 +- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 2 +- .../apache/spark/sql/catalyst/expressions/literals.scala | 4 +++- .../spark/sql/catalyst/expressions/objects/objects.scala | 2 +- .../apache/spark/sql/catalyst/json/JsonInferSchema.scala | 3 +++ .../sql/catalyst/optimizer/StarSchemaDetection.scala | 6 +++--- .../apache/spark/sql/catalyst/optimizer/expressions.scala | 1 + .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala | 2 ++ .../catalyst/plans/logical/basicLogicalOperators.scala | 2 +- .../apache/spark/sql/catalyst/util/GenericArrayData.scala | 2 +- .../spark/sql/catalyst/planning/ScanOperationSuite.scala | 5 +++++ .../sql/catalyst/util/ArrayDataIndexedSeqSuite.scala | 2 +- .../org/apache/spark/sql/execution/SparkSqlParser.scala | 6 +++--- .../spark/sql/execution/aggregate/BaseAggregateExec.scala | 2 +- .../spark/sql/execution/window/WindowExecBase.scala | 6 ++++++ .../scala/org/apache/spark/sql/hive/HiveInspectors.scala | 1 + .../spark/streaming/util/FileBasedWriteAheadLog.scala | 2 +- 35 files changed, 77 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 147731a0fb..c607fb28b2 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -169,7 +169,7 @@ private[spark] class StorageStatus( .getOrElse((0L, 0L)) case _ if !level.useOffHeap => (_nonRddStorageInfo.onHeapUsage, _nonRddStorageInfo.diskUsage) - case _ if level.useOffHeap => + case _ => (_nonRddStorageInfo.offHeapUsage, _nonRddStorageInfo.diskUsage) } val newMem = math.max(oldMem + changeInMem, 0L) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 13f7cb4533..103965e486 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -757,7 +757,7 @@ private[spark] object JsonProtocol { def taskResourceRequestMapFromJson(json: JValue): Map[String, TaskResourceRequest] = { val jsonFields = json.asInstanceOf[JObject].obj - jsonFields.map { case JField(k, v) => + jsonFields.collect { case JField(k, v) => val req = taskResourceRequestFromJson(v) (k, req) }.toMap @@ -765,7 +765,7 @@ private[spark] object JsonProtocol { def executorResourceRequestMapFromJson(json: JValue): Map[String, ExecutorResourceRequest] = { val jsonFields = json.asInstanceOf[JObject].obj - jsonFields.map { case JField(k, v) => + jsonFields.collect { case JField(k, v) => val req = executorResourceRequestFromJson(v) (k, req) }.toMap @@ -1229,7 +1229,7 @@ private[spark] object JsonProtocol { def resourcesMapFromJson(json: JValue): Map[String, ResourceInformation] = { val jsonFields = json.asInstanceOf[JObject].obj - jsonFields.map { case JField(k, v) => + jsonFields.collect { case JField(k, v) => val resourceInfo = ResourceInformation.parseJson(v) (k, resourceInfo) }.toMap @@ -1241,7 +1241,7 @@ private[spark] object JsonProtocol { def mapFromJson(json: JValue): Map[String, String] = { val jsonFields = json.asInstanceOf[JObject].obj - jsonFields.map { case JField(k, JString(v)) => (k, v) }.toMap + jsonFields.collect { case JField(k, JString(v)) => (k, v) }.toMap } def propertiesFromJson(json: JValue): Properties = { diff --git a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala index 368f177cda..b6c1b011f0 100644 --- a/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala +++ b/mllib-local/src/main/scala/org/apache/spark/ml/linalg/BLAS.scala @@ -302,6 +302,8 @@ private[spark] object BLAS extends Serializable { j += 1 prevCol = col } + case _ => + throw new IllegalArgumentException(s"spr doesn't support vector type ${v.getClass}.") } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala index dbbfd8f329..c5b28c95eb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormulaParser.scala @@ -286,6 +286,7 @@ private[ml] object RFormulaParser extends RegexParsers { private val pow: Parser[Term] = term ~ "^" ~ "^[1-9]\\d*".r ^^ { case base ~ "^" ~ degree => power(base, degree.toInt) + case t => throw new IllegalArgumentException(s"Invalid term: $t") } | term private val interaction: Parser[Term] = pow * (":" ^^^ { interact _ }) @@ -298,7 +299,10 @@ private[ml] object RFormulaParser extends RegexParsers { private val expr = (sum | term) private val formula: Parser[ParsedRFormula] = - (label ~ "~" ~ expr) ^^ { case r ~ "~" ~ t => ParsedRFormula(r, t.asTerms.terms) } + (label ~ "~" ~ expr) ^^ { + case r ~ "~" ~ t => ParsedRFormula(r, t.asTerms.terms) + case t => throw new IllegalArgumentException(s"Invalid term: $t") + } def parse(value: String): ParsedRFormula = parseAll(formula, value) match { case Success(result, _) => result diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index 7434b1adb2..92dee46ad0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -314,6 +314,8 @@ object StandardScalerModel extends MLReadable[StandardScalerModel] { case SparseVector(size, indices, values) => val newValues = transformSparseWithScale(scale, indices, values.clone()) Vectors.sparse(size, indices, newValues) + case v => + throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.") } case (false, false) => diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala index 0bee643412..8f03a29eb9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonMatrixConverter.scala @@ -74,6 +74,8 @@ private[ml] object JsonMatrixConverter { ("values" -> values.toSeq) ~ ("isTransposed" -> isTransposed) compact(render(jValue)) + case _ => + throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.") } } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala index 781e69f8d6..1b949d75ee 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/JsonVectorConverter.scala @@ -57,6 +57,8 @@ private[ml] object JsonVectorConverter { case DenseVector(values) => val jValue = ("type" -> 1) ~ ("values" -> values.toSeq) compact(render(jValue)) + case _ => + throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.") } } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala index 37f173bc20..35bbaf5aa1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/linalg/VectorUDT.scala @@ -45,6 +45,8 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] { row.setNullAt(2) row.update(3, UnsafeArrayData.fromPrimitiveArray(values)) row + case v => + throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.") } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala index 3d72512563..0fe1ed231a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -200,6 +200,9 @@ private[ml] class BlockHingeAggregator( case sm: SparseMatrix if !fitIntercept => val gradSumVec = new DenseVector(gradientSumArray) BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) + + case m => + throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.") } if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala index 2496c789f8..5a516940b9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala @@ -504,6 +504,9 @@ private[ml] class BlockLogisticAggregator( case sm: SparseMatrix if !fitIntercept => val gradSumVec = new DenseVector(gradientSumArray) BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec) + + case m => + throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.") } if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala index d4b39e11fd..2215c2b071 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/Instrumentation.scala @@ -192,6 +192,8 @@ private[spark] object Instrumentation { case Failure(NonFatal(e)) => instr.logFailure(e) throw e + case Failure(e) => + throw e case Success(result) => instr.logSuccess() result diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala index 8f9d6d07a4..12a5a0f2b2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala @@ -167,6 +167,8 @@ class StandardScalerModel @Since("1.3.0") ( val newValues = NewStandardScalerModel .transformSparseWithScale(localScale, indices, values.clone()) Vectors.sparse(size, indices, newValues) + case v => + throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.") } case _ => vector diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index da486010cf..bd60364326 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -285,6 +285,8 @@ private[spark] object BLAS extends Serializable with Logging { j += 1 prevCol = col } + case _ => + throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.") } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala index 2fe415f140..9ed9dd0c88 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala @@ -289,6 +289,8 @@ class VectorUDT extends UserDefinedType[Vector] { row.setNullAt(2) row.update(3, UnsafeArrayData.fromPrimitiveArray(values)) row + case v => + throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.") } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala index ad79230c75..da5d165069 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala @@ -145,6 +145,8 @@ class IndexedRowMatrix @Since("1.0.0") ( .map { case (values, blockColumn) => ((blockRow.toInt, blockColumn), (rowInBlock.toInt, values.zipWithIndex)) } + case v => + throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.") } }.groupByKey(GridPartitioner(numRowBlocks, numColBlocks, rows.getNumPartitions)).map { case ((blockRow, blockColumn), itr) => @@ -187,6 +189,8 @@ class IndexedRowMatrix @Since("1.0.0") ( Iterator.tabulate(indices.length)(i => MatrixEntry(rowIndex, indices(i), values(i))) case DenseVector(values) => Iterator.tabulate(values.length)(i => MatrixEntry(rowIndex, i, values(i))) + case v => + throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.") } } new CoordinateMatrix(entries, numRows(), numCols()) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 07b9d91c1f..c618b71ddc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -748,6 +748,8 @@ class RowMatrix @Since("1.0.0") ( } buf }.flatten + case v => + throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.") } } }.reduceByKey(_ + _).map { case ((i, j), sim) => diff --git a/pom.xml b/pom.xml index 0ab5a8c5b3..e5b1f30edd 100644 --- a/pom.xml +++ b/pom.xml @@ -3264,7 +3264,7 @@ scala-2.13 - 2.13.3 + 2.13.4 2.13 diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index b5a3601676..4620bdb005 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -313,7 +313,6 @@ trait MesosSchedulerUtils extends Logging { // offer has the required attribute and subsumes the required values for that attribute case (name, requiredValues) => offerAttributes.get(name) match { - case None => false case Some(_) if requiredValues.isEmpty => true // empty value matches presence case Some(scalarValue: Value.Scalar) => // check if provided values is less than equal to the offered values @@ -332,6 +331,7 @@ trait MesosSchedulerUtils extends Logging { // check if the specified value is equal, if multiple values are specified // we succeed if any of them match. requiredValues.contains(textValue.getValue) + case _ => false } } } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 67ecf3242f..6a6514569c 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -178,7 +178,7 @@ class MesosFineGrainedSchedulerBackendSuite val (execInfo, _) = backend.createExecutorInfo( Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor") assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock")) - assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true)) + assert(execInfo.getContainer.getDocker.getForcePullImage) val portmaps = execInfo.getContainer.getDocker.getPortMappingsList assert(portmaps.get(0).getHostPort.equals(80)) assert(portmaps.get(0).getContainerPort.equals(8080)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 39d9eb5a36..a363615d3a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -94,7 +94,7 @@ private[this] object JsonPathParser extends RegexParsers { case Success(result, _) => Some(result) - case NoSuccess(msg, next) => + case _ => None } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 1e69814673..810cecff37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -322,7 +322,9 @@ case class Literal (value: Any, dataType: DataType) extends LeafExpression { case (a: Array[Byte], b: Array[Byte]) => util.Arrays.equals(a, b) case (a: ArrayBasedMapData, b: ArrayBasedMapData) => a.keyArray == b.keyArray && a.valueArray == b.valueArray - case (a, b) => a != null && a.equals(b) + case (a: Double, b: Double) if a.isNaN && b.isNaN => true + case (a: Float, b: Float) if a.isNaN && b.isNaN => true + case (a, b) => a != null && a == b } case _ => false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 9701420e65..9303df75af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -981,7 +981,7 @@ case class MapObjects private( (genValue: String) => s"$builder.add($genValue);", s"$builder;" ) - case None => + case _ => // array ( s""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index de396a4c63..a39f06628b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -190,6 +190,9 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { } case VALUE_TRUE | VALUE_FALSE => BooleanType + + case _ => + throw new SparkException("Malformed JSON") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala index b65fc7f7e2..bf3fced0ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala @@ -197,9 +197,9 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper { } else { false } - case None => false + case _ => false } - case None => false + case _ => false } case _ => false } @@ -239,7 +239,7 @@ object StarSchemaDetection extends PredicateHelper with SQLConfHelper { case Some(col) if t.outputSet.contains(col) => val stats = t.stats stats.attributeStats.nonEmpty && stats.attributeStats.contains(col) - case None => false + case _ => false } case _ => false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 55a45f4410..d1eb3b07d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -685,6 +685,7 @@ object FoldablePropagation extends Rule[LogicalPlan] { case LeftOuter => newJoin.right.output case RightOuter => newJoin.left.output case FullOuter => newJoin.left.output ++ newJoin.right.output + case _ => Nil }) val newFoldableMap = AttributeMap(foldableMap.baseMap.values.filterNot { case (attr, _) => missDerivedAttrsSet.contains(attr) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index ea4baafbac..50580b8e33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -967,6 +967,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg (UsingJoin(baseJoinType, visitIdentifierList(c.identifierList)), None) case Some(c) if c.booleanExpression != null => (baseJoinType, Option(expression(c.booleanExpression))) + case Some(c) => + throw new ParseException(s"Unimplemented joinCriteria: $c", ctx) case None if join.NATURAL != null => if (baseJoinType == Cross) { throw new ParseException("NATURAL CROSS JOIN is not supported", ctx) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index f96e07863f..c7108ea8ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -362,7 +362,7 @@ case class Join( left.constraints case RightOuter => right.constraints - case FullOuter => + case _ => ExpressionSet() } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala index 81f412c143..e46d730afb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala @@ -120,7 +120,7 @@ class GenericArrayData(val array: Array[Any]) extends ArrayData { if (!o2.isInstanceOf[Double] || ! java.lang.Double.isNaN(o2.asInstanceOf[Double])) { return false } - case _ => if (!o1.equals(o2)) { + case _ => if (o1.getClass != o2.getClass || o1 != o2) { return false } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala index 7790f467a8..1290f77034 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/ScanOperationSuite.scala @@ -39,6 +39,7 @@ class ScanOperationSuite extends SparkFunSuite { assert(projects(0) === colB) assert(projects(1) === aliasR) assert(filters.size === 1) + case _ => assert(false) } } @@ -50,6 +51,7 @@ class ScanOperationSuite extends SparkFunSuite { assert(projects(0) === colA) assert(projects(1) === colB) assert(filters.size === 1) + case _ => assert(false) } } @@ -65,6 +67,7 @@ class ScanOperationSuite extends SparkFunSuite { assert(projects.size === 2) assert(projects(0) === colA) assert(projects(1) === aliasId) + case _ => assert(false) } } @@ -81,6 +84,7 @@ class ScanOperationSuite extends SparkFunSuite { assert(projects(0) === colA) assert(projects(1) === aliasR) assert(filters.size === 1) + case _ => assert(false) } } @@ -93,6 +97,7 @@ class ScanOperationSuite extends SparkFunSuite { assert(projects(0) === colA) assert(projects(1) === aliasR) assert(filters.size === 1) + case _ => assert(false) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayDataIndexedSeqSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayDataIndexedSeqSuite.scala index 1e430351b5..9c3aaea0f7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayDataIndexedSeqSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayDataIndexedSeqSuite.scala @@ -45,7 +45,7 @@ class ArrayDataIndexedSeqSuite extends SparkFunSuite { if (e != null) { elementDt match { // For Nan, etc. - case FloatType | DoubleType => assert(seq(i).equals(e)) + case FloatType | DoubleType => assert(seq(i) == e) case _ => assert(seq(i) === e) } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 85476bcd21..01522257c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -868,12 +868,12 @@ class SparkSqlAstBuilder extends AstBuilder { // assert if directory is local when LOCAL keyword is mentioned val scheme = Option(storage.locationUri.get.getScheme) scheme match { - case None => + case Some(pathScheme) if (!pathScheme.equals("file")) => + throw new ParseException("LOCAL is supported only with file: scheme", ctx) + case _ => // force scheme to be file rather than fs.default.name val loc = Some(UriBuilder.fromUri(CatalogUtils.stringToURI(path)).scheme("file").build()) storage = storage.copy(locationUri = loc) - case Some(pathScheme) if (!pathScheme.equals("file")) => - throw new ParseException("LOCAL is supported only with file: scheme", ctx) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala index efba51706c..c676609bc3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/BaseAggregateExec.scala @@ -91,7 +91,7 @@ trait BaseAggregateExec extends UnaryExecNode with AliasAwareOutputPartitioning override def requiredChildDistribution: List[Distribution] = { requiredChildDistributionExpressions match { case Some(exprs) if exprs.isEmpty => AllTuples :: Nil - case Some(exprs) if exprs.nonEmpty => ClusteredDistribution(exprs) :: Nil + case Some(exprs) => ClusteredDistribution(exprs) :: Nil case None => UnspecifiedDistribution :: Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala index c6b98d48d7..9832e5cd74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala @@ -71,6 +71,9 @@ trait WindowExecBase extends UnaryExecNode { case (RowFrame, IntegerLiteral(offset)) => RowBoundOrdering(offset) + case (RowFrame, _) => + sys.error(s"Unhandled bound in windows expressions: $bound") + case (RangeFrame, CurrentRow) => val ordering = RowOrdering.create(orderSpec, child.output) RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection) @@ -249,6 +252,9 @@ trait WindowExecBase extends UnaryExecNode { createBoundOrdering(frameType, lower, timeZone), createBoundOrdering(frameType, upper, timeZone)) } + + case _ => + sys.error(s"Unsupported factory: $key") } // Keep track of the number of expressions. This is a side-effect in a map... diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 8ab6e28366..9213173bbc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -1039,6 +1039,7 @@ private[hive] trait HiveInspectors { private def decimalTypeInfo(decimalType: DecimalType): TypeInfo = decimalType match { case DecimalType.Fixed(precision, scale) => new DecimalTypeInfo(precision, scale) + case dt => throw new AnalysisException(s"${dt.catalogString} is not supported.") } def toTypeInfo: TypeInfo = dt match { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 2e5000159b..d1f9dfb791 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -293,7 +293,7 @@ private[streaming] object FileBasedWriteAheadLog { val startTime = startTimeStr.toLong val stopTime = stopTimeStr.toLong Some(LogInfo(startTime, stopTime, file.toString)) - case None => + case None | Some(_) => None } }.sortBy { _.startTime }