diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1d4972edfc..de57807639 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec import scala.collection.Map -import scala.collection.mutable.{ArrayStack, HashMap, HashSet} +import scala.collection.mutable.{HashMap, HashSet, ListBuffer} import scala.concurrent.duration._ import scala.util.control.NonFatal @@ -468,21 +468,21 @@ private[spark] class DAGScheduler( /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ private def getMissingAncestorShuffleDependencies( - rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = { - val ancestors = new ArrayStack[ShuffleDependency[_, _, _]] + rdd: RDD[_]): ListBuffer[ShuffleDependency[_, _, _]] = { + val ancestors = new ListBuffer[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting - val waitingForVisit = new ArrayStack[RDD[_]] - waitingForVisit.push(rdd) + val waitingForVisit = new ListBuffer[RDD[_]] + waitingForVisit += rdd while (waitingForVisit.nonEmpty) { - val toVisit = waitingForVisit.pop() + val toVisit = waitingForVisit.remove(0) if (!visited(toVisit)) { visited += toVisit getShuffleDependencies(toVisit).foreach { shuffleDep => if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) { - ancestors.push(shuffleDep) - waitingForVisit.push(shuffleDep.rdd) + ancestors.prepend(shuffleDep) + waitingForVisit.prepend(shuffleDep.rdd) } // Otherwise, the dependency and its ancestors have already been registered. } } @@ -506,17 +506,17 @@ private[spark] class DAGScheduler( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] - val waitingForVisit = new ArrayStack[RDD[_]] - waitingForVisit.push(rdd) + val waitingForVisit = new ListBuffer[RDD[_]] + waitingForVisit += rdd while (waitingForVisit.nonEmpty) { - val toVisit = waitingForVisit.pop() + val toVisit = waitingForVisit.remove(0) if (!visited(toVisit)) { visited += toVisit toVisit.dependencies.foreach { case shuffleDep: ShuffleDependency[_, _, _] => parents += shuffleDep case dependency => - waitingForVisit.push(dependency.rdd) + waitingForVisit.prepend(dependency.rdd) } } } @@ -529,10 +529,10 @@ private[spark] class DAGScheduler( */ private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_] => Boolean): Boolean = { val visited = new HashSet[RDD[_]] - val waitingForVisit = new ArrayStack[RDD[_]] - waitingForVisit.push(rdd) + val waitingForVisit = new ListBuffer[RDD[_]] + waitingForVisit += rdd while (waitingForVisit.nonEmpty) { - val toVisit = waitingForVisit.pop() + val toVisit = waitingForVisit.remove(0) if (!visited(toVisit)) { if (!predicate(toVisit)) { return false @@ -542,7 +542,7 @@ private[spark] class DAGScheduler( case _: ShuffleDependency[_, _, _] => // Not within the same stage with current rdd, do nothing. case dependency => - waitingForVisit.push(dependency.rdd) + waitingForVisit.prepend(dependency.rdd) } } } @@ -554,7 +554,8 @@ private[spark] class DAGScheduler( val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting - val waitingForVisit = new ArrayStack[RDD[_]] + val waitingForVisit = new ListBuffer[RDD[_]] + waitingForVisit += stage.rdd def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd @@ -568,15 +569,14 @@ private[spark] class DAGScheduler( missing += mapStage } case narrowDep: NarrowDependency[_] => - waitingForVisit.push(narrowDep.rdd) + waitingForVisit.prepend(narrowDep.rdd) } } } } } - waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) { - visit(waitingForVisit.pop()) + visit(waitingForVisit.remove(0)) } missing.toList } @@ -2000,7 +2000,8 @@ private[spark] class DAGScheduler( val visitedRdds = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting - val waitingForVisit = new ArrayStack[RDD[_]] + val waitingForVisit = new ListBuffer[RDD[_]] + waitingForVisit += stage.rdd def visit(rdd: RDD[_]) { if (!visitedRdds(rdd)) { visitedRdds += rdd @@ -2009,17 +2010,16 @@ private[spark] class DAGScheduler( case shufDep: ShuffleDependency[_, _, _] => val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { - waitingForVisit.push(mapStage.rdd) + waitingForVisit.prepend(mapStage.rdd) } // Otherwise there's no need to follow the dependency back case narrowDep: NarrowDependency[_] => - waitingForVisit.push(narrowDep.rdd) + waitingForVisit.prepend(narrowDep.rdd) } } } } - waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) { - visit(waitingForVisit.pop()) + visit(waitingForVisit.remove(0)) } visitedRdds.contains(target.rdd) } diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala index c2e3830d95..fef514e0c4 100644 --- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala @@ -333,7 +333,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers { // ArrayBuffer iterator (indexable type) d = medianKSD( - gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)), + gaps(sampler.sample(ArrayBuffer(Iterator.from(0).take(20*sampleSize).toSeq: _*).iterator)), gaps(sample(Iterator.from(0), 0.1))) d should be < D @@ -557,7 +557,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers { // ArrayBuffer iterator (indexable type) d = medianKSD( - gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)), + gaps(sampler.sample(ArrayBuffer(Iterator.from(0).take(20*sampleSize).toSeq: _*).iterator)), gaps(sampleWR(Iterator.from(0), 0.1))) d should be < D diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala index 507d21dd12..7381f59771 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala @@ -17,6 +17,7 @@ package org.apache.spark.graphx.lib +import scala.collection.{mutable, Map} import scala.reflect.ClassTag import org.apache.spark.graphx._ @@ -51,11 +52,14 @@ object LabelPropagation { } def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long]) : Map[VertexId, Long] = { - (count1.keySet ++ count2.keySet).map { i => + // Mimics the optimization of breakOut, not present in Scala 2.13, while working in 2.12 + val map = mutable.Map[VertexId, Long]() + (count1.keySet ++ count2.keySet).foreach { i => val count1Val = count1.getOrElse(i, 0L) val count2Val = count2.getOrElse(i, 0L) - i -> (count1Val + count2Val) - }(collection.breakOut) + map.put(i, count1Val + count2Val) + } + map } def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = { if (message.isEmpty) attr else message.maxBy(_._2)._1 diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala index 9edfe589ce..a410473afd 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala @@ -17,6 +17,7 @@ package org.apache.spark.graphx.lib +import scala.collection.{mutable, Map} import scala.reflect.ClassTag import org.apache.spark.graphx._ @@ -34,9 +35,12 @@ object ShortestPaths extends Serializable { private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) } private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = { - (spmap1.keySet ++ spmap2.keySet).map { - k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)) - }(collection.breakOut) + // Mimics the optimization of breakOut, not present in Scala 2.13, while working in 2.12 + val map = mutable.Map[VertexId, Int]() + (spmap1.keySet ++ spmap2.keySet).foreach { k => + map.put(k, math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))) + } + map } /** diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 1e4c6c74bd..d8f1c49771 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.graphx.util.GraphGenerators object GridPageRank { def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double): Seq[(VertexId, Double)] = { - val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int]) + val inNbrs = Array.fill(nRows * nCols)(collection.mutable.ArrayBuffer.empty[Int]) val outDegree = Array.fill(nRows * nCols)(0) // Convert row column address into vertex ids (row major order) def sub2ind(r: Int, c: Int): Int = r * nCols + c diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index b041dd42d6..7921823374 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -194,14 +194,16 @@ private[spark] object RandomForest extends Logging with Serializable { training the same tree in the next iteration. This focus allows us to send fewer trees to workers on each iteration; see topNodesForGroup below. */ - val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] + val nodeStack = new mutable.ListBuffer[(Int, LearningNode)] val rng = new Random() rng.setSeed(seed) // Allocate and queue root nodes. val topNodes = Array.fill[LearningNode](numTrees)(LearningNode.emptyNode(nodeIndex = 1)) - Range(0, numTrees).foreach(treeIndex => nodeStack.push((treeIndex, topNodes(treeIndex)))) + for (treeIndex <- 0 until numTrees) { + nodeStack.prepend((treeIndex, topNodes(treeIndex))) + } timer.stop("init") @@ -398,7 +400,7 @@ private[spark] object RandomForest extends Logging with Serializable { nodesForGroup: Map[Int, Array[LearningNode]], treeToNodeToIndexInfo: Map[Int, Map[Int, NodeIndexInfo]], splits: Array[Array[Split]], - nodeStack: mutable.ArrayStack[(Int, LearningNode)], + nodeStack: mutable.ListBuffer[(Int, LearningNode)], timer: TimeTracker = new TimeTracker, nodeIdCache: Option[NodeIdCache] = None): Unit = { @@ -639,10 +641,10 @@ private[spark] object RandomForest extends Logging with Serializable { // enqueue left child and right child if they are not leaves if (!leftChildIsLeaf) { - nodeStack.push((treeIndex, node.leftChild.get)) + nodeStack.prepend((treeIndex, node.leftChild.get)) } if (!rightChildIsLeaf) { - nodeStack.push((treeIndex, node.rightChild.get)) + nodeStack.prepend((treeIndex, node.rightChild.get)) } logDebug("leftChildIndex = " + node.leftChild.get.id + @@ -1042,8 +1044,8 @@ private[spark] object RandomForest extends Logging with Serializable { var partNumSamples = 0.0 var unweightedNumSamples = 0.0 featureSamples.foreach { case (sampleWeight, feature) => - partValueCountMap(feature) = partValueCountMap.getOrElse(feature, 0.0) + sampleWeight; - partNumSamples += sampleWeight; + partValueCountMap(feature) = partValueCountMap.getOrElse(feature, 0.0) + sampleWeight + partNumSamples += sampleWeight unweightedNumSamples += 1.0 } @@ -1131,7 +1133,7 @@ private[spark] object RandomForest extends Logging with Serializable { * The feature indices are None if not subsampling features. */ private[tree] def selectNodesToSplit( - nodeStack: mutable.ArrayStack[(Int, LearningNode)], + nodeStack: mutable.ListBuffer[(Int, LearningNode)], maxMemoryUsage: Long, metadata: DecisionTreeMetadata, rng: Random): (Map[Int, Array[LearningNode]], Map[Int, Map[Int, NodeIndexInfo]]) = { @@ -1146,7 +1148,7 @@ private[spark] object RandomForest extends Logging with Serializable { // so we allow one iteration if memUsage == 0. var groupDone = false while (nodeStack.nonEmpty && !groupDone) { - val (treeIndex, node) = nodeStack.top + val (treeIndex, node) = nodeStack.head // Choose subset of features for node (if subsampling). val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures) { Some(SamplingUtils.reservoirSampleAndCount(Range(0, @@ -1157,7 +1159,7 @@ private[spark] object RandomForest extends Logging with Serializable { // Check if enough memory remains to add this node to the group. val nodeMemUsage = RandomForest.aggregateSizeForNode(metadata, featureSubset) * 8L if (memUsage + nodeMemUsage <= maxMemoryUsage || memUsage == 0) { - nodeStack.pop() + nodeStack.remove(0) mutableNodesForGroup.getOrElseUpdate(treeIndex, new mutable.ArrayBuffer[LearningNode]()) += node mutableTreeToNodeToIndexInfo diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala index b89cc6053b..a63ab913f2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala @@ -40,8 +40,6 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { import RandomForestSuite.mapToVec - private val seed = 42 - ///////////////////////////////////////////////////////////////////////////// // Tests for split calculation ///////////////////////////////////////////////////////////////////////////// @@ -350,7 +348,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { val treeToNodeToIndexInfo = Map(0 -> Map( topNode.id -> new RandomForest.NodeIndexInfo(0, None) )) - val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] + val nodeStack = new mutable.ListBuffer[(Int, LearningNode)] RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode), nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack) @@ -392,7 +390,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { val treeToNodeToIndexInfo = Map(0 -> Map( topNode.id -> new RandomForest.NodeIndexInfo(0, None) )) - val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] + val nodeStack = new mutable.ListBuffer[(Int, LearningNode)] RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode), nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack) @@ -505,11 +503,11 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { val failString = s"Failed on test with:" + s"numTrees=$numTrees, featureSubsetStrategy=$featureSubsetStrategy," + s" numFeaturesPerNode=$numFeaturesPerNode, seed=$seed" - val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] + val nodeStack = new mutable.ListBuffer[(Int, LearningNode)] val topNodes: Array[LearningNode] = new Array[LearningNode](numTrees) Range(0, numTrees).foreach { treeIndex => topNodes(treeIndex) = LearningNode.emptyNode(nodeIndex = 1) - nodeStack.push((treeIndex, topNodes(treeIndex))) + nodeStack.prepend((treeIndex, topNodes(treeIndex))) } val rng = new scala.util.Random(seed = seed) val (nodesForGroup: Map[Int, Array[LearningNode]], diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala index fe49162c66..e478f14906 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala @@ -57,8 +57,8 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext { test("number of features more than 65535") { val data1 = sc.parallelize(Array( - Vectors.dense((1 to 100000).map(_ => 2.0).to[scala.Vector].toArray), - Vectors.dense((1 to 100000).map(_ => 0.0).to[scala.Vector].toArray) + Vectors.dense(Array.fill(100000)(2.0)), + Vectors.dense(Array.fill(100000)(0.0)) ), 2) val pca = new PCA(2).fit(data1) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index 40059454da..50a7ef7d17 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -125,7 +125,7 @@ private[spark] object SparkAppLauncher extends Logging { appConf.toStringArray :+ appArguments.mainAppResource if (appArguments.appArgs.nonEmpty) { - commandLine ++= appArguments.appArgs.to[ArrayBuffer] + commandLine ++= appArguments.appArgs } logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}") ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala index 8d06804ce1..72ff9361d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala @@ -41,7 +41,7 @@ class EquivalentExpressions { } // For each expression, the set of equivalent expressions. - private val equivalenceMap = mutable.HashMap.empty[Expr, mutable.MutableList[Expression]] + private val equivalenceMap = mutable.HashMap.empty[Expr, mutable.ArrayBuffer[Expression]] /** * Adds each expression to this data structure, grouping them with existing equivalent @@ -56,7 +56,7 @@ class EquivalentExpressions { f.get += expr true } else { - equivalenceMap.put(e, mutable.MutableList(expr)) + equivalenceMap.put(e, mutable.ArrayBuffer(expr)) false } } else { @@ -102,7 +102,7 @@ class EquivalentExpressions { * an empty collection if there are none. */ def getEquivalentExprs(e: Expression): Seq[Expression] = { - equivalenceMap.getOrElse(Expr(e), mutable.MutableList()) + equivalenceMap.getOrElse(Expr(e), Seq.empty) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index fe324a4894..edf8d2c1b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.types -import scala.collection.mutable.ArrayBuffer +import scala.collection.{mutable, Map} import scala.util.Try import scala.util.control.NonFatal @@ -516,7 +516,7 @@ object StructType extends AbstractDataType { leftContainsNull || rightContainsNull) case (StructType(leftFields), StructType(rightFields)) => - val newFields = ArrayBuffer.empty[StructField] + val newFields = mutable.ArrayBuffer.empty[StructField] val rightMapped = fieldsMap(rightFields) leftFields.foreach { @@ -575,7 +575,10 @@ object StructType extends AbstractDataType { } private[sql] def fieldsMap(fields: Array[StructField]): Map[String, StructField] = { - import scala.collection.breakOut - fields.map(s => (s.name, s))(breakOut) + // Mimics the optimization of breakOut, not present in Scala 2.13, while working in 2.12 + val map = mutable.Map[String, StructField]() + map.sizeHint(fields.length) + fields.foreach(s => map.put(s.name, s)) + map } }