[SPARK-24736][K8S] Let spark-submit handle dependency resolution.

Before this change, there was some code in the k8s backend to deal
with how to resolve dependencies and make them available to the
Spark application. It turns out that none of that code is necessary,
since spark-submit already handles all that for applications started
in client mode - like the k8s driver that is run inside a Spark-created
pod.

For that reason, specifically for pyspark, there's no need for the
k8s backend to deal with PYTHONPATH; or, in general, to change the URIs
provided by the user at all. spark-submit takes care of that.

For testing, I created a pyspark script that depends on another module
that is shipped with --py-files. Then I used:

- --py-files http://.../dep.py http://.../test.py
- --py-files http://.../dep.zip http://.../test.py
- --py-files local:/.../dep.py local:/.../test.py
- --py-files local:/.../dep.zip local:/.../test.py

Without this change, all of the above commands fail. With the change, the
driver is able to see the dependencies in all the above cases; but executors
don't see the dependencies in the last two. That's a bug in shared Spark code
that deals with local: dependencies in pyspark (SPARK-26934).

I also tested a Scala app using the main jar from an http server.

Closes #23793 from vanzin/SPARK-24736.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Marcelo Vanzin 2019-02-27 09:49:31 -08:00
parent a67e8426e3
commit a6ddc9d083
13 changed files with 38 additions and 186 deletions

View file

@ -520,6 +520,7 @@ private[spark] class SparkSubmit extends Logging {
confKey = PRINCIPAL.key),
OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = KEYTAB.key),
OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),
// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
@ -694,9 +695,6 @@ private[spark] class SparkSubmit extends Logging {
if (args.isPython) {
childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
if (args.pyFiles != null) {
childArgs ++= Array("--other-py-files", args.pyFiles)
}
} else if (args.isR) {
childArgs ++= Array("--primary-r-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
@ -744,7 +742,7 @@ private[spark] class SparkSubmit extends Logging {
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES)
val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(","))
val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
val formattedPyFiles = if (deployMode != CLUSTER) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
// Ignoring formatting python path in yarn and mesos cluster mode, these two modes

View file

@ -69,7 +69,6 @@ private[spark] object Constants {
val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
// BINDINGS
val ENV_PYSPARK_FILES = "PYSPARK_FILES"
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
// Pod spec templates

View file

@ -73,8 +73,7 @@ private[spark] class KubernetesDriverConf(
val appId: String,
val mainAppResource: MainAppResource,
val mainClass: String,
val appArgs: Array[String],
val pyFiles: Seq[String])
val appArgs: Array[String])
extends KubernetesConf(sparkConf) {
override val resourceNamePrefix: String = {
@ -175,14 +174,11 @@ private[spark] object KubernetesConf {
appId: String,
mainAppResource: MainAppResource,
mainClass: String,
appArgs: Array[String],
maybePyFiles: Option[String]): KubernetesDriverConf = {
appArgs: Array[String]): KubernetesDriverConf = {
// Parse executor volumes in order to verify configuration before the driver pod is created.
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)
new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, mainClass, appArgs,
pyFiles)
new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, mainClass, appArgs)
}
def createExecutorConf(

View file

@ -66,26 +66,6 @@ private[spark] object KubernetesUtils extends Logging {
opt2.foreach { _ => require(opt1.isEmpty, errMessage) }
}
/**
* For the given collection of file URIs, resolves them as follows:
* - File URIs with scheme local:// resolve to just the path of the URI.
* - Otherwise, the URIs are returned as-is.
*/
def resolveFileUrisAndPath(fileUris: Iterable[String]): Iterable[String] = {
fileUris.map { uri =>
resolveFileUri(uri)
}
}
def resolveFileUri(uri: String): String = {
val fileUri = Utils.resolveURI(uri)
val fileScheme = Option(fileUri.getScheme).getOrElse("file")
fileScheme match {
case "local" => fileUri.getPath
case _ => uri
}
}
def loadPodFromTemplate(
kubernetesClient: KubernetesClient,
templateFile: File,

View file

@ -153,15 +153,6 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.resourceNamePrefix,
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
Seq(JARS, FILES).foreach { key =>
val value = conf.get(key)
val resolved = KubernetesUtils.resolveFileUrisAndPath(value)
if (resolved.nonEmpty) {
additionalProps.put(key.key, resolved.mkString(","))
}
}
additionalProps.toMap
}
}

View file

@ -37,8 +37,8 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
override def configurePod(pod: SparkPod): SparkPod = {
conf.mainAppResource match {
case JavaMainAppResource(_) =>
configureForJava(pod)
case JavaMainAppResource(res) =>
configureForJava(pod, res.getOrElse(SparkLauncher.NO_RESOURCE))
case PythonMainAppResource(res) =>
configureForPython(pod, res)
@ -49,45 +49,33 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
}
override def getAdditionalPodSystemProperties(): Map[String, String] = {
conf.mainAppResource match {
case JavaMainAppResource(res) =>
res.map(additionalJavaProperties).getOrElse(Map.empty)
val appType = conf.mainAppResource match {
case JavaMainAppResource(_) =>
APP_RESOURCE_TYPE_JAVA
case PythonMainAppResource(res) =>
additionalPythonProperties(res)
case PythonMainAppResource(_) =>
APP_RESOURCE_TYPE_PYTHON
case RMainAppResource(res) =>
additionalRProperties(res)
case RMainAppResource(_) =>
APP_RESOURCE_TYPE_R
}
Map(APP_RESOURCE_TYPE.key -> appType)
}
private def configureForJava(pod: SparkPod): SparkPod = {
// The user application jar is merged into the spark.jars list and managed through that
// property, so use a "blank" resource for the Java driver.
val driverContainer = baseDriverContainer(pod, SparkLauncher.NO_RESOURCE).build()
private def configureForJava(pod: SparkPod, res: String): SparkPod = {
val driverContainer = baseDriverContainer(pod, res).build()
SparkPod(pod.pod, driverContainer)
}
private def configureForPython(pod: SparkPod, res: String): SparkPod = {
val maybePythonFiles = if (conf.pyFiles.nonEmpty) {
// Delineation by ":" is to append the PySpark Files to the PYTHONPATH
// of the respective PySpark pod
val resolved = KubernetesUtils.resolveFileUrisAndPath(conf.pyFiles)
Some(new EnvVarBuilder()
.withName(ENV_PYSPARK_FILES)
.withValue(resolved.mkString(":"))
.build())
} else {
None
}
val pythonEnvs =
Seq(new EnvVarBuilder()
.withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION)
.withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION))
.build()) ++
maybePythonFiles
.build())
val pythonContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res))
val pythonContainer = baseDriverContainer(pod, res)
.addAllToEnv(pythonEnvs.asJava)
.build()
@ -95,7 +83,7 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
}
private def configureForR(pod: SparkPod, res: String): SparkPod = {
val rContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res)).build()
val rContainer = baseDriverContainer(pod, res).build()
SparkPod(pod.pod, rContainer)
}
@ -107,27 +95,4 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
.addToArgs(resource)
.addToArgs(conf.appArgs: _*)
}
private def additionalJavaProperties(resource: String): Map[String, String] = {
resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList(JARS, Seq(resource))
}
private def additionalPythonProperties(resource: String): Map[String, String] = {
resourceType(APP_RESOURCE_TYPE_PYTHON) ++
mergeFileList(FILES, Seq(resource) ++ conf.pyFiles)
}
private def additionalRProperties(resource: String): Map[String, String] = {
resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList(FILES, Seq(resource))
}
private def mergeFileList(key: ConfigEntry[Seq[String]], filesToAdd: Seq[String])
: Map[String, String] = {
val existing = conf.get(key)
Map(key.key -> (existing ++ filesToAdd).distinct.mkString(","))
}
private def resourceType(resType: String): Map[String, String] = {
Map(APP_RESOURCE_TYPE.key -> resType)
}
}

View file

@ -39,13 +39,11 @@ import org.apache.spark.util.Utils
* @param mainAppResource the main application resource if any
* @param mainClass the main class of the application to run
* @param driverArgs arguments to the driver
* @param maybePyFiles additional Python files via --py-files
*/
private[spark] case class ClientArguments(
mainAppResource: MainAppResource,
mainClass: String,
driverArgs: Array[String],
maybePyFiles: Option[String])
driverArgs: Array[String])
private[spark] object ClientArguments {
@ -53,7 +51,6 @@ private[spark] object ClientArguments {
var mainAppResource: MainAppResource = JavaMainAppResource(None)
var mainClass: Option[String] = None
val driverArgs = mutable.ArrayBuffer.empty[String]
var maybePyFiles : Option[String] = None
args.sliding(2, 2).toList.foreach {
case Array("--primary-java-resource", primaryJavaResource: String) =>
@ -62,8 +59,6 @@ private[spark] object ClientArguments {
mainAppResource = PythonMainAppResource(primaryPythonResource)
case Array("--primary-r-file", primaryRFile: String) =>
mainAppResource = RMainAppResource(primaryRFile)
case Array("--other-py-files", pyFiles: String) =>
maybePyFiles = Some(pyFiles)
case Array("--main-class", clazz: String) =>
mainClass = Some(clazz)
case Array("--arg", arg: String) =>
@ -78,8 +73,7 @@ private[spark] object ClientArguments {
ClientArguments(
mainAppResource,
mainClass.get,
driverArgs.toArray,
maybePyFiles)
driverArgs.toArray)
}
}
@ -214,8 +208,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication {
kubernetesAppId,
clientArguments.mainAppResource,
clientArguments.mainClass,
clientArguments.driverArgs,
clientArguments.maybePyFiles)
clientArguments.driverArgs)
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s://" prefix here.
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))

View file

@ -69,8 +69,7 @@ class KubernetesConfSuite extends SparkFunSuite {
KubernetesTestConf.APP_ID,
JavaMainAppResource(None),
KubernetesTestConf.MAIN_CLASS,
APP_ARGS,
None)
APP_ARGS)
assert(conf.labels === Map(
SPARK_APP_ID_LABEL -> KubernetesTestConf.APP_ID,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++

View file

@ -44,7 +44,6 @@ object KubernetesTestConf {
mainAppResource: MainAppResource = JavaMainAppResource(None),
mainClass: String = MAIN_CLASS,
appArgs: Array[String] = Array.empty,
pyFiles: Seq[String] = Nil,
resourceNamePrefix: Option[String] = None,
labels: Map[String, String] = Map.empty,
environment: Map[String, String] = Map.empty,
@ -64,7 +63,7 @@ object KubernetesTestConf {
setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX, secretEnvNamesToKeyRefs)
setVolumeSpecs(conf, KUBERNETES_DRIVER_VOLUMES_PREFIX, volumes)
new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, pyFiles)
new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs)
}
// scalastyle:on argcount

View file

@ -137,29 +137,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
assert(configuredPythonPod.container.getImage === "spark-driver-py:latest")
}
test("Additional system properties resolve jars and set cluster-mode confs.") {
val allJars = Seq("local:///opt/spark/jar1.jar", "hdfs:///opt/spark/jar2.jar")
val allFiles = Seq("https://localhost:9000/file1.txt", "local:///opt/spark/file2.txt")
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.setJars(allJars)
.set(FILES, allFiles)
.set(CONTAINER_IMAGE, "spark-driver:latest")
val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
val step = new BasicDriverFeatureStep(kubernetesConf)
val additionalProperties = step.getAdditionalPodSystemProperties()
val expectedSparkConf = Map(
KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
"spark.app.id" -> KubernetesTestConf.APP_ID,
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> kubernetesConf.resourceNamePrefix,
"spark.kubernetes.submitInDriver" -> "true",
JARS.key -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar",
FILES.key -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt",
MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString)
assert(additionalProperties === expectedSparkConf)
}
// Memory overhead tests. Tuples are:
// test name, main resource, overhead factor, expected factor
Seq(

View file

@ -23,12 +23,13 @@ import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
class DriverCommandFeatureStepSuite extends SparkFunSuite {
test("java resource") {
val mainResource = "local:///main.jar"
val mainResource = "local:/main.jar"
val spec = applyFeatureStep(
JavaMainAppResource(Some(mainResource)),
appArgs = Array("5", "7"))
@ -36,72 +37,33 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
"driver",
"--properties-file", SPARK_CONF_PATH,
"--class", KubernetesTestConf.MAIN_CLASS,
"spark-internal", "5", "7"))
val jars = Utils.stringToSeq(spec.systemProperties("spark.jars"))
assert(jars.toSet === Set(mainResource))
mainResource, "5", "7"))
}
test("python resource with no extra files") {
val mainResource = "local:///main.py"
test("python resource") {
val mainResource = "local:/main.py"
val sparkConf = new SparkConf(false)
.set(PYSPARK_MAJOR_PYTHON_VERSION, "3")
val spec = applyFeatureStep(
PythonMainAppResource(mainResource),
conf = sparkConf)
assert(spec.pod.container.getArgs.asScala === List(
"driver",
"--properties-file", SPARK_CONF_PATH,
"--class", KubernetesTestConf.MAIN_CLASS,
"/main.py"))
val envs = spec.pod.container.getEnv.asScala
.map { env => (env.getName, env.getValue) }
.toMap
assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3")
val files = Utils.stringToSeq(spec.systemProperties("spark.files"))
assert(files.toSet === Set(mainResource))
}
test("python resource with extra files") {
val expectedMainResource = "/main.py"
val expectedPySparkFiles = "/example2.py:/example3.py"
val filesInConf = Set("local:///example.py")
val mainResource = s"local://$expectedMainResource"
val pyFiles = Seq("local:///example2.py", "local:///example3.py")
val sparkConf = new SparkConf(false)
.set("spark.files", filesInConf.mkString(","))
.set(PYSPARK_MAJOR_PYTHON_VERSION, "2")
val spec = applyFeatureStep(
PythonMainAppResource(mainResource),
conf = sparkConf,
appArgs = Array("5", "7", "9"),
pyFiles = pyFiles)
appArgs = Array("5", "7", "9"))
assert(spec.pod.container.getArgs.asScala === List(
"driver",
"--properties-file", SPARK_CONF_PATH,
"--class", KubernetesTestConf.MAIN_CLASS,
"/main.py", "5", "7", "9"))
mainResource, "5", "7", "9"))
val envs = spec.pod.container.getEnv.asScala
.map { env => (env.getName, env.getValue) }
.toMap
val expected = Map(
ENV_PYSPARK_FILES -> expectedPySparkFiles,
ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "2")
val expected = Map(ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "2")
assert(envs === expected)
val files = Utils.stringToSeq(spec.systemProperties("spark.files"))
assert(files.toSet === pyFiles.toSet ++ filesInConf ++ Set(mainResource))
}
test("R resource") {
val expectedMainResource = "/main.R"
val mainResource = s"local://$expectedMainResource"
val mainResource = "local:/main.R"
val spec = applyFeatureStep(
RMainAppResource(mainResource),
@ -111,19 +73,17 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
"driver",
"--properties-file", SPARK_CONF_PATH,
"--class", KubernetesTestConf.MAIN_CLASS,
"/main.R", "5", "7", "9"))
mainResource, "5", "7", "9"))
}
private def applyFeatureStep(
resource: MainAppResource,
conf: SparkConf = new SparkConf(false),
appArgs: Array[String] = Array(),
pyFiles: Seq[String] = Nil): KubernetesDriverSpec = {
appArgs: Array[String] = Array()): KubernetesDriverSpec = {
val kubernetesConf = KubernetesTestConf.createDriverConf(
sparkConf = conf,
mainAppResource = resource,
appArgs = appArgs,
pyFiles = pyFiles)
appArgs = appArgs)
val step = new DriverCommandFeatureStep(kubernetesConf)
val pod = step.configurePod(SparkPod.initialPod())
val props = step.getAdditionalPodSystemProperties()

View file

@ -39,7 +39,6 @@ RUN apk add --no-cache python && \
COPY python/pyspark ${SPARK_HOME}/python/pyspark
COPY python/lib ${SPARK_HOME}/python/lib
ENV PYTHONPATH ${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-*.zip
WORKDIR /opt/spark/work-dir
ENTRYPOINT [ "/opt/entrypoint.sh" ]

View file

@ -44,10 +44,6 @@ if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
fi
if [ -n "$PYSPARK_FILES" ]; then
PYTHONPATH="$PYTHONPATH:$PYSPARK_FILES"
fi
if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "2" ]; then
pyv="$(python -V 2>&1)"
export PYTHON_VERSION="${pyv:7}"