diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 9c355d7c3e..8b62d2405e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -190,13 +190,15 @@ class DAGScheduler( } private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = cacheLocs.synchronized { - cacheLocs.getOrElseUpdate(rdd.id, { + // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times + if (!cacheLocs.contains(rdd.id)) { val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId] val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster) - blockIds.map { id => + cacheLocs(rdd.id) = blockIds.map { id => locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId)) } - }) + } + cacheLocs(rdd.id) } private def clearCacheLocs(): Unit = cacheLocs.synchronized {