[SPARK-32975][K8S] Add config for driver readiness timeout before executors start

### What changes were proposed in this pull request?
Add a new config that controls the timeout of waiting for driver pod's readiness before allocating executor pods. This wait only happens once on application start.

### Why are the changes needed?
The driver's headless service can be resolved by DNS only after the driver pod is ready. If the executor tries to connect to the headless service before driver pod is ready, it will hit UnkownHostException and get into error state but will not be restarted. **This case usually happens when the driver pod has sidecar containers but hasn't finished their creation when executors start.** So basically there is a race condition. This issue can be mitigated by tweaking this config.

### Does this PR introduce _any_ user-facing change?
A new config `spark.kubernetes.allocation.driver.readinessTimeout` added.

### How was this patch tested?
Exisiting tests.

Closes #32752 from cchriswu/SPARK-32975-fix.

Lead-authored-by: Chris Wu <wucaowei19@gmail.com>
Co-authored-by: Chris Wu <wcaowei@vmware.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Chris Wu 2021-06-04 06:59:49 -07:00 committed by Dongjoon Hyun
parent dc3317fdf9
commit 497c80a1ad
3 changed files with 23 additions and 0 deletions

View file

@ -300,6 +300,17 @@ private[spark] object Config extends Logging {
.checkValue(value => value > 0, "Allocation batch delay must be a positive time value.")
.createWithDefaultString("1s")
val KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT =
ConfigBuilder("spark.kubernetes.allocation.driver.readinessTimeout")
.doc("Time to wait for driver pod to get ready before creating executor pods. This wait " +
"only happens on application start. If timeout happens, executor pods will still be " +
"created.")
.version("3.1.3")
.timeConf(TimeUnit.SECONDS)
.checkValue(value => value > 0, "Allocation driver readiness timeout must be a positive "
+ "time value.")
.createWithDefaultString("1s")
val KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT =
ConfigBuilder("spark.kubernetes.allocation.executor.timeout")
.doc("Time to wait before a newly created executor POD request, which does not reached " +

View file

@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import scala.collection.JavaConverters._
@ -61,6 +62,8 @@ private[spark] class ExecutorPodsAllocator(
podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
private val driverPodReadinessTimeout = conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT)
private val executorIdleTimeout = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) * 1000
private val namespace = conf.get(KUBERNETES_NAMESPACE)
@ -99,6 +102,14 @@ private[spark] class ExecutorPodsAllocator(
@volatile private var deletedExecutorIds = Set.empty[Long]
def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
// Wait until the driver pod is ready before starting executors, as the headless service won't
// be resolvable by DNS until the driver pod is ready.
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withName(kubernetesDriverPodName.get)
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, schedulerBackend, _)
}

View file

@ -104,6 +104,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
when(driverPodOperations.get).thenReturn(driverPod)
when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod)
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
meq(kubernetesClient), any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer())
snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()