diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b953592fa0..86f1d745d9 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -22,6 +22,7 @@ import java.net.URI import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID} import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} +import javax.ws.rs.core.UriBuilder import scala.collection.JavaConverters._ import scala.collection.Map @@ -39,7 +40,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource} @@ -221,6 +222,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _listenerBusStarted: Boolean = false private var _jars: Seq[String] = _ private var _files: Seq[String] = _ + private var _archives: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ private var _statusStore: AppStatusStore = _ private var _heartbeater: Heartbeater = _ @@ -246,6 +248,7 @@ class SparkContext(config: SparkConf) extends Logging { def jars: Seq[String] = _jars def files: Seq[String] = _files + def archives: Seq[String] = _archives def master: String = _conf.get("spark.master") def deployMode: String = _conf.get(SUBMIT_DEPLOY_MODE) def appName: String = _conf.get("spark.app.name") @@ -278,6 +281,7 @@ class SparkContext(config: SparkConf) extends Logging { // Used to store a URL for each static file/jar together with the file's local timestamp private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala + private[spark] val addedArchives = new ConcurrentHashMap[String, Long]().asScala private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala // Keeps track of all persisted RDDs @@ -422,6 +426,7 @@ class SparkContext(config: SparkConf) extends Logging { _jars = Utils.getUserJars(_conf) _files = _conf.getOption(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten + _archives = _conf.getOption(ARCHIVES.key).map(Utils.stringToSeq).toSeq.flatten _eventLogDir = if (isEventLogEnabled) { @@ -506,6 +511,13 @@ class SparkContext(config: SparkConf) extends Logging { } } + if (archives != null) { + archives.foreach(file => addFile(file, false, true, isArchive = true)) + if (addedArchives.nonEmpty) { + _conf.set("spark.app.initial.archive.urls", addedArchives.keys.toSeq.mkString(",")) + } + } + _executorMemory = _conf.getOption(EXECUTOR_MEMORY.key) .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) .orElse(Option(System.getenv("SPARK_MEM")) @@ -1521,6 +1533,36 @@ class SparkContext(config: SparkConf) extends Logging { */ def listFiles(): Seq[String] = addedFiles.keySet.toSeq + /** + * :: Experimental :: + * Add an archive to be downloaded and unpacked with this Spark job on every node. + * + * If an archive is added during execution, it will not be available until the next TaskSet + * starts. + * + * @param path can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, + * use `SparkFiles.get(paths-to-files)` to find its download/unpacked location. + * The given path should be one of .zip, .tar, .tar.gz, .tgz and .jar. + * + * @note A path can be added only once. Subsequent additions of the same path are ignored. + * + * @since 3.1.0 + */ + @Experimental + def addArchive(path: String): Unit = { + addFile(path, false, false, isArchive = true) + } + + /** + * :: Experimental :: + * Returns a list of archive paths that are added to resources. + * + * @since 3.1.0 + */ + @Experimental + def listArchives(): Seq[String] = addedArchives.keySet.toSeq + /** * Add a file to be downloaded with this Spark job on every node. * @@ -1538,8 +1580,14 @@ class SparkContext(config: SparkConf) extends Logging { addFile(path, recursive, false) } - private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = { - val uri = new Path(path).toUri + private def addFile( + path: String, recursive: Boolean, addedOnSubmit: Boolean, isArchive: Boolean = false + ): Unit = { + val uri = if (!isArchive) { + new Path(path).toUri + } else { + Utils.resolveURI(path) + } val schemeCorrectedURI = uri.getScheme match { case null => new File(path).getCanonicalFile.toURI case "local" => @@ -1551,7 +1599,7 @@ class SparkContext(config: SparkConf) extends Logging { val hadoopPath = new Path(schemeCorrectedURI) val scheme = schemeCorrectedURI.getScheme - if (!Array("http", "https", "ftp").contains(scheme)) { + if (!Array("http", "https", "ftp").contains(scheme) && !isArchive) { val fs = hadoopPath.getFileSystem(hadoopConfiguration) val isDir = fs.getFileStatus(hadoopPath).isDirectory if (!isLocal && scheme == "file" && isDir) { @@ -1569,21 +1617,39 @@ class SparkContext(config: SparkConf) extends Logging { val key = if (!isLocal && scheme == "file") { env.rpcEnv.fileServer.addFile(new File(uri.getPath)) + } else if (uri.getScheme == null) { + schemeCorrectedURI.toString + } else if (isArchive) { + uri.toString } else { - if (uri.getScheme == null) { - schemeCorrectedURI.toString - } else { - path - } + path } + val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis - if (addedFiles.putIfAbsent(key, timestamp).isEmpty) { + if (!isArchive && addedFiles.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added file $path at $key with timestamp $timestamp") // Fetch the file locally so that closures which are run on the driver can still use the // SparkFiles API to access files. Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, hadoopConfiguration, timestamp, useCache = false) postEnvironmentUpdate() + } else if ( + isArchive && + addedArchives.putIfAbsent( + UriBuilder.fromUri(new URI(key)).fragment(uri.getFragment).build().toString, + timestamp).isEmpty) { + logInfo(s"Added archive $path at $key with timestamp $timestamp") + val uriToDownload = UriBuilder.fromUri(new URI(key)).fragment(null).build() + val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf, + env.securityManager, hadoopConfiguration, timestamp, useCache = false, shouldUntar = false) + val dest = new File( + SparkFiles.getRootDirectory(), + if (uri.getFragment != null) uri.getFragment else source.getName) + logInfo( + s"Unpacking an archive $path from ${source.getAbsolutePath} to ${dest.getAbsolutePath}") + Utils.deleteRecursively(dest) + Utils.unpack(source, dest) + postEnvironmentUpdate() } else { logWarning(s"The path $path has been added already. Overwriting of added paths " + "is not supported in the current version.") @@ -2495,8 +2561,9 @@ class SparkContext(config: SparkConf) extends Logging { val schedulingMode = getSchedulingMode.toString val addedJarPaths = addedJars.keys.toSeq val addedFilePaths = addedFiles.keys.toSeq + val addedArchivePaths = addedArchives.keys.toSeq val environmentDetails = SparkEnv.environmentDetails(conf, hadoopConfiguration, - schedulingMode, addedJarPaths, addedFilePaths) + schedulingMode, addedJarPaths, addedFilePaths, addedArchivePaths) val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails) listenerBus.post(environmentUpdate) } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index d543359f4d..9fc60ac399 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -454,7 +454,8 @@ object SparkEnv extends Logging { hadoopConf: Configuration, schedulingMode: String, addedJars: Seq[String], - addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = { + addedFiles: Seq[String], + addedArchives: Seq[String]): Map[String, Seq[(String, String)]] = { import Properties._ val jvmInformation = Seq( @@ -484,7 +485,7 @@ object SparkEnv extends Logging { .split(File.pathSeparator) .filterNot(_.isEmpty) .map((_, "System Classpath")) - val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) + val addedJarsAndFiles = (addedJars ++ addedFiles ++ addedArchives).map((_, "Added By User")) val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted // Add Hadoop properties, it will not ignore configs including in Spark. Some spark diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4aa393c514..a344bce7a0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -607,6 +607,8 @@ private[spark] class SparkSubmit extends Logging { confKey = CORES_MAX.key), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = FILES.key), + OptionAssigner(args.archives, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, + confKey = ARCHIVES.key), OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key), OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = JARS.key), @@ -796,6 +798,7 @@ private[spark] class SparkSubmit extends Logging { val pathConfigs = Seq( JARS.key, FILES.key, + ARCHIVES.key, "spark.yarn.dist.files", "spark.yarn.dist.archives", "spark.yarn.dist.jars") diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 3090a3b10a..9da1a73bba 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -183,6 +183,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull jars = Option(jars).orElse(sparkProperties.get(config.JARS.key)).orNull files = Option(files).orElse(sparkProperties.get(config.FILES.key)).orNull + archives = Option(archives).orElse(sparkProperties.get(config.ARCHIVES.key)).orNull pyFiles = Option(pyFiles).orElse(sparkProperties.get(config.SUBMIT_PYTHON_FILES.key)).orNull ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull ivySettingsPath = sparkProperties.get("spark.jars.ivySettings") @@ -512,6 +513,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | --files FILES Comma-separated list of files to be placed in the working | directory of each executor. File paths of these files | in executors can be accessed via SparkFiles.get(fileName). + | --archives ARCHIVES Comma-separated list of archives to be extracted into the + | working directory of each executor. | | --conf, -c PROP=VALUE Arbitrary Spark configuration property. | --properties-file FILE Path to a file from which to load extra properties. If not @@ -562,8 +565,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | | Spark on YARN only: | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). - | --archives ARCHIVES Comma separated list of archives to be extracted into the - | working directory of each executor. """.stripMargin ) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c81ac778a3..e7f1b8f3cf 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -26,6 +26,7 @@ import java.util.{Locale, Properties} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean import javax.annotation.concurrent.GuardedBy +import javax.ws.rs.core.UriBuilder import scala.collection.JavaConverters._ import scala.collection.immutable @@ -78,6 +79,7 @@ private[spark] class Executor( // Each map holds the master's timestamp for the version of that file or JAR we got. private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() private val currentJars: HashMap[String, Long] = new HashMap[String, Long]() + private val currentArchives: HashMap[String, Long] = new HashMap[String, Long]() private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) @@ -232,16 +234,17 @@ private[spark] class Executor( private val appStartTime = conf.getLong("spark.app.startTime", 0) // To allow users to distribute plugins and their required files - // specified by --jars and --files on application submission, those jars/files should be - // downloaded and added to the class loader via updateDependencies. - // This should be done before plugin initialization below + // specified by --jars, --files and --archives on application submission, those + // jars/files/archives should be downloaded and added to the class loader via + // updateDependencies. This should be done before plugin initialization below // because executors search plugins from the class loader and initialize them. - private val Seq(initialUserJars, initialUserFiles) = Seq("jar", "file").map { key => - conf.getOption(s"spark.app.initial.$key.urls").map { urls => - Map(urls.split(",").map(url => (url, appStartTime)): _*) - }.getOrElse(Map.empty) - } - updateDependencies(initialUserFiles, initialUserJars) + private val Seq(initialUserJars, initialUserFiles, initialUserArchives) = + Seq("jar", "file", "archive").map { key => + conf.getOption(s"spark.app.initial.$key.urls").map { urls => + Map(urls.split(",").map(url => (url, appStartTime)): _*) + }.getOrElse(Map.empty) + } + updateDependencies(initialUserFiles, initialUserJars, initialUserArchives) // Plugins need to load using a class loader that includes the executor's user classpath. // Plugins also needs to be initialized after the heartbeater started @@ -449,7 +452,8 @@ private[spark] class Executor( // requires access to properties contained within (e.g. for access control). Executor.taskDeserializationProps.set(taskDescription.properties) - updateDependencies(taskDescription.addedFiles, taskDescription.addedJars) + updateDependencies( + taskDescription.addedFiles, taskDescription.addedJars, taskDescription.addedArchives) task = ser.deserialize[Task[Any]]( taskDescription.serializedTask, Thread.currentThread.getContextClassLoader) task.localProperties = taskDescription.properties @@ -909,24 +913,42 @@ private[spark] class Executor( * Download any missing dependencies if we receive a new set of files and JARs from the * SparkContext. Also adds any new JARs we fetched to the class loader. */ - private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]): Unit = { + private def updateDependencies( + newFiles: Map[String, Long], + newJars: Map[String, Long], + newArchives: Map[String, Long]): Unit = { lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) synchronized { // Fetch missing dependencies for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { - logInfo("Fetching " + name + " with timestamp " + timestamp) + logInfo(s"Fetching $name with timestamp $timestamp") // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, hadoopConf, timestamp, useCache = !isLocal) currentFiles(name) = timestamp } + for ((name, timestamp) <- newArchives if currentArchives.getOrElse(name, -1L) < timestamp) { + logInfo(s"Fetching $name with timestamp $timestamp") + val sourceURI = new URI(name) + val uriToDownload = UriBuilder.fromUri(sourceURI).fragment(null).build() + val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf, + env.securityManager, hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false) + val dest = new File( + SparkFiles.getRootDirectory(), + if (sourceURI.getFragment != null) sourceURI.getFragment else source.getName) + logInfo( + s"Unpacking an archive $name from ${source.getAbsolutePath} to ${dest.getAbsolutePath}") + Utils.deleteRecursively(dest) + Utils.unpack(source, dest) + currentArchives(name) = timestamp + } for ((name, timestamp) <- newJars) { val localName = new URI(name).getPath.split("/").last val currentTimeStamp = currentJars.get(name) .orElse(currentJars.get(localName)) .getOrElse(-1L) if (currentTimeStamp < timestamp) { - logInfo("Fetching " + name + " with timestamp " + timestamp) + logInfo(s"Fetching $name with timestamp $timestamp") // Fetch file with useCache mode, close cache for local mode. Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, hadoopConf, timestamp, useCache = !isLocal) @@ -934,7 +956,7 @@ private[spark] class Executor( // Add it to our class loader val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL if (!urlClassLoader.getURLs().contains(url)) { - logInfo("Adding " + url + " to class loader") + logInfo(s"Adding $url to class loader") urlClassLoader.addURL(url) } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 093a0ecf58..6639f20a06 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1813,6 +1813,16 @@ package object config { .toSequence .createWithDefault(Nil) + private[spark] val ARCHIVES = ConfigBuilder("spark.archives") + .version("3.1.0") + .doc("Comma-separated list of archives to be extracted into the working directory of each " + + "executor. .jar, .tar.gz, .tgz and .zip are supported. You can specify the directory " + + "name to unpack via adding '#' after the file name to unpack, for example, " + + "'file.zip#directory'. This configuration is experimental.") + .stringConf + .toSequence + .createWithDefault(Nil) + private[spark] val SUBMIT_DEPLOY_MODE = ConfigBuilder("spark.submit.deployMode") .version("1.5.0") .stringConf diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 863bf27088..12b911d061 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -55,6 +55,7 @@ private[spark] class TaskDescription( val partitionId: Int, val addedFiles: Map[String, Long], val addedJars: Map[String, Long], + val addedArchives: Map[String, Long], val properties: Properties, val resources: immutable.Map[String, ResourceInformation], val serializedTask: ByteBuffer) { @@ -99,6 +100,9 @@ private[spark] object TaskDescription { // Write jars. serializeStringLongMap(taskDescription.addedJars, dataOut) + // Write archives. + serializeStringLongMap(taskDescription.addedArchives, dataOut) + // Write properties. dataOut.writeInt(taskDescription.properties.size()) taskDescription.properties.asScala.foreach { case (key, value) => @@ -167,6 +171,9 @@ private[spark] object TaskDescription { // Read jars. val taskJars = deserializeStringLongMap(dataIn) + // Read archives. + val taskArchives = deserializeStringLongMap(dataIn) + // Read properties. val properties = new Properties() val numProperties = dataIn.readInt() @@ -185,6 +192,6 @@ private[spark] object TaskDescription { val serializedTask = byteBuffer.slice() new TaskDescription(taskId, attemptNumber, executorId, name, index, partitionId, taskFiles, - taskJars, properties, resources, serializedTask) + taskJars, taskArchives, properties, resources, serializedTask) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 914fccc1a6..ad0791fa42 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -63,6 +63,7 @@ private[spark] class TaskSetManager( // SPARK-21563 make a copy of the jars/files so they are consistent across the TaskSet private val addedJars = HashMap[String, Long](sched.sc.addedJars.toSeq: _*) private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*) + private val addedArchives = HashMap[String, Long](sched.sc.addedArchives.toSeq: _*) val maxResultSize = conf.get(config.MAX_RESULT_SIZE) @@ -493,6 +494,7 @@ private[spark] class TaskSetManager( task.partitionId, addedFiles, addedJars, + addedArchives, task.localProperties, taskResourceAssignments, serializedTask) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index accf3d7c0d..ae4df146b0 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -53,6 +53,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.util.{RunJar, StringUtils} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.eclipse.jetty.util.MultiException import org.slf4j.Logger @@ -486,6 +487,10 @@ private[spark] object Utils extends Logging { * * Throws SparkException if the target file already exists and has different contents than * the requested file. + * + * If `shouldUntar` is true, it untars the given url if it is a tar.gz or tgz into `targetDir`. + * This is a legacy behavior, and users should better use `spark.archives` configuration or + * `SparkContext.addArchive` */ def fetchFile( url: String, @@ -494,7 +499,8 @@ private[spark] object Utils extends Logging { securityMgr: SecurityManager, hadoopConf: Configuration, timestamp: Long, - useCache: Boolean): File = { + useCache: Boolean, + shouldUntar: Boolean = true): File = { val fileName = decodeFileNameInURI(new URI(url)) val targetFile = new File(targetDir, fileName) val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true) @@ -535,13 +541,23 @@ private[spark] object Utils extends Logging { doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf) } - // Decompress the file if it's a .tar or .tar.gz - if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) { - logInfo("Untarring " + fileName) - executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir) - } else if (fileName.endsWith(".tar")) { - logInfo("Untarring " + fileName) - executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir) + if (shouldUntar) { + // Decompress the file if it's a .tar or .tar.gz + if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) { + logWarning( + "Untarring behavior will be deprecated at spark.files and " + + "SparkContext.addFile. Consider using spark.archives or SparkContext.addArchive " + + "instead.") + logInfo("Untarring " + fileName) + executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir) + } else if (fileName.endsWith(".tar")) { + logWarning( + "Untarring behavior will be deprecated at spark.files and " + + "SparkContext.addFile. Consider using spark.archives or SparkContext.addArchive " + + "instead.") + logInfo("Untarring " + fileName) + executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir) + } } // Make the file executable - That's necessary for scripts FileUtil.chmod(targetFile.getAbsolutePath, "a+x") @@ -555,6 +571,26 @@ private[spark] object Utils extends Logging { targetFile } + /** + * Unpacks an archive file into the specified directory. It expects .jar, .zip, .tar.gz, .tgz + * and .tar files. This behaves same as Hadoop's archive in distributed cache. This method is + * basically copied from `org.apache.hadoop.yarn.util.FSDownload.unpack`. + */ + def unpack(source: File, dest: File): Unit = { + val lowerSrc = StringUtils.toLowerCase(source.getName) + if (lowerSrc.endsWith(".jar")) { + RunJar.unJar(source, dest, RunJar.MATCH_ANY) + } else if (lowerSrc.endsWith(".zip")) { + FileUtil.unZip(source, dest) + } else if ( + lowerSrc.endsWith(".tar.gz") || lowerSrc.endsWith(".tgz") || lowerSrc.endsWith(".tar")) { + FileUtil.unTar(source, dest) + } else { + logWarning(s"Cannot unpack $source, just copying it to $dest.") + copyRecursive(source, dest) + } + } + /** Records the duration of running `body`. */ def timeTakenMs[T](body: => T): (T, Long) = { val startTime = System.nanoTime() diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index ebdf2f59a2..55bfa70f21 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -160,6 +160,85 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } + test("SPARK-33530: basic case for addArchive and listArchives") { + withTempDir { dir => + val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) + val file2 = File.createTempFile("someprefix2", "somesuffix2", dir) + val file3 = File.createTempFile("someprefix3", "somesuffix3", dir) + val file4 = File.createTempFile("someprefix4", "somesuffix4", dir) + + val jarFile = new File(dir, "test!@$jar.jar") + val zipFile = new File(dir, "test-zip.zip") + val relativePath1 = + s"${zipFile.getParent}/../${zipFile.getParentFile.getName}/${zipFile.getName}" + val relativePath2 = + s"${jarFile.getParent}/../${jarFile.getParentFile.getName}/${jarFile.getName}#zoo" + + try { + Files.write("somewords1", file1, StandardCharsets.UTF_8) + Files.write("somewords22", file2, StandardCharsets.UTF_8) + Files.write("somewords333", file3, StandardCharsets.UTF_8) + Files.write("somewords4444", file4, StandardCharsets.UTF_8) + val length1 = file1.length() + val length2 = file2.length() + val length3 = file1.length() + val length4 = file2.length() + + createJar(Seq(file1, file2), jarFile) + createJar(Seq(file3, file4), zipFile) + + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + sc.addArchive(jarFile.getAbsolutePath) + sc.addArchive(relativePath1) + sc.addArchive(s"${jarFile.getAbsolutePath}#foo") + sc.addArchive(s"${zipFile.getAbsolutePath}#bar") + sc.addArchive(relativePath2) + + sc.parallelize(Array(1), 1).map { x => + val gotten1 = new File(SparkFiles.get(jarFile.getName)) + val gotten2 = new File(SparkFiles.get(zipFile.getName)) + val gotten3 = new File(SparkFiles.get("foo")) + val gotten4 = new File(SparkFiles.get("bar")) + val gotten5 = new File(SparkFiles.get("zoo")) + + Seq(gotten1, gotten2, gotten3, gotten4, gotten5).foreach { gotten => + if (!gotten.exists()) { + throw new SparkException(s"The archive doesn't exist: ${gotten.getAbsolutePath}") + } + if (!gotten.isDirectory) { + throw new SparkException(s"The archive was not unpacked: ${gotten.getAbsolutePath}") + } + } + + // Jars + Seq(gotten1, gotten3, gotten5).foreach { gotten => + val actualLength1 = new File(gotten, file1.getName).length() + val actualLength2 = new File(gotten, file2.getName).length() + if (actualLength1 != length1 || actualLength2 != length2) { + s"Unpacked files have different lengths $actualLength1 and $actualLength2. at " + + s"${gotten.getAbsolutePath}. They should be $length1 and $length2." + } + } + + // Zip + Seq(gotten2, gotten4).foreach { gotten => + val actualLength3 = new File(gotten, file1.getName).length() + val actualLength4 = new File(gotten, file2.getName).length() + if (actualLength3 != length3 || actualLength4 != length4) { + s"Unpacked files have different lengths $actualLength3 and $actualLength4. at " + + s"${gotten.getAbsolutePath}. They should be $length3 and $length4." + } + } + x + }.count() + assert(sc.listArchives().count(_.endsWith("test!@$jar.jar")) == 1) + assert(sc.listArchives().count(_.contains("test-zip.zip")) == 2) + } finally { + sc.stop() + } + } + } + test("add and list jar files") { val jarPath = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar") try { diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index b5b3751439..dcd35f3f6b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -335,6 +335,43 @@ class SparkSubmitSuite sys.props("SPARK_SUBMIT") should be ("true") } + test("SPARK-33530: handles standalone mode with archives") { + val clArgs = Seq( + "--master", "spark://localhost:1234", + "--executor-memory", "5g", + "--executor-cores", "5", + "--class", "org.SomeClass", + "--jars", "one.jar,two.jar,three.jar", + "--driver-memory", "4g", + "--files", "file1.txt,file2.txt", + "--archives", "archive1.zip,archive2.jar", + "--num-executors", "6", + "--name", "beauty", + "--conf", "spark.ui.enabled=false", + "thejar.jar", + "arg1", "arg2") + val appArgs = new SparkSubmitArguments(clArgs) + val (childArgs, classpath, conf, mainClass) = submit.prepareSubmitEnvironment(appArgs) + val childArgsStr = childArgs.mkString(" ") + childArgsStr should include ("arg1 arg2") + mainClass should be ("org.SomeClass") + + classpath(0) should endWith ("thejar.jar") + classpath(1) should endWith ("one.jar") + classpath(2) should endWith ("two.jar") + classpath(3) should endWith ("three.jar") + + conf.get("spark.executor.memory") should be ("5g") + conf.get("spark.driver.memory") should be ("4g") + conf.get("spark.executor.cores") should be ("5") + conf.get("spark.jars") should include regex (".*one.jar,.*two.jar,.*three.jar") + conf.get("spark.files") should include regex (".*file1.txt,.*file2.txt") + conf.get("spark.archives") should include regex (".*archive1.zip,.*archive2.jar") + conf.get("spark.app.name") should be ("beauty") + conf.get(UI_ENABLED) should be (false) + sys.props("SPARK_SUBMIT") should be ("true") + } + test("handles standalone cluster mode") { testStandaloneCluster(useRest = true) } diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala index d08052faa0..9fdbf485e1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala @@ -98,6 +98,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite { // optional fields conf.set(JARS, Seq("mayonnaise.jar", "ketchup.jar")) conf.set(FILES.key, "fireball.png") + conf.set(ARCHIVES.key, "fireballs.zip") conf.set("spark.driver.memory", s"${Utils.DEFAULT_DRIVER_MEM_MB}m") conf.set(DRIVER_CORES, 180) conf.set("spark.driver.extraJavaOptions", " -Dslices=5 -Dcolor=mostly_red") @@ -246,6 +247,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite { | }, | "mainClass" : "org.apache.spark.examples.SparkPie", | "sparkProperties" : { + | "spark.archives" : "fireballs.zip", | "spark.driver.extraLibraryPath" : "pickle.jar", | "spark.jars" : "mayonnaise.jar,ketchup.jar", | "spark.driver.supervise" : "false", @@ -272,6 +274,7 @@ class SubmitRestProtocolSuite extends SparkFunSuite { | }, | "mainClass" : "org.apache.spark.examples.SparkPie", | "sparkProperties" : { + | "spark.archives" : "fireballs.zip", | "spark.driver.extraLibraryPath" : "pickle.jar", | "spark.jars" : "mayonnaise.jar,ketchup.jar", | "spark.driver.supervise" : "false", diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 319dcfeece..810dcf0e61 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -302,7 +302,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite // We don't really verify the data, just pass it around. val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19, - 1, mutable.Map.empty, mutable.Map.empty, new Properties, + 1, mutable.Map.empty, mutable.Map.empty, mutable.Map.empty, new Properties, Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data) val serializedTaskDescription = TaskDescription.encode(taskDescription) backend.executor = mock[Executor] diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 5b868604ec..7cf7a81a76 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -519,6 +519,7 @@ class ExecutorSuite extends SparkFunSuite partitionId = 0, addedFiles = Map[String, Long](), addedJars = Map[String, Long](), + addedArchives = Map[String, Long](), properties = new Properties, resources = immutable.Map[String, ResourceInformation](), serializedTask) diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 65d51e57ee..7a74dd877a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -244,7 +244,8 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0"))) var taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1", - "t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], + "t1", 0, 1, mutable.Map.empty[String, Long], + mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], new Properties(), taskResources, bytebuffer))) val ts = backend.getTaskSchedulerImpl() when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 915035e9eb..c4a8bcbb26 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -91,7 +91,8 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit .set(key, secretPassword) val hadoopconf = SparkHadoopUtil.get.newConfiguration(new SparkConf()) val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf) - val envDetails = SparkEnv.environmentDetails(conf, hadoopconf, "FIFO", Seq.empty, Seq.empty) + val envDetails = SparkEnv.environmentDetails( + conf, hadoopconf, "FIFO", Seq.empty, Seq.empty, Seq.empty) val event = SparkListenerEnvironmentUpdate(envDetails) val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap assert(redactedProps(key) == "*********(redacted)") diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala index 5839532f11..98b5bada27 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala @@ -33,6 +33,10 @@ class TaskDescriptionSuite extends SparkFunSuite { originalFiles.put("fileUrl1", 1824) originalFiles.put("fileUrl2", 2) + val originalArchives = new HashMap[String, Long]() + originalArchives.put("archiveUrl1", 1824) + originalArchives.put("archiveUrl2", 2) + val originalJars = new HashMap[String, Long]() originalJars.put("jar1", 3) @@ -70,6 +74,7 @@ class TaskDescriptionSuite extends SparkFunSuite { partitionId = 1, originalFiles, originalJars, + originalArchives, originalProperties, originalResources, taskBuffer @@ -87,6 +92,7 @@ class TaskDescriptionSuite extends SparkFunSuite { assert(decodedTaskDescription.partitionId === originalTaskDescription.partitionId) assert(decodedTaskDescription.addedFiles.equals(originalFiles)) assert(decodedTaskDescription.addedJars.equals(originalJars)) + assert(decodedTaskDescription.addedArchives.equals(originalArchives)) assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties)) assert(equalResources(decodedTaskDescription.resources, originalTaskDescription.resources)) assert(decodedTaskDescription.serializedTask.equals(taskBuffer)) diff --git a/docs/configuration.md b/docs/configuration.md index 76494b04c9..d4d8e47645 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -784,6 +784,17 @@ Apart from these, the following properties are also available, and may be useful 2.3.0 + + spark.archives + + + Comma-separated list of archives to be extracted into the working directory of each executor. + .jar, .tar.gz, .tgz and .zip are supported. You can specify the directory name to unpack via + adding # after the file name to unpack, for example, file.zip#directory. + This configuration is experimental. + + 3.1.0 + spark.pyspark.driver.python diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 5a66bfca27..9405927eb1 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -41,6 +41,7 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter.commitAllPartitions"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.shuffle.api.ShuffleMapOutputWriter.commitAllPartitions"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.shuffle.api.ShuffleMapOutputWriter.commitAllPartitions"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkEnv.environmentDetails"), // mllib module ProblemFilters.exclude[NewMixinForwarderProblem]("org.apache.spark.ml.classification.LogisticRegressionTrainingSummary.totalIterations"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionTrainingSummary.$init$"), diff --git a/python/docs/source/user_guide/python_packaging.rst b/python/docs/source/user_guide/python_packaging.rst index ef4d05a8ee..0aff6dc1d1 100644 --- a/python/docs/source/user_guide/python_packaging.rst +++ b/python/docs/source/user_guide/python_packaging.rst @@ -77,8 +77,7 @@ Using Zipped Virtual Environment -------------------------------- The idea of zipped environments is to zip your whole `virtual environment `_, -ship it to the cluster, unzip it remotely and target the Python interpreter from inside this zipped environment. Note that this -is currently supported *only for YARN*. +ship it to the cluster, unzip it remotely and target the Python interpreter from inside this zipped environment. Zip Virtual Environment ~~~~~~~~~~~~~~~~~~~~~~~ @@ -92,16 +91,15 @@ Example with `conda-pack`: .. code-block:: bash - conda create -y -n conda_env -c conda-forge \ - pyspark==3.0.1 pyarrow==0.15.1 pandas==0.25.3 conda-pack==0.4.0 - conda activate conda_env - conda pack -f -o conda_env.tar.gz + conda create -y -n pyspark_env -c conda-forge pyarrow==2.0.0 pandas==1.1.4 conda-pack==0.5.0 + conda activate pyspark_env + conda pack -f -o pyspark_env.tar.gz Upload to Spark Executors ~~~~~~~~~~~~~~~~~~~~~~~~~ Unzipping will be done by Spark when using target ``--archives`` option in spark-submit -or setting ``spark.yarn.dist.archives`` configuration. +or setting ``spark.archives`` configuration. Example with ``spark-submit``: @@ -109,8 +107,7 @@ Example with ``spark-submit``: export PYSPARK_DRIVER_PYTHON=python export PYSPARK_PYTHON=./environment/bin/python - spark-submit --master=yarn --deploy-mode client \ - --archives conda_env.tar.gz#environment app.py + spark-submit --master=... --archives pyspark_env.tar.gz#environment app.py Example using ``SparkSession.builder``: @@ -121,11 +118,17 @@ Example using ``SparkSession.builder``: from app import main os.environ['PYSPARK_PYTHON'] = "./environment/bin/python" - builder = SparkSession.builder.master("yarn").config( - "spark.yarn.dist.archives", "conda_env.tar.gz#environment") - spark = builder.getOrCreate() + spark = SparkSession.builder.master("...").config("spark.archives", "pyspark_env.tar.gz#environment").getOrCreate() main(spark) +Example with ``pyspark`` shell: + +.. code-block:: bash + + export PYSPARK_DRIVER_PYTHON=python + export PYSPARK_PYTHON=./environment/bin/python + pyspark --master=... --archives pyspark_env.tar.gz#environment + Using PEX --------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 6a6514569c..10030a20f0 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -264,6 +264,7 @@ class MesosFineGrainedSchedulerBackendSuite partitionId = 0, addedFiles = mutable.Map.empty[String, Long], addedJars = mutable.Map.empty[String, Long], + addedArchives = mutable.Map.empty[String, Long], properties = new Properties(), resources = immutable.Map.empty[String, ResourceInformation], ByteBuffer.wrap(new Array[Byte](0))) @@ -377,6 +378,7 @@ class MesosFineGrainedSchedulerBackendSuite partitionId = 0, addedFiles = mutable.Map.empty[String, Long], addedJars = mutable.Map.empty[String, Long], + addedArchives = mutable.Map.empty[String, Long], properties = new Properties(), resources = immutable.Map.empty[String, ResourceInformation], ByteBuffer.wrap(new Array[Byte](0)))