From 9cefde8db373a3433b7e3ce328e4a2ce83b1aca2 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Sun, 29 Aug 2021 09:24:37 -0700 Subject: [PATCH] [SPARK-36580][CORE][K8S] Use `intersect` and `diff` API on `Set` instead of manual implementation ### What changes were proposed in this pull request? The main change of this pr is replace `filter` + `contains` with `intersect` api and `filterNot` + `contains` with `diff` **Before** ```scala val set = Set(1, 2) val others = Set(2, 3) set.filter(others.contains(_)) set.filterNot(others.contains) ``` **After** ```scala val set = Set(1, 2) val others = Set(2, 3) set.intersect(others) set.diff(others) ``` ### Why are the changes needed? Code simplification, replace manual implementation with existing API ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass the Jenkins or GitHub Action Closes #33829 from LuciferYang/SPARK-36580. Authored-by: yangjie01 Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/scheduler/TaskSetExcludeList.scala | 2 +- .../org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 2 +- .../spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala index d8c46db166..d20f3ed654 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetExcludeList.scala @@ -138,7 +138,7 @@ private[scheduler] class TaskSetExcludelist( // This executor has been excluded for this stage. Let's check if it // the whole node should be excluded. val excludedExecutorsOnNode = - execsWithFailuresOnNode.filter(excludedExecs.contains(_)) + execsWithFailuresOnNode.intersect(excludedExecs) val now = clock.getTimeMillis() // SparkListenerExecutorBlacklistedForStage is deprecated but post both events // to keep backward compatibility diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 29c605d622..f9b1300336 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -717,7 +717,7 @@ class BlockManagerMasterEndpoint( hostsToFilter: Set[String]): Seq[BlockManagerId] = { val blockManagerHosts = blockManagerIdByExecutor .filterNot(_._2.isDriver).values.map(_.host).toSet - val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_)) + val filteredBlockManagerHosts = blockManagerHosts.diff(hostsToFilter) val filteredMergersWithExecutors = filteredBlockManagerHosts.map( BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _, externalShuffleServicePort)) // Enough mergers are available as part of active executors list diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 043f0e9889..d5a42c1ee9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -202,7 +202,7 @@ class ExecutorPodsAllocator( var _deletedExecutorIds = deletedExecutorIds if (snapshots.nonEmpty) { val existingExecs = lastSnapshot.executorPods.keySet - _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains) + _deletedExecutorIds = _deletedExecutorIds.intersect(existingExecs) } val notDeletedPods = lastSnapshot.executorPods.filterKeys(!_deletedExecutorIds.contains(_))