diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index f2f81b11fc..6925c3f65c 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -165,7 +165,7 @@ private[r] class RBackendHandler(server: RBackend) // Write status bit writeInt(dos, 0) - writeObject(dos, ret.asInstanceOf[AnyRef], server.jvmObjectTracker) + writeObject(dos, ret, server.jvmObjectTracker) } else if (methodName == "") { // methodName should be "" for constructor val ctors = cls.getConstructors diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 61108bb61a..4c39d178d3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -393,7 +393,7 @@ abstract class RDD[T: ClassTag]( } // Need to compute the block. case Right(iter) => - new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]]) + new InterruptibleIterator(context, iter) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index 2d26a314e7..5696ef1377 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -99,7 +99,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl FileInputFormat.setInputPaths(conf, path) val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] = - ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], conf).asInstanceOf[ + ReflectionUtils.newInstance(inputFormatClazz, conf).asInstanceOf[ org.apache.hadoop.mapreduce.InputFormat[_, _]] val job = Job.getInstance(conf) @@ -119,7 +119,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl FileInputFormat.setInputPaths(jobConf, path) val instance: org.apache.hadoop.mapred.InputFormat[_, _] = - ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], jobConf).asInstanceOf[ + ReflectionUtils.newInstance(inputFormatClazz, jobConf).asInstanceOf[ org.apache.hadoop.mapred.InputFormat[_, _]] val retval = new ArrayBuffer[SplitInfo]() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 117c710839..8bd768e9c6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2820,7 +2820,7 @@ private[spark] object Utils extends Logging { klass.getConstructor().newInstance() } - Some(ext.asInstanceOf[T]) + Some(ext) } catch { case _: NoSuchMethodException => throw new SparkException( diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 0c067136a7..ccef00c8e9 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -238,7 +238,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { test("aggregate") { val pairs = sc.makeRDD(Seq(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) type StringMap = scala.collection.mutable.Map[String, Int] - val emptyMap = HashMap[String, Int]().withDefaultValue(0).asInstanceOf[StringMap] + val emptyMap = HashMap[String, Int]().withDefaultValue(0) val mergeElement: (StringMap, (String, Int)) => StringMap = (map, pair) => { map(pair._1) += pair._2 map diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 8ed009882b..3bf74d46f0 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -315,7 +315,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT hostLocalBlocks.foreach { case (blockId, buf) => doReturn(buf) .when(blockManager) - .getHostLocalShuffleData(meq(blockId.asInstanceOf[ShuffleBlockId]), any()) + .getHostLocalShuffleData(meq(blockId), any()) } val hostLocalDirs = Map("test-host-local-client-1" -> Array("local-dir")) // returning local dir for hostLocalBmId diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index ac36e537c7..d907fe1a27 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -118,7 +118,7 @@ class ThreadUtilsSuite extends SparkFunSuite { val exception = intercept[IllegalArgumentException] { runInNewThread("thread-name") { throw new IllegalArgumentException(uniqueExceptionMessage) } } - assert(exception.asInstanceOf[IllegalArgumentException].getMessage === uniqueExceptionMessage) + assert(exception.getMessage === uniqueExceptionMessage) assert(exception.getStackTrace.mkString("\n").contains( "... run in separate thread using org.apache.spark.util.ThreadUtils ..."), "stack trace does not contain expected place holder" diff --git a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index 647f17859d..43c4118d8f 100644 --- a/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/external/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -95,7 +95,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun } test("RDD generation") { - val inputStream = KinesisInputDStream.builder. + val kinesisStream = KinesisInputDStream.builder. streamingContext(ssc). checkpointAppName(appName). streamName("dummyStream"). @@ -105,9 +105,8 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun storageLevel(StorageLevel.MEMORY_AND_DISK_2). kinesisCredentials(BasicCredentials(dummyAWSAccessKey, dummyAWSSecretKey)). build() - assert(inputStream.isInstanceOf[KinesisInputDStream[Array[Byte]]]) + assert(kinesisStream.isInstanceOf[KinesisInputDStream[Array[Byte]]]) - val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream[Array[Byte]]] val time = Time(1000) // Generate block info data for testing diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala index ce177666f3..7665f43f8b 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/BLASSuite.scala @@ -267,7 +267,7 @@ class BLASSuite extends SparkMLFunSuite { val dATT = dATman.transpose val sATT = sATman.transpose - val BTT = BTman.transpose.asInstanceOf[DenseMatrix] + val BTT = BTman.transpose assert(dATT.multiply(B) ~== expected absTol 1e-15) assert(sATT.multiply(B) ~== expected absTol 1e-15) diff --git a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/MatricesSuite.scala b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/MatricesSuite.scala index 7d29d6dcea..458cdb174c 100644 --- a/mllib-local/src/test/scala/org/apache/spark/ml/linalg/MatricesSuite.scala +++ b/mllib-local/src/test/scala/org/apache/spark/ml/linalg/MatricesSuite.scala @@ -596,8 +596,8 @@ class MatricesSuite extends SparkMLFunSuite { new DenseMatrix(4, 3, Array(0.0, 1.0, 0.0, 0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 3.0)) val sA = new SparseMatrix(4, 3, Array(0, 1, 3, 4), Array(1, 0, 2, 3), Array(1.0, 2.0, 1.0, 3.0)) - val dAT = dA.transpose.asInstanceOf[DenseMatrix] - val sAT = sA.transpose.asInstanceOf[SparseMatrix] + val dAT = dA.transpose + val sAT = sA.transpose val dATexpected = new DenseMatrix(3, 4, Array(0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 3.0)) val sATexpected = @@ -611,7 +611,7 @@ class MatricesSuite extends SparkMLFunSuite { assert(sA(2, 1) === sAT(1, 2)) assert(!dA.toArray.eq(dAT.toArray), "has to have a new array") - assert(dA.values.eq(dAT.transpose.asInstanceOf[DenseMatrix].values), "should not copy array") + assert(dA.values.eq(dAT.transpose.values), "should not copy array") assert(dAT.toSparse.asBreeze === sATexpected.asBreeze) assert(sAT.toDense.asBreeze === dATexpected.asBreeze) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 92ae6b5bf8..efbeac928d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -398,7 +398,7 @@ private[python] class PythonMLLibAPI extends Serializable { if (initialModelWeights != null && initialModelMu != null && initialModelSigma != null) { val gaussians = initialModelMu.asScala.toSeq.zip(initialModelSigma.asScala.toSeq).map { - case (x, y) => new MultivariateGaussian(x.asInstanceOf[Vector], y.asInstanceOf[Matrix]) + case (x, y) => new MultivariateGaussian(x, y) } val initialModel = new GaussianMixtureModel( initialModelWeights.asScala.toArray, gaussians.toArray) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala index 91d1e9a447..aadf535e13 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BLASSuite.scala @@ -267,7 +267,7 @@ class BLASSuite extends SparkFunSuite { val dATT = dATman.transpose val sATT = sATman.transpose - val BTT = BTman.transpose.asInstanceOf[DenseMatrix] + val BTT = BTman.transpose assert(dATT.multiply(B) ~== expected absTol 1e-15) assert(sATT.multiply(B) ~== expected absTol 1e-15) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala index 184c89c9ea..a057c238a2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala @@ -231,8 +231,8 @@ class MatricesSuite extends SparkFunSuite { new DenseMatrix(4, 3, Array(0.0, 1.0, 0.0, 0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 3.0)) val sA = new SparseMatrix(4, 3, Array(0, 1, 3, 4), Array(1, 0, 2, 3), Array(1.0, 2.0, 1.0, 3.0)) - val dAT = dA.transpose.asInstanceOf[DenseMatrix] - val sAT = sA.transpose.asInstanceOf[SparseMatrix] + val dAT = dA.transpose + val sAT = sA.transpose val dATexpected = new DenseMatrix(3, 4, Array(0.0, 2.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 3.0)) val sATexpected = @@ -246,7 +246,7 @@ class MatricesSuite extends SparkFunSuite { assert(sA(2, 1) === sAT(1, 2)) assert(!dA.toArray.eq(dAT.toArray), "has to have a new array") - assert(dA.values.eq(dAT.transpose.asInstanceOf[DenseMatrix].values), "should not copy array") + assert(dA.values.eq(dAT.transpose.values), "should not copy array") assert(dAT.toSparse.asBreeze === sATexpected.asBreeze) assert(sAT.toDense.asBreeze === dATexpected.asBreeze) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/pmml/export/BinaryClassificationPMMLModelExportSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/pmml/export/BinaryClassificationPMMLModelExportSuite.scala index 08c581cd47..60a2781994 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/pmml/export/BinaryClassificationPMMLModelExportSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/pmml/export/BinaryClassificationPMMLModelExportSuite.scala @@ -35,7 +35,7 @@ class BinaryClassificationPMMLModelExportSuite extends SparkFunSuite { // assert that the PMML format is as expected assert(logisticModelExport.isInstanceOf[PMMLModelExport]) - val pmml = logisticModelExport.asInstanceOf[PMMLModelExport].getPmml + val pmml = logisticModelExport.getPmml assert(pmml.getHeader.getDescription === "logistic regression") // check that the number of fields match the weights size assert(pmml.getDataDictionary.getNumberOfFields === logisticRegressionModel.weights.size + 1) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/pmml/export/KMeansPMMLModelExportSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/pmml/export/KMeansPMMLModelExportSuite.scala index b61c6225e9..0460b8a0b8 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/pmml/export/KMeansPMMLModelExportSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/pmml/export/KMeansPMMLModelExportSuite.scala @@ -36,7 +36,7 @@ class KMeansPMMLModelExportSuite extends SparkFunSuite { // assert that the PMML format is as expected assert(modelExport.isInstanceOf[PMMLModelExport]) - val pmml = modelExport.asInstanceOf[PMMLModelExport].getPmml + val pmml = modelExport.getPmml assert(pmml.getHeader.getDescription === "k-means clustering") // check that the number of fields match the single vector size assert(pmml.getDataDictionary.getNumberOfFields === clusterCenters(0).size) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index bab407b431..0fcf267b1b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -809,7 +809,7 @@ object ScalaReflection extends ScalaReflection { */ def findConstructor[T](cls: Class[T], paramTypes: Seq[Class[_]]): Option[Seq[AnyRef] => T] = { Option(ConstructorUtils.getMatchingAccessibleConstructor(cls, paramTypes: _*)) match { - case Some(c) => Some(x => c.newInstance(x: _*).asInstanceOf[T]) + case Some(c) => Some(x => c.newInstance(x: _*)) case None => val companion = mirror.staticClass(cls.getName).companion val moduleMirror = mirror.reflectModule(companion.asModule) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala index dfad737124..fb6ebc899d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificInternalRow.scala @@ -175,7 +175,7 @@ final class MutableAny extends MutableValue { override def boxed: Any = if (isNull) null else value override def update(v: Any): Unit = { isNull = false - value = v.asInstanceOf[Any] + value = v } override def copy(): MutableAny = { val newCopy = new MutableAny diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index e5e4e3415c..05308a9cd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -597,7 +597,7 @@ case class IntegralDivide( if (res == null) { null } else { - integral.asInstanceOf[Integral[Any]].toLong(res) + integral.toLong(res) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 26d9815780..8219af7ddc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -114,7 +114,7 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB case (result: Array[Byte], expected: Array[Byte]) => java.util.Arrays.equals(result, expected) case (result: Double, expected: Spread[Double @unchecked]) => - expected.asInstanceOf[Spread[Double]].isWithin(result) + expected.isWithin(result) case (result: InternalRow, expected: InternalRow) => val st = dataType.asInstanceOf[StructType] assert(result.numFields == st.length && expected.numFields == st.length) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala index e06d209c47..92ef8e7d22 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/ReusableStringReaderSuite.scala @@ -49,7 +49,7 @@ class ReusableStringReaderSuite extends SparkFunSuite { val reader = new ReusableStringReader if (reader.markSupported()) { - reader.asInstanceOf[ReusableStringReader].set(fox) + reader.set(fox) assert(reader.ready()) val cc = new Array[Char](6) @@ -73,14 +73,14 @@ class ReusableStringReaderSuite extends SparkFunSuite { test("skip") { val reader = new ReusableStringReader - reader.asInstanceOf[ReusableStringReader].set(fox) + reader.set(fox) // skip entire the data: var skipped = reader.skip(fox.length() + 1) assert(fox.length() == skipped) assert(-1 == reader.read()) - reader.asInstanceOf[ReusableStringReader].set(fox) // reset the data + reader.set(fox) // reset the data val cc = new Array[Char](6) var read = reader.read(cc) assert(read == 6) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala index 8de972f25f..a8dc2b20f5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/xml/UDFXPathUtilSuite.scala @@ -116,6 +116,6 @@ class UDFXPathUtilSuite extends SparkFunSuite { test("node list eval") { val ret = util.evalNodeList("truefalseb3c1-77", "a/*") assert(ret != null && ret.isInstanceOf[NodeList]) - assert(ret.asInstanceOf[NodeList].getLength == 5) + assert(ret.getLength == 5) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayDataIndexedSeqSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayDataIndexedSeqSuite.scala index 9c3aaea0f7..56d2af7cb7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayDataIndexedSeqSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayDataIndexedSeqSuite.scala @@ -22,7 +22,7 @@ import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.sql.RandomDataGenerator import org.apache.spark.sql.catalyst.encoders.{ExamplePointUDT, RowEncoder} -import org.apache.spark.sql.catalyst.expressions.{SafeProjection, UnsafeArrayData, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{SafeProjection, UnsafeProjection} import org.apache.spark.sql.types._ class ArrayDataIndexedSeqSuite extends SparkFunSuite { @@ -83,7 +83,7 @@ class ArrayDataIndexedSeqSuite extends SparkFunSuite { val safeRow = safeRowConverter(unsafeRow) val genericArrayData = safeRow.getArray(0).asInstanceOf[GenericArrayData] - val unsafeArrayData = unsafeRow.getArray(0).asInstanceOf[UnsafeArrayData] + val unsafeArrayData = unsafeRow.getArray(0) val elementType = dt.elementType test("ArrayDataIndexedSeq - UnsafeArrayData - " + dt.toString) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMapSuite.scala index db8b7403cb..40848619cf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMapSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMapSuite.scala @@ -48,7 +48,7 @@ class CaseInsensitiveMapSuite extends SparkFunSuite { test("SPARK-32377: CaseInsensitiveMap should be deterministic for addition") { var m = CaseInsensitiveMap(Map.empty[String, String]) Seq(("paTh", "1"), ("PATH", "2"), ("Path", "3"), ("patH", "4"), ("path", "5")).foreach { kv => - m = (m + kv).asInstanceOf[CaseInsensitiveMap[String]] + m = (m + kv) assert(m.get("path").contains(kv._2)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index a50752c8a5..befaea24e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -190,7 +190,7 @@ private[sql] object SQLUtils extends Logging { dataType match { case 's' => // Read StructType for DataFrame - val fields = SerDe.readList(dis, jvmObjectTracker = null).asInstanceOf[Array[Object]] + val fields = SerDe.readList(dis, jvmObjectTracker = null) Row.fromSeq(fields) case _ => null } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala index 42c46a038a..b19db8b0e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ContinuousScanExec.scala @@ -58,7 +58,7 @@ case class ContinuousScanExec( conf.continuousStreamingExecutorPollIntervalMs, partitions, schema, - readerFactory.asInstanceOf[ContinuousPartitionReaderFactory], + readerFactory, customMetrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index c4bef706bf..5101bdf46e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream} import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability} -import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Offset => OffsetV2, PartitionOffset, ReadLimit} +import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, PartitionOffset, ReadLimit} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation @@ -335,7 +335,7 @@ class ContinuousExecution( val offset = sources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json) committedOffsets ++= Seq(sources(0) -> offset) - sources(0).commit(offset.asInstanceOf[OffsetV2]) + sources(0).commit(offset) } else { return } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 8469216901..31569d82b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -208,7 +208,7 @@ abstract class QueryTest extends PlanTest { def assertCached(query: Dataset[_], cachedName: String, storageLevel: StorageLevel): Unit = { val planWithCaching = query.queryExecution.withCachedData val matched = planWithCaching.collectFirst { case cached: InMemoryRelation => - val cacheBuilder = cached.asInstanceOf[InMemoryRelation].cacheBuilder + val cacheBuilder = cached.cacheBuilder cachedName == cacheBuilder.tableName.get && (storageLevel == cacheBuilder.storageLevel) }.getOrElse(false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 84f62993eb..b8ffc47d6e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -94,7 +94,7 @@ class HashedRelationSuite extends SharedSparkSession { val os2 = new ByteArrayOutputStream() val out2 = new ObjectOutputStream(os2) - hashed2.asInstanceOf[UnsafeHashedRelation].writeExternal(out2) + hashed2.writeExternal(out2) out2.flush() // This depends on that the order of items in BytesToBytesMap.iterator() is exactly the same // as they are inserted diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index e60f7068ba..e89197b5ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -310,7 +310,7 @@ class StreamSuite extends StreamTest { // For each batch, we would log the state change during the execution // This checks whether the key of the state change log is the expected batch id def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery = - AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId == expectedId, + AssertOnQuery(_.lastExecution.currentBatchId == expectedId, s"lastExecution's currentBatchId should be $expectedId") // For each batch, we would log the sink change after the execution diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 156528776d..cf1b6bcd03 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -64,7 +64,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { extends AssertOnQuery(q => { eventually(Timeout(streamingTimeout)) { if (q.exception.isEmpty) { - assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis)) + assert(clock.isStreamWaitingAt(clock.getTimeMillis)) } } if (q.exception.isDefined) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index c360ec8e67..f0b12e3f62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -353,7 +353,7 @@ class StreamingDataSourceV2Suite extends StreamTest { Trigger.Once()) { v2Query => val sink = v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink assert(sink.isInstanceOf[Table]) - assert(sink.asInstanceOf[Table].schema() == StructType(Nil)) + assert(sink.schema() == StructType(Nil)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index a04802681e..6a6ff6ca94 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -35,7 +35,6 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat => oldInputClass, JobConf} import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} -import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD, UnionRDD} @@ -352,7 +351,7 @@ class HadoopTableReader( initializeJobConfFunc: JobConf => Unit): RDD[Writable] = { val rdd = new HadoopRDD( sparkSession.sparkContext, - _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], + _broadcastedHadoopConf, Some(initializeJobConfFunc), inputFormatClass, classOf[Writable], diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 742eae50e1..648fa66898 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.{HashPartitioner, SparkConf, SparkException} import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, WindowedDStream} -import org.apache.spark.util.{Clock, ManualClock} +import org.apache.spark.util.ManualClock class BasicOperationsSuite extends TestSuiteBase { test("map") { @@ -807,7 +807,7 @@ class BasicOperationsSuite extends TestSuiteBase { cleanupTestInput.size, numExpectedOutput, () => assertCleanup(operatedStream)) - val clock = ssc.scheduler.clock.asInstanceOf[Clock] + val clock = ssc.scheduler.clock assert(clock.getTimeMillis() === Seconds(10).milliseconds) assert(output.size === numExpectedOutput) operatedStream