From 09a8535cc407091c44e3eb9960292658a9c426c9 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Fri, 24 Sep 2021 12:46:22 +0900 Subject: [PATCH] =?UTF-8?q?Revert=20"[SPARK-35672][CORE][YARN]=20Pass=20us?= =?UTF-8?q?er=20classpath=20entries=20to=20exec=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …utors using config instead of command line" ### What changes were proposed in this pull request? This reverts commit 866df69c6290b2f8e2726f1325969d23c938c0f2. ### Why are the changes needed? After the change environment variables were not substituted in user classpath entries. Please find an example on SPARK-35672. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #34088 from gengliangwang/revertSPARK-35672. Authored-by: Gengliang Wang Signed-off-by: Hyukjin Kwon --- .../CoarseGrainedExecutorBackend.scala | 17 +++-- .../org/apache/spark/executor/Executor.scala | 2 - .../CoarseGrainedExecutorBackendSuite.scala | 17 ++--- .../spark/deploy/yarn/ApplicationMaster.scala | 9 ++- .../org/apache/spark/deploy/yarn/Client.scala | 32 +--------- .../spark/deploy/yarn/ExecutorRunnable.scala | 12 ++++ .../YarnCoarseGrainedExecutorBackend.scala | 8 +-- .../spark/deploy/yarn/ClientSuite.scala | 35 ----------- .../spark/deploy/yarn/YarnClusterSuite.scala | 63 +++---------------- 9 files changed, 53 insertions(+), 142 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 0f8e6d1390..e3be6eb201 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -52,6 +52,7 @@ private[spark] class CoarseGrainedExecutorBackend( bindAddress: String, hostname: String, cores: Int, + userClassPath: Seq[URL], env: SparkEnv, resourcesFileOpt: Option[String], resourceProfile: ResourceProfile) @@ -123,7 +124,7 @@ private[spark] class CoarseGrainedExecutorBackend( */ private def createClassLoader(): MutableURLClassLoader = { val currentLoader = Utils.getContextOrSparkClassLoader - val urls = getUserClassPath.toArray + val urls = userClassPath.toArray if (env.conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)) { new ChildFirstURLClassLoader(urls, currentLoader) } else { @@ -148,8 +149,6 @@ private[spark] class CoarseGrainedExecutorBackend( } } - def getUserClassPath: Seq[URL] = Nil - def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) @@ -166,7 +165,7 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor => logInfo("Successfully registered with driver") try { - executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false, + executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false, resources = _resources) driver.get.send(LaunchedExecutor(executorId)) } catch { @@ -395,6 +394,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { cores: Int, appId: String, workerUrl: Option[String], + userClassPath: mutable.ListBuffer[URL], resourcesFileOpt: Option[String], resourceProfileId: Int) @@ -402,7 +402,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, + arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq, env, arguments.resourcesFileOpt, resourceProfile) } run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) @@ -490,6 +490,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { var resourcesFileOpt: Option[String] = None var appId: String = null var workerUrl: Option[String] = None + val userClassPath = new mutable.ListBuffer[URL]() var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID var argv = args.toList @@ -520,6 +521,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) argv = tail + case ("--user-class-path") :: value :: tail => + userClassPath += new URL(value) + argv = tail case ("--resourceProfileId") :: value :: tail => resourceProfileId = value.toInt argv = tail @@ -546,7 +550,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl, - resourcesFileOpt, resourceProfileId) + userClassPath, resourcesFileOpt, resourceProfileId) } private def printUsageAndExit(classNameForEntry: String): Unit = { @@ -564,6 +568,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { | --resourcesFile | --app-id | --worker-url + | --user-class-path | --resourceProfileId |""".stripMargin) // scalastyle:on println diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 5fb1d7fe21..3f1023e349 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -885,8 +885,6 @@ private[spark] class Executor( val urls = userClassPath.toArray ++ currentJars.keySet.map { uri => new File(uri.split("/").last).toURI.toURL } - logInfo(s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " + - urls.mkString("'", ",", "'")) if (userClassPathFirst) { new ChildFirstURLClassLoader(urls, currentLoader) } else { diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 24182e4471..4909a586d3 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.executor import java.io.File +import java.net.URL import java.nio.ByteBuffer import java.util.Properties @@ -55,7 +56,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, env, None, resourceProfile) + 4, Seq.empty[URL], env, None, resourceProfile) withTempDir { tmpDir => val testResourceArgs: JObject = ("" -> "") val ja = JArray(List(testResourceArgs)) @@ -76,7 +77,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) withTempDir { tmpDir => val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) val ja = Extraction.decompose(Seq(ra)) @@ -110,7 +111,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, env, None, resourceProfile) + 4, Seq.empty[URL], env, None, resourceProfile) withTempDir { tmpDir => val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) @@ -137,7 +138,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) // not enough gpu's on the executor withTempDir { tmpDir => @@ -190,7 +191,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val env = createMockEnv(conf, serializer) // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, env, None, resourceProfile) + 4, Seq.empty[URL], env, None, resourceProfile) // executor resources < required withTempDir { tmpDir => @@ -221,7 +222,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) + 4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf)) val parsedResources = backend.parseOrFindResources(None) @@ -268,7 +269,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // we don't really use this, just need it to get at the parser function val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1", - 4, env, None, resourceProfile) + 4, Seq.empty[URL], env, None, resourceProfile) val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1")) val ja = Extraction.decompose(Seq(gpuArgs)) val f1 = createTempJsonFile(dir, "resources", ja) @@ -293,7 +294,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr) val env = createMockEnv(conf, serializer, Some(rpcEnv)) backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1", - "host1", "host1", 4, env, None, + "host1", "host1", 4, Seq.empty[URL], env, None, resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf)) assert(backend.taskResources.isEmpty) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9973db8e83..ebe1286235 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -17,9 +17,9 @@ package org.apache.spark.deploy.yarn -import java.io.IOException +import java.io.{File, IOException} import java.lang.reflect.{InvocationTargetException, Modifier} -import java.net.{URI, URLEncoder} +import java.net.{URI, URL, URLEncoder} import java.security.PrivilegedExceptionAction import java.util.concurrent.{TimeoutException, TimeUnit} @@ -85,7 +85,10 @@ private[spark] class ApplicationMaster( private var metricsSystem: Option[MetricsSystem] = None private val userClassLoader = { - val urls = Client.getUserClasspathUrls(sparkConf, isClusterMode) + val classpath = Client.getUserClasspath(sparkConf) + val urls = classpath.map { entry => + new URL("file:" + new File(entry.getPath()).getAbsolutePath()) + } if (isClusterMode) { if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index caccfac5ef..364bc3b4b0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -18,10 +18,10 @@ package org.apache.spark.deploy.yarn import java.io.{FileSystem => _, _} -import java.net.{InetAddress, UnknownHostException, URI, URL} +import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets -import java.nio.file.{Files, Paths} +import java.nio.file.Files import java.util.{Locale, Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -1308,7 +1308,7 @@ private[spark] class Client( } -private[spark] object Client extends Logging { +private object Client extends Logging { // Alias for the user jar val APP_JAR_NAME: String = "__app__.jar" @@ -1470,32 +1470,6 @@ private[spark] object Client extends Logging { (mainUri ++ secondaryUris).toArray } - /** - * Returns a list of local, absolute file URLs representing the user classpath. Note that this - * must be executed on the same host which will access the URLs, as it will resolve relative - * paths based on the current working directory. - * - * @param conf Spark configuration. - * @param useClusterPath Whether to use the 'cluster' path when resolving paths with the - * `local` scheme. This should be used when running on the cluster, but - * not when running on the gateway (i.e. for the driver in `client` mode). - * @return Array of local URLs ready to be passed to a [[java.net.URLClassLoader]]. - */ - def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = { - Client.getUserClasspath(conf).map { uri => - val inputPath = uri.getPath - val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) { - Client.getClusterPath(conf, inputPath) - } else { - // Any other URI schemes should have been resolved by this point - assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString), - "getUserClasspath should only return 'file' or 'local' URIs but found: " + uri) - inputPath - } - Paths.get(replacedFilePath).toAbsolutePath.toUri.toURL - } - } - private def getMainJarUri(mainJar: Option[String]): Option[URI] = { mainJar.flatMap { path => val uri = Utils.resolveURI(path) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index dbf4a0a805..717ce57b90 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.yarn +import java.io.File import java.nio.ByteBuffer import java.util.Collections @@ -189,6 +190,16 @@ private[yarn] class ExecutorRunnable( // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) + val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri => + val absPath = + if (new File(uri.getPath()).isAbsolute()) { + Client.getClusterPath(sparkConf, uri.getPath()) + } else { + Client.buildPath(Environment.PWD.$(), uri.getPath()) + } + Seq("--user-class-path", "file:" + absPath) + }.toSeq + YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ @@ -200,6 +211,7 @@ private[yarn] class ExecutorRunnable( "--cores", executorCores.toString, "--app-id", appId, "--resourceProfileId", resourceProfileId.toString) ++ + userClassPath ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala index 3dd51f174b..ce46ffa06f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala @@ -21,7 +21,6 @@ import java.net.URL import org.apache.spark.SparkEnv import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.Client import org.apache.spark.internal.Logging import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEnv @@ -39,6 +38,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend( bindAddress: String, hostname: String, cores: Int, + userClassPath: Seq[URL], env: SparkEnv, resourcesFile: Option[String], resourceProfile: ResourceProfile) @@ -49,15 +49,13 @@ private[spark] class YarnCoarseGrainedExecutorBackend( bindAddress, hostname, cores, + userClassPath, env, resourcesFile, resourceProfile) with Logging { private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf) - override def getUserClassPath: Seq[URL] = - Client.getUserClasspathUrls(env.conf, useClusterPath = true) - override def extractLogUrls: Map[String, String] = { YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None) .getOrElse(Map()) @@ -75,7 +73,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging { val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) => CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) => new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId, - arguments.bindAddress, arguments.hostname, arguments.cores, + arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq, env, arguments.resourcesFileOpt, resourceProfile) } val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args, diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 1650ea2e44..ea3acec3bb 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.deploy.yarn import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream} import java.net.URI -import java.nio.file.Paths import java.util.Properties import java.util.concurrent.ConcurrentHashMap @@ -584,40 +583,6 @@ class ClientSuite extends SparkFunSuite with Matchers { } } - test("SPARK-35672: test Client.getUserClasspathUrls") { - val gatewayRootPath = "/local/matching/replace" - val replacementRootPath = "/replaced/path" - val conf = new SparkConf() - .set(SECONDARY_JARS, Seq( - s"local:$gatewayRootPath/foo.jar", - "local:/local/not/matching/replace/foo.jar", - "file:/absolute/file/path/foo.jar", - s"$gatewayRootPath/but-not-actually-local/foo.jar", - "/absolute/path/foo.jar", - "relative/path/foo.jar" - )) - .set(GATEWAY_ROOT_PATH, gatewayRootPath) - .set(REPLACEMENT_ROOT_PATH, replacementRootPath) - - def assertUserClasspathUrls(cluster: Boolean, expectedReplacementPath: String): Unit = { - val expectedUrls = Seq( - Paths.get(APP_JAR_NAME).toAbsolutePath.toUri.toString, - s"file:$expectedReplacementPath/foo.jar", - "file:/local/not/matching/replace/foo.jar", - "file:/absolute/file/path/foo.jar", - // since this path wasn't a local URI, it should never be replaced - s"file:$gatewayRootPath/but-not-actually-local/foo.jar", - "file:/absolute/path/foo.jar", - Paths.get("relative/path/foo.jar").toAbsolutePath.toUri.toString - ).map(URI.create(_).toURL).toArray - assert(Client.getUserClasspathUrls(conf, cluster) === expectedUrls) - } - // assert that no replacement happens when cluster = false by expecting the replacement - // path to be the same as the original path - assertUserClasspathUrls(cluster = false, gatewayRootPath) - assertUserClasspathUrls(cluster = true, replacementRootPath) - } - private val matching = Seq( ("files URI match test1", "file:///file1", "file:///file2"), ("files URI match test2", "file:///c:file1", "file://c:file2"), diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 6bd9c4d6de..26ff3bf297 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -18,9 +18,7 @@ package org.apache.spark.deploy.yarn import java.io.File -import java.net.URL import java.nio.charset.StandardCharsets -import java.nio.file.Paths import java.util.{HashMap => JHashMap} import scala.collection.mutable @@ -152,48 +150,12 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } - test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local'") { - testWithAddJar(clientMode = true, "local") + test("run Spark in yarn-client mode with additional jar") { + testWithAddJar(true) } - test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local'") { - testWithAddJar(clientMode = false, "local") - } - - test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' " + - "and gateway-replacement path") { - // Use the original jar URL, but set up the gateway/replacement configs such that if - // replacement occurs, things will break. This ensures the replacement doesn't apply to the - // driver in 'client' mode. Executors will fail in this case because they still apply the - // replacement in client mode. - testWithAddJar(clientMode = true, "local", Some(jarUrl => { - (jarUrl.getPath, Map( - GATEWAY_ROOT_PATH.key -> Paths.get(jarUrl.toURI).getParent.toString, - REPLACEMENT_ROOT_PATH.key -> "/nonexistent/path/" - )) - }), expectExecutorFailure = true) - } - - test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local' " + - "and gateway-replacement path") { - // Put a prefix in front of the original jar URL which causes it to be an invalid path. - // Set up the gateway/replacement configs such that if replacement occurs, it is a valid - // path again (by removing the prefix). This ensures the replacement is applied. - val gatewayPath = "/replaceme/nonexistent/" - testWithAddJar(clientMode = false, "local", Some(jarUrl => { - (gatewayPath + jarUrl.getPath, Map( - GATEWAY_ROOT_PATH.key -> gatewayPath, - REPLACEMENT_ROOT_PATH.key -> "" - )) - })) - } - - test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file'") { - testWithAddJar(clientMode = true, "file") - } - - test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'file'") { - testWithAddJar(clientMode = false, "file") + test("run Spark in yarn-cluster mode with additional jar") { + testWithAddJar(false) } test("run Spark in yarn-cluster mode unsuccessfully") { @@ -324,23 +286,16 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } - private def testWithAddJar( - clientMode: Boolean, - jarUriScheme: String, - jarUrlToPathAndConfs: Option[URL => (String, Map[String, String])] = None, - expectExecutorFailure: Boolean = false): Unit = { + private def testWithAddJar(clientMode: Boolean): Unit = { val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) - val (jarPath, extraConf) = jarUrlToPathAndConfs - .map(_.apply(originalJar)) - .getOrElse((originalJar.getPath, Map[String, String]())) val driverResult = File.createTempFile("driver", null, tempDir) val executorResult = File.createTempFile("executor", null, tempDir) val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass), - appArgs = Seq(driverResult.getAbsolutePath, executorResult.getAbsolutePath), - extraJars = Seq(s"$jarUriScheme:$jarPath"), - extraConf = extraConf) + appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()), + extraClassPath = Seq(originalJar.getPath()), + extraJars = Seq("local:" + originalJar.getPath())) checkResult(finalState, driverResult, "ORIGINAL") - checkResult(finalState, executorResult, if (expectExecutorFailure) "failure" else "ORIGINAL") + checkResult(finalState, executorResult, "ORIGINAL") } private def testPySpark(