[SPARK-27536][CORE][ML][SQL][STREAMING] Remove most use of scala.language.existentials

## What changes were proposed in this pull request?

I want to get rid of as much use of `scala.language.existentials` as possible for 3.0. It's a complicated language feature that generates warnings unless this value is imported. It might even be on the way out of Scala: https://contributors.scala-lang.org/t/proposal-to-remove-existential-types-from-the-language/2785

For Spark, it comes up mostly where the code plays fast and loose with generic types, not the advanced situations you'll often see referenced where this feature is explained. For example, it comes up in cases where a function returns something like `(String, Class[_])`. Scala doesn't like matching this to any other instance of `(String, Class[_])` because doing so requires inferring the existence of some type that satisfies both. Seems obvious if the generic type is a wildcard, but, not technically something Scala likes to let you get away with.

This is a large PR, and it only gets rid of _most_ instances of `scala.language.existentials`. The change should be all compile-time and shouldn't affect APIs or logic.

Many of the changes simply touch up sloppiness about generic types, making the known correct value explicit in the code.

Some fixes involve being more explicit about the existence of generic types in methods. For instance, `def foo(arg: Class[_])` seems innocent enough but should really be declared `def foo[T](arg: Class[T])` to let Scala select and fix a single type when evaluating calls to `foo`.

For kind of surprising reasons, this comes up in places where code evaluates a tuple of things that involve a generic type, but is OK if the two parts of the tuple are evaluated separately.

One key change was altering `Utils.classForName(...): Class[_]` to the more correct `Utils.classForName[T](...): Class[T]`. This caused a number of small but positive changes to callers that otherwise had to cast the result.

In several tests, `Dataset[_]` was used where `DataFrame` seems to be the clear intent.

Finally, in a few cases in MLlib, the return type `this.type` was used where there are no subclasses of the class that uses it. This really isn't needed and causes issues for Scala reasoning about the return type. These are just changed to be concrete classes as return types.

After this change, we have only a few classes that still import `scala.language.existentials` (because modifying them would require extensive rewrites to fix) and no build warnings.

## How was this patch tested?

Existing tests.

Closes #24431 from srowen/SPARK-27536.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Sean Owen 2019-04-29 11:02:01 -05:00
parent fbc7942683
commit 8a17d26784
71 changed files with 180 additions and 241 deletions

View file

@ -33,18 +33,17 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
* A trait for use with reading custom classes in PySpark. Implement this trait and add custom
* transformation code by overriding the convert method.
*/
trait Converter[T, + U] extends Serializable {
trait Converter[-T, +U] extends Serializable {
def convert(obj: T): U
}
private[python] object Converter extends Logging {
def getInstance(converterClass: Option[String],
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
def getInstance[T, U](converterClass: Option[String],
defaultConverter: Converter[_ >: T, _ <: U]): Converter[T, U] = {
converterClass.map { cc =>
Try {
val c = Utils.classForName(cc).getConstructor().
newInstance().asInstanceOf[Converter[Any, Any]]
val c = Utils.classForName[Converter[T, U]](cc).getConstructor().newInstance()
logInfo(s"Loaded converter: $cc")
c
} match {
@ -177,8 +176,8 @@ private[python] object PythonHadoopUtil {
* [[org.apache.hadoop.io.Writable]], into an RDD of base types, or vice versa.
*/
def convertRDD[K, V](rdd: RDD[(K, V)],
keyConverter: Converter[Any, Any],
valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
keyConverter: Converter[K, Any],
valueConverter: Converter[V, Any]): RDD[(Any, Any)] = {
rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) }
}

View file

@ -24,10 +24,6 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.concurrent.Promise
import scala.concurrent.duration.Duration
import scala.language.existentials
import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
@ -228,8 +224,8 @@ private[spark] object PythonRDD extends Logging {
batchSize: Int): JavaRDD[Array[Byte]] = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val kc = Utils.classForName[K](keyClass)
val vc = Utils.classForName[V](valueClass)
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
@ -296,9 +292,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration): RDD[(K, V)] = {
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
val kc = Utils.classForName[K](keyClass)
val vc = Utils.classForName[V](valueClass)
val fc = Utils.classForName[F](inputFormatClass)
if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
@ -365,9 +361,9 @@ private[spark] object PythonRDD extends Logging {
keyClass: String,
valueClass: String,
conf: Configuration) = {
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
val kc = Utils.classForName[K](keyClass)
val vc = Utils.classForName[V](valueClass)
val fc = Utils.classForName[F](inputFormatClass)
if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
@ -425,29 +421,33 @@ private[spark] object PythonRDD extends Logging {
PythonHadoopUtil.mergeConfs(baseConf, conf)
}
private def inferKeyValueTypes[K, V](rdd: RDD[(K, V)], keyConverterClass: String = null,
valueConverterClass: String = null): (Class[_], Class[_]) = {
private def inferKeyValueTypes[K, V, KK, VV](rdd: RDD[(K, V)], keyConverterClass: String = null,
valueConverterClass: String = null): (Class[_ <: KK], Class[_ <: VV]) = {
// Peek at an element to figure out key/value types. Since Writables are not serializable,
// we cannot call first() on the converted RDD. Instead, we call first() on the original RDD
// and then convert locally.
val (key, value) = rdd.first()
val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
val (kc, vc) = getKeyValueConverters[K, V, KK, VV](
keyConverterClass, valueConverterClass, new JavaToWritableConverter)
(kc.convert(key).getClass, vc.convert(value).getClass)
}
private def getKeyValueTypes(keyClass: String, valueClass: String):
Option[(Class[_], Class[_])] = {
private def getKeyValueTypes[K, V](keyClass: String, valueClass: String):
Option[(Class[K], Class[V])] = {
for {
k <- Option(keyClass)
v <- Option(valueClass)
} yield (Utils.classForName(k), Utils.classForName(v))
}
private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
defaultConverter: Converter[Any, Any]): (Converter[Any, Any], Converter[Any, Any]) = {
val keyConverter = Converter.getInstance(Option(keyConverterClass), defaultConverter)
val valueConverter = Converter.getInstance(Option(valueConverterClass), defaultConverter)
private def getKeyValueConverters[K, V, KK, VV](
keyConverterClass: String,
valueConverterClass: String,
defaultConverter: Converter[_, _]): (Converter[K, KK], Converter[V, VV]) = {
val keyConverter = Converter.getInstance(Option(keyConverterClass),
defaultConverter.asInstanceOf[Converter[K, KK]])
val valueConverter = Converter.getInstance(Option(valueConverterClass),
defaultConverter.asInstanceOf[Converter[V, VV]])
(keyConverter, valueConverter)
}
@ -459,7 +459,7 @@ private[spark] object PythonRDD extends Logging {
keyConverterClass: String,
valueConverterClass: String,
defaultConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
val (kc, vc) = getKeyValueConverters(keyConverterClass, valueConverterClass,
val (kc, vc) = getKeyValueConverters[K, V, Any, Any](keyConverterClass, valueConverterClass,
defaultConverter)
PythonHadoopUtil.convertRDD(rdd, kc, vc)
}
@ -470,7 +470,7 @@ private[spark] object PythonRDD extends Logging {
* [[org.apache.hadoop.io.Writable]] types already, since Writables are not Java
* `Serializable` and we can't peek at them. The `path` can be on any Hadoop file system.
*/
def saveAsSequenceFile[K, V, C <: CompressionCodec](
def saveAsSequenceFile[C <: CompressionCodec](
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
path: String,
@ -489,7 +489,7 @@ private[spark] object PythonRDD extends Logging {
* `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
* this RDD.
*/
def saveAsHadoopFile[K, V, F <: OutputFormat[_, _], C <: CompressionCodec](
def saveAsHadoopFile[F <: OutputFormat[_, _], C <: CompressionCodec](
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
path: String,
@ -507,7 +507,7 @@ private[spark] object PythonRDD extends Logging {
val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new JavaToWritableConverter)
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
val fc = Utils.classForName[F](outputFormatClass)
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec = codec)
}
@ -520,7 +520,7 @@ private[spark] object PythonRDD extends Logging {
* `confAsMap` is merged with the default Hadoop conf associated with the SparkContext of
* this RDD.
*/
def saveAsNewAPIHadoopFile[K, V, F <: NewOutputFormat[_, _]](
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
path: String,
@ -548,7 +548,7 @@ private[spark] object PythonRDD extends Logging {
* (mapred vs. mapreduce). Keys/values are converted for output using either user specified
* converters or, by default, [[org.apache.spark.api.python.JavaToWritableConverter]].
*/
def saveAsHadoopDataset[K, V](
def saveAsHadoopDataset(
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
confAsMap: java.util.HashMap[String, String],

View file

@ -20,8 +20,6 @@ package org.apache.spark.api.r
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
import java.util.concurrent.TimeUnit
import scala.language.existentials
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
import io.netty.channel.ChannelHandler.Sharable
import io.netty.handler.timeout.ReadTimeoutException

View file

@ -34,7 +34,6 @@ import org.apache.spark.internal.config.History
import org.apache.spark.internal.config.UI._
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{ShutdownHookManager, SystemClock, Utils}
/**
@ -274,10 +273,9 @@ object HistoryServer extends Logging {
val providerName = conf.get(History.PROVIDER)
.getOrElse(classOf[FsHistoryProvider].getName())
val provider = Utils.classForName(providerName)
val provider = Utils.classForName[ApplicationHistoryProvider](providerName)
.getConstructor(classOf[SparkConf])
.newInstance(conf)
.asInstanceOf[ApplicationHistoryProvider]
val port = conf.get(History.HISTORY_SERVER_UI_PORT)

View file

@ -149,7 +149,7 @@ object FileCommitProtocol extends Logging {
logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" +
s" dynamic=$dynamicPartitionOverwrite")
val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
val clazz = Utils.classForName[FileCommitProtocol](className)
// First try the constructor with arguments (jobId: String, outputPath: String,
// dynamicPartitionOverwrite: Boolean).
// If that doesn't exist, try the one with (jobId: string, outputPath: String).

View file

@ -77,8 +77,9 @@ private[spark] object CompressionCodec {
val codecClass =
shortCompressionCodecNames.getOrElse(codecName.toLowerCase(Locale.ROOT), codecName)
val codec = try {
val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
val ctor =
Utils.classForName[CompressionCodec](codecClass).getConstructor(classOf[SparkConf])
Some(ctor.newInstance(conf))
} catch {
case _: ClassNotFoundException | _: IllegalArgumentException => None
}

View file

@ -179,8 +179,8 @@ private[spark] class MetricsSystem private (
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Utils.classForName(classPath).getConstructor().newInstance()
registerSource(source.asInstanceOf[Source])
val source = Utils.classForName[Source](classPath).getConstructor().newInstance()
registerSource(source)
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
@ -195,13 +195,18 @@ private[spark] class MetricsSystem private (
val classPath = kv._2.getProperty("class")
if (null != classPath) {
try {
val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
if (kv._1 == "servlet") {
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
val servlet = Utils.classForName[MetricsServlet](classPath)
.getConstructor(
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
metricsServlet = Some(servlet)
} else {
sinks += sink.asInstanceOf[Sink]
val sink = Utils.classForName[Sink](classPath)
.getConstructor(
classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
.newInstance(kv._2, registry, securityMgr)
sinks += sink
}
} catch {
case e: Exception =>

View file

@ -20,7 +20,6 @@ package org.apache.spark.network.netty
import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.language.existentials
import scala.reflect.ClassTag
import org.apache.spark.internal.Logging
@ -66,12 +65,7 @@ class NettyBlockRpcServer(
case uploadBlock: UploadBlock =>
// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
val (level: StorageLevel, classTag: ClassTag[_]) = {
serializer
.newInstance()
.deserialize(ByteBuffer.wrap(uploadBlock.metadata))
.asInstanceOf[(StorageLevel, ClassTag[_])]
}
val (level, classTag) = deserializeMetadata(uploadBlock.metadata)
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
val blockId = BlockId(uploadBlock.blockId)
logDebug(s"Receiving replicated block $blockId with level ${level} " +
@ -87,12 +81,7 @@ class NettyBlockRpcServer(
responseContext: RpcResponseCallback): StreamCallbackWithID = {
val message =
BlockTransferMessage.Decoder.fromByteBuffer(messageHeader).asInstanceOf[UploadBlockStream]
val (level: StorageLevel, classTag: ClassTag[_]) = {
serializer
.newInstance()
.deserialize(ByteBuffer.wrap(message.metadata))
.asInstanceOf[(StorageLevel, ClassTag[_])]
}
val (level, classTag) = deserializeMetadata(message.metadata)
val blockId = BlockId(message.blockId)
logDebug(s"Receiving replicated block $blockId with level ${level} as stream " +
s"from ${client.getSocketAddress}")
@ -101,5 +90,12 @@ class NettyBlockRpcServer(
blockManager.putBlockDataAsStream(blockId, level, classTag)
}
private def deserializeMetadata[T](metadata: Array[Byte]): (StorageLevel, ClassTag[T]) = {
serializer
.newInstance()
.deserialize(ByteBuffer.wrap(metadata))
.asInstanceOf[(StorageLevel, ClassTag[T])]
}
override def getStreamManager(): StreamManager = streamManager
}

View file

@ -20,7 +20,6 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
import scala.language.existentials
import scala.reflect.ClassTag
import org.apache.spark._

View file

@ -21,7 +21,6 @@ import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.language.existentials
import scala.reflect.ClassTag
import org.apache.spark._

View file

@ -26,7 +26,6 @@ import scala.annotation.tailrec
import scala.collection.Map
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.concurrent.duration._
import scala.language.existentials
import scala.util.control.NonFatal
import org.apache.commons.lang3.SerializationUtils
@ -382,7 +381,8 @@ private[spark] class DAGScheduler(
* locations that are still available from the previous shuffle to avoid unnecessarily
* regenerating data.
*/
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
def createShuffleMapStage[K, V, C](
shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)

View file

@ -19,8 +19,6 @@ package org.apache.spark.scheduler
import java.util.Properties
import scala.language.existentials
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{AccumulatorV2, CallSite}

View file

@ -21,13 +21,10 @@ import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.Properties
import scala.language.existentials
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.shuffle.ShuffleWriter
/**
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
@ -85,13 +82,15 @@ private[spark] class ShuffleMapTask(
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
val rdd = rddAndDep._1
val dep = rddAndDep._2
dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
}

View file

@ -21,7 +21,6 @@ import java.net.{InetAddress, ServerSocket, Socket}
import scala.concurrent.Promise
import scala.concurrent.duration.Duration
import scala.language.existentials
import scala.util.Try
import org.apache.spark.SparkEnv

View file

@ -164,8 +164,8 @@ class KryoSerializer(conf: SparkConf)
}
// Allow the user to register their own classes by setting spark.kryo.registrator.
userRegistrators
.map(Utils.classForName(_, noSparkClassLoader = true).getConstructor().
newInstance().asInstanceOf[KryoRegistrator])
.map(Utils.classForName[KryoRegistrator](_, noSparkClassLoader = true).
getConstructor().newInstance())
.foreach { reg => reg.registerClasses(kryo) }
} catch {
case e: Exception =>

View file

@ -21,7 +21,6 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.lang.invoke.SerializedLambda
import scala.collection.mutable.{Map, Set, Stack}
import scala.language.existentials
import org.apache.xbean.asm7.{ClassReader, ClassVisitor, MethodVisitor, Type}
import org.apache.xbean.asm7.Opcodes._
@ -309,7 +308,9 @@ private[spark] object ClosureCleaner extends Logging {
var outerPairs: List[(Class[_], AnyRef)] = outerClasses.zip(outerObjects).reverse
var parent: AnyRef = null
if (outerPairs.nonEmpty) {
val (outermostClass, outermostObject) = outerPairs.head
val outermostClass = outerPairs.head._1
val outermostObject = outerPairs.head._2
if (isClosure(outermostClass)) {
logDebug(s" + outermost object is a closure, so we clone it: ${outermostClass}")
} else if (outermostClass.getName.startsWith("$line")) {

View file

@ -197,14 +197,15 @@ private[spark] object Utils extends Logging {
* Preferred alternative to Class.forName(className), as well as
* Class.forName(className, initialize, loader) with current thread's ContextClassLoader.
*/
def classForName(
def classForName[C](
className: String,
initialize: Boolean = true,
noSparkClassLoader: Boolean = false): Class[_] = {
noSparkClassLoader: Boolean = false): Class[C] = {
if (!noSparkClassLoader) {
Class.forName(className, initialize, getContextOrSparkClassLoader)
Class.forName(className, initialize, getContextOrSparkClassLoader).asInstanceOf[Class[C]]
} else {
Class.forName(className, initialize, Thread.currentThread().getContextClassLoader)
Class.forName(className, initialize, Thread.currentThread().getContextClassLoader).
asInstanceOf[Class[C]]
}
// scalastyle:on classforname
}
@ -2680,10 +2681,11 @@ private[spark] object Utils extends Logging {
* other state) and decide they do not need to be added. A log message is printed in that case.
* Other exceptions are bubbled up.
*/
def loadExtensions[T](extClass: Class[T], classes: Seq[String], conf: SparkConf): Seq[T] = {
def loadExtensions[T <: AnyRef](
extClass: Class[T], classes: Seq[String], conf: SparkConf): Seq[T] = {
classes.flatMap { name =>
try {
val klass = classForName(name)
val klass = classForName[T](name)
require(extClass.isAssignableFrom(klass),
s"$name is not a subclass of ${extClass.getName()}.")

View file

@ -21,7 +21,6 @@ import java.lang.ref.WeakReference
import java.util.concurrent.TimeUnit
import scala.collection.mutable.HashSet
import scala.language.existentials
import scala.util.Random
import org.scalatest.BeforeAndAfter
@ -70,10 +69,11 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So
protected def newShuffleRDD() = newPairRDD().reduceByKey(_ + _)
protected def newBroadcast() = sc.broadcast(1 to 100)
protected def newRDDWithShuffleDependencies(): (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = {
protected def newRDDWithShuffleDependencies():
(RDD[(Int, Int)], Seq[ShuffleDependency[Int, Int, Int]]) = {
def getAllDependencies(rdd: RDD[(Int, Int)]): Seq[Dependency[_]] = {
rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
getAllDependencies(dep.rdd)
getAllDependencies(dep.rdd.asInstanceOf[RDD[(Int, Int)]])
}
}
val rdd = newShuffleRDD()
@ -81,7 +81,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So
// Get all the shuffle dependencies
val shuffleDeps = getAllDependencies(rdd)
.filter(_.isInstanceOf[ShuffleDependency[_, _, _]])
.map(_.asInstanceOf[ShuffleDependency[_, _, _]])
.map(_.asInstanceOf[ShuffleDependency[Int, Int, Int]])
(rdd, shuffleDeps)
}

View file

@ -35,7 +35,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD, RDD}
import org.apache.spark.rdd.{HadoopRDD, NewHadoopRDD}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@ -206,8 +206,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
Utils.withContextClassLoader(loader) {
sc = new SparkContext("local", "test")
val objs = sc.makeRDD(1 to 3).map { x =>
Utils.classForName(className, noSparkClassLoader = true).getConstructor().newInstance()
val objs = sc.makeRDD(1 to 3).map { _ =>
Utils.classForName[AnyRef](className, noSparkClassLoader = true).
getConstructor().newInstance()
}
val outputDir = new File(tempDir, "output").getAbsolutePath
objs.saveAsObjectFile(outputDir)

View file

@ -23,7 +23,6 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future
import scala.concurrent.duration.{Duration, SECONDS}
import scala.language.existentials
import scala.reflect.ClassTag
import org.scalactic.TripleEquals
@ -52,7 +51,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa
var taskScheduler: TestTaskScheduler = null
var scheduler: DAGScheduler = null
var backend: T = _
// Even though the tests aren't doing much, occassionally we see flakiness from pauses over
// Even though the tests aren't doing much, occasionally we see flakiness from pauses over
// a second (probably from GC?) so we leave a long timeout in here
val duration = Duration(10, SECONDS)
@ -297,11 +296,11 @@ private[spark] abstract class MockBackend(
* Test backends should call this to get a task that has been assigned to them by the scheduler.
* Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
*/
def beginTask(): (TaskDescription, Task[_]) = {
def beginTask[T](): (TaskDescription, Task[T]) = {
synchronized {
val toRun = assignedTasksWaitingToRun.remove(assignedTasksWaitingToRun.size - 1)
runningTasks += toRun._1.taskId
toRun
toRun.asInstanceOf[(TaskDescription, Task[T])]
}
}
@ -589,7 +588,7 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
backend.taskSuccess(taskDescription, 4321 + partition)
}
}
withBackend(runBackend _) {
withBackend(() => runBackend()) {
val jobFuture = submit(d, (0 until 30).toArray)
awaitJobTermination(jobFuture, duration)
}

View file

@ -134,7 +134,8 @@ class MutableURLClassLoaderSuite extends SparkFunSuite with Matchers {
try {
sc.makeRDD(1 to 5, 2).mapPartitions { x =>
Utils.classForName(className, noSparkClassLoader = true).getConstructor().newInstance()
Utils.classForName[AnyRef](className, noSparkClassLoader = true).
getConstructor().newInstance()
Seq().iterator
}.count()
}

View file

@ -17,7 +17,6 @@
package org.apache.spark.kafka010
import scala.language.existentials
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration

View file

@ -24,7 +24,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.Token
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions}
@ -47,7 +47,8 @@ private[spark] object KafkaTokenUtil extends Logging {
override def getKind: Text = TOKEN_KIND
}
private[kafka010] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = {
private[kafka010] def obtainToken(sparkConf: SparkConf):
(Token[KafkaDelegationTokenIdentifier], Long) = {
checkProxyUser()
val adminClient = AdminClient.create(createAdminClientProperties(sparkConf))

View file

@ -20,7 +20,6 @@ package org.apache.spark.graphx.util
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.mutable.HashSet
import scala.language.existentials
import org.apache.xbean.asm7.{ClassReader, ClassVisitor, MethodVisitor}
import org.apache.xbean.asm7.Opcodes._
@ -48,7 +47,7 @@ private[graphx] object BytecodeUtils {
return true
}
}
return false
false
}
}
@ -59,7 +58,8 @@ private[graphx] object BytecodeUtils {
var stack = List[(Class[_], String)]((cls, method))
while (stack.nonEmpty) {
val (c, m) = stack.head
val c = stack.head._1
val m = stack.head._2
stack = stack.tail
seen.add((c, m))
val finder = new MethodInvocationFinder(c.getName, m)
@ -72,7 +72,7 @@ private[graphx] object BytecodeUtils {
}
}
}
return false
false
}
/**

View file

@ -1290,7 +1290,7 @@ private[ml] class MultiClassSummarizer extends Serializable {
* @param weight The weight of this instances.
* @return This MultilabelSummarizer
*/
def add(label: Double, weight: Double = 1.0): this.type = {
def add(label: Double, weight: Double = 1.0): MultiClassSummarizer = {
require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
if (weight == 0.0) return this

View file

@ -17,8 +17,6 @@
package org.apache.spark.ml.feature
import scala.language.existentials
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException

View file

@ -20,7 +20,6 @@ package org.apache.spark.ml.feature
import java.util.NoSuchElementException
import scala.collection.mutable
import scala.language.existentials
import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
@ -131,14 +130,14 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String)
throw new SparkException(s"VectorAssembler does not support the $otherType type")
}
}
val featureAttributes = featureAttributesMap.flatten[Attribute].toArray
val lengths = featureAttributesMap.map(a => a.length).toArray
val featureAttributes = featureAttributesMap.flatten[Attribute]
val lengths = featureAttributesMap.map(a => a.length)
val metadata = new AttributeGroup($(outputCol), featureAttributes).toMetadata()
val (filteredDataset, keepInvalid) = $(handleInvalid) match {
case VectorAssembler.SKIP_INVALID => (dataset.na.drop($(inputCols)), false)
case VectorAssembler.KEEP_INVALID => (dataset, true)
case VectorAssembler.ERROR_INVALID => (dataset, false)
val filteredDataset = $(handleInvalid) match {
case VectorAssembler.SKIP_INVALID => dataset.na.drop($(inputCols))
case VectorAssembler.KEEP_INVALID | VectorAssembler.ERROR_INVALID => dataset
}
val keepInvalid = $(handleInvalid) == VectorAssembler.KEEP_INVALID
// Data transformation.
val assembleFunc = udf { r: Row =>
VectorAssembler.assemble(lengths, keepInvalid)(r.toSeq: _*)

View file

@ -17,7 +17,6 @@
package org.apache.spark.ml.image
import scala.language.existentials
import scala.util.Random
import org.apache.commons.io.FilenameUtils
@ -103,7 +102,7 @@ private object SamplePathFilter {
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
val old = Option(hadoopConf.getClass(flagName, null))
val old = hadoopConf.getClass(flagName, null)
hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio)
hadoopConf.setLong(SamplePathFilter.seedParam, seed)
hadoopConf.setClass(flagName, classOf[SamplePathFilter], classOf[PathFilter])
@ -111,8 +110,8 @@ private object SamplePathFilter {
hadoopConf.unset(SamplePathFilter.ratioParam)
hadoopConf.unset(SamplePathFilter.seedParam)
old match {
case Some(v) => hadoopConf.setClass(flagName, v, classOf[PathFilter])
case None => hadoopConf.unset(flagName)
case null => hadoopConf.unset(flagName)
case v => hadoopConf.setClass(flagName, v, classOf[PathFilter])
}
}
} else {

View file

@ -847,7 +847,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
}
/** Adds an observation. */
def add(a: Array[Float], b: Double, c: Double = 1.0): this.type = {
def add(a: Array[Float], b: Double, c: Double = 1.0): NormalEquation = {
require(c >= 0.0)
require(a.length == k)
copyToDouble(a)
@ -859,7 +859,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
}
/** Merges another normal equation object. */
def merge(other: NormalEquation): this.type = {
def merge(other: NormalEquation): NormalEquation = {
require(other.k == k)
blas.daxpy(ata.length, 1.0, other.ata, 1, ata, 1)
blas.daxpy(atb.length, 1.0, other.atb, 1, atb, 1)

View file

@ -23,7 +23,6 @@ import java.nio.charset.StandardCharsets
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import scala.collection.JavaConverters._
import scala.language.existentials
import scala.reflect.ClassTag
import net.razorvine.pickle._

View file

@ -33,7 +33,7 @@ private[fpm] class FPTree[T] extends Serializable {
private val summaries: mutable.Map[T, Summary[T]] = mutable.Map.empty
/** Adds a transaction with count. */
def add(t: Iterable[T], count: Long = 1L): this.type = {
def add(t: Iterable[T], count: Long = 1L): FPTree[T] = {
require(count > 0)
var curr = root
curr.count += count
@ -53,7 +53,7 @@ private[fpm] class FPTree[T] extends Serializable {
}
/** Merges another FP-Tree. */
def merge(other: FPTree[T]): this.type = {
def merge(other: FPTree[T]): FPTree[T] = {
other.transactions.foreach { case (t, c) =>
add(t, c)
}

View file

@ -18,7 +18,6 @@
package org.apache.spark.ml.classification
import scala.collection.JavaConverters._
import scala.language.existentials
import scala.util.Random
import scala.util.control.Breaks._
@ -31,7 +30,7 @@ import org.apache.spark.ml.optim.aggregator.LogisticAggregator
import org.apache.spark.ml.param.{ParamMap, ParamsSuite}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, lit, rand}
import org.apache.spark.sql.types.LongType
@ -40,11 +39,11 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
import testImplicits._
private val seed = 42
@transient var smallBinaryDataset: Dataset[_] = _
@transient var smallMultinomialDataset: Dataset[_] = _
@transient var binaryDataset: Dataset[_] = _
@transient var multinomialDataset: Dataset[_] = _
@transient var multinomialDatasetWithZeroVar: Dataset[_] = _
@transient var smallBinaryDataset: DataFrame = _
@transient var smallMultinomialDataset: DataFrame = _
@transient var binaryDataset: DataFrame = _
@transient var multinomialDataset: DataFrame = _
@transient var multinomialDatasetWithZeroVar: DataFrame = _
private val eps: Double = 1e-5
override def beforeAll(): Unit = {

View file

@ -17,15 +17,13 @@
package org.apache.spark.ml.clustering
import scala.language.existentials
import org.apache.spark.SparkException
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.clustering.DistanceMeasure
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.DataFrame
class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
@ -33,9 +31,8 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
import testImplicits._
final val k = 5
@transient var dataset: Dataset[_] = _
@transient var sparseDataset: Dataset[_] = _
@transient var dataset: DataFrame = _
@transient var sparseDataset: DataFrame = _
override def beforeAll(): Unit = {
super.beforeAll()
@ -191,7 +188,7 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest {
}
test("BisectingKMeans with Array input") {
def trainAndComputeCost(dataset: Dataset[_]): Double = {
def trainAndComputeCost(dataset: DataFrame): Double = {
val model = new BisectingKMeans().setK(k).setMaxIter(1).setSeed(1).fit(dataset)
model.computeCost(dataset)
}

View file

@ -17,15 +17,13 @@
package org.apache.spark.ml.clustering
import scala.language.existentials
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{DenseMatrix, Matrices, Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.{DataFrame, Dataset, Row}
class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
@ -35,11 +33,11 @@ class GaussianMixtureSuite extends MLTest with DefaultReadWriteTest {
final val k = 5
private val seed = 538009335
@transient var dataset: Dataset[_] = _
@transient var denseDataset: Dataset[_] = _
@transient var sparseDataset: Dataset[_] = _
@transient var decompositionDataset: Dataset[_] = _
@transient var rDataset: Dataset[_] = _
@transient var dataset: DataFrame = _
@transient var denseDataset: DataFrame = _
@transient var sparseDataset: DataFrame = _
@transient var decompositionDataset: DataFrame = _
@transient var rDataset: DataFrame = _
override def beforeAll(): Unit = {
super.beforeAll()

View file

@ -17,7 +17,6 @@
package org.apache.spark.ml.clustering
import scala.language.existentials
import scala.util.Random
import org.dmg.pmml.PMML
@ -40,7 +39,7 @@ class KMeansSuite extends MLTest with DefaultReadWriteTest with PMMLReadWriteTes
import testImplicits._
final val k = 5
@transient var dataset: Dataset[_] = _
@transient var dataset: DataFrame = _
override def beforeAll(): Unit = {
super.beforeAll()

View file

@ -17,8 +17,6 @@
package org.apache.spark.ml.clustering
import scala.language.existentials
import org.apache.hadoop.fs.Path
import org.apache.spark.ml.linalg.{Vector, Vectors}
@ -64,7 +62,7 @@ class LDASuite extends MLTest with DefaultReadWriteTest {
val k: Int = 5
val vocabSize: Int = 30
@transient var dataset: Dataset[_] = _
@transient var dataset: DataFrame = _
override def beforeAll(): Unit = {
super.beforeAll()
@ -329,8 +327,7 @@ class LDASuite extends MLTest with DefaultReadWriteTest {
(model.logLikelihood(dataset), model.logPerplexity(dataset))
}
val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
val (ll, lp) = trainAndLogLikelihoodAndPerplexity(newDataset)
val (_, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset)
val (llD, lpD) = trainAndLogLikelihoodAndPerplexity(newDatasetD)
val (llF, lpF) = trainAndLogLikelihoodAndPerplexity(newDatasetF)
// TODO: need to compare the results once we fix the seed issue for LDA (SPARK-22210)

View file

@ -24,7 +24,7 @@ import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.DataFrame
class ClusteringEvaluatorSuite
@ -32,10 +32,10 @@ class ClusteringEvaluatorSuite
import testImplicits._
@transient var irisDataset: Dataset[_] = _
@transient var newIrisDataset: Dataset[_] = _
@transient var newIrisDatasetD: Dataset[_] = _
@transient var newIrisDatasetF: Dataset[_] = _
@transient var irisDataset: DataFrame = _
@transient var newIrisDataset: DataFrame = _
@transient var newIrisDatasetD: DataFrame = _
@transient var newIrisDatasetF: DataFrame = _
override def beforeAll(): Unit = {
super.beforeAll()

View file

@ -23,7 +23,6 @@ import java.util.Random
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, WrappedArray}
import scala.language.existentials
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.commons.io.FileUtils

View file

@ -24,7 +24,6 @@ import org.apache.spark.ml.feature.{Instance, LabeledPoint}
import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared.HasWeightCol
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.ml.tree.impl.TreeTests
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
@ -255,8 +254,8 @@ object MLTestingUtils extends SparkFunSuite {
* one having double array "features" column with float precision, and one having float array
* "features" column.
*/
def generateArrayFeatureDataset(dataset: Dataset[_],
featuresColName: String = "features"): (Dataset[_], Dataset[_], Dataset[_]) = {
def generateArrayFeatureDataset(dataset: DataFrame,
featuresColName: String = "features"): (DataFrame, DataFrame, DataFrame) = {
val toFloatVectorUDF = udf { (features: Vector) =>
Vectors.dense(features.toArray.map(_.toFloat.toDouble))}
val toDoubleArrayUDF = udf { (features: Vector) => features.toArray}

View file

@ -16,8 +16,6 @@
*/
package org.apache.spark.mllib.fpm
import scala.language.existentials
import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.util.Utils
@ -301,7 +299,7 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext {
val path = tempDir.toURI.toString
try {
model3.save(sc, path)
val newModel = FPGrowthModel.load(sc, path)
val newModel = FPGrowthModel.load(sc, path).asInstanceOf[FPGrowthModel[String]]
val newFreqItemsets = newModel.freqItemsets.collect().map { itemset =>
(itemset.items.toSet, itemset.freq)
}
@ -335,7 +333,7 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext {
val path = tempDir.toURI.toString
try {
model3.save(sc, path)
val newModel = FPGrowthModel.load(sc, path)
val newModel = FPGrowthModel.load(sc, path).asInstanceOf[FPGrowthModel[String]]
val newFreqItemsets = newModel.freqItemsets.collect().map { itemset =>
(itemset.items.toSet, itemset.freq)
}

View file

@ -17,8 +17,6 @@
package org.apache.spark.mllib.fpm
import scala.language.existentials
import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.util.MLlibTestSparkContext

View file

@ -16,8 +16,6 @@
*/
package org.apache.spark.mllib.fpm
import scala.language.existentials
import org.apache.spark.SparkFunSuite
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.util.Utils
@ -420,7 +418,9 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
val path = tempDir.toURI.toString
try {
model.save(sc, path)
val newModel = PrefixSpanModel.load(sc, path)
// Save/loading this model results in item type "Object" even if in this case
// the objects are Integers -- not Int as in the original saved model.
val newModel = PrefixSpanModel.load(sc, path).asInstanceOf[PrefixSpanModel[AnyRef]]
val originalSet = model.freqSequences.collect().map { x =>
(x.sequence.map(_.toSet).toSeq, x.freq)
}.toSet

View file

@ -21,7 +21,6 @@ import java.util.Locale
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}

View file

@ -23,7 +23,6 @@ import java.util.{Map => JavaMap}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.language.existentials
import scala.util.control.NonFatal
import com.google.common.cache.{CacheBuilder, CacheLoader}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen
import java.lang.{Boolean => JBool}
import scala.collection.mutable.ArrayBuffer
import scala.language.{existentials, implicitConversions}
import scala.language.implicitConversions
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.{BooleanType, DataType}

View file

@ -21,7 +21,6 @@ import java.lang.reflect.{Method, Modifier}
import scala.collection.JavaConverters._
import scala.collection.mutable.Builder
import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.Try
@ -30,7 +29,6 @@ import org.apache.spark.serializer._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.ScalaReflection.universe.TermName
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedException}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql.catalyst.plans.logical
import scala.language.existentials
import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Encoder, Row}
@ -29,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions.objects.Invoke
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
object CatalystSerde {
def deserialize[T : Encoder](child: LogicalPlan): DeserializeToObject = {
@ -245,12 +242,12 @@ case class TypedFilter(
}
def typedCondition(input: Expression): Expression = {
val (funcClass, methodName) = func match {
case m: FilterFunction[_] => classOf[FilterFunction[_]] -> "call"
val funcMethod = func match {
case _: FilterFunction[_] => (classOf[FilterFunction[_]], "call")
case _ => FunctionUtils.getFunctionOneName(BooleanType, input.dataType)
}
val funcObj = Literal.create(func, ObjectType(funcClass))
Invoke(funcObj, methodName, BooleanType, input :: Nil)
val funcObj = Literal.create(func, ObjectType(funcMethod._1))
Invoke(funcObj, funcMethod._2, BooleanType, input :: Nil)
}
}
@ -266,9 +263,9 @@ object FunctionUtils {
}
}
def getFunctionOneName(outputDT: DataType, inputDT: DataType): (Class[_], String) = {
// load "scala.Function1" using Java API to avoid requirements of type parameters
Utils.classForName("scala.Function1") -> {
def getFunctionOneName(outputDT: DataType, inputDT: DataType):
(Class[scala.Function1[_, _]], String) = {
classOf[scala.Function1[_, _]] -> {
// if a pair of an argument and return types is one of specific types
// whose specialized method (apply$mc..$sp) is generated by scalac,
// Catalyst generated a direct method call to the specialized method.

View file

@ -48,11 +48,10 @@ object ArrayBasedMapData {
* @param valueConverter This function is applied over all the values of the input map to
* obtain the output map's values
*/
def apply(
javaMap: JavaMap[_, _],
def apply[K, V](
javaMap: JavaMap[K, V],
keyConverter: (Any) => Any,
valueConverter: (Any) => Any): ArrayBasedMapData = {
import scala.language.existentials
val keys: Array[Any] = new Array[Any](javaMap.size())
val values: Array[Any] = new Array[Any](javaMap.size())

View file

@ -180,7 +180,7 @@ object DataType {
("pyClass", _),
("sqlType", _),
("type", JString("udt"))) =>
Utils.classForName(udtClass).getConstructor().newInstance().asInstanceOf[UserDefinedType[_]]
Utils.classForName[UserDefinedType[_]](udtClass).getConstructor().newInstance()
// Python UDT
case JSortedObject(

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql.types
import scala.language.existentials
import org.apache.spark.annotation.Evolving
@Evolving

View file

@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
import scala.language.existentials
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
import scala.util.Random
@ -316,7 +315,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
test("SPARK-23587: MapObjects should support interpreted execution") {
def testMapObjects(collection: Any, collectionCls: Class[_], inputType: DataType): Unit = {
def testMapObjects[T](collection: Any, collectionCls: Class[T], inputType: DataType): Unit = {
val function = (lambda: Expression) => Add(lambda, Literal(1))
val elementType = IntegerType
val expected = Seq(2, 3, 4)

View file

@ -635,7 +635,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
private[sql] def registerJava(name: String, className: String, returnDataType: DataType): Unit = {
try {
val clazz = Utils.classForName(className)
val clazz = Utils.classForName[AnyRef](className)
val udfInterfaces = clazz.getGenericInterfaces
.filter(_.isInstanceOf[ParameterizedType])
.map(_.asInstanceOf[ParameterizedType])
@ -699,7 +699,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends
*/
private[sql] def registerJavaUDAF(name: String, className: String): Unit = {
try {
val clazz = Utils.classForName(className)
val clazz = Utils.classForName[AnyRef](className)
if (!classOf[UserDefinedAggregateFunction].isAssignableFrom(clazz)) {
throw new AnalysisException(s"class $className doesn't implement interface UserDefinedAggregateFunction")
}

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.aggregate
import scala.language.existentials
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
import java.util.{Locale, ServiceConfigurationError, ServiceLoader}
import scala.collection.JavaConverters._
import scala.language.{existentials, implicitConversions}
import scala.language.implicitConversions
import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.SparkPlan

View file

@ -21,7 +21,6 @@ import java.util.Locale
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import scala.language.existentials
import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.sql.catalyst.TableIdentifier
@ -50,8 +49,9 @@ class JoinSuite extends QueryTest with SharedSQLContext {
assert(planned.size === 1)
}
def assertJoin(pair: (String, Class[_])): Any = {
val (sqlString, c) = pair
def assertJoin(pair: (String, Class[_ <: BinaryExecNode])): Any = {
val sqlString = pair._1
val c = pair._2
val df = sql(sqlString)
val physical = df.queryExecution.sparkPlan
val operators = physical.collect {

View file

@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
import scala.language.existentials
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
@ -186,12 +184,11 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS
assert(typeName === PrimitiveTypeName.INT96)
val oneBlockMeta = oneFooter.getBlocks().get(0)
val oneBlockColumnMeta = oneBlockMeta.getColumns().get(0)
val columnStats = oneBlockColumnMeta.getStatistics
// This is the important assert. Column stats are written, but they are ignored
// when the data is read back as mentioned above, b/c int96 is unsigned. This
// assert makes sure this holds even if we change parquet versions (if eg. there
// were ever statistics even on unsigned columns).
assert(!columnStats.hasNonNullValue)
assert(!oneBlockColumnMeta.getStatistics.hasNonNullValue)
}
// These queries should return the entire dataset with the conversion applied,

View file

@ -19,8 +19,6 @@ package org.apache.spark.sql.sources
import java.util.Locale
import scala.language.existentials
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.PredicateHelper

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql.sources
import scala.language.existentials
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.internal.SQLConf

View file

@ -94,7 +94,7 @@ class HadoopTableReader(
override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
hiveTable,
Utils.classForName(tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]],
Utils.classForName[Deserializer](tableDesc.getSerdeClassName),
filterOpt = None)
/**

View file

@ -922,11 +922,10 @@ private[hive] object HiveClientImpl {
}
private def toInputFormat(name: String) =
Utils.classForName(name).asInstanceOf[Class[_ <: org.apache.hadoop.mapred.InputFormat[_, _]]]
Utils.classForName[org.apache.hadoop.mapred.InputFormat[_, _]](name)
private def toOutputFormat(name: String) =
Utils.classForName(name)
.asInstanceOf[Class[_ <: org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]]]
Utils.classForName[org.apache.hadoop.hive.ql.io.HiveOutputFormat[_, _]](name)
/**
* Converts the native table metadata representation format CatalogTable to Hive's Table.

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql.hive.execution
import scala.language.existentials
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.plan.TableDesc
@ -30,7 +28,6 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.client.HiveClientImpl

View file

@ -407,8 +407,8 @@ case class HiveScriptIOSchema (
columnTypes: Seq[DataType],
serdeProps: Seq[(String, String)]): AbstractSerDe = {
val serde = Utils.classForName(serdeClassName).getConstructor().
newInstance().asInstanceOf[AbstractSerDe]
val serde = Utils.classForName[AbstractSerDe](serdeClassName).getConstructor().
newInstance()
val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",")
@ -428,8 +428,8 @@ case class HiveScriptIOSchema (
inputStream: InputStream,
conf: Configuration): Option[RecordReader] = {
recordReaderClass.map { klass =>
val instance = Utils.classForName(klass).getConstructor().
newInstance().asInstanceOf[RecordReader]
val instance = Utils.classForName[RecordReader](klass).getConstructor().
newInstance()
val props = new Properties()
// Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
@ -441,8 +441,8 @@ case class HiveScriptIOSchema (
def recordWriter(outputStream: OutputStream, conf: Configuration): Option[RecordWriter] = {
recordWriterClass.map { klass =>
val instance = Utils.classForName(klass).getConstructor().
newInstance().asInstanceOf[RecordWriter]
val instance = Utils.classForName[RecordWriter](klass).getConstructor().
newInstance()
instance.initialize(outputStream, conf)
instance
}

View file

@ -20,8 +20,6 @@ package org.apache.spark.sql.hive.execution
import java.io.File
import java.net.URI
import scala.language.existentials
import org.apache.hadoop.fs.Path
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
import org.apache.parquet.hadoop.ParquetFileReader

View file

@ -17,15 +17,12 @@
package org.apache.spark.sql.hive.execution
import scala.language.existentials
import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.StaticSQLConf._

View file

@ -46,7 +46,7 @@ class WindowedDStream[T: ClassTag](
def windowDuration: Duration = _windowDuration
override def dependencies: List[DStream[_]] = List(parent)
override def dependencies: List[DStream[T]] = List(parent)
override def slideDuration: Duration = _slideDuration

View file

@ -20,7 +20,6 @@ package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.language.existentials
/** Trait representing a received block */
private[streaming] sealed trait ReceivedBlock

View file

@ -19,7 +19,6 @@ package org.apache.spark.streaming.receiver
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.existentials
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

View file

@ -51,11 +51,11 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
val clockClass = ssc.sc.conf.get(
"spark.streaming.clock", "org.apache.spark.util.SystemClock")
try {
Utils.classForName(clockClass).getConstructor().newInstance().asInstanceOf[Clock]
Utils.classForName[Clock](clockClass).getConstructor().newInstance()
} catch {
case e: ClassNotFoundException if clockClass.startsWith("org.apache.spark.streaming") =>
val newClockClass = clockClass.replace("org.apache.spark.streaming", "org.apache.spark")
Utils.classForName(newClockClass).getConstructor().newInstance().asInstanceOf[Clock]
Utils.classForName[Clock](newClockClass).getConstructor().newInstance()
}
}

View file

@ -21,7 +21,6 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.mutable.HashMap
import scala.concurrent.ExecutionContext
import scala.language.existentials
import scala.util.{Failure, Success}
import org.apache.spark._

View file

@ -132,8 +132,7 @@ private[streaming] object WriteAheadLogUtils extends Logging {
}
val wal = classNameOption.map { className =>
try {
instantiateClass(
Utils.classForName(className).asInstanceOf[Class[_ <: WriteAheadLog]], sparkConf)
instantiateClass(Utils.classForName[WriteAheadLog](className), sparkConf)
} catch {
case NonFatal(e) =>
throw new SparkException(s"Could not create a write ahead log of class $className", e)

View file

@ -20,7 +20,6 @@ package org.apache.spark.streaming
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.mutable
import scala.language.existentials
import scala.reflect.ClassTag
import org.scalatest.concurrent.Eventually.eventually
@ -656,12 +655,12 @@ class BasicOperationsSuite extends TestSuiteBase {
runCleanupTest(
conf,
operation _,
operation,
numExpectedOutput = cleanupTestInput.size / 2,
rememberDuration = Seconds(3)) { operatedStream =>
eventually(eventuallyTimeout) {
val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[_]]
val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[_]]
val windowedStream2 = operatedStream.asInstanceOf[WindowedDStream[Int]]
val windowedStream1 = windowedStream2.dependencies.head.asInstanceOf[WindowedDStream[Int]]
val mappedStream = windowedStream1.dependencies.head
// Checkpoint remember durations