[SPARK-26090][CORE][SQL][ML] Resolve most miscellaneous deprecation and build warnings for Spark 3

## What changes were proposed in this pull request?

The build has a lot of deprecation warnings. Some are new in Scala 2.12 and Java 11. We've fixed some, but I wanted to take a pass at fixing lots of easy miscellaneous ones here.

They're too numerous and small to list here; see the pull request. Some highlights:

- `BeanInfo` is deprecated in 2.12, and BeanInfo classes are pretty ancient in Java. Instead, case classes can explicitly declare getters
- Eta expansion of zero-arg methods; foo() becomes () => foo() in many cases
- Floating-point Range is inexact and deprecated, like 0.0 to 100.0 by 1.0
- finalize() is finally deprecated (just needs to be suppressed)
- StageInfo.attempId was deprecated and easiest to remove here

I'm not now going to touch some chunks of deprecation warnings:

- Parquet deprecations
- Hive deprecations (particularly serde2 classes)
- Deprecations in generated code (mostly Thriftserver CLI)
- ProcessingTime deprecations (we may need to revive this class as internal)
- many MLlib deprecations because they concern methods that may be removed anyway
- a few Kinesis deprecations I couldn't figure out
- Mesos get/setRole, which I don't know well
- Kafka/ZK deprecations (e.g. poll())
- Kinesis
- a few other ones that will probably resolve by deleting a deprecated method

## How was this patch tested?

Existing tests, including manual testing with the 2.11 build and Java 11.

Closes #23065 from srowen/SPARK-26090.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Sean Owen 2018-11-19 09:16:42 -06:00
parent 219b037f05
commit 32365f8177
45 changed files with 177 additions and 125 deletions

View file

@ -196,6 +196,7 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
* when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by
* the iterator will eventually be released.
*/
@SuppressWarnings("deprecation")
@Override
protected void finalize() throws Throwable {
db.closeIterator(this);

View file

@ -89,6 +89,11 @@
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

View file

@ -17,7 +17,7 @@
package org.apache.spark.unsafe.types
import org.apache.commons.lang3.StringUtils
import org.apache.commons.text.similarity.LevenshteinDistance
import org.scalacheck.{Arbitrary, Gen}
import org.scalatest.prop.GeneratorDrivenPropertyChecks
// scalastyle:off
@ -232,7 +232,7 @@ class UTF8StringPropertyCheckSuite extends FunSuite with GeneratorDrivenProperty
test("levenshteinDistance") {
forAll { (one: String, another: String) =>
assert(toUTF8(one).levenshteinDistance(toUTF8(another)) ===
StringUtils.getLevenshteinDistance(one, another))
LevenshteinDistance.getDefaultInstance.apply(one, another))
}
}

View file

@ -27,7 +27,7 @@ import java.nio.file.StandardOpenOption;
* to read a file to avoid extra copy of data between Java and
* native memory which happens when using {@link java.io.BufferedInputStream}.
* Unfortunately, this is not something already available in JDK,
* {@link sun.nio.ch.ChannelInputStream} supports reading a file using nio,
* {@code sun.nio.ch.ChannelInputStream} supports reading a file using nio,
* but does not support buffering.
*/
public final class NioBufferedFileInputStream extends InputStream {
@ -130,6 +130,7 @@ public final class NioBufferedFileInputStream extends InputStream {
StorageUtils.dispose(byteBuffer);
}
@SuppressWarnings("deprecation")
@Override
protected void finalize() throws IOException {
close();

View file

@ -502,7 +502,9 @@ class SparkContext(config: SparkConf) extends Logging {
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// create and start the heartbeater for collecting memory metrics
_heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater",
_heartbeater = new Heartbeater(env.memoryManager,
() => SparkContext.this.reportHeartBeat(),
"driver-heartbeater",
conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
_heartbeater.start()

View file

@ -17,7 +17,7 @@
package org.apache.spark.api.r
import java.io.{DataInputStream, DataOutputStream, File, FileOutputStream, IOException}
import java.io.{DataOutputStream, File, FileOutputStream, IOException}
import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket}
import java.util.concurrent.TimeUnit
@ -32,8 +32,6 @@ import io.netty.handler.timeout.ReadTimeoutHandler
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
/**
* Netty-based backend server that is used to communicate between R and Java.
@ -99,7 +97,7 @@ private[spark] class RBackend {
if (bootstrap != null && bootstrap.config().group() != null) {
bootstrap.config().group().shutdownGracefully()
}
if (bootstrap != null && bootstrap.childGroup() != null) {
if (bootstrap != null && bootstrap.config().childGroup() != null) {
bootstrap.config().childGroup().shutdownGracefully()
}
bootstrap = null
@ -147,7 +145,7 @@ private[spark] object RBackend extends Logging {
new Thread("wait for socket to close") {
setDaemon(true)
override def run(): Unit = {
// any un-catched exception will also shutdown JVM
// any uncaught exception will also shutdown JVM
val buf = new Array[Byte](1024)
// shutdown JVM if R does not connect back in 10 seconds
serverSocket.setSoTimeout(10000)

View file

@ -270,7 +270,9 @@ private[spark] class HadoopDelegationTokenManager(
}
private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = {
val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystemsToAccess)) ++
val providers = Seq(
new HadoopFSDelegationTokenProvider(
() => HadoopDelegationTokenManager.this.fileSystemsToAccess())) ++
safeCreateProvider(new HiveDelegationTokenProvider) ++
safeCreateProvider(new HBaseDelegationTokenProvider)

View file

@ -190,8 +190,11 @@ private[spark] class Executor(
private val HEARTBEAT_INTERVAL_MS = conf.get(EXECUTOR_HEARTBEAT_INTERVAL)
// Executor for the heartbeat task.
private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat,
"executor-heartbeater", HEARTBEAT_INTERVAL_MS)
private val heartbeater = new Heartbeater(
env.memoryManager,
() => Executor.this.reportHeartBeat(),
"executor-heartbeater",
HEARTBEAT_INTERVAL_MS)
// must be initialized before running startDriverHeartbeat()
private val heartbeatReceiverRef =

View file

@ -30,7 +30,7 @@ import org.apache.spark.storage.RDDInfo
@DeveloperApi
class StageInfo(
val stageId: Int,
@deprecated("Use attemptNumber instead", "2.3.0") val attemptId: Int,
private val attemptId: Int,
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
@ -56,6 +56,8 @@ class StageInfo(
completionTime = Some(System.currentTimeMillis)
}
// This would just be the second constructor arg, except we need to maintain this method
// with parentheses for compatibility
def attemptNumber(): Int = attemptId
private[spark] def getStatusString: String = {

View file

@ -215,7 +215,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
}
test("exclusive ranges of doubles") {
val data = 1.0 until 100.0 by 1.0
val data = Range.BigDecimal(1, 100, 1)
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).sum === 99)
@ -223,7 +223,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
}
test("inclusive ranges of doubles") {
val data = 1.0 to 100.0 by 1.0
val data = Range.BigDecimal.inclusive(1, 100, 1)
val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).sum === 100)

View file

@ -202,7 +202,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
assert(ser.serialize(t).limit() < 100)
assert(ser.serialize(t).limit() < 200)
}
check(1 to 1000000)
check(1 to 1000000 by 2)
@ -212,10 +212,10 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
check(1L to 1000000L by 2L)
check(1L until 1000000L)
check(1L until 1000000L by 2L)
check(1.0 to 1000000.0 by 1.0)
check(1.0 to 1000000.0 by 2.0)
check(1.0 until 1000000.0 by 1.0)
check(1.0 until 1000000.0 by 2.0)
check(Range.BigDecimal.inclusive(1, 1000000, 1))
check(Range.BigDecimal.inclusive(1, 1000000, 2))
check(Range.BigDecimal(1, 1000000, 1))
check(Range.BigDecimal(1, 1000000, 2))
}
test("asJavaIterable") {

View file

@ -18,8 +18,7 @@
package org.apache.spark.status
import java.io.File
import java.lang.{Integer => JInteger, Long => JLong}
import java.util.{Arrays, Date, Properties}
import java.util.{Date, Properties}
import scala.collection.JavaConverters._
import scala.collection.immutable.Map
@ -1171,12 +1170,12 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Stop task 2 before task 1
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(1), null))
listener.onTaskEnd(SparkListenerTaskEnd(
stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(1), null))
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
listener.onTaskEnd(SparkListenerTaskEnd(
stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(0), null))
// Start task 3 and task 2 should be evicted.
listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, tasks(2)))
@ -1241,8 +1240,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Task 1 Finished
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
listener.onTaskEnd(SparkListenerTaskEnd(
stage1.stageId, stage1.attemptNumber, "taskType", Success, tasks(0), null))
// Stage 1 Completed
stage1.failureReason = Some("Failed")
@ -1256,7 +1255,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
SparkListenerTaskEnd(stage1.stageId, stage1.attemptNumber, "taskType",
TaskKilled(reason = "Killed"), tasks(1), null))
// Ensure killed task metrics are updated

View file

@ -19,6 +19,7 @@ package org.apache.spark.util.collection
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.ref.WeakReference
import org.scalatest.Matchers

View file

@ -138,7 +138,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
test("test NULL avro type") {
withTempPath { dir =>
val fields =
Seq(new Field("null", Schema.create(Type.NULL), "doc", null)).asJava
Seq(new Field("null", Schema.create(Type.NULL), "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
@ -161,7 +161,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val avroSchema: Schema = {
val union =
Schema.createUnion(List(Schema.create(Type.INT), Schema.create(Type.LONG)).asJava)
val fields = Seq(new Field("field1", union, "doc", null)).asJava
val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
schema
@ -189,7 +189,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val avroSchema: Schema = {
val union =
Schema.createUnion(List(Schema.create(Type.FLOAT), Schema.create(Type.DOUBLE)).asJava)
val fields = Seq(new Field("field1", union, "doc", null)).asJava
val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
schema
@ -221,7 +221,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
Schema.create(Type.NULL)
).asJava
)
val fields = Seq(new Field("field1", union, "doc", null)).asJava
val fields = Seq(new Field("field1", union, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
schema
@ -247,7 +247,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
test("Union of a single type") {
withTempPath { dir =>
val UnionOfOne = Schema.createUnion(List(Schema.create(Type.INT)).asJava)
val fields = Seq(new Field("field1", UnionOfOne, "doc", null)).asJava
val fields = Seq(new Field("field1", UnionOfOne, "doc", null.asInstanceOf[AnyVal])).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
@ -274,10 +274,10 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val complexUnionType = Schema.createUnion(
List(Schema.create(Type.INT), Schema.create(Type.STRING), fixedSchema, enumSchema).asJava)
val fields = Seq(
new Field("field1", complexUnionType, "doc", null),
new Field("field2", complexUnionType, "doc", null),
new Field("field3", complexUnionType, "doc", null),
new Field("field4", complexUnionType, "doc", null)
new Field("field1", complexUnionType, "doc", null.asInstanceOf[AnyVal]),
new Field("field2", complexUnionType, "doc", null.asInstanceOf[AnyVal]),
new Field("field3", complexUnionType, "doc", null.asInstanceOf[AnyVal]),
new Field("field4", complexUnionType, "doc", null.asInstanceOf[AnyVal])
).asJava
val schema = Schema.createRecord("name", "docs", "namespace", false)
schema.setFields(fields)
@ -941,7 +941,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val avroArrayType = resolveNullable(Schema.createArray(avroType), nullable)
val avroMapType = resolveNullable(Schema.createMap(avroType), nullable)
val name = "foo"
val avroField = new Field(name, avroType, "", null)
val avroField = new Field(name, avroType, "", null.asInstanceOf[AnyVal])
val recordSchema = Schema.createRecord("name", "doc", "space", true, Seq(avroField).asJava)
val avroRecordType = resolveNullable(recordSchema, nullable)

View file

@ -56,7 +56,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
}
// Continuous processing tasks end asynchronously, so test that they actually end.
private val tasksEndedListener = new SparkListener() {
private class TasksEndedListener extends SparkListener {
val activeTaskIdCount = new AtomicInteger(0)
override def onTaskStart(start: SparkListenerTaskStart): Unit = {
@ -68,6 +68,8 @@ trait KafkaContinuousTest extends KafkaSourceTest {
}
}
private val tasksEndedListener = new TasksEndedListener()
override def beforeEach(): Unit = {
super.beforeEach()
spark.sparkContext.addSparkListener(tasksEndedListener)

View file

@ -228,7 +228,7 @@ object ConsumerStrategies {
new Subscribe[K, V](
new ju.ArrayList(topics.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
}
/**
@ -307,7 +307,7 @@ object ConsumerStrategies {
new SubscribePattern[K, V](
pattern,
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
}
/**
@ -391,7 +391,7 @@ object ConsumerStrategies {
new Assign[K, V](
new ju.ArrayList(topicPartitions.asJavaCollection),
new ju.HashMap[String, Object](kafkaParams.asJava),
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))
new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).asJava))
}
/**

View file

@ -17,8 +17,6 @@
package org.apache.spark.ml.feature
import scala.beans.BeanInfo
import org.apache.spark.annotation.Since
import org.apache.spark.ml.linalg.Vector
@ -30,8 +28,12 @@ import org.apache.spark.ml.linalg.Vector
* @param features List of features for this data point.
*/
@Since("2.0.0")
@BeanInfo
case class LabeledPoint(@Since("2.0.0") label: Double, @Since("2.0.0") features: Vector) {
def getLabel: Double = label
def getFeatures: Vector = features
override def toString: String = {
s"($label,$features)"
}

View file

@ -17,10 +17,6 @@
package org.apache.spark.ml.feature
import org.json4s.JsonDSL._
import org.json4s.JValue
import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml._
@ -209,7 +205,7 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui
if (isSet(inputCols)) {
val splitsArray = if (isSet(numBucketsArray)) {
val probArrayPerCol = $(numBucketsArray).map { numOfBuckets =>
(0.0 to 1.0 by 1.0 / numOfBuckets).toArray
(0 to numOfBuckets).map(_.toDouble / numOfBuckets).toArray
}
val probabilityArray = probArrayPerCol.flatten.sorted.distinct
@ -229,12 +225,12 @@ final class QuantileDiscretizer @Since("1.6.0") (@Since("1.6.0") override val ui
}
} else {
dataset.stat.approxQuantile($(inputCols),
(0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError))
(0 to $(numBuckets)).map(_.toDouble / $(numBuckets)).toArray, $(relativeError))
}
bucketizer.setSplitsArray(splitsArray.map(getDistinctSplits))
} else {
val splits = dataset.stat.approxQuantile($(inputCol),
(0.0 to 1.0 by 1.0 / $(numBuckets)).toArray, $(relativeError))
(0 to $(numBuckets)).map(_.toDouble / $(numBuckets)).toArray, $(relativeError))
bucketizer.setSplits(getDistinctSplits(splits))
}
copyValues(bucketizer.setParent(this))

View file

@ -17,8 +17,6 @@
package org.apache.spark.mllib.regression
import scala.beans.BeanInfo
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint}
@ -32,10 +30,14 @@ import org.apache.spark.mllib.util.NumericParser
* @param features List of features for this data point.
*/
@Since("0.8.0")
@BeanInfo
case class LabeledPoint @Since("1.0.0") (
@Since("0.8.0") label: Double,
@Since("1.0.0") features: Vector) {
def getLabel: Double = label
def getFeatures: Vector = features
override def toString: String = {
s"($label,$features)"
}

View file

@ -17,8 +17,6 @@
package org.apache.spark.mllib.stat.test
import scala.beans.BeanInfo
import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.streaming.api.java.JavaDStream
@ -32,10 +30,11 @@ import org.apache.spark.util.StatCounter
* @param value numeric value of the observation.
*/
@Since("1.6.0")
@BeanInfo
case class BinarySample @Since("1.6.0") (
@Since("1.6.0") isExperiment: Boolean,
@Since("1.6.0") value: Double) {
def getIsExperiment: Boolean = isExperiment
def getValue: Double = value
override def toString: String = {
s"($isExperiment, $value)"
}

View file

@ -17,16 +17,16 @@
package org.apache.spark.ml.feature
import scala.beans.BeanInfo
import edu.emory.mathcs.jtransforms.dct.DoubleDCT_1D
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.Row
@BeanInfo
case class DCTTestData(vec: Vector, wantedVec: Vector)
case class DCTTestData(vec: Vector, wantedVec: Vector) {
def getVec: Vector = vec
def getWantedVec: Vector = wantedVec
}
class DCTSuite extends MLTest with DefaultReadWriteTest {

View file

@ -17,14 +17,13 @@
package org.apache.spark.ml.feature
import scala.beans.BeanInfo
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.{DataFrame, Row}
@BeanInfo
case class NGramTestData(inputTokens: Array[String], wantedNGrams: Array[String])
case class NGramTestData(inputTokens: Array[String], wantedNGrams: Array[String]) {
def getInputTokens: Array[String] = inputTokens
def getWantedNGrams: Array[String] = wantedNGrams
}
class NGramSuite extends MLTest with DefaultReadWriteTest {

View file

@ -31,7 +31,7 @@ class QuantileDiscretizerSuite extends MLTest with DefaultReadWriteTest {
val datasetSize = 100000
val numBuckets = 5
val df = sc.parallelize(1.0 to datasetSize by 1.0).map(Tuple1.apply).toDF("input")
val df = sc.parallelize(1 to datasetSize).map(_.toDouble).map(Tuple1.apply).toDF("input")
val discretizer = new QuantileDiscretizer()
.setInputCol("input")
.setOutputCol("result")
@ -114,8 +114,8 @@ class QuantileDiscretizerSuite extends MLTest with DefaultReadWriteTest {
val spark = this.spark
import spark.implicits._
val trainDF = sc.parallelize(1.0 to 100.0 by 1.0).map(Tuple1.apply).toDF("input")
val testDF = sc.parallelize(-10.0 to 110.0 by 1.0).map(Tuple1.apply).toDF("input")
val trainDF = sc.parallelize((1 to 100).map(_.toDouble)).map(Tuple1.apply).toDF("input")
val testDF = sc.parallelize((-10 to 110).map(_.toDouble)).map(Tuple1.apply).toDF("input")
val discretizer = new QuantileDiscretizer()
.setInputCol("input")
.setOutputCol("result")
@ -276,10 +276,10 @@ class QuantileDiscretizerSuite extends MLTest with DefaultReadWriteTest {
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
val data2 = Array.range(1, 40, 2).map(_.toDouble)
val expected2 = Array (0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0,
2.0, 2.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 4.0, 4.0)
2.0, 3.0, 3.0, 3.0, 3.0, 4.0, 4.0, 4.0, 4.0, 4.0)
val data3 = Array.range(1, 60, 3).map(_.toDouble)
val expected3 = Array (0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 4.0, 4.0, 5.0,
5.0, 5.0, 6.0, 6.0, 7.0, 8.0, 8.0, 9.0, 9.0, 9.0)
val expected3 = Array (0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 5.0,
5.0, 6.0, 6.0, 7.0, 7.0, 8.0, 8.0, 9.0, 9.0, 9.0)
val data = (0 until 20).map { idx =>
(data1(idx), data2(idx), data3(idx), expected1(idx), expected2(idx), expected3(idx))
}

View file

@ -17,14 +17,14 @@
package org.apache.spark.ml.feature
import scala.beans.BeanInfo
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.{DataFrame, Row}
@BeanInfo
case class TokenizerTestData(rawText: String, wantedTokens: Array[String])
case class TokenizerTestData(rawText: String, wantedTokens: Array[String]) {
def getRawText: String = rawText
def getWantedTokens: Array[String] = wantedTokens
}
class TokenizerSuite extends MLTest with DefaultReadWriteTest {

View file

@ -17,8 +17,6 @@
package org.apache.spark.ml.feature
import scala.beans.{BeanInfo, BeanProperty}
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.ml.attribute._
@ -26,7 +24,7 @@ import org.apache.spark.ml.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.DataFrame
class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging {
@ -339,6 +337,7 @@ class VectorIndexerSuite extends MLTest with DefaultReadWriteTest with Logging {
}
private[feature] object VectorIndexerSuite {
@BeanInfo
case class FeatureData(@BeanProperty features: Vector)
case class FeatureData(features: Vector) {
def getFeatures: Vector = features
}
}

View file

@ -601,7 +601,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging {
val df = maybeDf.get._2
val expected = estimator.fit(df)
val actuals = dfs.filter(_ != baseType).map(t => (t, estimator.fit(t._2)))
val actuals = dfs.map(t => (t, estimator.fit(t._2)))
actuals.foreach { case (_, actual) => check(expected, actual) }
actuals.foreach { case (t, actual) => check2(expected, actual, t._2, t._1.encoder) }

View file

@ -407,6 +407,11 @@
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>

View file

@ -36,6 +36,11 @@ object MimaExcludes {
// Exclude rules for 3.0.x
lazy val v30excludes = v24excludes ++ Seq(
// [SPARK-26090] Resolve most miscellaneous deprecation and build warnings for Spark 3
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.stat.test.BinarySampleBeanInfo"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.regression.LabeledPointBeanInfo"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.feature.LabeledPointBeanInfo"),
// [SPARK-25959] GBTClassifier picks wrong impurity stats on loading
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.GBTClassificationModel.setImpurity"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.tree.HasVarianceImpurity.org$apache$spark$ml$tree$HasVarianceImpurity$_setter_$impurity_="),

View file

@ -55,7 +55,7 @@ private[spark] class KubernetesDriverBuilder(
providePodTemplateConfigMapStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]
=> PodTemplateConfigMapStep) =
new PodTemplateConfigMapStep(_),
provideInitialPod: () => SparkPod = SparkPod.initialPod) {
provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) {
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]): KubernetesDriverSpec = {

View file

@ -53,7 +53,7 @@ private[spark] class KubernetesExecutorBuilder(
KubernetesConf[KubernetesExecutorSpecificConf]
=> HadoopSparkUserExecutorFeatureStep) =
new HadoopSparkUserExecutorFeatureStep(_),
provideInitialPod: () => SparkPod = SparkPod.initialPod) {
provideInitialPod: () => SparkPod = () => SparkPod.initialPod()) {
def buildFromFeatures(
kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]): SparkPod = {

View file

@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.Mockito.{doReturn, verify, when}
import org.scalatest.BeforeAndAfter
@ -28,7 +28,6 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.deploy.k8s.submit.JavaMainAppResource
class ClientSuite extends SparkFunSuite with BeforeAndAfter {

View file

@ -157,7 +157,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
private def kubernetesConfWithCorrectFields(): KubernetesConf[KubernetesExecutorSpecificConf] =
Matchers.argThat(new ArgumentMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] {
override def matches(argument: scala.Any): Boolean = {
if (!argument.isInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]) {
if (!argument.isInstanceOf[KubernetesConf[_]]) {
false
} else {
val k8sConf = argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]]

View file

@ -116,8 +116,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
}
def createContainer(host: String, resource: Resource = containerResource): Container = {
// When YARN 2.6+ is required, avoid deprecation by using version with long second arg
val containerId = ContainerId.newInstance(appAttemptId, containerNum)
val containerId = ContainerId.newContainerId(appAttemptId, containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)
Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null)

View file

@ -199,7 +199,7 @@ class HyperLogLogPlusPlusHelper(relativeSD: Double) extends Serializable {
var shift = 0
while (idx < m && i < REGISTERS_PER_WORD) {
val Midx = (word >>> shift) & REGISTER_WORD_MASK
zInverse += 1.0 / (1 << Midx)
zInverse += 1.0 / (1L << Midx)
if (Midx == 0) {
V += 1.0d
}

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst.analysis
import scala.beans.{BeanInfo, BeanProperty}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
@ -30,8 +28,9 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData}
import org.apache.spark.sql.types._
@BeanInfo
private[sql] case class GroupableData(@BeanProperty data: Int)
private[sql] case class GroupableData(data: Int) {
def getData: Int = data
}
private[sql] class GroupableUDT extends UserDefinedType[GroupableData] {
@ -50,8 +49,9 @@ private[sql] class GroupableUDT extends UserDefinedType[GroupableData] {
private[spark] override def asNullable: GroupableUDT = this
}
@BeanInfo
private[sql] case class UngroupableData(@BeanProperty data: Map[Int, Int])
private[sql] case class UngroupableData(data: Map[Int, Int]) {
def getData: Map[Int, Int] = data
}
private[sql] class UngroupableUDT extends UserDefinedType[UngroupableData] {

View file

@ -311,7 +311,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
outputMode: OutputMode,
useTempCheckpointLocation: Boolean = false,
recoverFromCheckpointLocation: Boolean = true,
trigger: Trigger = ProcessingTime(0),
trigger: Trigger = Trigger.ProcessingTime(0),
triggerClock: Clock = new SystemClock()): StreamingQuery = {
val query = createQuery(
userSpecifiedName,

View file

@ -143,11 +143,16 @@ public class JavaBeanDeserializationSuite implements Serializable {
this.intervals = intervals;
}
@Override
public int hashCode() {
return id ^ Objects.hashCode(intervals);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof ArrayRecord)) return false;
ArrayRecord other = (ArrayRecord) obj;
return (other.id == this.id) && other.intervals.equals(this.intervals);
return (other.id == this.id) && Objects.equals(other.intervals, this.intervals);
}
@Override
@ -184,6 +189,11 @@ public class JavaBeanDeserializationSuite implements Serializable {
this.intervals = intervals;
}
@Override
public int hashCode() {
return id ^ Objects.hashCode(intervals);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof MapRecord)) return false;
@ -225,6 +235,11 @@ public class JavaBeanDeserializationSuite implements Serializable {
this.endTime = endTime;
}
@Override
public int hashCode() {
return Long.hashCode(startTime) ^ Long.hashCode(endTime);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Interval)) return false;

View file

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.org.apache.spark.sql.sources.v2;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
class JavaRangeInputPartition implements InputPartition {
int start;
int end;
JavaRangeInputPartition(int start, int end) {
this.start = start;
this.end = end;
}
}

View file

@ -88,12 +88,3 @@ class JavaSimpleReaderFactory implements PartitionReaderFactory {
}
}
class JavaRangeInputPartition implements InputPartition {
int start;
int end;
JavaRangeInputPartition(int start, int end) {
this.start = start;
this.end = end;
}
}

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql
import scala.beans.{BeanInfo, BeanProperty}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Cast, ExpressionEvalHelper, GenericInternalRow, Literal}
@ -28,10 +26,10 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@BeanInfo
private[sql] case class MyLabeledPoint(
@BeanProperty label: Double,
@BeanProperty features: UDT.MyDenseVector)
private[sql] case class MyLabeledPoint(label: Double, features: UDT.MyDenseVector) {
def getLabel: Double = label
def getFeatures: UDT.MyDenseVector = features
}
// Wrapped in an object to check Scala compatibility. See SPARK-13929
object UDT {

View file

@ -46,6 +46,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
(input.tail, input.init).zipped.map {
case (x: Int, y: Int) => (x - y).toLong
case (x: Long, y: Long) => x - y
case other => fail(s"Unexpected input $other")
}
}
@ -116,7 +117,7 @@ class IntegralDeltaSuite extends SparkFunSuite {
val row = new GenericInternalRow(1)
val nullRow = new GenericInternalRow(1)
nullRow.setNullAt(0)
input.map { value =>
input.foreach { value =>
if (value == nullValue) {
builder.appendFrom(nullRow, 0)
} else {

View file

@ -19,9 +19,6 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import org.eclipse.jetty.util.ConcurrentHashSet
import org.scalatest.concurrent.{Eventually, Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
@ -48,7 +45,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
}
test("trigger timing") {
val triggerTimes = new ConcurrentHashSet[Int]
val triggerTimes = ConcurrentHashMap.newKeySet[Int]()
val clock = new StreamManualClock()
@volatile var continueExecuting = true
@volatile var clockIncrementInTrigger = 0L

View file

@ -382,10 +382,9 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
tasks.foreach {
case t: TextSocketContinuousInputPartition =>
val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader]
for (i <- 0 until numRecords / 2) {
for (_ <- 0 until numRecords / 2) {
r.next()
assert(r.get().get(0, TextSocketReader.SCHEMA_TIMESTAMP)
.isInstanceOf[(String, Timestamp)])
assert(r.get().get(0, TextSocketReader.SCHEMA_TIMESTAMP).isInstanceOf[(_, _)])
}
case _ => throw new IllegalStateException("Unexpected task type")
}

View file

@ -150,7 +150,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
def getPeakExecutionMemory(stageId: Int): Long = {
val peakMemoryAccumulator = sparkListener.getCompletedStageInfos(stageId).accumulables
.filter(_._2.name == InternalAccumulator.PEAK_EXECUTION_MEMORY)
.filter(_._2.name == Some(InternalAccumulator.PEAK_EXECUTION_MEMORY))
assert(peakMemoryAccumulator.size == 1)
peakMemoryAccumulator.head._2.value.get.asInstanceOf[Long]

View file

@ -28,7 +28,7 @@ import org.apache.spark.sql.hive.HiveUtils
class HiveCliSessionStateSuite extends SparkFunSuite {
def withSessionClear(f: () => Unit): Unit = {
try f finally SessionState.detachSession()
try f() finally SessionState.detachSession()
}
test("CliSessionState will be reused") {