Revert "[SPARK-35672][CORE][YARN] Pass user classpath entries to exec…

…utors using config instead of command line"

### What changes were proposed in this pull request?
This reverts commit 866df69c62.

### Why are the changes needed?
After the change environment variables were not substituted in user classpath entries. Please find an example on SPARK-35672.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #34088 from gengliangwang/revertSPARK-35672.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
Gengliang Wang 2021-09-24 12:46:22 +09:00 committed by Hyukjin Kwon
parent 09283d3210
commit 09a8535cc4
9 changed files with 53 additions and 142 deletions

View file

@ -52,6 +52,7 @@ private[spark] class CoarseGrainedExecutorBackend(
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFileOpt: Option[String],
resourceProfile: ResourceProfile)
@ -123,7 +124,7 @@ private[spark] class CoarseGrainedExecutorBackend(
*/
private def createClassLoader(): MutableURLClassLoader = {
val currentLoader = Utils.getContextOrSparkClassLoader
val urls = getUserClassPath.toArray
val urls = userClassPath.toArray
if (env.conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(urls, currentLoader)
} else {
@ -148,8 +149,6 @@ private[spark] class CoarseGrainedExecutorBackend(
}
}
def getUserClassPath: Seq[URL] = Nil
def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
@ -166,7 +165,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, getUserClassPath, isLocal = false,
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId))
} catch {
@ -395,6 +394,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: mutable.ListBuffer[URL],
resourcesFileOpt: Option[String],
resourceProfileId: Int)
@ -402,7 +402,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
env, arguments.resourcesFileOpt, resourceProfile)
}
run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn)
@ -490,6 +490,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
var resourcesFileOpt: Option[String] = None
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID
var argv = args.toList
@ -520,6 +521,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case ("--resourceProfileId") :: value :: tail =>
resourceProfileId = value.toInt
argv = tail
@ -546,7 +550,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl,
resourcesFileOpt, resourceProfileId)
userClassPath, resourcesFileOpt, resourceProfileId)
}
private def printUsageAndExit(classNameForEntry: String): Unit = {
@ -564,6 +568,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
| --resourcesFile <fileWithJSONResourceInformation>
| --app-id <appid>
| --worker-url <workerUrl>
| --user-class-path <url>
| --resourceProfileId <id>
|""".stripMargin)
// scalastyle:on println

View file

@ -885,8 +885,6 @@ private[spark] class Executor(
val urls = userClassPath.toArray ++ currentJars.keySet.map { uri =>
new File(uri.split("/").last).toURI.toURL
}
logInfo(s"Starting executor with user classpath (userClassPathFirst = $userClassPathFirst): " +
urls.mkString("'", ",", "'"))
if (userClassPathFirst) {
new ChildFirstURLClassLoader(urls, currentLoader)
} else {

View file

@ -18,6 +18,7 @@
package org.apache.spark.executor
import java.io.File
import java.net.URL
import java.nio.ByteBuffer
import java.util.Properties
@ -55,7 +56,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, env, None, resourceProfile)
4, Seq.empty[URL], env, None, resourceProfile)
withTempDir { tmpDir =>
val testResourceArgs: JObject = ("" -> "")
val ja = JArray(List(testResourceArgs))
@ -76,7 +77,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
withTempDir { tmpDir =>
val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = Extraction.decompose(Seq(ra))
@ -110,7 +111,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", "1", "host1", "host1",
4, env, None, resourceProfile)
4, Seq.empty[URL], env, None, resourceProfile)
withTempDir { tmpDir =>
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
@ -137,7 +138,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
// not enough gpu's on the executor
withTempDir { tmpDir =>
@ -190,7 +191,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val env = createMockEnv(conf, serializer)
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, env, None, resourceProfile)
4, Seq.empty[URL], env, None, resourceProfile)
// executor resources < required
withTempDir { tmpDir =>
@ -221,7 +222,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
4, Seq.empty[URL], env, None, ResourceProfile.getOrCreateDefaultProfile(conf))
val parsedResources = backend.parseOrFindResources(None)
@ -268,7 +269,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
// we don't really use this, just need it to get at the parser function
val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", "1", "host1", "host1",
4, env, None, resourceProfile)
4, Seq.empty[URL], env, None, resourceProfile)
val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
val ja = Extraction.decompose(Seq(gpuArgs))
val f1 = createTempJsonFile(dir, "resources", ja)
@ -293,7 +294,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val rpcEnv = RpcEnv.create("1", "localhost", 0, conf, securityMgr)
val env = createMockEnv(conf, serializer, Some(rpcEnv))
backend = new CoarseGrainedExecutorBackend(env.rpcEnv, rpcEnv.address.hostPort, "1",
"host1", "host1", 4, env, None,
"host1", "host1", 4, Seq.empty[URL], env, None,
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
assert(backend.taskResources.isEmpty)

View file

@ -17,9 +17,9 @@
package org.apache.spark.deploy.yarn
import java.io.IOException
import java.io.{File, IOException}
import java.lang.reflect.{InvocationTargetException, Modifier}
import java.net.{URI, URLEncoder}
import java.net.{URI, URL, URLEncoder}
import java.security.PrivilegedExceptionAction
import java.util.concurrent.{TimeoutException, TimeUnit}
@ -85,7 +85,10 @@ private[spark] class ApplicationMaster(
private var metricsSystem: Option[MetricsSystem] = None
private val userClassLoader = {
val urls = Client.getUserClasspathUrls(sparkConf, isClusterMode)
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
new URL("file:" + new File(entry.getPath()).getAbsolutePath())
}
if (isClusterMode) {
if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) {

View file

@ -18,10 +18,10 @@
package org.apache.spark.deploy.yarn
import java.io.{FileSystem => _, _}
import java.net.{InetAddress, UnknownHostException, URI, URL}
import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.nio.file.Files
import java.util.{Locale, Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}
@ -1308,7 +1308,7 @@ private[spark] class Client(
}
private[spark] object Client extends Logging {
private object Client extends Logging {
// Alias for the user jar
val APP_JAR_NAME: String = "__app__.jar"
@ -1470,32 +1470,6 @@ private[spark] object Client extends Logging {
(mainUri ++ secondaryUris).toArray
}
/**
* Returns a list of local, absolute file URLs representing the user classpath. Note that this
* must be executed on the same host which will access the URLs, as it will resolve relative
* paths based on the current working directory.
*
* @param conf Spark configuration.
* @param useClusterPath Whether to use the 'cluster' path when resolving paths with the
* `local` scheme. This should be used when running on the cluster, but
* not when running on the gateway (i.e. for the driver in `client` mode).
* @return Array of local URLs ready to be passed to a [[java.net.URLClassLoader]].
*/
def getUserClasspathUrls(conf: SparkConf, useClusterPath: Boolean): Array[URL] = {
Client.getUserClasspath(conf).map { uri =>
val inputPath = uri.getPath
val replacedFilePath = if (Utils.isLocalUri(uri.toString) && useClusterPath) {
Client.getClusterPath(conf, inputPath)
} else {
// Any other URI schemes should have been resolved by this point
assert(uri.getScheme == null || uri.getScheme == "file" || Utils.isLocalUri(uri.toString),
"getUserClasspath should only return 'file' or 'local' URIs but found: " + uri)
inputPath
}
Paths.get(replacedFilePath).toAbsolutePath.toUri.toURL
}
}
private def getMainJarUri(mainJar: Option[String]): Option[URI] = {
mainJar.flatMap { path =>
val uri = Utils.resolveURI(path)

View file

@ -17,6 +17,7 @@
package org.apache.spark.deploy.yarn
import java.io.File
import java.nio.ByteBuffer
import java.util.Collections
@ -189,6 +190,16 @@ private[yarn] class ExecutorRunnable(
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
val absPath =
if (new File(uri.getPath()).isAbsolute()) {
Client.getClusterPath(sparkConf, uri.getPath())
} else {
Client.buildPath(Environment.PWD.$(), uri.getPath())
}
Seq("--user-class-path", "file:" + absPath)
}.toSeq
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
@ -200,6 +211,7 @@ private[yarn] class ExecutorRunnable(
"--cores", executorCores.toString,
"--app-id", appId,
"--resourceProfileId", resourceProfileId.toString) ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

View file

@ -21,7 +21,6 @@ import java.net.URL
import org.apache.spark.SparkEnv
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.yarn.Client
import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.RpcEnv
@ -39,6 +38,7 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFile: Option[String],
resourceProfile: ResourceProfile)
@ -49,15 +49,13 @@ private[spark] class YarnCoarseGrainedExecutorBackend(
bindAddress,
hostname,
cores,
userClassPath,
env,
resourcesFile,
resourceProfile) with Logging {
private lazy val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(env.conf)
override def getUserClassPath: Seq[URL] =
Client.getUserClasspathUrls(env.conf, useClusterPath = true)
override def extractLogUrls: Map[String, String] = {
YarnContainerInfoHelper.getLogUrls(hadoopConfiguration, container = None)
.getOrElse(Map())
@ -75,7 +73,7 @@ private[spark] object YarnCoarseGrainedExecutorBackend extends Logging {
val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile) =>
new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, arguments.executorId,
arguments.bindAddress, arguments.hostname, arguments.cores,
arguments.bindAddress, arguments.hostname, arguments.cores, arguments.userClassPath.toSeq,
env, arguments.resourcesFileOpt, resourceProfile)
}
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,

View file

@ -19,7 +19,6 @@ package org.apache.spark.deploy.yarn
import java.io.{File, FileInputStream, FileNotFoundException, FileOutputStream}
import java.net.URI
import java.nio.file.Paths
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
@ -584,40 +583,6 @@ class ClientSuite extends SparkFunSuite with Matchers {
}
}
test("SPARK-35672: test Client.getUserClasspathUrls") {
val gatewayRootPath = "/local/matching/replace"
val replacementRootPath = "/replaced/path"
val conf = new SparkConf()
.set(SECONDARY_JARS, Seq(
s"local:$gatewayRootPath/foo.jar",
"local:/local/not/matching/replace/foo.jar",
"file:/absolute/file/path/foo.jar",
s"$gatewayRootPath/but-not-actually-local/foo.jar",
"/absolute/path/foo.jar",
"relative/path/foo.jar"
))
.set(GATEWAY_ROOT_PATH, gatewayRootPath)
.set(REPLACEMENT_ROOT_PATH, replacementRootPath)
def assertUserClasspathUrls(cluster: Boolean, expectedReplacementPath: String): Unit = {
val expectedUrls = Seq(
Paths.get(APP_JAR_NAME).toAbsolutePath.toUri.toString,
s"file:$expectedReplacementPath/foo.jar",
"file:/local/not/matching/replace/foo.jar",
"file:/absolute/file/path/foo.jar",
// since this path wasn't a local URI, it should never be replaced
s"file:$gatewayRootPath/but-not-actually-local/foo.jar",
"file:/absolute/path/foo.jar",
Paths.get("relative/path/foo.jar").toAbsolutePath.toUri.toString
).map(URI.create(_).toURL).toArray
assert(Client.getUserClasspathUrls(conf, cluster) === expectedUrls)
}
// assert that no replacement happens when cluster = false by expecting the replacement
// path to be the same as the original path
assertUserClasspathUrls(cluster = false, gatewayRootPath)
assertUserClasspathUrls(cluster = true, replacementRootPath)
}
private val matching = Seq(
("files URI match test1", "file:///file1", "file:///file2"),
("files URI match test2", "file:///c:file1", "file://c:file2"),

View file

@ -18,9 +18,7 @@
package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URL
import java.nio.charset.StandardCharsets
import java.nio.file.Paths
import java.util.{HashMap => JHashMap}
import scala.collection.mutable
@ -152,48 +150,12 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
checkResult(finalState, result)
}
test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local'") {
testWithAddJar(clientMode = true, "local")
test("run Spark in yarn-client mode with additional jar") {
testWithAddJar(true)
}
test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local'") {
testWithAddJar(clientMode = false, "local")
}
test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'local' " +
"and gateway-replacement path") {
// Use the original jar URL, but set up the gateway/replacement configs such that if
// replacement occurs, things will break. This ensures the replacement doesn't apply to the
// driver in 'client' mode. Executors will fail in this case because they still apply the
// replacement in client mode.
testWithAddJar(clientMode = true, "local", Some(jarUrl => {
(jarUrl.getPath, Map(
GATEWAY_ROOT_PATH.key -> Paths.get(jarUrl.toURI).getParent.toString,
REPLACEMENT_ROOT_PATH.key -> "/nonexistent/path/"
))
}), expectExecutorFailure = true)
}
test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'local' " +
"and gateway-replacement path") {
// Put a prefix in front of the original jar URL which causes it to be an invalid path.
// Set up the gateway/replacement configs such that if replacement occurs, it is a valid
// path again (by removing the prefix). This ensures the replacement is applied.
val gatewayPath = "/replaceme/nonexistent/"
testWithAddJar(clientMode = false, "local", Some(jarUrl => {
(gatewayPath + jarUrl.getPath, Map(
GATEWAY_ROOT_PATH.key -> gatewayPath,
REPLACEMENT_ROOT_PATH.key -> ""
))
}))
}
test("SPARK-35672: run Spark in yarn-client mode with additional jar using URI scheme 'file'") {
testWithAddJar(clientMode = true, "file")
}
test("SPARK-35672: run Spark in yarn-cluster mode with additional jar using URI scheme 'file'") {
testWithAddJar(clientMode = false, "file")
test("run Spark in yarn-cluster mode with additional jar") {
testWithAddJar(false)
}
test("run Spark in yarn-cluster mode unsuccessfully") {
@ -324,23 +286,16 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
checkResult(finalState, result)
}
private def testWithAddJar(
clientMode: Boolean,
jarUriScheme: String,
jarUrlToPathAndConfs: Option[URL => (String, Map[String, String])] = None,
expectExecutorFailure: Boolean = false): Unit = {
private def testWithAddJar(clientMode: Boolean): Unit = {
val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir)
val (jarPath, extraConf) = jarUrlToPathAndConfs
.map(_.apply(originalJar))
.getOrElse((originalJar.getPath, Map[String, String]()))
val driverResult = File.createTempFile("driver", null, tempDir)
val executorResult = File.createTempFile("executor", null, tempDir)
val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
appArgs = Seq(driverResult.getAbsolutePath, executorResult.getAbsolutePath),
extraJars = Seq(s"$jarUriScheme:$jarPath"),
extraConf = extraConf)
appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
extraClassPath = Seq(originalJar.getPath()),
extraJars = Seq("local:" + originalJar.getPath()))
checkResult(finalState, driverResult, "ORIGINAL")
checkResult(finalState, executorResult, if (expectExecutorFailure) "failure" else "ORIGINAL")
checkResult(finalState, executorResult, "ORIGINAL")
}
private def testPySpark(