Revert "SPARK-2624 add datanucleus jars to the container in yarn-cluster"
This reverts commit a975dc3279
.
This commit is contained in:
parent
87437df036
commit
fd8525334c
|
@ -139,21 +139,6 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
|
|||
The maximum number of threads to use in the application master for launching executor containers.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.yarn.datanucleus.dir</code></td>
|
||||
<td>$SPARK_HOME/lib</td>
|
||||
<td>
|
||||
The location of the DataNucleus jars, in case overriding the default location is desired.
|
||||
By default, Spark on YARN will use the DataNucleus jars installed at
|
||||
<code>$SPARK_HOME/lib</code>, but the jars can also be in a world-readable location on HDFS.
|
||||
This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an
|
||||
application runs. To point to a directory on HDFS, for example, set this configuration to
|
||||
"hdfs:///some/path".
|
||||
|
||||
This is required because the datanucleus jars cannot be packaged into the
|
||||
assembly jar due to metadata conflicts (involving <code>plugin.xml</code>.)
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
# Launching Spark on YARN
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.spark.deploy.yarn
|
||||
|
||||
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
|
||||
import java.io.{File, FilenameFilter}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.mutable.{HashMap, ListBuffer, Map}
|
||||
|
@ -224,48 +223,10 @@ private[spark] trait ClientBase extends Logging {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (cachedSecondaryJarLinks.nonEmpty) {
|
||||
sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(","))
|
||||
}
|
||||
|
||||
/**
|
||||
* Do the same for datanucleus jars, if they exist in spark home. Find all datanucleus-* jars,
|
||||
* copy them to the remote fs, and add them to the class path.
|
||||
*
|
||||
* This is necessary because the datanucleus jars cannot be included in the assembly jar due
|
||||
* to metadata conflicts involving plugin.xml. At the time of writing, these are the only
|
||||
* jars that cannot be distributed with the uber jar and have to be treated differently.
|
||||
*
|
||||
* For more details, see SPARK-2624, and https://github.com/apache/spark/pull/3238
|
||||
*/
|
||||
for (libsDir <- dataNucleusJarsDir(sparkConf)) {
|
||||
val libsURI = new URI(libsDir)
|
||||
val jarLinks = ListBuffer.empty[String]
|
||||
if (libsURI.getScheme != LOCAL_SCHEME) {
|
||||
val localURI = getQualifiedLocalPath(libsURI).toUri()
|
||||
val jars = FileSystem.get(localURI, hadoopConf).listFiles(new Path(localURI.getPath), false)
|
||||
while (jars.hasNext) {
|
||||
val jar = jars.next()
|
||||
val name = jar.getPath.getName
|
||||
if (name.startsWith("datanucleus-")) {
|
||||
// copy to remote and add to classpath
|
||||
val src = jar.getPath
|
||||
val destPath = copyFileToRemote(dst, src, replication)
|
||||
distCacheMgr.addResource(fs, hadoopConf, destPath,
|
||||
localResources, LocalResourceType.FILE, name, statCache)
|
||||
jarLinks += name
|
||||
}
|
||||
}
|
||||
} else {
|
||||
jarLinks += libsURI.toString + Path.SEPARATOR + "*"
|
||||
}
|
||||
|
||||
if (jarLinks.nonEmpty) {
|
||||
sparkConf.set(CONF_SPARK_DATANUCLEUS_JARS, jarLinks.mkString(","))
|
||||
}
|
||||
}
|
||||
|
||||
localResources
|
||||
}
|
||||
|
||||
|
@ -590,13 +551,6 @@ private[spark] object ClientBase extends Logging {
|
|||
// Internal config to propagate the location of the user's jar to the driver/executors
|
||||
val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
|
||||
|
||||
// Location of the datanucleus jars
|
||||
val CONF_SPARK_DATANUCLEUS_DIR = "spark.yarn.datanucleus.dir"
|
||||
|
||||
// Internal config to propagate the locations of datanucleus jars found to add to the
|
||||
// classpath of the executors. Value should be a comma-separated list of paths to each jar.
|
||||
val CONF_SPARK_DATANUCLEUS_JARS = "spark.yarn.datanucleus.jars"
|
||||
|
||||
// Internal config to propagate the locations of any extra jars to add to the classpath
|
||||
// of the executors
|
||||
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
|
||||
|
@ -629,19 +583,6 @@ private[spark] object ClientBase extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the user-defined provided jars directory if configured, or return SPARK_HOME/lib if not.
|
||||
*
|
||||
* This method first looks for $CONF_SPARK_DATANUCLEUS_DIR inside the SparkConf, then looks for
|
||||
* Spark home inside the the SparkConf and the user environment.
|
||||
*/
|
||||
private def dataNucleusJarsDir(conf: SparkConf): Option[String] = {
|
||||
conf.getOption(CONF_SPARK_DATANUCLEUS_DIR).orElse {
|
||||
val sparkHome = conf.getOption("spark.home").orElse(sys.env.get("SPARK_HOME"))
|
||||
sparkHome.map(path => path + Path.SEPARATOR + "lib")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the path to the given application's staging directory.
|
||||
*/
|
||||
|
@ -743,13 +684,6 @@ private[spark] object ClientBase extends Logging {
|
|||
addUserClasspath(args, sparkConf, env)
|
||||
}
|
||||
|
||||
// Add datanucleus jars to classpath
|
||||
for (entries <- sparkConf.getOption(CONF_SPARK_DATANUCLEUS_JARS)) {
|
||||
entries.split(",").filter(_.nonEmpty).foreach { entry =>
|
||||
addFileToClasspath(entry, null, env)
|
||||
}
|
||||
}
|
||||
|
||||
// Append all jar files under the working directory to the classpath.
|
||||
addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env)
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ import java.io.File
|
|||
import java.net.URI
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.FileSystem
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig
|
||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
|
||||
|
@ -105,81 +104,6 @@ class ClientBaseSuite extends FunSuite with Matchers {
|
|||
cp should not contain (ClientBase.APP_JAR)
|
||||
}
|
||||
|
||||
test("DataNucleus in classpath") {
|
||||
val dnJars = "local:/dn/core.jar,/dn/api.jar"
|
||||
val conf = new Configuration()
|
||||
val sparkConf = new SparkConf()
|
||||
.set(ClientBase.CONF_SPARK_JAR, SPARK)
|
||||
.set(ClientBase.CONF_SPARK_DATANUCLEUS_JARS, dnJars)
|
||||
val env = new MutableHashMap[String, String]()
|
||||
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
|
||||
|
||||
ClientBase.populateClasspath(args, conf, sparkConf, env)
|
||||
|
||||
val cp = env("CLASSPATH").split(File.pathSeparator)
|
||||
s"$dnJars".split(",").foreach({ entry =>
|
||||
val uri = new URI(entry)
|
||||
if (ClientBase.LOCAL_SCHEME.equals(uri.getScheme())) {
|
||||
cp should contain (uri.getPath())
|
||||
} else {
|
||||
cp should not contain (uri.getPath())
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
test("DataNucleus using local:") {
|
||||
val dnDir = "local:/datanucleus"
|
||||
val conf = new Configuration()
|
||||
val sparkConf = new SparkConf()
|
||||
.set(ClientBase.CONF_SPARK_JAR, SPARK)
|
||||
.set(ClientBase.CONF_SPARK_DATANUCLEUS_DIR, dnDir)
|
||||
val yarnConf = new YarnConfiguration()
|
||||
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
|
||||
|
||||
val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
|
||||
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
|
||||
any(classOf[Path]), anyShort(), anyBoolean())
|
||||
|
||||
val tempDir = Utils.createTempDir()
|
||||
try {
|
||||
client.prepareLocalResources(tempDir.getAbsolutePath())
|
||||
val jars = sparkConf.get(ClientBase.CONF_SPARK_DATANUCLEUS_JARS).split(",")
|
||||
val uri = new URI(dnDir)
|
||||
jars should contain (uri.toString + Path.SEPARATOR + "*")
|
||||
} finally {
|
||||
Utils.deleteRecursively(tempDir)
|
||||
}
|
||||
}
|
||||
|
||||
test("DataNucleus using file:") {
|
||||
val dnDir = Utils.createTempDir()
|
||||
val tempDir = Utils.createTempDir()
|
||||
|
||||
try {
|
||||
// create mock datanucleus jar
|
||||
val tempJar = File.createTempFile("datanucleus-", null, dnDir)
|
||||
|
||||
val conf = new Configuration()
|
||||
val sparkConf = new SparkConf()
|
||||
.set(ClientBase.CONF_SPARK_JAR, SPARK)
|
||||
.set(ClientBase.CONF_SPARK_DATANUCLEUS_DIR, dnDir.toURI.toString)
|
||||
val yarnConf = new YarnConfiguration()
|
||||
val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf)
|
||||
|
||||
val client = spy(new DummyClient(args, conf, sparkConf, yarnConf))
|
||||
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
|
||||
any(classOf[Path]), anyShort(), anyBoolean())
|
||||
|
||||
client.prepareLocalResources(tempDir.getAbsolutePath())
|
||||
|
||||
val jars = sparkConf.get(ClientBase.CONF_SPARK_DATANUCLEUS_JARS).split(",")
|
||||
jars should contain (tempJar.getName)
|
||||
} finally {
|
||||
Utils.deleteRecursively(dnDir)
|
||||
Utils.deleteRecursively(tempDir)
|
||||
}
|
||||
}
|
||||
|
||||
test("Jar path propagation through SparkConf") {
|
||||
val conf = new Configuration()
|
||||
val sparkConf = new SparkConf().set(ClientBase.CONF_SPARK_JAR, SPARK)
|
||||
|
|
Loading…
Reference in a new issue