diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala index d8c46db166..d20f3ed654 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala @@ -138,7 +138,7 @@ private[scheduler] class TaskSetExcludelist( // This executor has been excluded for this stage. Let's check if it // the whole node should be excluded. val excludedExecutorsOnNode = - execsWithFailuresOnNode.filter(excludedExecs.contains(_)) + execsWithFailuresOnNode.intersect(excludedExecs) val now = clock.getTimeMillis() // SparkListenerExecutorBlacklistedForStage is deprecated but post both events // to keep backward compatibility diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 29c605d622..f9b1300336 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -717,7 +717,7 @@ class BlockManagerMasterEndpoint( hostsToFilter: Set[String]): Seq[BlockManagerId] = { val blockManagerHosts = blockManagerIdByExecutor .filterNot(_._2.isDriver).values.map(_.host).toSet - val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_)) + val filteredBlockManagerHosts = blockManagerHosts.diff(hostsToFilter) val filteredMergersWithExecutors = filteredBlockManagerHosts.map( BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _, externalShuffleServicePort)) // Enough mergers are available as part of active executors list diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 043f0e9889..d5a42c1ee9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -202,7 +202,7 @@ class ExecutorPodsAllocator( var _deletedExecutorIds = deletedExecutorIds if (snapshots.nonEmpty) { val existingExecs = lastSnapshot.executorPods.keySet - _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains) + _deletedExecutorIds = _deletedExecutorIds.intersect(existingExecs) } val notDeletedPods = lastSnapshot.executorPods.filterKeys(!_deletedExecutorIds.contains(_))