[SPARK-27682][CORE][GRAPHX][MLLIB] Replace use of collections and methods that will be removed in Scala 2.13 with work-alikes

## What changes were proposed in this pull request?

This replaces use of collection classes like `MutableList` and `ArrayStack` with workalikes that are available in 2.12, as they will be removed in 2.13. It also removes use of `.to[Collection]` as its uses was superfluous anyway. Removing `collection.breakOut` will have to wait until 2.13

## How was this patch tested?

Existing tests

Closes #24586 from srowen/SPARK-27682.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Sean Owen 2019-05-15 09:29:12 -05:00
parent fd9acf23b0
commit bfb3ffe9b3
11 changed files with 71 additions and 60 deletions

View file

@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.collection.Map
import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
import scala.concurrent.duration._
import scala.util.control.NonFatal
@ -468,21 +468,21 @@ private[spark] class DAGScheduler(
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getMissingAncestorShuffleDependencies(
rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
rdd: RDD[_]): ListBuffer[ShuffleDependency[_, _, _]] = {
val ancestors = new ListBuffer[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
getShuffleDependencies(toVisit).foreach { shuffleDep =>
if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
ancestors.push(shuffleDep)
waitingForVisit.push(shuffleDep.rdd)
ancestors.prepend(shuffleDep)
waitingForVisit.prepend(shuffleDep.rdd)
} // Otherwise, the dependency and its ancestors have already been registered.
}
}
@ -506,17 +506,17 @@ private[spark] class DAGScheduler(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
waitingForVisit.prepend(dependency.rdd)
}
}
}
@ -529,10 +529,10 @@ private[spark] class DAGScheduler(
*/
private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_] => Boolean): Boolean = {
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += rdd
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
val toVisit = waitingForVisit.remove(0)
if (!visited(toVisit)) {
if (!predicate(toVisit)) {
return false
@ -542,7 +542,7 @@ private[spark] class DAGScheduler(
case _: ShuffleDependency[_, _, _] =>
// Not within the same stage with current rdd, do nothing.
case dependency =>
waitingForVisit.push(dependency.rdd)
waitingForVisit.prepend(dependency.rdd)
}
}
}
@ -554,7 +554,8 @@ private[spark] class DAGScheduler(
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += stage.rdd
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
@ -568,15 +569,14 @@ private[spark] class DAGScheduler(
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
waitingForVisit.prepend(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
visit(waitingForVisit.remove(0))
}
missing.toList
}
@ -2000,7 +2000,8 @@ private[spark] class DAGScheduler(
val visitedRdds = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += stage.rdd
def visit(rdd: RDD[_]) {
if (!visitedRdds(rdd)) {
visitedRdds += rdd
@ -2009,17 +2010,16 @@ private[spark] class DAGScheduler(
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
waitingForVisit.push(mapStage.rdd)
waitingForVisit.prepend(mapStage.rdd)
} // Otherwise there's no need to follow the dependency back
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
waitingForVisit.prepend(narrowDep.rdd)
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
visit(waitingForVisit.remove(0))
}
visitedRdds.contains(target.rdd)
}

View file

@ -333,7 +333,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
// ArrayBuffer iterator (indexable type)
d = medianKSD(
gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
gaps(sampler.sample(ArrayBuffer(Iterator.from(0).take(20*sampleSize).toSeq: _*).iterator)),
gaps(sample(Iterator.from(0), 0.1)))
d should be < D
@ -557,7 +557,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
// ArrayBuffer iterator (indexable type)
d = medianKSD(
gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
gaps(sampler.sample(ArrayBuffer(Iterator.from(0).take(20*sampleSize).toSeq: _*).iterator)),
gaps(sampleWR(Iterator.from(0), 0.1)))
d should be < D

View file

@ -17,6 +17,7 @@
package org.apache.spark.graphx.lib
import scala.collection.{mutable, Map}
import scala.reflect.ClassTag
import org.apache.spark.graphx._
@ -51,11 +52,14 @@ object LabelPropagation {
}
def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long])
: Map[VertexId, Long] = {
(count1.keySet ++ count2.keySet).map { i =>
// Mimics the optimization of breakOut, not present in Scala 2.13, while working in 2.12
val map = mutable.Map[VertexId, Long]()
(count1.keySet ++ count2.keySet).foreach { i =>
val count1Val = count1.getOrElse(i, 0L)
val count2Val = count2.getOrElse(i, 0L)
i -> (count1Val + count2Val)
}(collection.breakOut)
map.put(i, count1Val + count2Val)
}
map
}
def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = {
if (message.isEmpty) attr else message.maxBy(_._2)._1

View file

@ -17,6 +17,7 @@
package org.apache.spark.graphx.lib
import scala.collection.{mutable, Map}
import scala.reflect.ClassTag
import org.apache.spark.graphx._
@ -34,9 +35,12 @@ object ShortestPaths extends Serializable {
private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
(spmap1.keySet ++ spmap2.keySet).map {
k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
}(collection.breakOut)
// Mimics the optimization of breakOut, not present in Scala 2.13, while working in 2.12
val map = mutable.Map[VertexId, Int]()
(spmap1.keySet ++ spmap2.keySet).foreach { k =>
map.put(k, math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)))
}
map
}
/**

View file

@ -24,7 +24,7 @@ import org.apache.spark.graphx.util.GraphGenerators
object GridPageRank {
def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double): Seq[(VertexId, Double)] = {
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.MutableList.empty[Int])
val inNbrs = Array.fill(nRows * nCols)(collection.mutable.ArrayBuffer.empty[Int])
val outDegree = Array.fill(nRows * nCols)(0)
// Convert row column address into vertex ids (row major order)
def sub2ind(r: Int, c: Int): Int = r * nCols + c

View file

@ -194,14 +194,16 @@ private[spark] object RandomForest extends Logging with Serializable {
training the same tree in the next iteration. This focus allows us to send fewer trees to
workers on each iteration; see topNodesForGroup below.
*/
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
val rng = new Random()
rng.setSeed(seed)
// Allocate and queue root nodes.
val topNodes = Array.fill[LearningNode](numTrees)(LearningNode.emptyNode(nodeIndex = 1))
Range(0, numTrees).foreach(treeIndex => nodeStack.push((treeIndex, topNodes(treeIndex))))
for (treeIndex <- 0 until numTrees) {
nodeStack.prepend((treeIndex, topNodes(treeIndex)))
}
timer.stop("init")
@ -398,7 +400,7 @@ private[spark] object RandomForest extends Logging with Serializable {
nodesForGroup: Map[Int, Array[LearningNode]],
treeToNodeToIndexInfo: Map[Int, Map[Int, NodeIndexInfo]],
splits: Array[Array[Split]],
nodeStack: mutable.ArrayStack[(Int, LearningNode)],
nodeStack: mutable.ListBuffer[(Int, LearningNode)],
timer: TimeTracker = new TimeTracker,
nodeIdCache: Option[NodeIdCache] = None): Unit = {
@ -639,10 +641,10 @@ private[spark] object RandomForest extends Logging with Serializable {
// enqueue left child and right child if they are not leaves
if (!leftChildIsLeaf) {
nodeStack.push((treeIndex, node.leftChild.get))
nodeStack.prepend((treeIndex, node.leftChild.get))
}
if (!rightChildIsLeaf) {
nodeStack.push((treeIndex, node.rightChild.get))
nodeStack.prepend((treeIndex, node.rightChild.get))
}
logDebug("leftChildIndex = " + node.leftChild.get.id +
@ -1042,8 +1044,8 @@ private[spark] object RandomForest extends Logging with Serializable {
var partNumSamples = 0.0
var unweightedNumSamples = 0.0
featureSamples.foreach { case (sampleWeight, feature) =>
partValueCountMap(feature) = partValueCountMap.getOrElse(feature, 0.0) + sampleWeight;
partNumSamples += sampleWeight;
partValueCountMap(feature) = partValueCountMap.getOrElse(feature, 0.0) + sampleWeight
partNumSamples += sampleWeight
unweightedNumSamples += 1.0
}
@ -1131,7 +1133,7 @@ private[spark] object RandomForest extends Logging with Serializable {
* The feature indices are None if not subsampling features.
*/
private[tree] def selectNodesToSplit(
nodeStack: mutable.ArrayStack[(Int, LearningNode)],
nodeStack: mutable.ListBuffer[(Int, LearningNode)],
maxMemoryUsage: Long,
metadata: DecisionTreeMetadata,
rng: Random): (Map[Int, Array[LearningNode]], Map[Int, Map[Int, NodeIndexInfo]]) = {
@ -1146,7 +1148,7 @@ private[spark] object RandomForest extends Logging with Serializable {
// so we allow one iteration if memUsage == 0.
var groupDone = false
while (nodeStack.nonEmpty && !groupDone) {
val (treeIndex, node) = nodeStack.top
val (treeIndex, node) = nodeStack.head
// Choose subset of features for node (if subsampling).
val featureSubset: Option[Array[Int]] = if (metadata.subsamplingFeatures) {
Some(SamplingUtils.reservoirSampleAndCount(Range(0,
@ -1157,7 +1159,7 @@ private[spark] object RandomForest extends Logging with Serializable {
// Check if enough memory remains to add this node to the group.
val nodeMemUsage = RandomForest.aggregateSizeForNode(metadata, featureSubset) * 8L
if (memUsage + nodeMemUsage <= maxMemoryUsage || memUsage == 0) {
nodeStack.pop()
nodeStack.remove(0)
mutableNodesForGroup.getOrElseUpdate(treeIndex, new mutable.ArrayBuffer[LearningNode]()) +=
node
mutableTreeToNodeToIndexInfo

View file

@ -40,8 +40,6 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
import RandomForestSuite.mapToVec
private val seed = 42
/////////////////////////////////////////////////////////////////////////////
// Tests for split calculation
/////////////////////////////////////////////////////////////////////////////
@ -350,7 +348,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
val treeToNodeToIndexInfo = Map(0 -> Map(
topNode.id -> new RandomForest.NodeIndexInfo(0, None)
))
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)
@ -392,7 +390,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
val treeToNodeToIndexInfo = Map(0 -> Map(
topNode.id -> new RandomForest.NodeIndexInfo(0, None)
))
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)
@ -505,11 +503,11 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext {
val failString = s"Failed on test with:" +
s"numTrees=$numTrees, featureSubsetStrategy=$featureSubsetStrategy," +
s" numFeaturesPerNode=$numFeaturesPerNode, seed=$seed"
val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
val topNodes: Array[LearningNode] = new Array[LearningNode](numTrees)
Range(0, numTrees).foreach { treeIndex =>
topNodes(treeIndex) = LearningNode.emptyNode(nodeIndex = 1)
nodeStack.push((treeIndex, topNodes(treeIndex)))
nodeStack.prepend((treeIndex, topNodes(treeIndex)))
}
val rng = new scala.util.Random(seed = seed)
val (nodesForGroup: Map[Int, Array[LearningNode]],

View file

@ -57,8 +57,8 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext {
test("number of features more than 65535") {
val data1 = sc.parallelize(Array(
Vectors.dense((1 to 100000).map(_ => 2.0).to[scala.Vector].toArray),
Vectors.dense((1 to 100000).map(_ => 0.0).to[scala.Vector].toArray)
Vectors.dense(Array.fill(100000)(2.0)),
Vectors.dense(Array.fill(100000)(0.0))
), 2)
val pca = new PCA(2).fit(data1)

View file

@ -125,7 +125,7 @@ private[spark] object SparkAppLauncher extends Logging {
appConf.toStringArray :+ appArguments.mainAppResource
if (appArguments.appArgs.nonEmpty) {
commandLine ++= appArguments.appArgs.to[ArrayBuffer]
commandLine ++= appArguments.appArgs
}
logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)

View file

@ -41,7 +41,7 @@ class EquivalentExpressions {
}
// For each expression, the set of equivalent expressions.
private val equivalenceMap = mutable.HashMap.empty[Expr, mutable.MutableList[Expression]]
private val equivalenceMap = mutable.HashMap.empty[Expr, mutable.ArrayBuffer[Expression]]
/**
* Adds each expression to this data structure, grouping them with existing equivalent
@ -56,7 +56,7 @@ class EquivalentExpressions {
f.get += expr
true
} else {
equivalenceMap.put(e, mutable.MutableList(expr))
equivalenceMap.put(e, mutable.ArrayBuffer(expr))
false
}
} else {
@ -102,7 +102,7 @@ class EquivalentExpressions {
* an empty collection if there are none.
*/
def getEquivalentExprs(e: Expression): Seq[Expression] = {
equivalenceMap.getOrElse(Expr(e), mutable.MutableList())
equivalenceMap.getOrElse(Expr(e), Seq.empty)
}
/**

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.types
import scala.collection.mutable.ArrayBuffer
import scala.collection.{mutable, Map}
import scala.util.Try
import scala.util.control.NonFatal
@ -516,7 +516,7 @@ object StructType extends AbstractDataType {
leftContainsNull || rightContainsNull)
case (StructType(leftFields), StructType(rightFields)) =>
val newFields = ArrayBuffer.empty[StructField]
val newFields = mutable.ArrayBuffer.empty[StructField]
val rightMapped = fieldsMap(rightFields)
leftFields.foreach {
@ -575,7 +575,10 @@ object StructType extends AbstractDataType {
}
private[sql] def fieldsMap(fields: Array[StructField]): Map[String, StructField] = {
import scala.collection.breakOut
fields.map(s => (s.name, s))(breakOut)
// Mimics the optimization of breakOut, not present in Scala 2.13, while working in 2.12
val map = mutable.Map[String, StructField]()
map.sizeHint(fields.length)
fields.foreach(s => map.put(s.name, s))
map
}
}