[SPARK-35692][K8S] Use AtomicInteger for executor id generating

### What changes were proposed in this pull request?

AtomicInteger is enough for executor ids, in this PR, we use it to replace AtomicLong like other cluster managers, e.g. yarn, standalone

### Why are the changes needed?

See the discussion here https://github.com/apache/spark/pull/32610#discussion_r648007320

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

pass CI with existing tests

Closes #32837 from yaooqinn/SPARK-35692.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Kent Yao 2021-06-10 13:42:07 -07:00 committed by Dongjoon Hyun
parent b4b78ce265
commit bc1edba8f6
2 changed files with 9 additions and 4 deletions

View file

@ -17,9 +17,8 @@
package org.apache.spark.scheduler.cluster.k8s
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
import scala.collection.mutable
@ -46,7 +45,7 @@ private[spark] class ExecutorPodsAllocator(
snapshotsStore: ExecutorPodsSnapshotsStore,
clock: Clock) extends Logging {
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
private val EXECUTOR_ID_COUNTER = new AtomicInteger(0)
// ResourceProfile id -> total expected executors per profile, currently we don't remove
// any resource profiles - https://issues.apache.org/jira/browse/SPARK-30749

View file

@ -17,6 +17,7 @@
package org.apache.spark.scheduler.cluster.k8s
import java.time.Instant
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConverters._
@ -29,6 +30,7 @@ import org.mockito.Mockito.{never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester._
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec}
@ -127,11 +129,15 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Request executors in batches. Allow another batch to be requested if" +
" all pending executors start running.") {
val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))()
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0)
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
for (execId <- 1 until podAllocationSize) {
snapshotsStore.updatePod(runningExecutor(execId))
}
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 5)
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))