[SPARK-36334][K8S] Add a new conf to allow K8s API server-side caching for pod listing

### What changes were proposed in this pull request?

This PR aims to add a new config to allow K8s API server-side caching for pod listing.

### Why are the changes needed?

Apache Spark currently requests the most recent data which should be consistent. New configuration looses the restriction to reduce the server-side overhead by allowing K8S API server side caching.

https://kubernetes.io/docs/reference/using-api/api-concepts/#the-resourceversion-parameter

- `resourceVersion`: unset
> Most Recent: Return data at the most recent resource version. The returned data must be consistent (i.e. served from etcd via a quorum read).

- `resourceVersion`: "0"
> Any: Return data at any resource version. The newest available resource version is preferred, but strong consistency is not required; data at any resource version may be served.

### Does this PR introduce _any_ user-facing change?

Yes, this is a new feature to reduce the K8s API server side overhead.

### How was this patch tested?

Pass the CIs.

Closes #33563 from dongjoon-hyun/SPARK-36334.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2021-07-29 01:01:48 -07:00
parent db18866742
commit 86c42275ba
3 changed files with 42 additions and 5 deletions

View file

@ -395,6 +395,14 @@ private[spark] object Config extends Logging {
" positive time value.")
.createWithDefaultString("30s")
val KUBERNETES_EXECUTOR_API_POLLING_WITH_RESOURCE_VERSION =
ConfigBuilder("spark.kubernetes.executor.enablePollingWithResourceVersion")
.doc("If true, `resourceVersion` is set with `0` during invoking pod listing APIs " +
"in order to allow API Server-side caching. This should be used carefully.")
.version("3.3.0")
.booleanConf
.createWithDefault(false)
val KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL =
ConfigBuilder("spark.kubernetes.executor.eventProcessingInterval")
.doc("Interval between successive inspection of executor events sent from the" +

View file

@ -18,6 +18,8 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
import com.google.common.primitives.UnsignedLong
import io.fabric8.kubernetes.api.model.ListOptionsBuilder
import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.JavaConverters._
@ -53,16 +55,27 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
}
private class PollRunnable(applicationId: String) extends Runnable {
private var resourceVersion: UnsignedLong = _
override def run(): Unit = Utils.tryLogNonFatalError {
logDebug(s"Resynchronizing full executor pod state from Kubernetes.")
snapshotsStore.replaceSnapshot(kubernetesClient
val pods = kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
.list()
.getItems
.asScala.toSeq)
if (conf.get(KUBERNETES_EXECUTOR_API_POLLING_WITH_RESOURCE_VERSION)) {
val list = pods.list(new ListOptionsBuilder().withResourceVersion("0").build())
val newResourceVersion = UnsignedLong.valueOf(list.getMetadata.getResourceVersion())
// Replace only when we receive a monotonically increased resourceVersion
// because some K8s API servers may return old(smaller) cached versions in case of HA setup.
if (resourceVersion == null || newResourceVersion.compareTo(resourceVersion) > 0) {
resourceVersion = newResourceVersion
snapshotsStore.replaceSnapshot(list.getItems.asScala.toSeq)
}
} else {
snapshotsStore.replaceSnapshot(pods.list().getItems.asScala.toSeq)
}
}
}

View file

@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.TimeUnit
import io.fabric8.kubernetes.api.model.PodListBuilder
import io.fabric8.kubernetes.api.model.{ListOptionsBuilder, PodListBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{Mock, MockitoAnnotations}
@ -88,4 +88,20 @@ class ExecutorPodsPollingSnapshotSourceSuite extends SparkFunSuite with BeforeAn
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
verify(eventQueue).replaceSnapshot(Seq(exec1, exec2))
}
test("SPARK-36334: Support pod listing with resource version") {
Seq(true, false).foreach { value =>
val source = new ExecutorPodsPollingSnapshotSource(
sparkConf.set(KUBERNETES_EXECUTOR_API_POLLING_WITH_RESOURCE_VERSION, value),
kubernetesClient,
eventQueue,
pollingExecutor)
pollingExecutor.tick(pollingInterval, TimeUnit.MILLISECONDS)
if (value) {
verify(activeExecutorPods).list(new ListOptionsBuilder().withResourceVersion("0").build())
} else {
verify(activeExecutorPods).list()
}
}
}
}