diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java index f62e85d435..e3efc92c4a 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java @@ -196,6 +196,7 @@ class LevelDBIterator implements KVStoreIterator { * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by * the iterator will eventually be released. */ + @SuppressWarnings("deprecation") @Override protected void finalize() throws Throwable { db.closeIterator(this); diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml index 7e4b08217f..93a4f67fd2 100644 --- a/common/unsafe/pom.xml +++ b/common/unsafe/pom.xml @@ -89,6 +89,11 @@ commons-lang3 test + + org.apache.commons + commons-text + test + target/scala-${scala.binary.version}/classes diff --git a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala index 9656951810..fdb81a06d4 100644 --- a/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala +++ b/common/unsafe/src/test/scala/org/apache/spark/unsafe/types/UTF8StringPropertyCheckSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.unsafe.types -import org.apache.commons.lang3.StringUtils +import org.apache.commons.text.similarity.LevenshteinDistance import org.scalacheck.{Arbitrary, Gen} import org.scalatest.prop.GeneratorDrivenPropertyChecks // scalastyle:off @@ -232,7 +232,7 @@ class UTF8StringPropertyCheckSuite extends FunSuite with GeneratorDrivenProperty test("levenshteinDistance") { forAll { (one: String, another: String) => assert(toUTF8(one).levenshteinDistance(toUTF8(another)) === - StringUtils.getLevenshteinDistance(one, another)) + LevenshteinDistance.getDefaultInstance.apply(one, another)) } } diff --git a/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java b/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java index f6d1288cb2..92bf0ecc1b 100644 --- a/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java +++ b/core/src/main/java/org/apache/spark/io/NioBufferedFileInputStream.java @@ -27,7 +27,7 @@ import java.nio.file.StandardOpenOption; * to read a file to avoid extra copy of data between Java and * native memory which happens when using {@link java.io.BufferedInputStream}. * Unfortunately, this is not something already available in JDK, - * {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio, + * {@code sun.nio.ch.ChannelInputStream} supports reading a file using nio, * but does not support buffering. */ public final class NioBufferedFileInputStream extends InputStream { @@ -130,6 +130,7 @@ public final class NioBufferedFileInputStream extends InputStream { StorageUtils.dispose(byteBuffer); } + @SuppressWarnings("deprecation") @Override protected void finalize() throws IOException { close(); diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cb91717dfa..845a3d5f6d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -502,7 +502,9 @@ class SparkContext(config: SparkConf) extends Logging { _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // create and start the heartbeater for collecting memory metrics - _heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater", + _heartbeater = new Heartbeater(env.memoryManager, + () => SparkContext.this.reportHeartBeat(), + "driver-heartbeater", conf.get(EXECUTOR_HEARTBEAT_INTERVAL)) _heartbeater.start() diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala index 7ce2581555..50c8fdf531 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala @@ -17,7 +17,7 @@ package org.apache.spark.api.r -import java.io.{DataInputStream, DataOutputStream, File, FileOutputStream, IOException} +import java.io.{DataOutputStream, File, FileOutputStream, IOException} import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket} import java.util.concurrent.TimeUnit @@ -32,8 +32,6 @@ import io.netty.handler.timeout.ReadTimeoutHandler import org.apache.spark.SparkConf import org.apache.spark.internal.Logging -import org.apache.spark.network.util.JavaUtils -import org.apache.spark.util.Utils /** * Netty-based backend server that is used to communicate between R and Java. @@ -99,7 +97,7 @@ private[spark] class RBackend { if (bootstrap != null && bootstrap.config().group() != null) { bootstrap.config().group().shutdownGracefully() } - if (bootstrap != null && bootstrap.childGroup() != null) { + if (bootstrap != null && bootstrap.config().childGroup() != null) { bootstrap.config().childGroup().shutdownGracefully() } bootstrap = null @@ -147,7 +145,7 @@ private[spark] object RBackend extends Logging { new Thread("wait for socket to close") { setDaemon(true) override def run(): Unit = { - // any un-catched exception will also shutdown JVM + // any uncaught exception will also shutdown JVM val buf = new Array[Byte](1024) // shutdown JVM if R does not connect back in 10 seconds serverSocket.setSoTimeout(10000) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 10cd8742f2..1169b2878e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -270,7 +270,9 @@ private[spark] class HadoopDelegationTokenManager( } private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = { - val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystemsToAccess)) ++ + val providers = Seq( + new HadoopFSDelegationTokenProvider( + () => HadoopDelegationTokenManager.this.fileSystemsToAccess())) ++ safeCreateProvider(new HiveDelegationTokenProvider) ++ safeCreateProvider(new HBaseDelegationTokenProvider) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 61deb543d8..a30a501e5d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -190,8 +190,11 @@ private[spark] class Executor( private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL) // Executor for the heartbeat task. - private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, - "executor-heartbeater", HEARTBEAT_INTERVAL_MS) + private val heartbeater = new Heartbeater( + env.memoryManager, + () => Executor.this.reportHeartBeat(), + "executor-heartbeater", + HEARTBEAT_INTERVAL_MS) // must be initialized before running startDriverHeartbeat() private val heartbeatReceiverRef = diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 903e25b798..33a68f24bd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -30,7 +30,7 @@ import org.apache.spark.storage.RDDInfo @DeveloperApi class StageInfo( val stageId: Int, - @deprecated("Use attemptNumber instead", "2.3.0") val attemptId: Int, + private val attemptId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo], @@ -56,6 +56,8 @@ class StageInfo( completionTime = Some(System.currentTimeMillis) } + // This would just be the second constructor arg, except we need to maintain this method + // with parentheses for compatibility def attemptNumber(): Int = attemptId private[spark] def getStatusString: String = { diff --git a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala index 31ce9483cf..424d9f825c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/ParallelCollectionSplitSuite.scala @@ -215,7 +215,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers { } test("exclusive ranges of doubles") { - val data = 1.0 until 100.0 by 1.0 + val data = Range.BigDecimal(1, 100, 1) val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices.map(_.size).sum === 99) @@ -223,7 +223,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers { } test("inclusive ranges of doubles") { - val data = 1.0 to 100.0 by 1.0 + val data = Range.BigDecimal.inclusive(1, 100, 1) val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices.map(_.size).sum === 100) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 84af73b08d..e413fe3b77 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -202,7 +202,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { def check[T: ClassTag](t: T) { assert(ser.deserialize[T](ser.serialize(t)) === t) // Check that very long ranges don't get written one element at a time - assert(ser.serialize(t).limit() < 100) + assert(ser.serialize(t).limit() < 200) } check(1 to 1000000) check(1 to 1000000 by 2) @@ -212,10 +212,10 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { check(1L to 1000000L by 2L) check(1L until 1000000L) check(1L until 1000000L by 2L) - check(1.0 to 1000000.0 by 1.0) - check(1.0 to 1000000.0 by 2.0) - check(1.0 until 1000000.0 by 1.0) - check(1.0 until 1000000.0 by 2.0) + check(Range.BigDecimal.inclusive(1, 1000000, 1)) + check(Range.BigDecimal.inclusive(1, 1000000, 2)) + check(Range.BigDecimal(1, 1000000, 1)) + check(Range.BigDecimal(1, 1000000, 2)) } test("asJavaIterable") { diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index bfd73069fb..5f757b757a 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -18,8 +18,7 @@ package org.apache.spark.status import java.io.File -import java.lang.{Integer => JInteger, Long => JLong} -import java.util.{Arrays, Date, Properties} +import java.util.{Date, Properties} import scala.collection.JavaConverters._ import scala.collection.immutable.Map @@ -1171,12 +1170,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // Stop task 2 before task 1 time += 1 tasks(1).markFinished(TaskState.FINISHED, time) - listener.onTaskEnd( - SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(1), null)) + listener.onTaskEnd(SparkListenerTaskEnd( + stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(1), null)) time += 1 tasks(0).markFinished(TaskState.FINISHED, time) - listener.onTaskEnd( - SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null)) + listener.onTaskEnd(SparkListenerTaskEnd( + stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(0), null)) // Start task 3 and task 2 should be evicted. listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, tasks(2))) @@ -1241,8 +1240,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { // Task 1 Finished time += 1 tasks(0).markFinished(TaskState.FINISHED, time) - listener.onTaskEnd( - SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null)) + listener.onTaskEnd(SparkListenerTaskEnd( + stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(0), null)) // Stage 1 Completed stage1.failureReason = Some("Failed") @@ -1256,7 +1255,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { time += 1 tasks(1).markFinished(TaskState.FINISHED, time) listener.onTaskEnd( - SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", + SparkListenerTaskEnd(stage1.stageId, stage1.attemptNumber, "taskType", TaskKilled(reason = "Killed"), tasks(1), null)) // Ensure killed task metrics are updated diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index cd25265784..35fba1a3b7 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.util.collection import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ +import scala.language.postfixOps import scala.ref.WeakReference import org.scalatest.Matchers diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index 8d6cca8e48..207c54ce75 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -138,7 +138,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("test NULL avro type") { withTempPath { dir => val fields = - Seq(new Field("null", Schema.create(Type.NULL), "doc", null)).asJava + Seq(new Field("null", Schema.create(Type.NULL), "doc", null.asInstanceOf[AnyVal])).asJava val schema = Schema.createRecord("name", "docs", "namespace", false) schema.setFields(fields) val datumWriter = new GenericDatumWriter[GenericRecord](schema) @@ -161,7 +161,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val avroSchema: Schema = { val union = Schema.createUnion(List(Schema.create(Type.INT), Schema.create(Type.LONG)).asJava) - val fields = Seq(new Field("field1", union, "doc", null)).asJava + val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava val schema = Schema.createRecord("name", "docs", "namespace", false) schema.setFields(fields) schema @@ -189,7 +189,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val avroSchema: Schema = { val union = Schema.createUnion(List(Schema.create(Type.FLOAT), Schema.create(Type.DOUBLE)).asJava) - val fields = Seq(new Field("field1", union, "doc", null)).asJava + val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava val schema = Schema.createRecord("name", "docs", "namespace", false) schema.setFields(fields) schema @@ -221,7 +221,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { Schema.create(Type.NULL) ).asJava ) - val fields = Seq(new Field("field1", union, "doc", null)).asJava + val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava val schema = Schema.createRecord("name", "docs", "namespace", false) schema.setFields(fields) schema @@ -247,7 +247,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { test("Union of a single type") { withTempPath { dir => val UnionOfOne = Schema.createUnion(List(Schema.create(Type.INT)).asJava) - val fields = Seq(new Field("field1", UnionOfOne, "doc", null)).asJava + val fields = Seq(new Field("field1", UnionOfOne, "doc", null.asInstanceOf[AnyVal])).asJava val schema = Schema.createRecord("name", "docs", "namespace", false) schema.setFields(fields) @@ -274,10 +274,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val complexUnionType = Schema.createUnion( List(Schema.create(Type.INT), Schema.create(Type.STRING), fixedSchema, enumSchema).asJava) val fields = Seq( - new Field("field1", complexUnionType, "doc", null), - new Field("field2", complexUnionType, "doc", null), - new Field("field3", complexUnionType, "doc", null), - new Field("field4", complexUnionType, "doc", null) + new Field("field1", complexUnionType, "doc", null.asInstanceOf[AnyVal]), + new Field("field2", complexUnionType, "doc", null.asInstanceOf[AnyVal]), + new Field("field3", complexUnionType, "doc", null.asInstanceOf[AnyVal]), + new Field("field4", complexUnionType, "doc", null.asInstanceOf[AnyVal]) ).asJava val schema = Schema.createRecord("name", "docs", "namespace", false) schema.setFields(fields) @@ -941,7 +941,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val avroArrayType = resolveNullable(Schema.createArray(avroType), nullable) val avroMapType = resolveNullable(Schema.createMap(avroType), nullable) val name = "foo" - val avroField = new Field(name, avroType, "", null) + val avroField = new Field(name, avroType, "", null.asInstanceOf[AnyVal]) val recordSchema = Schema.createRecord("name", "doc", "space", true, Seq(avroField).asJava) val avroRecordType = resolveNullable(recordSchema, nullable) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala index fa6bdc20bd..aa21f1271b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala @@ -56,7 +56,7 @@ trait KafkaContinuousTest extends KafkaSourceTest { } // Continuous processing tasks end asynchronously, so test that they actually end. - private val tasksEndedListener = new SparkListener() { + private class TasksEndedListener extends SparkListener { val activeTaskIdCount = new AtomicInteger(0) override def onTaskStart(start: SparkListenerTaskStart): Unit = { @@ -68,6 +68,8 @@ trait KafkaContinuousTest extends KafkaSourceTest { } } + private val tasksEndedListener = new TasksEndedListener() + override def beforeEach(): Unit = { super.beforeEach() spark.sparkContext.addSparkListener(tasksEndedListener) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index cf283a5c3e..07960d14b0 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -228,7 +228,7 @@ object ConsumerStrategies { new Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava)) } /** @@ -307,7 +307,7 @@ object ConsumerStrategies { new SubscribePattern[K, V]( pattern, new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava)) } /** @@ -391,7 +391,7 @@ object ConsumerStrategies { new Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava)) } /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala index c5d0ec1a8d..412954f7b2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/LabeledPoint.scala @@ -17,8 +17,6 @@ package org.apache.spark.ml.feature -import scala.beans.BeanInfo - import org.apache.spark.annotation.Since import org.apache.spark.ml.linalg.Vector @@ -30,8 +28,12 @@ import org.apache.spark.ml.linalg.Vector * @param features List of features for this data point. */ @Since("2.0.0") -@BeanInfo case class LabeledPoint(@Since("2.0.0") label: Double, @Since("2.0.0") features: Vector) { + + def getLabel: Double = label + + def getFeatures: Vector = features + override def toString: String = { s"($label,$features)" } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala index 56e2c543d1..5bfaa3b7f3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala @@ -17,10 +17,6 @@ package org.apache.spark.ml.feature -import org.json4s.JsonDSL._ -import org.json4s.JValue -import org.json4s.jackson.JsonMethods._ - import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml._ @@ -209,7 +205,7 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui if (isSet(inputCols)) { val splitsArray = if (isSet(numBucketsArray)) { val probArrayPerCol = $(numBucketsArray).map { numOfBuckets => - (0.0 to 1.0 by 1.0 / numOfBuckets).toArray + (0 to numOfBuckets).map(_.toDouble / numOfBuckets).toArray } val probabilityArray = probArrayPerCol.flatten.sorted.distinct @@ -229,12 +225,12 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui } } else { dataset.stat.approxQuantile($(inputCols), - (0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError)) + (0 to $(numBuckets)).map(_.toDouble / $(numBuckets)).toArray, $(relativeError)) } bucketizer.setSplitsArray(splitsArray.map(getDistinctSplits)) } else { val splits = dataset.stat.approxQuantile($(inputCol), - (0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError)) + (0 to $(numBuckets)).map(_.toDouble / $(numBuckets)).toArray, $(relativeError)) bucketizer.setSplits(getDistinctSplits(splits)) } copyValues(bucketizer.setParent(this)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala index 4381d6ab20..b320057b25 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala @@ -17,8 +17,6 @@ package org.apache.spark.mllib.regression -import scala.beans.BeanInfo - import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint} @@ -32,10 +30,14 @@ import org.apache.spark.mllib.util.NumericParser * @param features List of features for this data point. */ @Since("0.8.0") -@BeanInfo case class LabeledPoint @Since("1.0.0") ( @Since("0.8.0") label: Double, @Since("1.0.0") features: Vector) { + + def getLabel: Double = label + + def getFeatures: Vector = features + override def toString: String = { s"($label,$features)" } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala index 80c6ef0ea1..85ed11d655 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/StreamingTest.scala @@ -17,8 +17,6 @@ package org.apache.spark.mllib.stat.test -import scala.beans.BeanInfo - import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.streaming.api.java.JavaDStream @@ -32,10 +30,11 @@ import org.apache.spark.util.StatCounter * @param value numeric value of the observation. */ @Since("1.6.0") -@BeanInfo case class BinarySample @Since("1.6.0") ( @Since("1.6.0") isExperiment: Boolean, @Since("1.6.0") value: Double) { + def getIsExperiment: Boolean = isExperiment + def getValue: Double = value override def toString: String = { s"($isExperiment, $value)" } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/DCTSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/DCTSuite.scala index 6734336aac..985e396000 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/DCTSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/DCTSuite.scala @@ -17,16 +17,16 @@ package org.apache.spark.ml.feature -import scala.beans.BeanInfo - import edu.emory.mathcs.jtransforms.dct.DoubleDCT_1D import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest} import org.apache.spark.sql.Row -@BeanInfo -case class DCTTestData(vec: Vector, wantedVec: Vector) +case class DCTTestData(vec: Vector, wantedVec: Vector) { + def getVec: Vector = vec + def getWantedVec: Vector = wantedVec +} class DCTSuite extends MLTest with DefaultReadWriteTest { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala index 201a335e0d..1483d5df4d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala @@ -17,14 +17,13 @@ package org.apache.spark.ml.feature -import scala.beans.BeanInfo - import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest} import org.apache.spark.sql.{DataFrame, Row} - -@BeanInfo -case class NGramTestData(inputTokens: Array[String], wantedNGrams: Array[String]) +case class NGramTestData(inputTokens: Array[String], wantedNGrams: Array[String]) { + def getInputTokens: Array[String] = inputTokens + def getWantedNGrams: Array[String] = wantedNGrams +} class NGramSuite extends MLTest with DefaultReadWriteTest { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala index b009038bbd..82af050396 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala @@ -31,7 +31,7 @@ class QuantileDiscretizerSuite extends MLTest with DefaultReadWriteTest { val datasetSize = 100000 val numBuckets = 5 - val df = sc.parallelize(1.0 to datasetSize by 1.0).map(Tuple1.apply).toDF("input") + val df = sc.parallelize(1 to datasetSize).map(_.toDouble).map(Tuple1.apply).toDF("input") val discretizer = new QuantileDiscretizer() .setInputCol("input") .setOutputCol("result") @@ -114,8 +114,8 @@ class QuantileDiscretizerSuite extends MLTest with DefaultReadWriteTest { val spark = this.spark import spark.implicits._ - val trainDF = sc.parallelize(1.0 to 100.0 by 1.0).map(Tuple1.apply).toDF("input") - val testDF = sc.parallelize(-10.0 to 110.0 by 1.0).map(Tuple1.apply).toDF("input") + val trainDF = sc.parallelize((1 to 100).map(_.toDouble)).map(Tuple1.apply).toDF("input") + val testDF = sc.parallelize((-10 to 110).map(_.toDouble)).map(Tuple1.apply).toDF("input") val discretizer = new QuantileDiscretizer() .setInputCol("input") .setOutputCol("result") @@ -276,10 +276,10 @@ class QuantileDiscretizerSuite extends MLTest with DefaultReadWriteTest { 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0) val data2 = Array.range(1, 40, 2).map(_.toDouble) val expected2 = Array (0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, - 2.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 4.0, 4.0) + 2.0, 3.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 4.0, 4.0) val data3 = Array.range(1, 60, 3).map(_.toDouble) - val expected3 = Array (0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 4.0, 4.0, 5.0, - 5.0, 5.0, 6.0, 6.0, 7.0, 8.0, 8.0, 9.0, 9.0, 9.0) + val expected3 = Array (0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 5.0, + 5.0, 6.0, 6.0, 7.0, 7.0, 8.0, 8.0, 9.0, 9.0, 9.0) val data = (0 until 20).map { idx => (data1(idx), data2(idx), data3(idx), expected1(idx), expected2(idx), expected3(idx)) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala index be59b0af2c..ba8e79f14d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala @@ -17,14 +17,14 @@ package org.apache.spark.ml.feature -import scala.beans.BeanInfo - import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest} import org.apache.spark.sql.{DataFrame, Row} -@BeanInfo -case class TokenizerTestData(rawText: String, wantedTokens: Array[String]) +case class TokenizerTestData(rawText: String, wantedTokens: Array[String]) { + def getRawText: String = rawText + def getWantedTokens: Array[String] = wantedTokens +} class TokenizerSuite extends MLTest with DefaultReadWriteTest { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala index fb5789f945..44b0f8f8ae 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.ml.feature -import scala.beans.{BeanInfo, BeanProperty} - import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.ml.attribute._ @@ -26,7 +24,7 @@ import org.apache.spark.ml.linalg.{SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.DataFrame class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging { @@ -339,6 +337,7 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging { } private[feature] object VectorIndexerSuite { - @BeanInfo - case class FeatureData(@BeanProperty features: Vector) + case class FeatureData(features: Vector) { + def getFeatures: Vector = features + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 9a59c41740..2fc9754ecf 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -601,7 +601,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { val df = maybeDf.get._2 val expected = estimator.fit(df) - val actuals = dfs.filter(_ != baseType).map(t => (t, estimator.fit(t._2))) + val actuals = dfs.map(t => (t, estimator.fit(t._2))) actuals.foreach { case (_, actual) => check(expected, actual) } actuals.foreach { case (t, actual) => check2(expected, actual, t._2, t._1.encoder) } diff --git a/pom.xml b/pom.xml index fcec295eee..9130773cb5 100644 --- a/pom.xml +++ b/pom.xml @@ -407,6 +407,11 @@ commons-lang3 ${commons-lang3.version} + + org.apache.commons + commons-text + 1.6 + commons-lang commons-lang diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a8d2b5d1d9..e35e74aa33 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,11 @@ object MimaExcludes { // Exclude rules for 3.0.x lazy val v30excludes = v24excludes ++ Seq( + // [SPARK-26090] Resolve most miscellaneous deprecation and build warnings for Spark 3 + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.stat.test.BinarySampleBeanInfo"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.regression.LabeledPointBeanInfo"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.feature.LabeledPointBeanInfo"), + // [SPARK-25959] GBTClassifier picks wrong impurity stats on loading ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.GBTClassificationModel.setImpurity"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.tree.HasVarianceImpurity.org$apache$spark$ml$tree$HasVarianceImpurity$_setter_$impurity_="), diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index be4daec3b1..167fb402cd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -55,7 +55,7 @@ private[spark] class KubernetesDriverBuilder( providePodTemplateConfigMapStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => PodTemplateConfigMapStep) = new PodTemplateConfigMapStep(_), - provideInitialPod: () => SparkPod = SparkPod.initialPod) { + provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 089f84dec2..fc41a4770b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -53,7 +53,7 @@ private[spark] class KubernetesExecutorBuilder( KubernetesConf[KubernetesExecutorSpecificConf] => HadoopSparkUserExecutorFeatureStep) = new HadoopSparkUserExecutorFeatureStep(_), - provideInitialPod: () => SparkPod = SparkPod.initialPod) { + provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) { def buildFromFeatures( kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 81e3822389..08f28758ef 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, Watch} -import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource} +import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.Mockito.{doReturn, verify, when} import org.scalatest.BeforeAndAfter @@ -28,7 +28,6 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ -import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class ClientSuite extends SparkFunSuite with BeforeAndAfter { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index b336774838..2f984e5d89 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -157,7 +157,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { private def kubernetesConfWithCorrectFields(): KubernetesConf[KubernetesExecutorSpecificConf] = Matchers.argThat(new ArgumentMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] { override def matches(argument: scala.Any): Boolean = { - if (!argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]) { + if (!argument.isInstanceOf[KubernetesConf[_]]) { false } else { val k8sConf = argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]] diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 35299166d9..c3070de3d1 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -116,8 +116,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter } def createContainer(host: String, resource: Resource = containerResource): Container = { - // When YARN 2.6+ is required, avoid deprecation by using version with long second arg - val containerId = ContainerId.newInstance(appAttemptId, containerNum) + val containerId = ContainerId.newContainerId(appAttemptId, containerNum) containerNum += 1 val nodeId = NodeId.newInstance(host, 1000) Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala index 9bacd3b925..ea619c6a76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/HyperLogLogPlusPlusHelper.scala @@ -199,7 +199,7 @@ class HyperLogLogPlusPlusHelper(relativeSD: Double) extends Serializable { var shift = 0 while (idx < m && i < REGISTERS_PER_WORD) { val Midx = (word >>> shift) & REGISTER_WORD_MASK - zInverse += 1.0 / (1 << Midx) + zInverse += 1.0 / (1L << Midx) if (Midx == 0) { V += 1.0d } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 94778840d7..117e96175e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.analysis -import scala.beans.{BeanInfo, BeanProperty} - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -30,8 +28,9 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} import org.apache.spark.sql.types._ -@BeanInfo -private[sql] case class GroupableData(@BeanProperty data: Int) +private[sql] case class GroupableData(data: Int) { + def getData: Int = data +} private[sql] class GroupableUDT extends UserDefinedType[GroupableData] { @@ -50,8 +49,9 @@ private[sql] class GroupableUDT extends UserDefinedType[GroupableData] { private[spark] override def asNullable: GroupableUDT = this } -@BeanInfo -private[sql] case class UngroupableData(@BeanProperty data: Map[Int, Int]) +private[sql] case class UngroupableData(data: Map[Int, Int]) { + def getData: Map[Int, Int] = data +} private[sql] class UngroupableUDT extends UserDefinedType[UngroupableData] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index d9ea8dc9d4..d9fe1a992a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -311,7 +311,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo outputMode: OutputMode, useTempCheckpointLocation: Boolean = false, recoverFromCheckpointLocation: Boolean = true, - trigger: Trigger = ProcessingTime(0), + trigger: Trigger = Trigger.ProcessingTime(0), triggerClock: Clock = new SystemClock()): StreamingQuery = { val query = createQuery( userSpecifiedName, diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java index 7f975a647c..8f35abeb57 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanDeserializationSuite.java @@ -143,11 +143,16 @@ public class JavaBeanDeserializationSuite implements Serializable { this.intervals = intervals; } + @Override + public int hashCode() { + return id ^ Objects.hashCode(intervals); + } + @Override public boolean equals(Object obj) { if (!(obj instanceof ArrayRecord)) return false; ArrayRecord other = (ArrayRecord) obj; - return (other.id == this.id) && other.intervals.equals(this.intervals); + return (other.id == this.id) && Objects.equals(other.intervals, this.intervals); } @Override @@ -184,6 +189,11 @@ public class JavaBeanDeserializationSuite implements Serializable { this.intervals = intervals; } + @Override + public int hashCode() { + return id ^ Objects.hashCode(intervals); + } + @Override public boolean equals(Object obj) { if (!(obj instanceof MapRecord)) return false; @@ -225,6 +235,11 @@ public class JavaBeanDeserializationSuite implements Serializable { this.endTime = endTime; } + @Override + public int hashCode() { + return Long.hashCode(startTime) ^ Long.hashCode(endTime); + } + @Override public boolean equals(Object obj) { if (!(obj instanceof Interval)) return false; diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaRangeInputPartition.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaRangeInputPartition.java new file mode 100644 index 0000000000..438f489a3e --- /dev/null +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaRangeInputPartition.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.sources.v2; + +import org.apache.spark.sql.sources.v2.reader.InputPartition; + +class JavaRangeInputPartition implements InputPartition { + int start; + int end; + + JavaRangeInputPartition(int start, int end) { + this.start = start; + this.end = end; + } +} diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java index 685f9b9747..ced51dde69 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleReadSupport.java @@ -88,12 +88,3 @@ class JavaSimpleReaderFactory implements PartitionReaderFactory { } } -class JavaRangeInputPartition implements InputPartition { - int start; - int end; - - JavaRangeInputPartition(int start, int end) { - this.start = start; - this.end = end; - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index cc8b600efa..cf95631605 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import scala.beans.{BeanInfo, BeanProperty} - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{Cast, ExpressionEvalHelper, GenericInternalRow, Literal} @@ -28,10 +26,10 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ -@BeanInfo -private[sql] case class MyLabeledPoint( - @BeanProperty label: Double, - @BeanProperty features: UDT.MyDenseVector) +private[sql] case class MyLabeledPoint(label: Double, features: UDT.MyDenseVector) { + def getLabel: Double = label + def getFeatures: UDT.MyDenseVector = features +} // Wrapped in an object to check Scala compatibility. See SPARK-13929 object UDT { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala index 0d9f1fb0c0..fb3388452e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala @@ -46,6 +46,7 @@ class IntegralDeltaSuite extends SparkFunSuite { (input.tail, input.init).zipped.map { case (x: Int, y: Int) => (x - y).toLong case (x: Long, y: Long) => x - y + case other => fail(s"Unexpected input $other") } } @@ -116,7 +117,7 @@ class IntegralDeltaSuite extends SparkFunSuite { val row = new GenericInternalRow(1) val nullRow = new GenericInternalRow(1) nullRow.setNullAt(0) - input.map { value => + input.foreach { value => if (value == nullValue) { builder.appendFrom(nullRow, 0) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index 80c76915e4..2d338ab922 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -19,9 +19,6 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.ConcurrentHashMap -import scala.collection.mutable - -import org.eclipse.jetty.util.ConcurrentHashSet import org.scalatest.concurrent.{Eventually, Signaler, ThreadSignaler, TimeLimits} import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ @@ -48,7 +45,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits { } test("trigger timing") { - val triggerTimes = new ConcurrentHashSet[Int] + val triggerTimes = ConcurrentHashMap.newKeySet[Int]() val clock = new StreamManualClock() @volatile var continueExecuting = true @volatile var clockIncrementInTrigger = 0L diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala index 635ea6fca6..7db31f1f8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala @@ -382,10 +382,9 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before tasks.foreach { case t: TextSocketContinuousInputPartition => val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader] - for (i <- 0 until numRecords / 2) { + for (_ <- 0 until numRecords / 2) { r.next() - assert(r.get().get(0, TextSocketReader.SCHEMA_TIMESTAMP) - .isInstanceOf[(String, Timestamp)]) + assert(r.get().get(0, TextSocketReader.SCHEMA_TIMESTAMP).isInstanceOf[(_, _)]) } case _ => throw new IllegalStateException("Unexpected task type") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index e8710aeb40..ddc5dbb148 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -150,7 +150,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { def getPeakExecutionMemory(stageId: Int): Long = { val peakMemoryAccumulator = sparkListener.getCompletedStageInfos(stageId).accumulables - .filter(_._2.name == InternalAccumulator.PEAK_EXECUTION_MEMORY) + .filter(_._2.name == Some(InternalAccumulator.PEAK_EXECUTION_MEMORY)) assert(peakMemoryAccumulator.size == 1) peakMemoryAccumulator.head._2.value.get.asInstanceOf[Long] diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala index 5f9ea4d267..035b71a37a 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveCliSessionStateSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.hive.HiveUtils class HiveCliSessionStateSuite extends SparkFunSuite { def withSessionClear(f: () => Unit): Unit = { - try f finally SessionState.detachSession() + try f() finally SessionState.detachSession() } test("CliSessionState will be reused") {