Revert "[SPARK-35672][CORE][YARN] Pass user classpath entries to executors using config instead of command line"
### What changes were proposed in this pull request?
This reverts commit 866df69c62
.
### Why are the changes needed?
After the change environment variables were not substituted in user classpath entries. Please find an example on SPARK-35672.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes #34082 from peter-toth/SPARK-35672-revert.
Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
parent
ef7441bb42
commit
c2c4a48c78
|
@ -52,6 +52,7 @@ private[spark] class CoarseGrainedExecutorBackend(
|
|||
bindAddress: String,
|
||||
hostname: String,
|
||||
cores: Int,
|
||||
userClassPath: Seq[URL],
|
||||
env: SparkEnv,
|
||||
resourcesFileOpt: Option[String],
|
||||
resourceProfile: ResourceProfile)
|
||||
|
@ -123,7 +124,7 @@ private[spark] class CoarseGrainedExecutorBackend(
|
|||
*/
|
||||
private def createClassLoader(): MutableURLClassLoader = {
|
||||
val currentLoader = Utils.getContextOrSparkClassLoader
|
||||
val urls = getUserClassPath.toArray
|
||||
val urls = userClassPath.toArray
|
||||
if (env.conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)) {
|
||||
new ChildFirstURLClassLoader(urls, currentLoader)
|
||||
} else {
|
||||
|
@ -148,8 +149,6 @@ private[spark] class CoarseGrainedExecutorBackend(
|
|||
}
|
||||
}
|
||||
|
||||
def getUserClassPath: Seq[URL] = Nil
|
||||
|
||||
def extractLogUrls: Map[String, String] = {
|
||||
val prefix = "SPARK_LOG_URL_"
|
||||
sys.env.filterKeys(_.startsWith(prefix))
|
||||
|
@ -166,7 +165,7 @@ private[spark] class CoarseGrainedExecutorBackend(
|
|||
case RegisteredExecutor =>
|
||||
logInfo("Successfully registered with driver")
|
||||
try {
|
||||
executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false,
|
||||
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
|
||||
resources = _resources)
|
||||
driver.get.send(LaunchedExecutor(executorId))
|
||||
} catch {
|
||||
|
@ -399,6 +398,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
|
|||
cores: Int,
|
||||
appId: String,
|
||||
workerUrl: Option[String],
|
||||
userClassPath: mutable.ListBuffer[URL],
|
||||
resourcesFileOpt: Option[String],
|
||||
resourceProfileId: Int)
|
||||
|
||||
|
@ -406,7 +406,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
|
|||
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
|
||||
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
|
||||
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
|
||||
arguments.bindAddress, arguments.hostname, arguments.cores,
|
||||
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
|
||||
env, arguments.resourcesFileOpt, resourceProfile)
|
||||
}
|
||||
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
|
||||
|
@ -494,6 +494,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
|
|||
var resourcesFileOpt: Option[String] = None
|
||||
var appId: String = null
|
||||
var workerUrl: Option[String] = None
|
||||
val userClassPath = new mutable.ListBuffer[URL]()
|
||||
var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID
|
||||
|
||||
var argv = args.toList
|
||||
|
@ -524,6 +525,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
|
|||
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
|
||||
workerUrl = Some(value)
|
||||
argv = tail
|
||||
case ("--user-class-path") :: value :: tail =>
|
||||
userClassPath += new URL(value)
|
||||
argv = tail
|
||||
case ("--resourceProfileId") :: value :: tail =>
|
||||
resourceProfileId = value.toInt
|
||||
argv = tail
|
||||
|
@ -550,7 +554,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
|
|||
}
|
||||
|
||||
Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
|
||||
resourcesFileOpt, resourceProfileId)
|
||||
userClassPath, resourcesFileOpt, resourceProfileId)
|
||||
}
|
||||
|
||||
private def printUsageAndExit(classNameForEntry: String): Unit = {
|
||||
|
@ -568,6 +572,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
|
|||
| --resourcesFile <fileWithJSONResourceInformation>
|
||||
| --app-id <appid>
|
||||
| --worker-url <workerUrl>
|
||||
| --user-class-path <url>
|
||||
| --resourceProfileId <id>
|
||||
|""".stripMargin)
|
||||
// scalastyle:on println
|
||||
|
|
|
@ -886,8 +886,6 @@ private[spark] class Executor(
|
|||
val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
|
||||
new File(uri.split("/").last).toURI.toURL
|
||||
}
|
||||
logInfo(s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " +
|
||||
urls.mkString("'", ",", "'"))
|
||||
if (userClassPathFirst) {
|
||||
new ChildFirstURLClassLoader(urls, currentLoader)
|
||||
} else {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.executor
|
||||
|
||||
import java.io.File
|
||||
import java.net.URL
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.Properties
|
||||
|
||||
|
@ -55,7 +56,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
|
|||
|
||||
// we don't really use this, just need it to get at the parser function
|
||||
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
|
||||
4, env, None, resourceProfile)
|
||||
4, Seq.empty[URL], env, None, resourceProfile)
|
||||
withTempDir { tmpDir =>
|
||||
val testResourceArgs: JObject = ("" -> "")
|
||||
val ja = JArray(List(testResourceArgs))
|
||||
|
@ -76,7 +77,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
|
|||
val env = createMockEnv(conf, serializer)
|
||||
// we don't really use this, just need it to get at the parser function
|
||||
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
|
||||
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
|
||||
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
|
||||
withTempDir { tmpDir =>
|
||||
val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
|
||||
val ja = Extraction.decompose(Seq(ra))
|
||||
|
@ -110,7 +111,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
|
|||
val env = createMockEnv(conf, serializer)
|
||||
// we don't really use this, just need it to get at the parser function
|
||||
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
|
||||
4, env, None, resourceProfile)
|
||||
4, Seq.empty[URL], env, None, resourceProfile)
|
||||
|
||||
withTempDir { tmpDir =>
|
||||
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
|
||||
|
@ -137,7 +138,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
|
|||
val env = createMockEnv(conf, serializer)
|
||||
// we don't really use this, just need it to get at the parser function
|
||||
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
|
||||
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
|
||||
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
|
||||
|
||||
// not enough gpu's on the executor
|
||||
withTempDir { tmpDir =>
|
||||
|
@ -190,7 +191,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
|
|||
val env = createMockEnv(conf, serializer)
|
||||
// we don't really use this, just need it to get at the parser function
|
||||
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
|
||||
4, env, None, resourceProfile)
|
||||
4, Seq.empty[URL], env, None, resourceProfile)
|
||||
|
||||
// executor resources < required
|
||||
withTempDir { tmpDir =>
|
||||
|
@ -221,7 +222,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
|
|||
|
||||
// we don't really use this, just need it to get at the parser function
|
||||
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
|
||||
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
|
||||
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
|
||||
|
||||
val parsedResources = backend.parseOrFindResources(None)
|
||||
|
||||
|
@ -268,7 +269,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
|
|||
|
||||
// we don't really use this, just need it to get at the parser function
|
||||
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
|
||||
4, env, None, resourceProfile)
|
||||
4, Seq.empty[URL], env, None, resourceProfile)
|
||||
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
|
||||
val ja = Extraction.decompose(Seq(gpuArgs))
|
||||
val f1 = createTempJsonFile(dir, "resources", ja)
|
||||
|
@ -293,7 +294,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
|
|||
val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
|
||||
val env = createMockEnv(conf, serializer, Some(rpcEnv))
|
||||
backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
|
||||
"host1", "host1", 4, env, None,
|
||||
"host1", "host1", 4, Seq.empty[URL], env, None,
|
||||
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
|
||||
assert(backend.taskResources.isEmpty)
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ private[spark] object KubernetesExecutorBackend extends Logging {
|
|||
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) =>
|
||||
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile, execId) =>
|
||||
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, execId,
|
||||
arguments.bindAddress, arguments.hostname, arguments.cores,
|
||||
arguments.bindAddress, arguments.hostname, arguments.cores, Seq.empty,
|
||||
env, arguments.resourcesFileOpt, resourceProfile)
|
||||
}
|
||||
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
|
||||
|
|
|
@ -17,9 +17,9 @@
|
|||
|
||||
package org.apache.spark.deploy.yarn
|
||||
|
||||
import java.io.IOException
|
||||
import java.io.{File, IOException}
|
||||
import java.lang.reflect.{InvocationTargetException, Modifier}
|
||||
import java.net.{URI, URLEncoder}
|
||||
import java.net.{URI, URL, URLEncoder}
|
||||
import java.security.PrivilegedExceptionAction
|
||||
import java.util.concurrent.{TimeoutException, TimeUnit}
|
||||
|
||||
|
@ -85,7 +85,10 @@ private[spark] class ApplicationMaster(
|
|||
private var metricsSystem: Option[MetricsSystem] = None
|
||||
|
||||
private val userClassLoader = {
|
||||
val urls = Client.getUserClasspathUrls(sparkConf, isClusterMode)
|
||||
val classpath = Client.getUserClasspath(sparkConf)
|
||||
val urls = classpath.map { entry =>
|
||||
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
|
||||
}
|
||||
|
||||
if (isClusterMode) {
|
||||
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {
|
||||
|
|
|
@ -18,10 +18,10 @@
|
|||
package org.apache.spark.deploy.yarn
|
||||
|
||||
import java.io.{FileSystem => _, _}
|
||||
import java.net.{InetAddress, UnknownHostException, URI, URL}
|
||||
import java.net.{InetAddress, UnknownHostException, URI}
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.{Files, Paths}
|
||||
import java.nio.file.Files
|
||||
import java.util.{Locale, Properties, UUID}
|
||||
import java.util.zip.{ZipEntry, ZipOutputStream}
|
||||
|
||||
|
@ -1308,7 +1308,7 @@ private[spark] class Client(
|
|||
|
||||
}
|
||||
|
||||
private[spark] object Client extends Logging {
|
||||
private object Client extends Logging {
|
||||
|
||||
// Alias for the user jar
|
||||
val APP_JAR_NAME: String = "__app__.jar"
|
||||
|
@ -1470,32 +1470,6 @@ private[spark] object Client extends Logging {
|
|||
(mainUri ++ secondaryUris).toArray
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a list of local, absolute file URLs representing the user classpath. Note that this
|
||||
* must be executed on the same host which will access the URLs, as it will resolve relative
|
||||
* paths based on the current working directory.
|
||||
*
|
||||
* @param conf Spark configuration.
|
||||
* @param useClusterPath Whether to use the 'cluster' path when resolving paths with the
|
||||
* `local` scheme. This should be used when running on the cluster, but
|
||||
* not when running on the gateway (i.e. for the driver in `client` mode).
|
||||
* @return Array of local URLs ready to be passed to a [[java.net.URLClassLoader]].
|
||||
*/
|
||||
def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = {
|
||||
Client.getUserClasspath(conf).map { uri =>
|
||||
val inputPath = uri.getPath
|
||||
val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) {
|
||||
Client.getClusterPath(conf, inputPath)
|
||||
} else {
|
||||
// Any other URI schemes should have been resolved by this point
|
||||
assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString),
|
||||
"getUserClasspath should only return 'file' or 'local' URIs but found: " + uri)
|
||||
inputPath
|
||||
}
|
||||
Paths.get(replacedFilePath).toAbsolutePath.toUri.toURL
|
||||
}
|
||||
}
|
||||
|
||||
private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
|
||||
mainJar.flatMap { path =>
|
||||
val uri = Utils.resolveURI(path)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.spark.deploy.yarn
|
||||
|
||||
import java.io.File
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.Collections
|
||||
|
||||
|
@ -189,6 +190,16 @@ private[yarn] class ExecutorRunnable(
|
|||
// For log4j configuration to reference
|
||||
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
|
||||
|
||||
val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
|
||||
val absPath =
|
||||
if (new File(uri.getPath()).isAbsolute()) {
|
||||
Client.getClusterPath(sparkConf, uri.getPath())
|
||||
} else {
|
||||
Client.buildPath(Environment.PWD.$(), uri.getPath())
|
||||
}
|
||||
Seq("--user-class-path", "file:" + absPath)
|
||||
}.toSeq
|
||||
|
||||
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
|
||||
val commands = prefixEnv ++
|
||||
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
|
||||
|
@ -200,6 +211,7 @@ private[yarn] class ExecutorRunnable(
|
|||
"--cores", executorCores.toString,
|
||||
"--app-id", appId,
|
||||
"--resourceProfileId", resourceProfileId.toString) ++
|
||||
userClassPath ++
|
||||
Seq(
|
||||
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
|
||||
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.net.URL
|
|||
|
||||
import org.apache.spark.SparkEnv
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.deploy.yarn.Client
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.resource.ResourceProfile
|
||||
import org.apache.spark.rpc.RpcEnv
|
||||
|
@ -39,6 +38,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
|
|||
bindAddress: String,
|
||||
hostname: String,
|
||||
cores: Int,
|
||||
userClassPath: Seq[URL],
|
||||
env: SparkEnv,
|
||||
resourcesFile: Option[String],
|
||||
resourceProfile: ResourceProfile)
|
||||
|
@ -49,15 +49,13 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
|
|||
bindAddress,
|
||||
hostname,
|
||||
cores,
|
||||
userClassPath,
|
||||
env,
|
||||
resourcesFile,
|
||||
resourceProfile) with Logging {
|
||||
|
||||
private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf)
|
||||
|
||||
override def getUserClassPath: Seq[URL] =
|
||||
Client.getUserClasspathUrls(env.conf, useClusterPath = true)
|
||||
|
||||
override def extractLogUrls: Map[String, String] = {
|
||||
YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None)
|
||||
.getOrElse(Map())
|
||||
|
@ -75,7 +73,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
|
|||
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
|
||||
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
|
||||
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
|
||||
arguments.bindAddress, arguments.hostname, arguments.cores,
|
||||
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
|
||||
env, arguments.resourcesFileOpt, resourceProfile)
|
||||
}
|
||||
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
|
||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.spark.deploy.yarn
|
|||
|
||||
import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream}
|
||||
import java.net.URI
|
||||
import java.nio.file.Paths
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
|
@ -584,40 +583,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-35672: test Client.getUserClasspathUrls") {
|
||||
val gatewayRootPath = "/local/matching/replace"
|
||||
val replacementRootPath = "/replaced/path"
|
||||
val conf = new SparkConf()
|
||||
.set(SECONDARY_JARS, Seq(
|
||||
s"local:$gatewayRootPath/foo.jar",
|
||||
"local:/local/not/matching/replace/foo.jar",
|
||||
"file:/absolute/file/path/foo.jar",
|
||||
s"$gatewayRootPath/but-not-actually-local/foo.jar",
|
||||
"/absolute/path/foo.jar",
|
||||
"relative/path/foo.jar"
|
||||
))
|
||||
.set(GATEWAY_ROOT_PATH, gatewayRootPath)
|
||||
.set(REPLACEMENT_ROOT_PATH, replacementRootPath)
|
||||
|
||||
def assertUserClasspathUrls(cluster: Boolean, expectedReplacementPath: String): Unit = {
|
||||
val expectedUrls = Seq(
|
||||
Paths.get(APP_JAR_NAME).toAbsolutePath.toUri.toString,
|
||||
s"file:$expectedReplacementPath/foo.jar",
|
||||
"file:/local/not/matching/replace/foo.jar",
|
||||
"file:/absolute/file/path/foo.jar",
|
||||
// since this path wasn't a local URI, it should never be replaced
|
||||
s"file:$gatewayRootPath/but-not-actually-local/foo.jar",
|
||||
"file:/absolute/path/foo.jar",
|
||||
Paths.get("relative/path/foo.jar").toAbsolutePath.toUri.toString
|
||||
).map(URI.create(_).toURL).toArray
|
||||
assert(Client.getUserClasspathUrls(conf, cluster) === expectedUrls)
|
||||
}
|
||||
// assert that no replacement happens when cluster = false by expecting the replacement
|
||||
// path to be the same as the original path
|
||||
assertUserClasspathUrls(cluster = false, gatewayRootPath)
|
||||
assertUserClasspathUrls(cluster = true, replacementRootPath)
|
||||
}
|
||||
|
||||
private val matching = Seq(
|
||||
("files URI match test1", "file:///file1", "file:///file2"),
|
||||
("files URI match test2", "file:///c:file1", "file://c:file2"),
|
||||
|
|
|
@ -18,9 +18,7 @@
|
|||
package org.apache.spark.deploy.yarn
|
||||
|
||||
import java.io.File
|
||||
import java.net.URL
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.nio.file.Paths
|
||||
import java.util.{HashMap => JHashMap}
|
||||
|
||||
import scala.collection.mutable
|
||||
|
@ -152,48 +150,12 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
|
|||
checkResult(finalState, result)
|
||||
}
|
||||
|
||||
test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local'") {
|
||||
testWithAddJar(clientMode = true, "local")
|
||||
test("run Spark in yarn-client mode with additional jar") {
|
||||
testWithAddJar(true)
|
||||
}
|
||||
|
||||
test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local'") {
|
||||
testWithAddJar(clientMode = false, "local")
|
||||
}
|
||||
|
||||
test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' " +
|
||||
"and gateway-replacement path") {
|
||||
// Use the original jar URL, but set up the gateway/replacement configs such that if
|
||||
// replacement occurs, things will break. This ensures the replacement doesn't apply to the
|
||||
// driver in 'client' mode. Executors will fail in this case because they still apply the
|
||||
// replacement in client mode.
|
||||
testWithAddJar(clientMode = true, "local", Some(jarUrl => {
|
||||
(jarUrl.getPath, Map(
|
||||
GATEWAY_ROOT_PATH.key -> Paths.get(jarUrl.toURI).getParent.toString,
|
||||
REPLACEMENT_ROOT_PATH.key -> "/nonexistent/path/"
|
||||
))
|
||||
}), expectExecutorFailure = true)
|
||||
}
|
||||
|
||||
test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local' " +
|
||||
"and gateway-replacement path") {
|
||||
// Put a prefix in front of the original jar URL which causes it to be an invalid path.
|
||||
// Set up the gateway/replacement configs such that if replacement occurs, it is a valid
|
||||
// path again (by removing the prefix). This ensures the replacement is applied.
|
||||
val gatewayPath = "/replaceme/nonexistent/"
|
||||
testWithAddJar(clientMode = false, "local", Some(jarUrl => {
|
||||
(gatewayPath + jarUrl.getPath, Map(
|
||||
GATEWAY_ROOT_PATH.key -> gatewayPath,
|
||||
REPLACEMENT_ROOT_PATH.key -> ""
|
||||
))
|
||||
}))
|
||||
}
|
||||
|
||||
test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file'") {
|
||||
testWithAddJar(clientMode = true, "file")
|
||||
}
|
||||
|
||||
test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'file'") {
|
||||
testWithAddJar(clientMode = false, "file")
|
||||
test("run Spark in yarn-cluster mode with additional jar") {
|
||||
testWithAddJar(false)
|
||||
}
|
||||
|
||||
test("run Spark in yarn-cluster mode unsuccessfully") {
|
||||
|
@ -324,23 +286,16 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
|
|||
checkResult(finalState, result)
|
||||
}
|
||||
|
||||
private def testWithAddJar(
|
||||
clientMode: Boolean,
|
||||
jarUriScheme: String,
|
||||
jarUrlToPathAndConfs: Option[URL => (String, Map[String, String])] = None,
|
||||
expectExecutorFailure: Boolean = false): Unit = {
|
||||
private def testWithAddJar(clientMode: Boolean): Unit = {
|
||||
val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
|
||||
val (jarPath, extraConf) = jarUrlToPathAndConfs
|
||||
.map(_.apply(originalJar))
|
||||
.getOrElse((originalJar.getPath, Map[String, String]()))
|
||||
val driverResult = File.createTempFile("driver", null, tempDir)
|
||||
val executorResult = File.createTempFile("executor", null, tempDir)
|
||||
val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
|
||||
appArgs = Seq(driverResult.getAbsolutePath, executorResult.getAbsolutePath),
|
||||
extraJars = Seq(s"$jarUriScheme:$jarPath"),
|
||||
extraConf = extraConf)
|
||||
appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
|
||||
extraClassPath = Seq(originalJar.getPath()),
|
||||
extraJars = Seq("local:" + originalJar.getPath()))
|
||||
checkResult(finalState, driverResult, "ORIGINAL")
|
||||
checkResult(finalState, executorResult, if (expectExecutorFailure) "failure" else "ORIGINAL")
|
||||
checkResult(finalState, executorResult, "ORIGINAL")
|
||||
}
|
||||
|
||||
private def testPySpark(
|
||||
|
|
Loading…
Reference in a new issue