[SPARK-33530][CORE] Support --archives and spark.archives option natively

### What changes were proposed in this pull request?

TL;DR:
- This PR completes the support of archives in Spark itself instead of Yarn-only
  - It makes `--archives` option work in other cluster modes too and adds `spark.archives` configuration.
-  After this PR, PySpark users can leverage Conda to ship Python packages together as below:
    ```python
    conda create -y -n pyspark_env -c conda-forge pyarrow==2.0.0 pandas==1.1.4 conda-pack==0.5.0
    conda activate pyspark_env
    conda pack -f -o pyspark_env.tar.gz
    PYSPARK_DRIVER_PYTHON=python PYSPARK_PYTHON=./environment/bin/python pyspark --archives pyspark_env.tar.gz#environment
   ```
- Issue a warning that undocumented and hidden behavior of partial archive handling in `spark.files` / `SparkContext.addFile` will be deprecated, and users can use `spark.archives` and `SparkContext.addArchive`.

This PR proposes to add Spark's native `--archives` in Spark submit, and `spark.archives` configuration. Currently, both are supported only in Yarn mode:

```bash
./bin/spark-submit --help
```

```
Options:
...
 Spark on YARN only:
  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
  --archives ARCHIVES         Comma separated list of archives to be extracted into the
                              working directory of each executor.
```

This `archives` feature is useful often when you have to ship a directory and unpack into executors. One example is native libraries to use e.g. JNI. Another example is to ship Python packages together by Conda environment.

Especially for Conda, PySpark currently does not have a nice way to ship a package that works in general, please see also https://hyukjin-spark.readthedocs.io/en/stable/user_guide/python_packaging.html#using-zipped-virtual-environment (PySpark new documentation demo for 3.1.0).

The neatest way is arguably to use Conda environment by shipping zipped Conda environment but this is currently dependent on this archive feature. NOTE that we are able to use `spark.files` by relying on its undocumented behaviour that untars `tar.gz` but I don't think we should document such ways and promote people to more rely on it.

Also, note that this PR does not target to add the feature parity of `spark.files.overwrite`, `spark.files.useFetchCache`, etc. yet. I documented that this is an experimental feature as well.

### Why are the changes needed?

To complete the feature parity, and to provide a better support of shipping Python libraries together with Conda env.

### Does this PR introduce _any_ user-facing change?

Yes, this makes `--archives` works in Spark instead of Yarn-only, and adds a new configuration `spark.archives`.

### How was this patch tested?

I added unittests. Also, manually tested in standalone cluster, local-cluster, and local modes.

Closes #30486 from HyukjinKwon/native-archive.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
HyukjinKwon 2020-12-01 13:43:02 +09:00
parent 2af2da5a4b
commit 1a042cc414
21 changed files with 347 additions and 53 deletions

View file

@ -22,6 +22,7 @@ import java.net.URI
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import javax.ws.rs.core.UriBuilder
import scala.collection.JavaConverters._
import scala.collection.Map
@ -39,7 +40,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource}
@ -221,6 +222,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _listenerBusStarted: Boolean = false
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _archives: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _
@ -246,6 +248,7 @@ class SparkContext(config: SparkConf) extends Logging {
def jars: Seq[String] = _jars
def files: Seq[String] = _files
def archives: Seq[String] = _archives
def master: String = _conf.get("spark.master")
def deployMode: String = _conf.get(SUBMIT_DEPLOY_MODE)
def appName: String = _conf.get("spark.app.name")
@ -278,6 +281,7 @@ class SparkContext(config: SparkConf) extends Logging {
// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
private[spark] val addedArchives = new ConcurrentHashMap[String, Long]().asScala
private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
// Keeps track of all persisted RDDs
@ -422,6 +426,7 @@ class SparkContext(config: SparkConf) extends Logging {
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
_archives = _conf.getOption(ARCHIVES.key).map(Utils.stringToSeq).toSeq.flatten
_eventLogDir =
if (isEventLogEnabled) {
@ -506,6 +511,13 @@ class SparkContext(config: SparkConf) extends Logging {
}
}
if (archives != null) {
archives.foreach(file => addFile(file, false, true, isArchive = true))
if (addedArchives.nonEmpty) {
_conf.set("spark.app.initial.archive.urls", addedArchives.keys.toSeq.mkString(","))
}
}
_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
@ -1521,6 +1533,36 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def listFiles(): Seq[String] = addedFiles.keySet.toSeq
/**
* :: Experimental ::
* Add an archive to be downloaded and unpacked with this Spark job on every node.
*
* If an archive is added during execution, it will not be available until the next TaskSet
* starts.
*
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(paths-to-files)` to find its download/unpacked location.
* The given path should be one of .zip, .tar, .tar.gz, .tgz and .jar.
*
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*
* @since 3.1.0
*/
@Experimental
def addArchive(path: String): Unit = {
addFile(path, false, false, isArchive = true)
}
/**
* :: Experimental ::
* Returns a list of archive paths that are added to resources.
*
* @since 3.1.0
*/
@Experimental
def listArchives(): Seq[String] = addedArchives.keySet.toSeq
/**
* Add a file to be downloaded with this Spark job on every node.
*
@ -1538,8 +1580,14 @@ class SparkContext(config: SparkConf) extends Logging {
addFile(path, recursive, false)
}
private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = {
val uri = new Path(path).toUri
private def addFile(
path: String, recursive: Boolean, addedOnSubmit: Boolean, isArchive: Boolean = false
): Unit = {
val uri = if (!isArchive) {
new Path(path).toUri
} else {
Utils.resolveURI(path)
}
val schemeCorrectedURI = uri.getScheme match {
case null => new File(path).getCanonicalFile.toURI
case "local" =>
@ -1551,7 +1599,7 @@ class SparkContext(config: SparkConf) extends Logging {
val hadoopPath = new Path(schemeCorrectedURI)
val scheme = schemeCorrectedURI.getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
if (!Array("http", "https", "ftp").contains(scheme) && !isArchive) {
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
val isDir = fs.getFileStatus(hadoopPath).isDirectory
if (!isLocal && scheme == "file" && isDir) {
@ -1569,21 +1617,39 @@ class SparkContext(config: SparkConf) extends Logging {
val key = if (!isLocal && scheme == "file") {
env.rpcEnv.fileServer.addFile(new File(uri.getPath))
} else if (uri.getScheme == null) {
schemeCorrectedURI.toString
} else if (isArchive) {
uri.toString
} else {
if (uri.getScheme == null) {
schemeCorrectedURI.toString
} else {
path
}
path
}
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
if (!isArchive && addedFiles.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added file $path at $key with timestamp $timestamp")
// Fetch the file locally so that closures which are run on the driver can still use the
// SparkFiles API to access files.
Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConfiguration, timestamp, useCache = false)
postEnvironmentUpdate()
} else if (
isArchive &&
addedArchives.putIfAbsent(
UriBuilder.fromUri(new URI(key)).fragment(uri.getFragment).build().toString,
timestamp).isEmpty) {
logInfo(s"Added archive $path at $key with timestamp $timestamp")
val uriToDownload = UriBuilder.fromUri(new URI(key)).fragment(null).build()
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
env.securityManager, hadoopConfiguration, timestamp, useCache = false, shouldUntar = false)
val dest = new File(
SparkFiles.getRootDirectory(),
if (uri.getFragment != null) uri.getFragment else source.getName)
logInfo(
s"Unpacking an archive $path from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
Utils.deleteRecursively(dest)
Utils.unpack(source, dest)
postEnvironmentUpdate()
} else {
logWarning(s"The path $path has been added already. Overwriting of added paths " +
"is not supported in the current version.")
@ -2495,8 +2561,9 @@ class SparkContext(config: SparkConf) extends Logging {
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val addedArchivePaths = addedArchives.keys.toSeq
val environmentDetails = SparkEnv.environmentDetails(conf, hadoopConfiguration,
schedulingMode, addedJarPaths, addedFilePaths)
schedulingMode, addedJarPaths, addedFilePaths, addedArchivePaths)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}

View file

@ -454,7 +454,8 @@ object SparkEnv extends Logging {
hadoopConf: Configuration,
schedulingMode: String,
addedJars: Seq[String],
addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
addedFiles: Seq[String],
addedArchives: Seq[String]): Map[String, Seq[(String, String)]] = {
import Properties._
val jvmInformation = Seq(
@ -484,7 +485,7 @@ object SparkEnv extends Logging {
.split(File.pathSeparator)
.filterNot(_.isEmpty)
.map((_, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
val addedJarsAndFiles = (addedJars ++ addedFiles ++ addedArchives).map((_, "Added By User"))
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
// Add Hadoop properties, it will not ignore configs including in Spark. Some spark

View file

@ -607,6 +607,8 @@ private[spark] class SparkSubmit extends Logging {
confKey = CORES_MAX.key),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = FILES.key),
OptionAssigner(args.archives, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = ARCHIVES.key),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key),
OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = JARS.key),
@ -796,6 +798,7 @@ private[spark] class SparkSubmit extends Logging {
val pathConfigs = Seq(
JARS.key,
FILES.key,
ARCHIVES.key,
"spark.yarn.dist.files",
"spark.yarn.dist.archives",
"spark.yarn.dist.jars")

View file

@ -183,6 +183,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
jars = Option(jars).orElse(sparkProperties.get(config.JARS.key)).orNull
files = Option(files).orElse(sparkProperties.get(config.FILES.key)).orNull
archives = Option(archives).orElse(sparkProperties.get(config.ARCHIVES.key)).orNull
pyFiles = Option(pyFiles).orElse(sparkProperties.get(config.SUBMIT_PYTHON_FILES.key)).orNull
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
ivySettingsPath = sparkProperties.get("spark.jars.ivySettings")
@ -512,6 +513,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor. File paths of these files
| in executors can be accessed via SparkFiles.get(fileName).
| --archives ARCHIVES Comma-separated list of archives to be extracted into the
| working directory of each executor.
|
| --conf, -c PROP=VALUE Arbitrary Spark configuration property.
| --properties-file FILE Path to a file from which to load extra properties. If not
@ -562,8 +565,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
|
| Spark on YARN only:
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.
""".stripMargin
)

View file

@ -26,6 +26,7 @@ import java.util.{Locale, Properties}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.GuardedBy
import javax.ws.rs.core.UriBuilder
import scala.collection.JavaConverters._
import scala.collection.immutable
@ -78,6 +79,7 @@ private[spark] class Executor(
// Each map holds the master's timestamp for the version of that file or JAR we got.
private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
private val currentArchives: HashMap[String, Long] = new HashMap[String, Long]()
private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
@ -232,16 +234,17 @@ private[spark] class Executor(
private val appStartTime = conf.getLong("spark.app.startTime", 0)
// To allow users to distribute plugins and their required files
// specified by --jars and --files on application submission, those jars/files should be
// downloaded and added to the class loader via updateDependencies.
// This should be done before plugin initialization below
// specified by --jars, --files and --archives on application submission, those
// jars/files/archives should be downloaded and added to the class loader via
// updateDependencies. This should be done before plugin initialization below
// because executors search plugins from the class loader and initialize them.
private val Seq(initialUserJars, initialUserFiles) = Seq("jar", "file").map { key =>
conf.getOption(s"spark.app.initial.$key.urls").map { urls =>
Map(urls.split(",").map(url => (url, appStartTime)): _*)
}.getOrElse(Map.empty)
}
updateDependencies(initialUserFiles, initialUserJars)
private val Seq(initialUserJars, initialUserFiles, initialUserArchives) =
Seq("jar", "file", "archive").map { key =>
conf.getOption(s"spark.app.initial.$key.urls").map { urls =>
Map(urls.split(",").map(url => (url, appStartTime)): _*)
}.getOrElse(Map.empty)
}
updateDependencies(initialUserFiles, initialUserJars, initialUserArchives)
// Plugins need to load using a class loader that includes the executor's user classpath.
// Plugins also needs to be initialized after the heartbeater started
@ -449,7 +452,8 @@ private[spark] class Executor(
// requires access to properties contained within (e.g. for access control).
Executor.taskDeserializationProps.set(taskDescription.properties)
updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
updateDependencies(
taskDescription.addedFiles, taskDescription.addedJars, taskDescription.addedArchives)
task = ser.deserialize[Task[Any]](
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
task.localProperties = taskDescription.properties
@ -909,24 +913,42 @@ private[spark] class Executor(
* Download any missing dependencies if we receive a new set of files and JARs from the
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]): Unit = {
private def updateDependencies(
newFiles: Map[String, Long],
newJars: Map[String, Long],
newArchives: Map[String, Long]): Unit = {
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
logInfo(s"Fetching $name with timestamp $timestamp")
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newArchives if currentArchives.getOrElse(name, -1L) < timestamp) {
logInfo(s"Fetching $name with timestamp $timestamp")
val sourceURI = new URI(name)
val uriToDownload = UriBuilder.fromUri(sourceURI).fragment(null).build()
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false)
val dest = new File(
SparkFiles.getRootDirectory(),
if (sourceURI.getFragment != null) sourceURI.getFragment else source.getName)
logInfo(
s"Unpacking an archive $name from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
Utils.deleteRecursively(dest)
Utils.unpack(source, dest)
currentArchives(name) = timestamp
}
for ((name, timestamp) <- newJars) {
val localName = new URI(name).getPath.split("/").last
val currentTimeStamp = currentJars.get(name)
.orElse(currentJars.get(localName))
.getOrElse(-1L)
if (currentTimeStamp < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
logInfo(s"Fetching $name with timestamp $timestamp")
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
@ -934,7 +956,7 @@ private[spark] class Executor(
// Add it to our class loader
val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL
if (!urlClassLoader.getURLs().contains(url)) {
logInfo("Adding " + url + " to class loader")
logInfo(s"Adding $url to class loader")
urlClassLoader.addURL(url)
}
}

View file

@ -1813,6 +1813,16 @@ package object config {
.toSequence
.createWithDefault(Nil)
private[spark] val ARCHIVES = ConfigBuilder("spark.archives")
.version("3.1.0")
.doc("Comma-separated list of archives to be extracted into the working directory of each " +
"executor. .jar, .tar.gz, .tgz and .zip are supported. You can specify the directory " +
"name to unpack via adding '#' after the file name to unpack, for example, " +
"'file.zip#directory'. This configuration is experimental.")
.stringConf
.toSequence
.createWithDefault(Nil)
private[spark] val SUBMIT_DEPLOY_MODE = ConfigBuilder("spark.submit.deployMode")
.version("1.5.0")
.stringConf

View file

@ -55,6 +55,7 @@ private[spark] class TaskDescription(
val partitionId: Int,
val addedFiles: Map[String, Long],
val addedJars: Map[String, Long],
val addedArchives: Map[String, Long],
val properties: Properties,
val resources: immutable.Map[String, ResourceInformation],
val serializedTask: ByteBuffer) {
@ -99,6 +100,9 @@ private[spark] object TaskDescription {
// Write jars.
serializeStringLongMap(taskDescription.addedJars, dataOut)
// Write archives.
serializeStringLongMap(taskDescription.addedArchives, dataOut)
// Write properties.
dataOut.writeInt(taskDescription.properties.size())
taskDescription.properties.asScala.foreach { case (key, value) =>
@ -167,6 +171,9 @@ private[spark] object TaskDescription {
// Read jars.
val taskJars = deserializeStringLongMap(dataIn)
// Read archives.
val taskArchives = deserializeStringLongMap(dataIn)
// Read properties.
val properties = new Properties()
val numProperties = dataIn.readInt()
@ -185,6 +192,6 @@ private[spark] object TaskDescription {
val serializedTask = byteBuffer.slice()
new TaskDescription(taskId, attemptNumber, executorId, name, index, partitionId, taskFiles,
taskJars, properties, resources, serializedTask)
taskJars, taskArchives, properties, resources, serializedTask)
}
}

View file

@ -63,6 +63,7 @@ private[spark] class TaskSetManager(
// SPARK-21563 make a copy of the jars/files so they are consistent across the TaskSet
private val addedJars = HashMap[String, Long](sched.sc.addedJars.toSeq: _*)
private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*)
private val addedArchives = HashMap[String, Long](sched.sc.addedArchives.toSeq: _*)
val maxResultSize = conf.get(config.MAX_RESULT_SIZE)
@ -493,6 +494,7 @@ private[spark] class TaskSetManager(
task.partitionId,
addedFiles,
addedJars,
addedArchives,
task.localProperties,
taskResourceAssignments,
serializedTask)

View file

@ -53,6 +53,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.{RunJar, StringUtils}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.eclipse.jetty.util.MultiException
import org.slf4j.Logger
@ -486,6 +487,10 @@ private[spark] object Utils extends Logging {
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*
* If `shouldUntar` is true, it untars the given url if it is a tar.gz or tgz into `targetDir`.
* This is a legacy behavior, and users should better use `spark.archives` configuration or
* `SparkContext.addArchive`
*/
def fetchFile(
url: String,
@ -494,7 +499,8 @@ private[spark] object Utils extends Logging {
securityMgr: SecurityManager,
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean): File = {
useCache: Boolean,
shouldUntar: Boolean = true): File = {
val fileName = decodeFileNameInURI(new URI(url))
val targetFile = new File(targetDir, fileName)
val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true)
@ -535,13 +541,23 @@ private[spark] object Utils extends Logging {
doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
}
// Decompress the file if it's a .tar or .tar.gz
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
logInfo("Untarring " + fileName)
executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir)
} else if (fileName.endsWith(".tar")) {
logInfo("Untarring " + fileName)
executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir)
if (shouldUntar) {
// Decompress the file if it's a .tar or .tar.gz
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
logWarning(
"Untarring behavior will be deprecated at spark.files and " +
"SparkContext.addFile. Consider using spark.archives or SparkContext.addArchive " +
"instead.")
logInfo("Untarring " + fileName)
executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir)
} else if (fileName.endsWith(".tar")) {
logWarning(
"Untarring behavior will be deprecated at spark.files and " +
"SparkContext.addFile. Consider using spark.archives or SparkContext.addArchive " +
"instead.")
logInfo("Untarring " + fileName)
executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir)
}
}
// Make the file executable - That's necessary for scripts
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
@ -555,6 +571,26 @@ private[spark] object Utils extends Logging {
targetFile
}
/**
* Unpacks an archive file into the specified directory. It expects .jar, .zip, .tar.gz, .tgz
* and .tar files. This behaves same as Hadoop's archive in distributed cache. This method is
* basically copied from `org.apache.hadoop.yarn.util.FSDownload.unpack`.
*/
def unpack(source: File, dest: File): Unit = {
val lowerSrc = StringUtils.toLowerCase(source.getName)
if (lowerSrc.endsWith(".jar")) {
RunJar.unJar(source, dest, RunJar.MATCH_ANY)
} else if (lowerSrc.endsWith(".zip")) {
FileUtil.unZip(source, dest)
} else if (
lowerSrc.endsWith(".tar.gz") || lowerSrc.endsWith(".tgz") || lowerSrc.endsWith(".tar")) {
FileUtil.unTar(source, dest)
} else {
logWarning(s"Cannot unpack $source, just copying it to $dest.")
copyRecursive(source, dest)
}
}
/** Records the duration of running `body`. */
def timeTakenMs[T](body: => T): (T, Long) = {
val startTime = System.nanoTime()

View file

@ -160,6 +160,85 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}
test("SPARK-33530: basic case for addArchive and listArchives") {
withTempDir { dir =>
val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
val file2 = File.createTempFile("someprefix2", "somesuffix2", dir)
val file3 = File.createTempFile("someprefix3", "somesuffix3", dir)
val file4 = File.createTempFile("someprefix4", "somesuffix4", dir)
val jarFile = new File(dir, "test!@$jar.jar")
val zipFile = new File(dir, "test-zip.zip")
val relativePath1 =
s"${zipFile.getParent}/../${zipFile.getParentFile.getName}/${zipFile.getName}"
val relativePath2 =
s"${jarFile.getParent}/../${jarFile.getParentFile.getName}/${jarFile.getName}#zoo"
try {
Files.write("somewords1", file1, StandardCharsets.UTF_8)
Files.write("somewords22", file2, StandardCharsets.UTF_8)
Files.write("somewords333", file3, StandardCharsets.UTF_8)
Files.write("somewords4444", file4, StandardCharsets.UTF_8)
val length1 = file1.length()
val length2 = file2.length()
val length3 = file1.length()
val length4 = file2.length()
createJar(Seq(file1, file2), jarFile)
createJar(Seq(file3, file4), zipFile)
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
sc.addArchive(jarFile.getAbsolutePath)
sc.addArchive(relativePath1)
sc.addArchive(s"${jarFile.getAbsolutePath}#foo")
sc.addArchive(s"${zipFile.getAbsolutePath}#bar")
sc.addArchive(relativePath2)
sc.parallelize(Array(1), 1).map { x =>
val gotten1 = new File(SparkFiles.get(jarFile.getName))
val gotten2 = new File(SparkFiles.get(zipFile.getName))
val gotten3 = new File(SparkFiles.get("foo"))
val gotten4 = new File(SparkFiles.get("bar"))
val gotten5 = new File(SparkFiles.get("zoo"))
Seq(gotten1, gotten2, gotten3, gotten4, gotten5).foreach { gotten =>
if (!gotten.exists()) {
throw new SparkException(s"The archive doesn't exist: ${gotten.getAbsolutePath}")
}
if (!gotten.isDirectory) {
throw new SparkException(s"The archive was not unpacked: ${gotten.getAbsolutePath}")
}
}
// Jars
Seq(gotten1, gotten3, gotten5).foreach { gotten =>
val actualLength1 = new File(gotten, file1.getName).length()
val actualLength2 = new File(gotten, file2.getName).length()
if (actualLength1 != length1 || actualLength2 != length2) {
s"Unpacked files have different lengths $actualLength1 and $actualLength2. at " +
s"${gotten.getAbsolutePath}. They should be $length1 and $length2."
}
}
// Zip
Seq(gotten2, gotten4).foreach { gotten =>
val actualLength3 = new File(gotten, file1.getName).length()
val actualLength4 = new File(gotten, file2.getName).length()
if (actualLength3 != length3 || actualLength4 != length4) {
s"Unpacked files have different lengths $actualLength3 and $actualLength4. at " +
s"${gotten.getAbsolutePath}. They should be $length3 and $length4."
}
}
x
}.count()
assert(sc.listArchives().count(_.endsWith("test!@$jar.jar")) == 1)
assert(sc.listArchives().count(_.contains("test-zip.zip")) == 2)
} finally {
sc.stop()
}
}
}
test("add and list jar files") {
val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar")
try {

View file

@ -335,6 +335,43 @@ class SparkSubmitSuite
sys.props("SPARK_SUBMIT") should be ("true")
}
test("SPARK-33530: handles standalone mode with archives") {
val clArgs = Seq(
"--master", "spark://localhost:1234",
"--executor-memory", "5g",
"--executor-cores", "5",
"--class", "org.SomeClass",
"--jars", "one.jar,two.jar,three.jar",
"--driver-memory", "4g",
"--files", "file1.txt,file2.txt",
"--archives", "archive1.zip,archive2.jar",
"--num-executors", "6",
"--name", "beauty",
"--conf", "spark.ui.enabled=false",
"thejar.jar",
"arg1", "arg2")
val appArgs = new SparkSubmitArguments(clArgs)
val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs)
val childArgsStr = childArgs.mkString(" ")
childArgsStr should include ("arg1 arg2")
mainClass should be ("org.SomeClass")
classpath(0) should endWith ("thejar.jar")
classpath(1) should endWith ("one.jar")
classpath(2) should endWith ("two.jar")
classpath(3) should endWith ("three.jar")
conf.get("spark.executor.memory") should be ("5g")
conf.get("spark.driver.memory") should be ("4g")
conf.get("spark.executor.cores") should be ("5")
conf.get("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar")
conf.get("spark.files") should include regex (".*file1.txt,.*file2.txt")
conf.get("spark.archives") should include regex (".*archive1.zip,.*archive2.jar")
conf.get("spark.app.name") should be ("beauty")
conf.get(UI_ENABLED) should be (false)
sys.props("SPARK_SUBMIT") should be ("true")
}
test("handles standalone cluster mode") {
testStandaloneCluster(useRest = true)
}

View file

@ -98,6 +98,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
// optional fields
conf.set(JARS, Seq("mayonnaise.jar", "ketchup.jar"))
conf.set(FILES.key, "fireball.png")
conf.set(ARCHIVES.key, "fireballs.zip")
conf.set("spark.driver.memory", s"${Utils.DEFAULT_DRIVER_MEM_MB}m")
conf.set(DRIVER_CORES, 180)
conf.set("spark.driver.extraJavaOptions", " -Dslices=5 -Dcolor=mostly_red")
@ -246,6 +247,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
| },
| "mainClass" : "org.apache.spark.examples.SparkPie",
| "sparkProperties" : {
| "spark.archives" : "fireballs.zip",
| "spark.driver.extraLibraryPath" : "pickle.jar",
| "spark.jars" : "mayonnaise.jar,ketchup.jar",
| "spark.driver.supervise" : "false",
@ -272,6 +274,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite {
| },
| "mainClass" : "org.apache.spark.examples.SparkPie",
| "sparkProperties" : {
| "spark.archives" : "fireballs.zip",
| "spark.driver.extraLibraryPath" : "pickle.jar",
| "spark.jars" : "mayonnaise.jar,ketchup.jar",
| "spark.driver.supervise" : "false",

View file

@ -302,7 +302,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
// We don't really verify the data, just pass it around.
val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19,
1, mutable.Map.empty, mutable.Map.empty, new Properties,
1, mutable.Map.empty, mutable.Map.empty, mutable.Map.empty, new Properties,
Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data)
val serializedTaskDescription = TaskDescription.encode(taskDescription)
backend.executor = mock[Executor]

View file

@ -519,6 +519,7 @@ class ExecutorSuite extends SparkFunSuite
partitionId = 0,
addedFiles = Map[String, Long](),
addedJars = Map[String, Long](),
addedArchives = Map[String, Long](),
properties = new Properties,
resources = immutable.Map[String, ResourceInformation](),
serializedTask)

View file

@ -244,7 +244,8 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
var taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
"t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
"t1", 0, 1, mutable.Map.empty[String, Long],
mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
new Properties(), taskResources, bytebuffer)))
val ts = backend.getTaskSchedulerImpl()
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs)

View file

@ -91,7 +91,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
.set(key, secretPassword)
val hadoopconf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
val envDetails = SparkEnv.environmentDetails(conf, hadoopconf, "FIFO", Seq.empty, Seq.empty)
val envDetails = SparkEnv.environmentDetails(
conf, hadoopconf, "FIFO", Seq.empty, Seq.empty, Seq.empty)
val event = SparkListenerEnvironmentUpdate(envDetails)
val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap
assert(redactedProps(key) == "*********(redacted)")

View file

@ -33,6 +33,10 @@ class TaskDescriptionSuite extends SparkFunSuite {
originalFiles.put("fileUrl1", 1824)
originalFiles.put("fileUrl2", 2)
val originalArchives = new HashMap[String, Long]()
originalArchives.put("archiveUrl1", 1824)
originalArchives.put("archiveUrl2", 2)
val originalJars = new HashMap[String, Long]()
originalJars.put("jar1", 3)
@ -70,6 +74,7 @@ class TaskDescriptionSuite extends SparkFunSuite {
partitionId = 1,
originalFiles,
originalJars,
originalArchives,
originalProperties,
originalResources,
taskBuffer
@ -87,6 +92,7 @@ class TaskDescriptionSuite extends SparkFunSuite {
assert(decodedTaskDescription.partitionId === originalTaskDescription.partitionId)
assert(decodedTaskDescription.addedFiles.equals(originalFiles))
assert(decodedTaskDescription.addedJars.equals(originalJars))
assert(decodedTaskDescription.addedArchives.equals(originalArchives))
assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties))
assert(equalResources(decodedTaskDescription.resources, originalTaskDescription.resources))
assert(decodedTaskDescription.serializedTask.equals(taskBuffer))

View file

@ -784,6 +784,17 @@ Apart from these, the following properties are also available, and may be useful
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.archives</code></td>
<td></td>
<td>
Comma-separated list of archives to be extracted into the working directory of each executor.
.jar, .tar.gz, .tgz and .zip are supported. You can specify the directory name to unpack via
adding <code>#</code> after the file name to unpack, for example, <code>file.zip#directory</code>.
This configuration is experimental.
</td>
<td>3.1.0</td>
</tr>
<tr>
<td><code>spark.pyspark.driver.python</code></td>
<td></td>

View file

@ -41,6 +41,7 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.shuffle.api.ShuffleMapOutputWriter.commitAllPartitions"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.shuffle.api.ShuffleMapOutputWriter.commitAllPartitions"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.environmentDetails"),
// mllib module
ProblemFilters.exclude[NewMixinForwarderProblem]("org.apache.spark.ml.classification.LogisticRegressionTrainingSummary.totalIterations"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionTrainingSummary.$init$"),

View file

@ -77,8 +77,7 @@ Using Zipped Virtual Environment
--------------------------------
The idea of zipped environments is to zip your whole `virtual environment <https://docs.python.org/3/tutorial/venv.html>`_,
ship it to the cluster, unzip it remotely and target the Python interpreter from inside this zipped environment. Note that this
is currently supported *only for YARN*.
ship it to the cluster, unzip it remotely and target the Python interpreter from inside this zipped environment.
Zip Virtual Environment
~~~~~~~~~~~~~~~~~~~~~~~
@ -92,16 +91,15 @@ Example with `conda-pack`:
.. code-block:: bash
conda create -y -n conda_env -c conda-forge \
pyspark==3.0.1 pyarrow==0.15.1 pandas==0.25.3 conda-pack==0.4.0
conda activate conda_env
conda pack -f -o conda_env.tar.gz
conda create -y -n pyspark_env -c conda-forge pyarrow==2.0.0 pandas==1.1.4 conda-pack==0.5.0
conda activate pyspark_env
conda pack -f -o pyspark_env.tar.gz
Upload to Spark Executors
~~~~~~~~~~~~~~~~~~~~~~~~~
Unzipping will be done by Spark when using target ``--archives`` option in spark-submit
or setting ``spark.yarn.dist.archives`` configuration.
or setting ``spark.archives`` configuration.
Example with ``spark-submit``:
@ -109,8 +107,7 @@ Example with ``spark-submit``:
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --master=yarn --deploy-mode client \
--archives conda_env.tar.gz#environment app.py
spark-submit --master=... --archives pyspark_env.tar.gz#environment app.py
Example using ``SparkSession.builder``:
@ -121,11 +118,17 @@ Example using ``SparkSession.builder``:
from app import main
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
builder = SparkSession.builder.master("yarn").config(
"spark.yarn.dist.archives", "conda_env.tar.gz#environment")
spark = builder.getOrCreate()
spark = SparkSession.builder.master("...").config("spark.archives", "pyspark_env.tar.gz#environment").getOrCreate()
main(spark)
Example with ``pyspark`` shell:
.. code-block:: bash
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --master=... --archives pyspark_env.tar.gz#environment
Using PEX
---------

View file

@ -264,6 +264,7 @@ class MesosFineGrainedSchedulerBackendSuite
partitionId = 0,
addedFiles = mutable.Map.empty[String, Long],
addedJars = mutable.Map.empty[String, Long],
addedArchives = mutable.Map.empty[String, Long],
properties = new Properties(),
resources = immutable.Map.empty[String, ResourceInformation],
ByteBuffer.wrap(new Array[Byte](0)))
@ -377,6 +378,7 @@ class MesosFineGrainedSchedulerBackendSuite
partitionId = 0,
addedFiles = mutable.Map.empty[String, Long],
addedJars = mutable.Map.empty[String, Long],
addedArchives = mutable.Map.empty[String, Long],
properties = new Properties(),
resources = immutable.Map.empty[String, ResourceInformation],
ByteBuffer.wrap(new Array[Byte](0)))