[SPARK-29292][SPARK-30010][CORE] Let core compile for Scala 2.13

### What changes were proposed in this pull request?

The purpose of this PR is to partly resolve SPARK-29292, and fully resolve SPARK-30010, which should allow Spark to compile vs Scala 2.13 in Spark Core and up through GraphX (not SQL, Streaming, etc).

Note that we are not trying to determine here whether this makes Spark work on 2.13 yet, just compile, as a prerequisite for assessing test outcomes. However, of course, we need to ensure that the change does not break 2.12.

The changes are, in the main, adding .toSeq and .toMap calls where mutable collections / maps are returned as Seq / Map, which are immutable by default in Scala 2.13. The theory is that it should be a no-op for Scala 2.12 (these return themselves), and required for 2.13.

There are a few non-trivial changes highlighted below.
In particular, to get Core to compile, we need to resolve SPARK-30010 which removes a deprecated SparkConf method

### Why are the changes needed?

Eventually, we need to support a Scala 2.13 build, perhaps in Spark 3.1.

### Does this PR introduce _any_ user-facing change?

Yes, removal of the deprecated SparkConf.setAll overload, which isn't legal in Scala 2.13 anymore.

### How was this patch tested?

Existing tests. (2.13 was not _tested_; this is about getting it to compile without breaking 2.12)

Closes #28971 from srowen/SPARK-29292.1.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Sean Owen 2020-07-11 14:34:02 -07:00 committed by Dongjoon Hyun
parent ceaa3924cb
commit 3ad4863673
58 changed files with 119 additions and 122 deletions

View file

@ -550,7 +550,7 @@ private[spark] class ExecutorAllocationManager(
} else {
// We don't want to change our target number of executors, because we already did that
// when the task backlog decreased.
client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false,
countFailures = false, force = false)
}
@ -563,9 +563,9 @@ private[spark] class ExecutorAllocationManager(
// reset the newExecutorTotal to the existing number of executors
if (testing || executorsRemoved.nonEmpty) {
executorMonitor.executorsKilled(executorsRemoved)
executorMonitor.executorsKilled(executorsRemoved.toSeq)
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
executorsRemoved
executorsRemoved.toSeq
} else {
logWarning(s"Unable to reach the cluster manager to kill executor/s " +
s"${executorIdsToBeRemoved.mkString(",")} or no executor eligible to kill!")

View file

@ -972,6 +972,6 @@ private[spark] object MapOutputTracker extends Logging {
}
}
splitsByAddress.iterator
splitsByAddress.mapValues(_.toSeq).iterator
}
}

View file

@ -173,15 +173,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
this
}
/**
* Set multiple parameters together
*/
@deprecated("Use setAll(Iterable) instead", "3.0.0")
def setAll(settings: Traversable[(String, String)]): SparkConf = {
settings.foreach { case (k, v) => set(k, v) }
this
}
/** Set a parameter if it isn't already configured */
def setIfMissing(key: String, value: String): SparkConf = {
if (settings.putIfAbsent(key, value) == null) {

View file

@ -111,7 +111,7 @@ private[spark] class TaskContextImpl(
if (failed) return
failed = true
failure = error
invokeListeners(onFailureCallbacks, "TaskFailureListener", Option(error)) {
invokeListeners(onFailureCallbacks.toSeq, "TaskFailureListener", Option(error)) {
_.onTaskFailure(this, error)
}
}
@ -120,7 +120,7 @@ private[spark] class TaskContextImpl(
private[spark] override def markTaskCompleted(error: Option[Throwable]): Unit = synchronized {
if (completed) return
completed = true
invokeListeners(onCompleteCallbacks, "TaskCompletionListener", error) {
invokeListeners(onCompleteCallbacks.toSeq, "TaskCompletionListener", error) {
_.onTaskCompletion(this)
}
}
@ -142,7 +142,7 @@ private[spark] class TaskContextImpl(
}
}
if (errorMsgs.nonEmpty) {
throw new TaskCompletionListenerException(errorMsgs, error)
throw new TaskCompletionListenerException(errorMsgs.toSeq, error)
}
}

View file

@ -256,7 +256,7 @@ object JavaRDD {
} catch {
case eof: EOFException => // No-op
}
JavaRDD.fromRDD(sc.parallelize(objs, parallelism))
JavaRDD.fromRDD(sc.parallelize(objs.toSeq, parallelism))
} finally {
din.close()
}

View file

@ -265,14 +265,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: JList[String]): JavaRDD[String] = {
rdd.pipe(command.asScala)
rdd.pipe(command.asScala.toSeq)
}
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala)
rdd.pipe(command.asScala.toSeq, env.asScala)
}
/**
@ -282,7 +282,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
env: JMap[String, String],
separateWorkingDir: Boolean,
bufferSize: Int): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize)
rdd.pipe(command.asScala.toSeq, env.asScala, null, null, separateWorkingDir, bufferSize)
}
/**
@ -293,7 +293,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
separateWorkingDir: Boolean,
bufferSize: Int,
encoding: String): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize, encoding)
rdd.pipe(command.asScala.toSeq, env.asScala, null, null, separateWorkingDir, bufferSize,
encoding)
}
/**

View file

@ -133,7 +133,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
sc.parallelize(list.asScala, numSlices)
sc.parallelize(list.asScala.toSeq, numSlices)
}
/** Get an RDD that has no partitions or elements. */
@ -152,7 +152,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
: JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[V] = fakeClassTag
JavaPairRDD.fromRDD(sc.parallelize(list.asScala, numSlices))
JavaPairRDD.fromRDD(sc.parallelize(list.asScala.toSeq, numSlices))
}
/** Distribute a local Scala collection to form an RDD. */
@ -161,7 +161,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {
/** Distribute a local Scala collection to form an RDD. */
def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD =
JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()), numSlices))
JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()).toSeq, numSlices))
/** Distribute a local Scala collection to form an RDD. */
def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD =

View file

@ -163,7 +163,7 @@ private[spark] object PythonRDD extends Logging {
type ByteArray = Array[Byte]
type UnrolledPartition = Array[ByteArray]
val allPartitions: Array[UnrolledPartition] =
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala)
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala.toSeq)
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
serveIterator(flattenedPartition.iterator,
s"serve RDD ${rdd.id} with partitions ${partitions.asScala.mkString(",")}")

View file

@ -54,7 +54,7 @@ private[spark] object PythonUtils {
* Convert list of T into seq of T (for calling API with varargs)
*/
def toSeq[T](vs: JList[T]): Seq[T] = {
vs.asScala
vs.asScala.toSeq
}
/**

View file

@ -205,7 +205,7 @@ private object FaultToleranceTest extends App with Logging {
private def addWorkers(num: Int): Unit = {
logInfo(s">>>>> ADD WORKERS $num <<<<<")
val masterUrls = getMasterUrls(masters)
val masterUrls = getMasterUrls(masters.toSeq)
(1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
}
@ -216,7 +216,7 @@ private object FaultToleranceTest extends App with Logging {
// Counter-hack: Because of a hack in SparkEnv#create() that changes this
// property, we need to reset it.
System.setProperty(config.DRIVER_PORT.key, "0")
sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
sc = new SparkContext(getMasterUrls(masters.toSeq), "fault-tolerance", containerSparkHome)
}
private def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
@ -279,7 +279,7 @@ private object FaultToleranceTest extends App with Logging {
var liveWorkerIPs: Seq[String] = List()
def stateValid(): Boolean = {
(workers.map(_.ip) -- liveWorkerIPs).isEmpty &&
workers.map(_.ip).forall(liveWorkerIPs.contains) &&
numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1
}

View file

@ -69,7 +69,7 @@ object PythonRunner {
pathElements ++= formattedPyFiles
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
val pythonPath = PythonUtils.mergePythonPaths(pathElements.toSeq: _*)
// Launch Python process
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)

View file

@ -820,7 +820,7 @@ private[spark] class SparkSubmit extends Logging {
}
sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toSeq)
(childArgs, childClasspath, sparkConf, childMainClass)
(childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
}
private def renameResourcesToLocalFS(resources: String, localResources: String): String = {

View file

@ -52,7 +52,7 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer
override def read[T: ClassTag](prefix: String): Seq[T] = {
zk.getChildren.forPath(workingDir).asScala
.filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T])
.filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T]).toSeq
}
override def close(): Unit = {

View file

@ -422,7 +422,7 @@ private[spark] object RestSubmissionClient {
private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = {
env.filterKeys { k =>
(k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_")
}
}.toMap
}
private[spark] def supportsRestClient(master: String): Boolean = {

View file

@ -61,7 +61,7 @@ object CommandUtils extends Logging {
// SPARK-698: do not call the run.cmd script, as process.destroy()
// fails to kill a process tree on Windows
val cmd = new WorkerCommandBuilder(sparkHome, memory, command).buildCommand()
cmd.asScala ++ Seq(command.mainClass) ++ command.arguments
(cmd.asScala ++ Seq(command.mainClass) ++ command.arguments).toSeq
}
/**

View file

@ -201,7 +201,7 @@ private[deploy] class DriverRunner(
CommandUtils.redirectStream(process.getInputStream, stdout)
val stderr = new File(baseDir, "stderr")
val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala)
val redactedCommand = Utils.redactCommandLineArgs(conf, builder.command.asScala.toSeq)
.mkString("\"", "\" \"", "\"")
val header = "Launch Command: %s\n%s\n\n".format(redactedCommand, "=" * 40)
Files.append(header, stderr, StandardCharsets.UTF_8)
@ -262,6 +262,6 @@ private[deploy] trait ProcessBuilderLike {
private[deploy] object ProcessBuilderLike {
def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike {
override def start(): Process = processBuilder.start()
override def command: Seq[String] = processBuilder.command().asScala
override def command: Seq[String] = processBuilder.command().asScala.toSeq
}
}

View file

@ -158,7 +158,7 @@ private[deploy] class ExecutorRunner(
val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala)
val redactedCommand = Utils.redactCommandLineArgs(conf, command.asScala.toSeq)
.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $redactedCommand")

View file

@ -140,13 +140,13 @@ private[spark] class CoarseGrainedExecutorBackend(
def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2))
.map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)).toMap
}
def extractAttributes: Map[String, String] = {
val prefix = "SPARK_EXECUTOR_ATTRIBUTE_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2))
.map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2)).toMap
}
override def receive: PartialFunction[Any, Unit] = {
@ -304,8 +304,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env,
arguments.resourcesFileOpt, resourceProfile)
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
env, arguments.resourcesFileOpt, resourceProfile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
System.exit(0)

View file

@ -606,7 +606,8 @@ private[spark] class Executor(
// Here and below, put task metric peaks in a WrappedArray to expose them as a Seq
// without requiring a copy.
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
val serializedTK = ser.serialize(TaskKilled(t.reason, accUpdates, accums, metricPeaks))
val serializedTK = ser.serialize(
TaskKilled(t.reason, accUpdates, accums, metricPeaks.toSeq))
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)
case _: InterruptedException | NonFatal(_) if
@ -616,7 +617,8 @@ private[spark] class Executor(
val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTimeNs)
val metricPeaks = WrappedArray.make(metricsPoller.getTaskMetricPeaks(taskId))
val serializedTK = ser.serialize(TaskKilled(killReason, accUpdates, accums, metricPeaks))
val serializedTK = ser.serialize(
TaskKilled(killReason, accUpdates, accums, metricPeaks.toSeq))
execBackend.statusUpdate(taskId, TaskState.KILLED, serializedTK)
case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>
@ -661,13 +663,13 @@ private[spark] class Executor(
val serializedTaskEndReason = {
try {
val ef = new ExceptionFailure(t, accUpdates).withAccums(accums)
.withMetricPeaks(metricPeaks)
.withMetricPeaks(metricPeaks.toSeq)
ser.serialize(ef)
} catch {
case _: NotSerializableException =>
// t is not serializable so just send the stacktrace
val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums)
.withMetricPeaks(metricPeaks)
.withMetricPeaks(metricPeaks.toSeq)
ser.serialize(ef)
}
}

View file

@ -123,7 +123,7 @@ class TaskMetrics private[spark] () extends Serializable {
def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = {
// This is called on driver. All accumulator updates have a fixed value. So it's safe to use
// `asScala` which accesses the internal values using `java.util.Iterator`.
_updatedBlockStatuses.value.asScala
_updatedBlockStatuses.value.asScala.toSeq
}
// Setters and increment-ers
@ -199,7 +199,7 @@ class TaskMetrics private[spark] () extends Serializable {
*/
private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
if (tempShuffleReadMetrics.nonEmpty) {
shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics)
shuffleReadMetrics.setMergeValues(tempShuffleReadMetrics.toSeq)
}
}

View file

@ -156,7 +156,7 @@ private[spark] class MetricsSystem private (
}
def getSourcesByName(sourceName: String): Seq[Source] =
sources.filter(_.sourceName == sourceName)
sources.filter(_.sourceName == sourceName).toSeq
def registerSource(source: Source): Unit = {
sources += source

View file

@ -934,7 +934,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
for (pair <- it if pair._1 == key) {
buf += pair._2
}
buf
buf.toSeq
} : Seq[V]
val res = self.context.runJob(self, process, Array(index))
res(0)

View file

@ -133,12 +133,11 @@ private object ParallelCollectionRDD {
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
} else {
new Range.Inclusive(r.start + start * r.step, r.start + (end - 1) * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] =>
case nr: NumericRange[T] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
@ -147,7 +146,7 @@ private object ParallelCollectionRDD {
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
slices.toSeq
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map { case (start, end) =>

View file

@ -238,7 +238,7 @@ private object PipedRDD {
while(tok.hasMoreElements) {
buf += tok.nextToken()
}
buf
buf.toSeq
}
val STDIN_WRITER_THREAD_PREFIX = "stdin writer for"

View file

@ -98,7 +98,7 @@ class UnionRDD[T: ClassTag](
deps += new RangeDependency(rdd, 0, pos, rdd.partitions.length)
pos += rdd.partitions.length
}
deps
deps.toSeq
}
override def compute(s: Partition, context: TaskContext): Iterator[T] = {

View file

@ -319,12 +319,13 @@ object ResourceProfile extends Logging {
private[spark] def getCustomTaskResources(
rp: ResourceProfile): Map[String, TaskResourceRequest] = {
rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS))
rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
}
private[spark] def getCustomExecutorResources(
rp: ResourceProfile): Map[String, ExecutorResourceRequest] = {
rp.executorResources.filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k))
rp.executorResources.
filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
}
/*

View file

@ -232,7 +232,7 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
// For testing only.
private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = {
queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }
queues.asScala.flatMap { queue => queue.findListenersByClass[T]() }.toSeq
}
// For testing only.

View file

@ -69,7 +69,7 @@ object SplitInfo {
for (host <- mapredSplit.getLocations) {
retval += new SplitInfo(inputFormatClazz, host, path, length, mapredSplit)
}
retval
retval.toSeq
}
def toSplitInfo(inputFormatClazz: Class[_], path: String,
@ -79,6 +79,6 @@ object SplitInfo {
for (host <- mapreduceSplit.getLocations) {
retval += new SplitInfo(inputFormatClazz, host, path, length, mapreduceSplit)
}
retval
retval.toSeq
}
}

View file

@ -47,19 +47,19 @@ class StatsReportListener extends SparkListener with Logging {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
implicit val sc = stageCompleted
this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}")
showMillisDistribution("task runtime:", (info, _) => info.duration, taskInfoMetrics)
showMillisDistribution("task runtime:", (info, _) => info.duration, taskInfoMetrics.toSeq)
// Shuffle write
showBytesDistribution("shuffle bytes written:",
(_, metric) => metric.shuffleWriteMetrics.bytesWritten, taskInfoMetrics)
(_, metric) => metric.shuffleWriteMetrics.bytesWritten, taskInfoMetrics.toSeq)
// Fetch & I/O
showMillisDistribution("fetch wait time:",
(_, metric) => metric.shuffleReadMetrics.fetchWaitTime, taskInfoMetrics)
(_, metric) => metric.shuffleReadMetrics.fetchWaitTime, taskInfoMetrics.toSeq)
showBytesDistribution("remote bytes read:",
(_, metric) => metric.shuffleReadMetrics.remoteBytesRead, taskInfoMetrics)
(_, metric) => metric.shuffleReadMetrics.remoteBytesRead, taskInfoMetrics.toSeq)
showBytesDistribution("task result size:",
(_, metric) => metric.resultSize, taskInfoMetrics)
(_, metric) => metric.resultSize, taskInfoMetrics.toSeq)
// Runtime breakdown
val runtimePcts = taskInfoMetrics.map { case (info, metrics) =>

View file

@ -71,7 +71,7 @@ private[spark] class DirectTaskResult[T](
for (i <- 0 until numUpdates) {
_accumUpdates += in.readObject.asInstanceOf[AccumulatorV2[_, _]]
}
accumUpdates = _accumUpdates
accumUpdates = _accumUpdates.toSeq
}
val numMetrics = in.readInt

View file

@ -137,7 +137,7 @@ private[spark] class TaskSchedulerImpl(
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
def runningTasksByExecutors: Map[String, Int] = synchronized {
executorIdToRunningTaskIds.toMap.mapValues(_.size)
executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
}
// The set of executors we have on each host; this is used to compute hostsAlive, which
@ -719,7 +719,7 @@ private[spark] class TaskSchedulerImpl(
if (tasks.nonEmpty) {
hasLaunchedTask = true
}
return tasks
return tasks.map(_.toSeq)
}
private def createUnschedulableTaskSetAbortTimer(

View file

@ -367,7 +367,7 @@ private[storage] class BlockInfoManager extends Logging {
notifyAll()
blocksWithReleasedLocks
blocksWithReleasedLocks.toSeq
}
/** Returns the number of locks held by the given task. Used only for testing. */

View file

@ -98,7 +98,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
}
}.filter(_ != null).flatMap { dir =>
val files = dir.listFiles()
if (files != null) files else Seq.empty
if (files != null) files.toSeq else Seq.empty
}
}

View file

@ -368,25 +368,25 @@ final class ShuffleBlockFetcherIterator(
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Unit = {
val iterator = blockInfos.iterator
var curRequestSize = 0L
var curBlocks = new ArrayBuffer[FetchBlockInfo]
var curBlocks = Seq.empty[FetchBlockInfo]
while (iterator.hasNext) {
val (blockId, size, mapIndex) = iterator.next()
assertPositiveBlockSize(blockId, size)
curBlocks += FetchBlockInfo(blockId, size, mapIndex)
curBlocks = curBlocks ++ Seq(FetchBlockInfo(blockId, size, mapIndex))
curRequestSize += size
// For batch fetch, the actual block in flight should count for merged block.
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
curBlocks = createFetchRequests(curBlocks, address, isLast = false,
collectedRemoteRequests).to[ArrayBuffer]
collectedRemoteRequests)
curRequestSize = curBlocks.map(_.size).sum
}
}
// Add in the final request
if (curBlocks.nonEmpty) {
curBlocks = createFetchRequests(curBlocks, address, isLast = true,
collectedRemoteRequests).to[ArrayBuffer]
collectedRemoteRequests)
curRequestSize = curBlocks.map(_.size).sum
}
}
@ -928,7 +928,7 @@ object ShuffleBlockFetcherIterator {
} else {
blocks
}
result
result.toSeq
}
/**

View file

@ -443,7 +443,7 @@ private[spark] object UIUtils extends Logging {
</th>
case None => <th width={colWidthAttr} class={getClass(x._2)}>{getHeaderContent(x._1)}</th>
}
}
}.toSeq
}
<table class={listingTableClass} id={id.map(Text.apply)}>
<thead>{headerRow}</thead>

View file

@ -58,11 +58,11 @@ private[spark] abstract class WebUI(
private val className = Utils.getFormattedClassName(this)
def getBasePath: String = basePath
def getTabs: Seq[WebUITab] = tabs
def getHandlers: Seq[ServletContextHandler] = handlers
def getTabs: Seq[WebUITab] = tabs.toSeq
def getHandlers: Seq[ServletContextHandler] = handlers.toSeq
def getDelegatingHandlers: Seq[DelegatingServletContextHandler] = {
handlers.map(new DelegatingServletContextHandler(_))
handlers.map(new DelegatingServletContextHandler(_)).toSeq
}
/** Attaches a tab to this UI, along with all of its attached pages. */

View file

@ -259,11 +259,11 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
}
val activeJobsTable =
jobsTable(request, "active", "activeJob", activeJobs, killEnabled = parent.killEnabled)
jobsTable(request, "active", "activeJob", activeJobs.toSeq, killEnabled = parent.killEnabled)
val completedJobsTable =
jobsTable(request, "completed", "completedJob", completedJobs, killEnabled = false)
jobsTable(request, "completed", "completedJob", completedJobs.toSeq, killEnabled = false)
val failedJobsTable =
jobsTable(request, "failed", "failedJob", failedJobs, killEnabled = false)
jobsTable(request, "failed", "failedJob", failedJobs.toSeq, killEnabled = false)
val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
@ -330,7 +330,7 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
</div>
var content = summary
content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs,
content ++= makeTimeline((activeJobs ++ completedJobs ++ failedJobs).toSeq,
store.executorList(false), startTime)
if (shouldShowActiveJobs) {

View file

@ -288,20 +288,20 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
}
val activeStagesTable =
new StageTableBase(store, request, activeStages, "active", "activeStage", parent.basePath,
basePath, parent.isFairScheduler,
new StageTableBase(store, request, activeStages.toSeq, "active", "activeStage",
parent.basePath, basePath, parent.isFairScheduler,
killEnabled = parent.killEnabled, isFailedStage = false)
val pendingOrSkippedStagesTable =
new StageTableBase(store, request, pendingOrSkippedStages, pendingOrSkippedTableId,
new StageTableBase(store, request, pendingOrSkippedStages.toSeq, pendingOrSkippedTableId,
"pendingStage", parent.basePath, basePath, parent.isFairScheduler,
killEnabled = false, isFailedStage = false)
val completedStagesTable =
new StageTableBase(store, request, completedStages, "completed", "completedStage",
new StageTableBase(store, request, completedStages.toSeq, "completed", "completedStage",
parent.basePath, basePath, parent.isFairScheduler,
killEnabled = false, isFailedStage = false)
val failedStagesTable =
new StageTableBase(store, request, failedStages, "failed", "failedStage", parent.basePath,
basePath, parent.isFairScheduler,
new StageTableBase(store, request, failedStages.toSeq, "failed", "failedStage",
parent.basePath, basePath, parent.isFairScheduler,
killEnabled = false, isFailedStage = true)
val shouldShowActiveStages = activeStages.nonEmpty
@ -391,7 +391,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
var content = summary
val appStartTime = store.applicationInfo().attempts.head.startTime.getTime()
content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
content ++= makeTimeline((activeStages ++ completedStages ++ failedStages).toSeq,
store.executorList(false), appStartTime)
val operationGraphContent = store.asOption(store.operationGraphForJob(jobId)) match {

View file

@ -81,11 +81,11 @@ private[spark] class RDDOperationCluster(
/** Return all the nodes which are cached. */
def getCachedNodes: Seq[RDDOperationNode] = {
_childNodes.filter(_.cached) ++ _childClusters.flatMap(_.getCachedNodes)
(_childNodes.filter(_.cached) ++ _childClusters.flatMap(_.getCachedNodes)).toSeq
}
def getBarrierClusters: Seq[RDDOperationCluster] = {
_childClusters.filter(_.barrier) ++ _childClusters.flatMap(_.getBarrierClusters)
(_childClusters.filter(_.barrier) ++ _childClusters.flatMap(_.getBarrierClusters)).toSeq
}
def canEqual(other: Any): Boolean = other.isInstanceOf[RDDOperationCluster]
@ -210,7 +210,7 @@ private[spark] object RDDOperationGraph extends Logging {
}
}
RDDOperationGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster)
RDDOperationGraph(internalEdges.toSeq, outgoingEdges.toSeq, incomingEdges.toSeq, rootCluster)
}
/**

View file

@ -1214,7 +1214,8 @@ private[spark] object JsonProtocol {
case Some(id) => id.extract[Int]
case None => ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
}
new ExecutorInfo(executorHost, totalCores, logUrls, attributes, resources, resourceProfileId)
new ExecutorInfo(executorHost, totalCores, logUrls, attributes.toMap, resources.toMap,
resourceProfileId)
}
def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {

View file

@ -1742,7 +1742,7 @@ private[spark] object Utils extends Logging {
if (inWord || inDoubleQuote || inSingleQuote) {
endWord()
}
buf
buf.toSeq
}
/* Calculates 'x' modulo 'mod', takes to consideration sign of x,

View file

@ -659,7 +659,7 @@ private[spark] class ExternalSorter[K, V, C](
}
} else {
// Merge spilled and in-memory data
merge(spills, destructiveIterator(
merge(spills.toSeq, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}

View file

@ -157,7 +157,7 @@ private class SaveInfoListener extends SparkListener {
def getCompletedStageInfos: Seq[StageInfo] = completedStageInfos.toArray.toSeq
def getCompletedTaskInfos: Seq[TaskInfo] = completedTaskInfos.values.flatten.toSeq
def getCompletedTaskInfos(stageId: StageId, stageAttemptId: StageAttemptId): Seq[TaskInfo] =
completedTaskInfos.getOrElse((stageId, stageAttemptId), Seq.empty[TaskInfo])
completedTaskInfos.getOrElse((stageId, stageAttemptId), Seq.empty[TaskInfo]).toSeq
/**
* If `jobCompletionCallback` is set, block until the next call has finished.

View file

@ -291,14 +291,14 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = broadcastBuffer.map(_.id)
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds.toSeq)
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1.second))
}
// Test that GC triggers the cleanup of all variables after the dereferencing them
val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds.toSeq)
broadcastBuffer.clear()
rddBuffer.clear()
runGC()
@ -331,14 +331,14 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
val shuffleIds = 0 until sc.newShuffleId
val broadcastIds = broadcastBuffer.map(_.id)
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds.toSeq)
runGC()
intercept[Exception] {
preGCTester.assertCleanup()(timeout(1.second))
}
// Test that GC triggers the cleanup of all variables after the dereferencing them
val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds)
val postGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds.toSeq)
broadcastBuffer.clear()
rddBuffer.clear()
runGC()

View file

@ -261,7 +261,7 @@ class HeartbeatReceiverSuite
// We may receive undesired SparkListenerExecutorAdded from LocalSchedulerBackend,
// so exclude it from the map. See SPARK-10800.
heartbeatReceiver.invokePrivate(_executorLastSeen()).
filterKeys(_ != SparkContext.DRIVER_IDENTIFIER)
filterKeys(_ != SparkContext.DRIVER_IDENTIFIER).toMap
}
}
@ -287,6 +287,8 @@ private class FakeSchedulerBackend(
resourceProfileManager: ResourceProfileManager)
extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
def this() = this(null, null, null, null)
protected override def doRequestTotalExecutors(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
clusterManagerEndpoint.ask[Boolean](

View file

@ -220,7 +220,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
super.registerAccumulatorForCleanup(a)
}
def accumsRegisteredForCleanup: Seq[Long] = accumsRegistered.toArray
def accumsRegisteredForCleanup: Seq[Long] = accumsRegistered.toSeq
}
}

View file

@ -182,7 +182,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2))
.map(p => (p._1, p._2.map(_.toArray)))
.map(p => (p._1, p._2.map(_.toSeq)))
.collectAsMap()
assert(results(1)(0).length === 3)

View file

@ -317,7 +317,7 @@ private[deploy] object IvyTestUtils {
val rFiles = createRFiles(root, className, artifact.groupId)
allFiles.append(rFiles: _*)
}
val jarFile = packJar(jarPath, artifact, allFiles, useIvyLayout, withR)
val jarFile = packJar(jarPath, artifact, allFiles.toSeq, useIvyLayout, withR)
assert(jarFile.exists(), "Problem creating Jar file")
val descriptor = createDescriptor(tempPath, artifact, dependencies, useIvyLayout)
assert(descriptor.exists(), "Problem creating Pom file")

View file

@ -140,7 +140,7 @@ class ParallelCollectionSplitSuite extends SparkFunSuite with Checkers {
assert(slices(i).isInstanceOf[Range])
val range = slices(i).asInstanceOf[Range]
assert(range.start === i * (N / 40), "slice " + i + " start")
assert(range.end === (i + 1) * (N / 40), "slice " + i + " end")
assert(range.last === (i + 1) * (N / 40) - 1, "slice " + i + " end")
assert(range.step === 1, "slice " + i + " step")
}
}

View file

@ -656,7 +656,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
}
test("top with predefined ordering") {
val nums = Array.range(1, 100000)
val nums = Seq.range(1, 100000)
val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
val topK = ints.top(5)
assert(topK.size === 5)
@ -1098,7 +1098,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
override def getPartitions: Array[Partition] = Array(new Partition {
override def index: Int = 0
})
override def getDependencies: Seq[Dependency[_]] = mutableDependencies
override def getDependencies: Seq[Dependency[_]] = mutableDependencies.toSeq
def addDependency(dep: Dependency[_]): Unit = {
mutableDependencies += dep
}

View file

@ -325,7 +325,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
8000L, 5000L, 7000L, 4000L, 6000L, 3000L, 10L, 90L, 2L, 20L)
def max(a: Array[Long], b: Array[Long]): Array[Long] =
(a, b).zipped.map(Math.max)
(a, b).zipped.map(Math.max).toArray
// calculated metric peaks per stage per executor
// metrics sent during stage 0 for each executor

View file

@ -26,7 +26,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
test("Track Executor Resource information") {
// Init Executor Resource.
val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1)
val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1)
assert(info.availableAddrs.sorted sameElements Seq("0", "1", "2", "3"))
assert(info.assignedAddrs.isEmpty)
@ -43,7 +43,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
test("Don't allow acquire address that is not available") {
// Init Executor Resource.
val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1)
val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1)
// Acquire some addresses.
info.acquire(Seq("0", "1"))
assert(!info.availableAddrs.contains("1"))
@ -56,7 +56,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
test("Don't allow acquire address that doesn't exist") {
// Init Executor Resource.
val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1)
val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1)
assert(!info.availableAddrs.contains("4"))
// Acquire an address that doesn't exist
val e = intercept[SparkException] {
@ -67,7 +67,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
test("Don't allow release address that is not assigned") {
// Init Executor Resource.
val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1)
val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1)
// Acquire addresses
info.acquire(Array("0", "1"))
assert(!info.assignedAddrs.contains("2"))
@ -80,7 +80,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
test("Don't allow release address that doesn't exist") {
// Init Executor Resource.
val info = new ExecutorResourceInfo(GPU, ArrayBuffer("0", "1", "2", "3"), 1)
val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1)
assert(!info.assignedAddrs.contains("4"))
// Release an address that doesn't exist
val e = intercept[SparkException] {
@ -93,7 +93,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
val slotSeq = Seq(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
val addresses = ArrayBuffer("0", "1", "2", "3")
slotSeq.foreach { slots =>
val info = new ExecutorResourceInfo(GPU, addresses, slots)
val info = new ExecutorResourceInfo(GPU, addresses.toSeq, slots)
for (_ <- 0 until slots) {
addresses.foreach(addr => info.acquire(Seq(addr)))
}

View file

@ -621,7 +621,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
override def onStageCompleted(stage: SparkListenerStageCompleted): Unit = {
stageInfos(stage.stageInfo) = taskInfoMetrics
stageInfos(stage.stageInfo) = taskInfoMetrics.toSeq
taskInfoMetrics = mutable.Buffer.empty
}
}

View file

@ -103,7 +103,7 @@ private class MyTaskResultGetter(env: SparkEnv, scheduler: TaskSchedulerImpl)
// DirectTaskResults that we receive from the executors
private val _taskResults = new ArrayBuffer[DirectTaskResult[_]]
def taskResults: Seq[DirectTaskResult[_]] = _taskResults
def taskResults: Seq[DirectTaskResult[_]] = _taskResults.toSeq
override def enqueueSuccessfulTask(tsm: TaskSetManager, tid: Long, data: ByteBuffer): Unit = {
// work on a copy since the super class still needs to use the buffer

View file

@ -1670,7 +1670,7 @@ class TaskSetManagerSuite
for (i <- 0 to 99) {
locations += Seq(TaskLocation("host" + i))
}
val taskSet = FakeTask.createTaskSet(100, locations: _*)
val taskSet = FakeTask.createTaskSet(100, locations.toSeq: _*)
val clock = new ManualClock
// make sure we only do one rack resolution call, for the entire batch of hosts, as this
// can be expensive. The FakeTaskScheduler calls rack resolution more than the real one

View file

@ -1047,7 +1047,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer()
)
val transfer = createMockTransfer(blocks.mapValues(_ => createMockManagedBuffer(0)))
val transfer = createMockTransfer(blocks.mapValues(_ => createMockManagedBuffer(0)).toMap)
val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])](
(remoteBmId, blocks.keys.map(blockId => (blockId, 1L, 0)).toSeq))

View file

@ -3159,7 +3159,7 @@
<profile>
<id>scala-2.13</id>
<properties>
<scala.version>2.13.1</scala.version>
<scala.version>2.13.3</scala.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>

View file

@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, SparkD
*/
class StreamProgress(
val baseMap: immutable.Map[SparkDataStream, OffsetV2] =
new immutable.HashMap[SparkDataStream, OffsetV2])
new immutable.HashMap[SparkDataStream, OffsetV2])
extends scala.collection.immutable.Map[SparkDataStream, OffsetV2] {
// Note: this class supports Scala 2.13. A parallel source tree has a 2.12 implementation.