[SPARK-29292][YARN][K8S][MESOS] Fix Scala 2.13 compilation for remaining modules

### What changes were proposed in this pull request?

See again the related PRs like https://github.com/apache/spark/pull/28971
This completes fixing compilation for 2.13 for all but `repl`, which is a separate task.

### Why are the changes needed?

Eventually, we need to support a Scala 2.13 build, perhaps in Spark 3.1.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests. (2.13 was not tested; this is about getting it to compile without breaking 2.12)

Closes #29147 from srowen/SPARK-29292.4.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Sean Owen 2020-07-18 15:08:00 -07:00 committed by Dongjoon Hyun
parent f9f9309bec
commit ee624821a9
17 changed files with 36 additions and 34 deletions

View file

@ -97,7 +97,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
} }
def getShards(): Seq[Shard] = { def getShards(): Seq[Shard] = {
kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala kinesisClient.describeStream(_streamName).getStreamDescription.getShards.asScala.toSeq
} }
def splitShard(shardId: String): Unit = { def splitShard(shardId: String): Unit = {
@ -137,7 +137,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: Int = 2) extends Loggi
* Expose a Python friendly API. * Expose a Python friendly API.
*/ */
def pushData(testData: java.util.List[Int]): Unit = { def pushData(testData: java.util.List[Int]): Unit = {
pushData(testData.asScala, aggregate = false) pushData(testData.asScala.toSeq, aggregate = false)
} }
def deleteStream(): Unit = { def deleteStream(): Unit = {
@ -289,6 +289,6 @@ private[kinesis] class SimpleDataGenerator(
sentSeqNumbers += ((num, seqNumber)) sentSeqNumbers += ((num, seqNumber))
} }
shardIdToSeqNumbers.toMap shardIdToSeqNumbers.mapValues(_.toSeq).toMap
} }
} }

View file

@ -72,6 +72,6 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG
Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService) Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService)
} }
producer.flushSync() producer.flushSync()
shardIdToSeqNumbers.toMap shardIdToSeqNumbers.mapValues(_.toSeq).toMap
} }
} }

View file

@ -47,8 +47,8 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean)
require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards") require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards")
shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
shardIdToData = shardIdToDataAndSeqNumbers.mapValues { _.map { _._1 }} shardIdToData = shardIdToDataAndSeqNumbers.mapValues(_.map(_._1)).toMap
shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues { _.map { _._2 }} shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues(_.map(_._2)).toMap
shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
val seqNumRange = SequenceNumberRange( val seqNumRange = SequenceNumberRange(
testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size) testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size)

View file

@ -62,7 +62,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
.withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true") .withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
.list() .list()
.getItems .getItems
.asScala) .asScala.toSeq)
} }
} }

View file

@ -131,7 +131,7 @@ private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
try { try {
val snapshots = new ArrayList[ExecutorPodsSnapshot]() val snapshots = new ArrayList[ExecutorPodsSnapshot]()
snapshotsBuffer.drainTo(snapshots) snapshotsBuffer.drainTo(snapshots)
onNewSnapshots(snapshots.asScala) onNewSnapshots(snapshots.asScala.toSeq)
} catch { } catch {
case NonFatal(e) => logWarning("Exception when notifying snapshot subscriber.", e) case NonFatal(e) => logWarning("Exception when notifying snapshot subscriber.", e)
} finally { } finally {

View file

@ -35,7 +35,7 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore
override def stop(): Unit = {} override def stop(): Unit = {}
override def notifySubscribers(): Unit = { override def notifySubscribers(): Unit = {
subscribers.foreach(_(snapshotsBuffer)) subscribers.foreach(_(snapshotsBuffer.toSeq))
snapshotsBuffer.clear() snapshotsBuffer.clear()
} }

View file

@ -47,6 +47,6 @@ object ProcessUtils extends Logging {
assert(proc.exitValue == 0, assert(proc.exitValue == 0,
s"Failed to execute ${fullCommand.mkString(" ")}" + s"Failed to execute ${fullCommand.mkString(" ")}" +
s"${if (dumpErrors) "\n" + outputLines.mkString("\n")}") s"${if (dumpErrors) "\n" + outputLines.mkString("\n")}")
outputLines outputLines.toSeq
} }
} }

View file

@ -383,13 +383,13 @@ private[spark] class MesosClusterScheduler(
taskId.split(s"${RETRY_SEP}").head taskId.split(s"${RETRY_SEP}").head
} }
private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = { private def adjust[A, B](m: Map[A, B], k: A, default: B)(f: B => B) = {
m.updated(k, f(m.getOrElse(k, default))) m.updated(k, f(m.getOrElse(k, default)))
} }
private def getDriverEnvironment(desc: MesosDriverDescription): Environment = { private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
// TODO(mgummelt): Don't do this here. This should be passed as a --conf // TODO(mgummelt): Don't do this here. This should be passed as a --conf
val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")( val commandEnv = adjust(desc.command.environment.toMap, "SPARK_SUBMIT_OPTS", "")(
v => s"$v -D${config.DRIVER_FRAMEWORK_ID.key}=${getDriverFrameworkID(desc)}" v => s"$v -D${config.DRIVER_FRAMEWORK_ID.key}=${getDriverFrameworkID(desc)}"
) )
@ -686,14 +686,14 @@ private[spark] class MesosClusterScheduler(
} }
scheduleTasks( scheduleTasks(
copyBuffer(driversToRetry), copyBuffer(driversToRetry).toSeq,
removeFromPendingRetryDrivers, removeFromPendingRetryDrivers,
currentOffers, currentOffers,
tasks) tasks)
// Then we walk through the queued drivers and try to schedule them. // Then we walk through the queued drivers and try to schedule them.
scheduleTasks( scheduleTasks(
copyBuffer(queuedDrivers), copyBuffer(queuedDrivers).toSeq,
removeFromQueuedDrivers, removeFromQueuedDrivers,
currentOffers, currentOffers,
tasks) tasks)

View file

@ -491,8 +491,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
// offerID -> resources // offerID -> resources
val remainingResources = mutable.Map(offers.map(offer => val remainingResources = mutable.Map[String, JList[Resource]]()
(offer.getId.getValue, offer.getResourcesList)): _*) remainingResources ++= offers.map(offer => (offer.getId.getValue, offer.getResourcesList))
var launchTasks = true var launchTasks = true

View file

@ -379,7 +379,7 @@ trait MesosSchedulerUtils extends Logging {
} else { } else {
v.split(',').toSet v.split(',').toSet
} }
) ).toMap
} catch { } catch {
case NonFatal(e) => case NonFatal(e) =>
throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e) throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e)

View file

@ -146,7 +146,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val resources = taskInfo.getResourcesList val resources = taskInfo.getResourcesList
assert(scheduler.getResource(resources, "cpus") == 1.5) assert(scheduler.getResource(resources, "cpus") == 1.5)
assert(scheduler.getResource(resources, "mem") == 1200) assert(scheduler.getResource(resources, "mem") == 1200)
val resourcesSeq: Seq[Resource] = resources.asScala val resourcesSeq: Seq[Resource] = resources.asScala.toSeq
val cpus = resourcesSeq.filter(_.getName == "cpus").toList val cpus = resourcesSeq.filter(_.getName == "cpus").toList
assert(cpus.size == 2) assert(cpus.size == 2)
assert(cpus.exists(_.getRole() == "role2")) assert(cpus.exists(_.getRole() == "role2"))

View file

@ -267,7 +267,8 @@ class MesosFineGrainedSchedulerBackendSuite
properties = new Properties(), properties = new Properties(),
resources = immutable.Map.empty[String, ResourceInformation], resources = immutable.Map.empty[String, ResourceInformation],
ByteBuffer.wrap(new Array[Byte](0))) ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) when(taskScheduler.resourceOffers(
expectedWorkerOffers.toIndexedSeq)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2) when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
@ -379,7 +380,8 @@ class MesosFineGrainedSchedulerBackendSuite
properties = new Properties(), properties = new Properties(),
resources = immutable.Map.empty[String, ResourceInformation], resources = immutable.Map.empty[String, ResourceInformation],
ByteBuffer.wrap(new Array[Byte](0))) ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) when(taskScheduler.resourceOffers(
expectedWorkerOffers.toIndexedSeq)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(1) when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])

View file

@ -553,7 +553,7 @@ private[spark] class Client(
} }
// Propagate the local URIs to the containers using the configuration. // Propagate the local URIs to the containers using the configuration.
sparkConf.set(SPARK_JARS, localJars) sparkConf.set(SPARK_JARS, localJars.toSeq)
case None => case None =>
// No configuration, so fall back to uploading local jar files. // No configuration, so fall back to uploading local jar files.
@ -628,7 +628,7 @@ private[spark] class Client(
} }
} }
if (cachedSecondaryJarLinks.nonEmpty) { if (cachedSecondaryJarLinks.nonEmpty) {
sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks) sparkConf.set(SECONDARY_JARS, cachedSecondaryJarLinks.toSeq)
} }
if (isClusterMode && args.primaryPyFile != null) { if (isClusterMode && args.primaryPyFile != null) {

View file

@ -91,11 +91,11 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* Writes down information about cached files needed in executors to the given configuration. * Writes down information about cached files needed in executors to the given configuration.
*/ */
def updateConfiguration(conf: SparkConf): Unit = { def updateConfiguration(conf: SparkConf): Unit = {
conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString)) conf.set(CACHED_FILES, distCacheEntries.map(_.uri.toString).toSeq)
conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size)) conf.set(CACHED_FILES_SIZES, distCacheEntries.map(_.size).toSeq)
conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime)) conf.set(CACHED_FILES_TIMESTAMPS, distCacheEntries.map(_.modTime).toSeq)
conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name())) conf.set(CACHED_FILES_VISIBILITIES, distCacheEntries.map(_.visibility.name()).toSeq)
conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name())) conf.set(CACHED_FILES_TYPES, distCacheEntries.map(_.resType.name()).toSeq)
} }
/** /**

View file

@ -296,7 +296,7 @@ private[yarn] class YarnAllocator(
val profResource = rpIdToYarnResource.get(id) val profResource = rpIdToYarnResource.get(id)
val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource) val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource)
.asScala.flatMap(_.asScala) .asScala.flatMap(_.asScala)
allContainerRequests(id) = result allContainerRequests(id) = result.toSeq
} }
allContainerRequests.toMap allContainerRequests.toMap
} }
@ -426,13 +426,13 @@ private[yarn] class YarnAllocator(
getNumExecutorsStarting, getNumExecutorsStarting,
allocateResponse.getAvailableResources)) allocateResponse.getAvailableResources))
handleAllocatedContainers(allocatedContainers.asScala) handleAllocatedContainers(allocatedContainers.asScala.toSeq)
} }
val completedContainers = allocateResponse.getCompletedContainersStatuses() val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) { if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size)) logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala) processCompletedContainers(completedContainers.asScala.toSeq)
logDebug("Finished processing %d completed containers. Current running executor count: %d." logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, getNumExecutorsRunning)) .format(completedContainers.size, getNumExecutorsRunning))
} }
@ -960,7 +960,7 @@ private[yarn] class YarnAllocator(
} }
} }
(localityMatched, localityUnMatched, localityFree) (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
} }
} }

View file

@ -73,8 +73,8 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) => val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath, env, arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
arguments.resourcesFileOpt, resourceProfile) env, arguments.resourcesFileOpt, resourceProfile)
} }
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
this.getClass.getCanonicalName.stripSuffix("$")) this.getClass.getCanonicalName.stripSuffix("$"))

View file

@ -37,7 +37,7 @@ private[hive] trait SparkOperation extends Operation with Logging {
protected var statementId = getHandle().getHandleIdentifier().getPublicId().toString() protected var statementId = getHandle().getHandleIdentifier().getPublicId().toString()
protected def cleanup(): Unit = Unit // noop by default protected def cleanup(): Unit = () // noop by default
abstract override def run(): Unit = { abstract override def run(): Unit = {
withLocalProperties { withLocalProperties {