From bc1edba8f6f5e5a5de0d3005ad685054d86e478f Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 10 Jun 2021 13:42:07 -0700 Subject: [PATCH] [SPARK-35692][K8S] Use AtomicInteger for executor id generating ### What changes were proposed in this pull request? AtomicInteger is enough for executor ids, in this PR, we use it to replace AtomicLong like other cluster managers, e.g. yarn, standalone ### Why are the changes needed? See the discussion here https://github.com/apache/spark/pull/32610#discussion_r648007320 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? pass CI with existing tests Closes #32837 from yaooqinn/SPARK-35692. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun --- .../scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 7 +++---- .../scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala | 6 ++++++ 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index c101567a1d..ad489edef6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -17,9 +17,8 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable @@ -46,7 +45,7 @@ private[spark] class ExecutorPodsAllocator( snapshotsStore: ExecutorPodsSnapshotsStore, clock: Clock) extends Logging { - private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + private val EXECUTOR_ID_COUNTER = new AtomicInteger(0) // ResourceProfile id -> total expected executors per profile, currently we don't remove // any resource profiles - https://issues.apache.org/jira/browse/SPARK-30749 diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 9a04d53b3d..207094ea75 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ @@ -29,6 +30,7 @@ import org.mockito.Mockito.{never, times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter +import org.scalatest.PrivateMethodTester._ import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec} @@ -127,11 +129,15 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { test("Request executors in batches. Allow another batch to be requested if" + " all pending executors start running.") { + val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))() + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1))) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) for (execId <- 1 until podAllocationSize) { snapshotsStore.updatePod(runningExecutor(execId)) } + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 5) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))