[SPARK-36580][CORE][K8S] Use intersect and diff API on Set instead of manual implementation

### What changes were proposed in this pull request?
The main change of this pr is replace `filter` + `contains` with `intersect` api and `filterNot` + `contains` with `diff`

**Before**

```scala
val set = Set(1, 2)
val others = Set(2, 3)
set.filter(others.contains(_))
set.filterNot(others.contains)
```

**After**
```scala
val set = Set(1, 2)
val others = Set(2, 3)
set.intersect(others)
set.diff(others)
```

### Why are the changes needed?
Code simplification, replace manual implementation with existing API

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass the Jenkins or GitHub Action

Closes #33829 from LuciferYang/SPARK-36580.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
yangjie01 2021-08-29 09:24:37 -07:00 committed by Dongjoon Hyun
parent 7d1be3710d
commit 9cefde8db3
3 changed files with 3 additions and 3 deletions

View file

@ -138,7 +138,7 @@ private[scheduler] class TaskSetExcludelist(
// This executor has been excluded for this stage. Let's check if it
// the whole node should be excluded.
val excludedExecutorsOnNode =
execsWithFailuresOnNode.filter(excludedExecs.contains(_))
execsWithFailuresOnNode.intersect(excludedExecs)
val now = clock.getTimeMillis()
// SparkListenerExecutorBlacklistedForStage is deprecated but post both events
// to keep backward compatibility

View file

@ -717,7 +717,7 @@ class BlockManagerMasterEndpoint(
hostsToFilter: Set[String]): Seq[BlockManagerId] = {
val blockManagerHosts = blockManagerIdByExecutor
.filterNot(_._2.isDriver).values.map(_.host).toSet
val filteredBlockManagerHosts = blockManagerHosts.filterNot(hostsToFilter.contains(_))
val filteredBlockManagerHosts = blockManagerHosts.diff(hostsToFilter)
val filteredMergersWithExecutors = filteredBlockManagerHosts.map(
BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, _, externalShuffleServicePort))
// Enough mergers are available as part of active executors list

View file

@ -202,7 +202,7 @@ class ExecutorPodsAllocator(
var _deletedExecutorIds = deletedExecutorIds
if (snapshots.nonEmpty) {
val existingExecs = lastSnapshot.executorPods.keySet
_deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
_deletedExecutorIds = _deletedExecutorIds.intersect(existingExecs)
}
val notDeletedPods = lastSnapshot.executorPods.filterKeys(!_deletedExecutorIds.contains(_))