[SPARK-25021][K8S] Add spark.executor.pyspark.memory limit for K8S

## What changes were proposed in this pull request?

Add spark.executor.pyspark.memory limit for K8S

## How was this patch tested?

Unit and Integration tests

Closes #22298 from ifilonenko/SPARK-25021.

Authored-by: Ilan Filonenko <if56@cornell.edu>
Signed-off-by: Holden Karau <holden@pigscanfly.ca>
This commit is contained in:
Ilan Filonenko 2018-09-08 22:18:06 -07:00 committed by Holden Karau
parent 78981efc2c
commit 1cfda44825
19 changed files with 166 additions and 16 deletions

View file

@ -192,6 +192,7 @@ fi
if [ -d "$SPARK_HOME"/resource-managers/kubernetes/core/target/ ]; then
mkdir -p "$DISTDIR/kubernetes/"
cp -a "$SPARK_HOME"/resource-managers/kubernetes/docker/src/main/dockerfiles "$DISTDIR/kubernetes/"
cp -a "$SPARK_HOME"/resource-managers/kubernetes/integration-tests/tests "$DISTDIR/kubernetes/"
fi
# Copy examples and dependencies

View file

@ -188,7 +188,7 @@ of the most common options to set are:
unless otherwise specified. If set, PySpark memory for an executor will be
limited to this amount. If not set, Spark will not limit Python's memory use
and it is up to the application to avoid exceeding the overhead memory space
shared with other non-JVM processes. When PySpark is run in YARN, this memory
shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory
is added to executor resource requests.
</td>
</tr>

View file

@ -225,6 +225,13 @@ private[spark] object Config extends Logging {
"Ensure that major Python version is either Python2 or Python3")
.createWithDefault("2")
val APP_RESOURCE_TYPE =
ConfigBuilder("spark.kubernetes.resource.type")
.doc("This sets the resource type internally")
.internal()
.stringConf
.createOptional
val KUBERNETES_LOCAL_DIRS_TMPFS =
ConfigBuilder("spark.kubernetes.local.dirs.tmpfs")
.doc("If set to true then emptyDir volumes created to back SPARK_LOCAL_DIRS will have " +

View file

@ -24,7 +24,7 @@ import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD}
import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, PYSPARK_EXECUTOR_MEMORY}
import org.apache.spark.rpc.RpcEndpointAddress
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
@ -58,6 +58,16 @@ private[spark] class BasicExecutorFeatureStep(
(kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt,
MEMORY_OVERHEAD_MIN_MIB))
private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB
private val executorMemoryTotal = kubernetesConf.sparkConf
.getOption(APP_RESOURCE_TYPE.key).map{ res =>
val additionalPySparkMemory = res match {
case "python" =>
kubernetesConf.sparkConf
.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
case _ => 0
}
executorMemoryWithOverhead + additionalPySparkMemory
}.getOrElse(executorMemoryWithOverhead)
private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
private val executorCoresRequest =
@ -76,7 +86,7 @@ private[spark] class BasicExecutorFeatureStep(
// executorId
val hostname = name.substring(Math.max(0, name.length - 63))
val executorMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${executorMemoryWithOverhead}Mi")
.withAmount(s"${executorMemoryTotal}Mi")
.build()
val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCoresRequest)

View file

@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.features.bindings
import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod}
import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
import org.apache.spark.launcher.SparkLauncher
@ -38,7 +39,8 @@ private[spark] class JavaDriverFeatureStep(
.build()
SparkPod(pod.pod, withDriverArgs)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
override def getAdditionalPodSystemProperties(): Map[String, String] =
Map(APP_RESOURCE_TYPE.key -> "java")
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}

View file

@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
@ -68,7 +69,8 @@ private[spark] class PythonDriverFeatureStep(
SparkPod(pod.pod, withPythonPrimaryContainer)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
override def getAdditionalPodSystemProperties(): Map[String, String] =
Map(APP_RESOURCE_TYPE.key -> "python")
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}

View file

@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
@ -54,7 +55,8 @@ private[spark] class RDriverFeatureStep(
SparkPod(pod.pod, withRPrimaryContainer)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty
override def getAdditionalPodSystemProperties(): Map[String, String] =
Map(APP_RESOURCE_TYPE.key -> "r")
override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}

View file

@ -57,7 +57,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
MAIN_CLASS,
APP_ARGS)
test("Check the pod respects all configurations from the user.") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")

View file

@ -75,6 +75,7 @@ class BasicExecutorFeatureStepSuite
.set("spark.driver.host", DRIVER_HOSTNAME)
.set("spark.driver.port", DRIVER_PORT.toString)
.set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(","))
.set("spark.kubernetes.resource.type", "java")
}
test("basic executor pod has reasonable defaults") {
@ -161,6 +162,29 @@ class BasicExecutorFeatureStepSuite
checkOwnerReferences(executor.pod, DRIVER_POD_UID)
}
test("test executor pyspark memory") {
val conf = baseConf.clone()
conf.set("spark.kubernetes.resource.type", "python")
conf.set(org.apache.spark.internal.config.PYSPARK_EXECUTOR_MEMORY, 42L)
val step = new BasicExecutorFeatureStep(
KubernetesConf(
conf,
KubernetesExecutorSpecificConf("1", Some(DRIVER_POD)),
RESOURCE_NAME_PREFIX,
APP_ID,
LABELS,
ANNOTATIONS,
Map.empty,
Map.empty,
Map.empty,
Nil,
Seq.empty[String]))
val executor = step.configurePod(SparkPod.initialPod())
// This is checking that basic executor + executorMemory = 1408 + 42 = 1450
assert(executor.container.getResources.getRequests.get("memory").getAmount === "1450Mi")
}
// There is always exactly one controller reference, and it points to the driver pod.
private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = {
assert(executor.getMetadata.getOwnerReferences.size() === 1)

View file

@ -56,6 +56,5 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite {
"--properties-file", SPARK_CONF_PATH,
"--class", "test-class",
"spark-internal", "5 7"))
}
}

View file

@ -43,6 +43,7 @@ COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY ${img_path}/spark/entrypoint.sh /opt/
COPY examples /opt/spark/examples
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data
ENV SPARK_HOME /opt/spark

View file

@ -19,10 +19,10 @@ ARG base_img
FROM $base_img
WORKDIR /
RUN mkdir ${SPARK_HOME}/R
COPY R ${SPARK_HOME}/R
RUN apk add --no-cache R R-dev
COPY R ${SPARK_HOME}/R
ENV R_HOME /usr/lib/R
WORKDIR /opt/spark/work-dir

View file

@ -19,7 +19,6 @@ ARG base_img
FROM $base_img
WORKDIR /
RUN mkdir ${SPARK_HOME}/python
COPY python/lib ${SPARK_HOME}/python/lib
# TODO: Investigate running both pip and pip3 via virtualenvs
RUN apk add --no-cache python && \
apk add --no-cache python3 && \
@ -33,6 +32,7 @@ RUN apk add --no-cache python && \
# Removed the .cache to save space
rm -r /root/.cache
COPY python/lib ${SPARK_HOME}/python/lib
ENV PYTHONPATH ${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip
WORKDIR /opt/spark/work-dir

View file

@ -50,6 +50,17 @@ private[spark] class KubernetesSuite extends SparkFunSuite
protected var containerLocalSparkDistroExamplesJar: String = _
protected var appLocator: String = _
// Default memory limit is 1024M + 384M (minimum overhead constant)
private val baseMemory = s"${1024 + 384}Mi"
protected val memOverheadConstant = 0.8
private val standardNonJVMMemory = s"${(1024 + 0.4*1024).toInt}Mi"
protected val additionalMemory = 200
// 209715200 is 200Mi
protected val additionalMemoryInBytes = 209715200
private val extraDriverTotalMemory = s"${(1024 + memOverheadConstant*1024).toInt}Mi"
private val extraExecTotalMemory =
s"${(1024 + memOverheadConstant*1024 + additionalMemory).toInt}Mi"
override def beforeAll(): Unit = {
// The scalatest-maven-plugin gives system properties that are referenced but not set null
// values. We need to remove the null-value properties before initializing the test backend.
@ -233,6 +244,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === image)
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== baseMemory)
}
@ -240,28 +253,48 @@ private[spark] class KubernetesSuite extends SparkFunSuite
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== standardNonJVMMemory)
}
protected def doBasicDriverRPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getName === driverPodName)
assert(driverPod.getSpec.getContainers.get(0).getImage === rImage)
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== standardNonJVMMemory)
}
protected def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === image)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== baseMemory)
}
protected def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== standardNonJVMMemory)
}
protected def doBasicExecutorRPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === rImage)
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== standardNonJVMMemory)
}
protected def doDriverMemoryCheck(driverPod: Pod): Unit = {
assert(driverPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== extraDriverTotalMemory)
}
protected def doExecutorMemoryCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getResources.getRequests.get("memory").getAmount
=== extraExecTotalMemory)
}
protected def checkCustomSettings(pod: Pod): Unit = {

View file

@ -23,9 +23,11 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
import PythonTestsSuite._
import KubernetesSuite.k8sTestTag
private val pySparkDockerImage =
s"${getTestImageRepo}/spark-py:${getTestImageTag}"
test("Run PySpark on simple pi.py example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.container.image", pySparkDockerImage)
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_PI,
mainClass = "",
@ -39,7 +41,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.pyspark.pythonVersion", "2")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
@ -57,7 +59,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.pyspark.pythonVersion", "3")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_FILES,
@ -72,12 +74,32 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite =>
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}
test("Run PySpark with memory customization", k8sTestTag) {
sparkAppConf
.set("spark.kubernetes.container.image", pySparkDockerImage)
.set("spark.kubernetes.pyspark.pythonVersion", "3")
.set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant")
.set("spark.executor.pyspark.memory", s"${additionalMemory}m")
runSparkApplicationAndVerifyCompletion(
appResource = PYSPARK_MEMORY_CHECK,
mainClass = "",
expectedLogOnCompletion = Seq(
"PySpark Worker Memory Check is: True"),
appArgs = Array(s"$additionalMemoryInBytes"),
driverPodChecker = doDriverMemoryCheck,
executorPodChecker = doExecutorMemoryCheck,
appLocator = appLocator,
isJVM = false,
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
}
}
private[spark] object PythonTestsSuite {
val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/"
val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py"
val TEST_LOCAL_PYSPARK: String = "local:///opt/spark/tests/"
val PYSPARK_FILES: String = TEST_LOCAL_PYSPARK + "pyfiles.py"
val PYSPARK_CONTAINER_TESTS: String = TEST_LOCAL_PYSPARK + "py_container_checks.py"
val PYSPARK_MEMORY_CHECK: String = TEST_LOCAL_PYSPARK + "worker_memory_check.py"
}

View file

@ -53,6 +53,7 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
.delete()
}
// TODO: [SPARK-25291] This test is flaky with regards to memory of executors
test("Run SparkPi with env and mount secrets.", k8sTestTag) {
createTestSecret()
sparkAppConf

View file

@ -0,0 +1,47 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
import resource
import sys
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: worker_memory_check [Memory_in_Mi]
"""
spark = SparkSession \
.builder \
.appName("PyMemoryTest") \
.getOrCreate()
sc = spark.sparkContext
if len(sys.argv) < 2:
print("Usage: worker_memory_check [Memory_in_Mi]", file=sys.stderr)
sys.exit(-1)
def f(x):
rLimit = resource.getrlimit(resource.RLIMIT_AS)
print("RLimit is " + str(rLimit))
return rLimit
resourceValue = sc.parallelize([1]).map(f).collect()[0][0]
print("Resource Value is " + str(resourceValue))
truthCheck = (resourceValue == int(sys.argv[1]))
print("PySpark Worker Memory Check is: " + str(truthCheck))
spark.stop()