From d7b1fcf8f0a267322af0592b2cb31f1c8970fb16 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 29 Aug 2017 14:42:24 -0700 Subject: [PATCH] [SPARK-21728][CORE] Allow SparkSubmit to use Logging. This change initializes logging when SparkSubmit runs, using a configuration that should avoid printing log messages as much as possible with most configurations, and adds code to restore the Spark logging system to as close as possible to its initial state, so the Spark app being run can re-initialize logging with its own configuration. With that feature, some duplicate code in SparkSubmit can now be replaced with the existing methods in the Utils class, which could not be used before because they initialized logging. As part of that I also did some minor refactoring, moving methods that should really belong in DependencyUtils. The change also shuffles some code in SparkHadoopUtil so that SparkSubmit can create a Hadoop config like the rest of Spark code, respecting the user's Spark configuration. The behavior was verified running spark-shell, pyspark and normal applications, then verifying the logging behavior, with and without dependency downloads. Author: Marcelo Vanzin Closes #19013 from vanzin/SPARK-21728. --- .../apache/spark/deploy/DependencyUtils.scala | 112 ++++++++--- .../apache/spark/deploy/SparkHadoopUtil.scala | 79 +++++--- .../org/apache/spark/deploy/SparkSubmit.scala | 179 +++--------------- .../spark/deploy/worker/DriverWrapper.scala | 9 +- .../org/apache/spark/internal/Logging.scala | 61 ++++-- .../scala/org/apache/spark/util/Utils.scala | 10 +- .../spark/deploy/SparkSubmitSuite.scala | 60 ++++-- 7 files changed, 263 insertions(+), 247 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala index 97f3803aaf..db92a8f102 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala @@ -18,15 +18,15 @@ package org.apache.spark.deploy import java.io.File -import java.nio.file.Files import scala.collection.mutable.HashMap -import org.apache.commons.io.FileUtils import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.util.MutableURLClassLoader +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.util.{MutableURLClassLoader, Utils} private[deploy] object DependencyUtils { @@ -51,41 +51,22 @@ private[deploy] object DependencyUtils { SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions) } - def createTempDir(): File = { - val targetDir = Files.createTempDirectory("tmp").toFile - // scalastyle:off runtimeaddshutdownhook - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = { - FileUtils.deleteQuietly(targetDir) - } - }) - // scalastyle:on runtimeaddshutdownhook - targetDir - } - - def resolveAndDownloadJars(jars: String, userJar: String): String = { - val targetDir = DependencyUtils.createTempDir() - val hadoopConf = new Configuration() - val sparkProperties = new HashMap[String, String]() - val securityProperties = List("spark.ssl.fs.trustStore", "spark.ssl.trustStore", - "spark.ssl.fs.trustStorePassword", "spark.ssl.trustStorePassword", - "spark.ssl.fs.protocol", "spark.ssl.protocol") - - securityProperties.foreach { pName => - sys.props.get(pName).foreach { pValue => - sparkProperties.put(pName, pValue) - } - } - + def resolveAndDownloadJars( + jars: String, + userJar: String, + sparkConf: SparkConf, + hadoopConf: Configuration, + secMgr: SecurityManager): String = { + val targetDir = Utils.createTempDir() Option(jars) .map { - SparkSubmit.resolveGlobPaths(_, hadoopConf) + resolveGlobPaths(_, hadoopConf) .split(",") .filterNot(_.contains(userJar.split("/").last)) .mkString(",") } .filterNot(_ == "") - .map(SparkSubmit.downloadFileList(_, targetDir, sparkProperties, hadoopConf)) + .map(downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)) .orNull } @@ -96,4 +77,73 @@ private[deploy] object DependencyUtils { } } } + + /** + * Download a list of remote files to temp local files. If the file is local, the original file + * will be returned. + * + * @param fileList A comma separated file list. + * @param targetDir A temporary directory for which downloaded files. + * @param sparkConf Spark configuration. + * @param hadoopConf Hadoop configuration. + * @param secMgr Spark security manager. + * @return A comma separated local files list. + */ + def downloadFileList( + fileList: String, + targetDir: File, + sparkConf: SparkConf, + hadoopConf: Configuration, + secMgr: SecurityManager): String = { + require(fileList != null, "fileList cannot be null.") + fileList.split(",") + .map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)) + .mkString(",") + } + + /** + * Download a file from the remote to a local temporary directory. If the input path points to + * a local path, returns it with no operation. + * + * @param path A file path from where the files will be downloaded. + * @param targetDir A temporary directory for which downloaded files. + * @param sparkConf Spark configuration. + * @param hadoopConf Hadoop configuration. + * @param secMgr Spark security manager. + * @return Path to the local file. + */ + def downloadFile( + path: String, + targetDir: File, + sparkConf: SparkConf, + hadoopConf: Configuration, + secMgr: SecurityManager): String = { + require(path != null, "path cannot be null.") + val uri = Utils.resolveURI(path) + + uri.getScheme match { + case "file" | "local" => path + case _ => + val fname = new Path(uri).getName() + val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, secMgr, + hadoopConf) + localFile.toURI().toString() + } + } + + def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { + require(paths != null, "paths cannot be null.") + paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path => + val uri = Utils.resolveURI(path) + uri.getScheme match { + case "local" | "http" | "https" | "ftp" => Array(path) + case _ => + val fs = FileSystem.get(uri, hadoopConf) + Option(fs.globStatus(new Path(uri))).map { status => + status.filter(_.isFile).map(_.getPath.toUri.toString) + }.getOrElse(Array(path)) + } + }.mkString(",") + } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6d507d8533..53775db251 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -81,29 +81,7 @@ class SparkHadoopUtil extends Logging { * configuration. */ def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = { - // Note: this null check is around more than just access to the "conf" object to maintain - // the behavior of the old implementation of this code, for backwards compatibility. - if (conf != null) { - // Explicitly check for S3 environment variables - val keyId = System.getenv("AWS_ACCESS_KEY_ID") - val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY") - if (keyId != null && accessKey != null) { - hadoopConf.set("fs.s3.awsAccessKeyId", keyId) - hadoopConf.set("fs.s3n.awsAccessKeyId", keyId) - hadoopConf.set("fs.s3a.access.key", keyId) - hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey) - hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey) - hadoopConf.set("fs.s3a.secret.key", accessKey) - - val sessionToken = System.getenv("AWS_SESSION_TOKEN") - if (sessionToken != null) { - hadoopConf.set("fs.s3a.session.token", sessionToken) - } - } - appendSparkHadoopConfigs(conf, hadoopConf) - val bufferSize = conf.get("spark.buffer.size", "65536") - hadoopConf.set("io.file.buffer.size", bufferSize) - } + SparkHadoopUtil.appendS3AndSparkHadoopConfigurations(conf, hadoopConf) } /** @@ -111,10 +89,7 @@ class SparkHadoopUtil extends Logging { * configuration without the spark.hadoop. prefix. */ def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { - // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar" - for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) { - hadoopConf.set(key.substring("spark.hadoop.".length), value) - } + SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf) } /** @@ -134,9 +109,7 @@ class SparkHadoopUtil extends Logging { * subsystems. */ def newConfiguration(conf: SparkConf): Configuration = { - val hadoopConf = new Configuration() - appendS3AndSparkHadoopConfigurations(conf, hadoopConf) - hadoopConf + SparkHadoopUtil.newConfiguration(conf) } /** @@ -479,4 +452,50 @@ object SparkHadoopUtil { hadoop } } + + /** + * Returns a Configuration object with Spark configuration applied on top. Unlike + * the instance method, this will always return a Configuration instance, and not a + * cluster manager-specific type. + */ + private[spark] def newConfiguration(conf: SparkConf): Configuration = { + val hadoopConf = new Configuration() + appendS3AndSparkHadoopConfigurations(conf, hadoopConf) + hadoopConf + } + + private def appendS3AndSparkHadoopConfigurations( + conf: SparkConf, + hadoopConf: Configuration): Unit = { + // Note: this null check is around more than just access to the "conf" object to maintain + // the behavior of the old implementation of this code, for backwards compatibility. + if (conf != null) { + // Explicitly check for S3 environment variables + val keyId = System.getenv("AWS_ACCESS_KEY_ID") + val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY") + if (keyId != null && accessKey != null) { + hadoopConf.set("fs.s3.awsAccessKeyId", keyId) + hadoopConf.set("fs.s3n.awsAccessKeyId", keyId) + hadoopConf.set("fs.s3a.access.key", keyId) + hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey) + hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey) + hadoopConf.set("fs.s3a.secret.key", accessKey) + + val sessionToken = System.getenv("AWS_SESSION_TOKEN") + if (sessionToken != null) { + hadoopConf.set("fs.s3a.session.token", sessionToken) + } + } + appendSparkHadoopConfigs(conf, hadoopConf) + val bufferSize = conf.get("spark.buffer.size", "65536") + hadoopConf.set("io.file.buffer.size", bufferSize) + } + } + + private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { + // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar" + for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) { + hadoopConf.set(key.substring("spark.hadoop.".length), value) + } + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 548149a88a..38604fe939 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -20,19 +20,16 @@ package org.apache.spark.deploy import java.io._ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException} import java.net.URL -import java.security.{KeyStore, PrivilegedExceptionAction} -import java.security.cert.X509Certificate +import java.security.PrivilegedExceptionAction import java.text.ParseException -import javax.net.ssl._ import scala.annotation.tailrec import scala.collection.mutable.{ArrayBuffer, HashMap, Map} import scala.util.Properties -import com.google.common.io.ByteStreams import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.{Configuration => HadoopConfiguration} -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.ivy.Ivy @@ -69,7 +66,9 @@ private[deploy] object SparkSubmitAction extends Enumeration { * This program handles setting up the classpath with relevant Spark dependencies and provides * a layer over the different cluster managers and deploy modes that Spark supports. */ -object SparkSubmit extends CommandLineUtils { +object SparkSubmit extends CommandLineUtils with Logging { + + import DependencyUtils._ // Cluster managers private val YARN = 1 @@ -113,6 +112,10 @@ object SparkSubmit extends CommandLineUtils { // scalastyle:on println override def main(args: Array[String]): Unit = { + // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to + // be reset before the application starts. + val uninitLog = initializeLogIfNecessary(true, silent = true) + val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { // scalastyle:off println @@ -120,7 +123,7 @@ object SparkSubmit extends CommandLineUtils { // scalastyle:on println } appArgs.action match { - case SparkSubmitAction.SUBMIT => submit(appArgs) + case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } @@ -153,7 +156,7 @@ object SparkSubmit extends CommandLineUtils { * main class. */ @tailrec - private def submit(args: SparkSubmitArguments): Unit = { + private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { @@ -185,11 +188,16 @@ object SparkSubmit extends CommandLineUtils { } } - // In standalone cluster mode, there are two submission gateways: - // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper - // (2) The new REST-based gateway introduced in Spark 1.3 - // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over - // to use the legacy gateway if the master endpoint turns out to be not a REST server. + // Let the main class re-initialize the logging system once it starts. + if (uninitLog) { + Logging.uninitialize() + } + + // In standalone cluster mode, there are two submission gateways: + // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper + // (2) The new REST-based gateway introduced in Spark 1.3 + // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over + // to use the legacy gateway if the master endpoint turns out to be not a REST server. if (args.isStandaloneCluster && args.useRest) { try { // scalastyle:off println @@ -202,7 +210,7 @@ object SparkSubmit extends CommandLineUtils { printWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.") args.useRest = false - submit(args) + submit(args, false) } // In all other modes, just run the main class as prepared } else { @@ -328,8 +336,10 @@ object SparkSubmit extends CommandLineUtils { } } - val hadoopConf = conf.getOrElse(new HadoopConfiguration()) - val targetDir = DependencyUtils.createTempDir() + val sparkConf = new SparkConf(false) + args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) } + val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf)) + val targetDir = Utils.createTempDir() // Resolve glob path for different resources. args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull @@ -342,14 +352,15 @@ object SparkSubmit extends CommandLineUtils { var localJars: String = null var localPyFiles: String = null if (deployMode == CLIENT) { + val secMgr = new SecurityManager(sparkConf) localPrimaryResource = Option(args.primaryResource).map { - downloadFile(_, targetDir, args.sparkProperties, hadoopConf) + downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull localJars = Option(args.jars).map { - downloadFileList(_, targetDir, args.sparkProperties, hadoopConf) + downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull localPyFiles = Option(args.pyFiles).map { - downloadFileList(_, targetDir, args.sparkProperties, hadoopConf) + downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull } @@ -863,136 +874,6 @@ object SparkSubmit extends CommandLineUtils { if (merged == "") null else merged } - /** - * Download a list of remote files to temp local files. If the file is local, the original file - * will be returned. - * @param fileList A comma separated file list. - * @param targetDir A temporary directory for which downloaded files - * @param sparkProperties Spark properties - * @return A comma separated local files list. - */ - private[deploy] def downloadFileList( - fileList: String, - targetDir: File, - sparkProperties: Map[String, String], - hadoopConf: HadoopConfiguration): String = { - require(fileList != null, "fileList cannot be null.") - fileList.split(",") - .map(downloadFile(_, targetDir, sparkProperties, hadoopConf)) - .mkString(",") - } - - /** - * Download a file from the remote to a local temporary directory. If the input path points to - * a local path, returns it with no operation. - * @param path A file path from where the files will be downloaded. - * @param targetDir A temporary directory for which downloaded files - * @param sparkProperties Spark properties - * @return A comma separated local files list. - */ - private[deploy] def downloadFile( - path: String, - targetDir: File, - sparkProperties: Map[String, String], - hadoopConf: HadoopConfiguration): String = { - require(path != null, "path cannot be null.") - val uri = Utils.resolveURI(path) - uri.getScheme match { - case "file" | "local" => path - case "http" | "https" | "ftp" => - val uc = uri.toURL.openConnection() - uc match { - case https: HttpsURLConnection => - val trustStore = sparkProperties.get("spark.ssl.fs.trustStore") - .orElse(sparkProperties.get("spark.ssl.trustStore")) - val trustStorePwd = sparkProperties.get("spark.ssl.fs.trustStorePassword") - .orElse(sparkProperties.get("spark.ssl.trustStorePassword")) - .map(_.toCharArray) - .orNull - val protocol = sparkProperties.get("spark.ssl.fs.protocol") - .orElse(sparkProperties.get("spark.ssl.protocol")) - if (protocol.isEmpty) { - printErrorAndExit("spark ssl protocol is required when enabling SSL connection.") - } - - val trustStoreManagers = trustStore.map { t => - var input: InputStream = null - try { - input = new FileInputStream(new File(t)) - val ks = KeyStore.getInstance(KeyStore.getDefaultType) - ks.load(input, trustStorePwd) - val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm) - tmf.init(ks) - tmf.getTrustManagers - } finally { - if (input != null) { - input.close() - input = null - } - } - }.getOrElse { - Array({ - new X509TrustManager { - override def getAcceptedIssuers: Array[X509Certificate] = null - override def checkClientTrusted( - x509Certificates: Array[X509Certificate], s: String) {} - override def checkServerTrusted( - x509Certificates: Array[X509Certificate], s: String) {} - }: TrustManager - }) - } - val sslContext = SSLContext.getInstance(protocol.get) - sslContext.init(null, trustStoreManagers, null) - https.setSSLSocketFactory(sslContext.getSocketFactory) - https.setHostnameVerifier(new HostnameVerifier { - override def verify(s: String, sslSession: SSLSession): Boolean = false - }) - - case _ => - } - - uc.setConnectTimeout(60 * 1000) - uc.setReadTimeout(60 * 1000) - uc.connect() - val in = uc.getInputStream - val fileName = new Path(uri).getName - val tempFile = new File(targetDir, fileName) - val out = new FileOutputStream(tempFile) - // scalastyle:off println - printStream.println(s"Downloading ${uri.toString} to ${tempFile.getAbsolutePath}.") - // scalastyle:on println - try { - ByteStreams.copy(in, out) - } finally { - in.close() - out.close() - } - tempFile.toURI.toString - case _ => - val fs = FileSystem.get(uri, hadoopConf) - val tmpFile = new File(targetDir, new Path(uri).getName) - // scalastyle:off println - printStream.println(s"Downloading ${uri.toString} to ${tmpFile.getAbsolutePath}.") - // scalastyle:on println - fs.copyToLocalFile(new Path(uri), new Path(tmpFile.getAbsolutePath)) - tmpFile.toURI.toString - } - } - - private[deploy] def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = { - require(paths != null, "paths cannot be null.") - paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { - case "local" | "http" | "https" | "ftp" => Array(path) - case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => - status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) - } - }.mkString(",") - } } /** Provides utility functions to be used inside SparkSubmit. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index cd3e361530..c1671192e0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -22,7 +22,7 @@ import java.io.File import org.apache.commons.lang3.StringUtils import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.{DependencyUtils, SparkSubmit} +import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit} import org.apache.spark.rpc.RpcEnv import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -72,6 +72,10 @@ object DriverWrapper { } private def setupDependencies(loader: MutableURLClassLoader, userJar: String): Unit = { + val sparkConf = new SparkConf() + val secMgr = new SecurityManager(sparkConf) + val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf) + val Seq(packagesExclusions, packages, repositories, ivyRepoPath) = Seq("spark.jars.excludes", "spark.jars.packages", "spark.jars.repositories", "spark.jars.ivy") .map(sys.props.get(_).orNull) @@ -86,7 +90,8 @@ object DriverWrapper { jarsProp } } - val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar) + val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf, + secMgr) DependencyUtils.addJarsToClassPath(localJars, loader) } } diff --git a/core/src/main/scala/org/apache/spark/internal/Logging.scala b/core/src/main/scala/org/apache/spark/internal/Logging.scala index c7f2847731..cea9964ea8 100644 --- a/core/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/core/src/main/scala/org/apache/spark/internal/Logging.scala @@ -96,24 +96,27 @@ trait Logging { } protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = { + initializeLogIfNecessary(isInterpreter, silent = false) + } + + protected def initializeLogIfNecessary( + isInterpreter: Boolean, + silent: Boolean = false): Boolean = { if (!Logging.initialized) { Logging.initLock.synchronized { if (!Logging.initialized) { - initializeLogging(isInterpreter) + initializeLogging(isInterpreter, silent) + return true } } } + false } - private def initializeLogging(isInterpreter: Boolean): Unit = { + private def initializeLogging(isInterpreter: Boolean, silent: Boolean): Unit = { // Don't use a logger in here, as this is itself occurring during initialization of a logger // If Log4j 1.2 is being used, but is not initialized, load a default properties file - val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr - // This distinguishes the log4j 1.2 binding, currently - // org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently - // org.apache.logging.slf4j.Log4jLoggerFactory - val usingLog4j12 = "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass) - if (usingLog4j12) { + if (Logging.isLog4j12()) { val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements // scalastyle:off println if (!log4j12Initialized) { @@ -121,22 +124,30 @@ trait Logging { Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match { case Some(url) => PropertyConfigurator.configure(url) - System.err.println(s"Using Spark's default log4j profile: $defaultLogProps") + if (!silent) { + System.err.println(s"Using Spark's default log4j profile: $defaultLogProps") + } case None => System.err.println(s"Spark was unable to load $defaultLogProps") } } + val rootLogger = LogManager.getRootLogger() + if (Logging.defaultRootLevel == null) { + Logging.defaultRootLevel = rootLogger.getLevel() + } + if (isInterpreter) { // Use the repl's main class to define the default log level when running the shell, // overriding the root logger's config if they're different. - val rootLogger = LogManager.getRootLogger() val replLogger = LogManager.getLogger(logName) val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN) if (replLevel != rootLogger.getEffectiveLevel()) { - System.err.printf("Setting default log level to \"%s\".\n", replLevel) - System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " + - "For SparkR, use setLogLevel(newLevel).") + if (!silent) { + System.err.printf("Setting default log level to \"%s\".\n", replLevel) + System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " + + "For SparkR, use setLogLevel(newLevel).") + } rootLogger.setLevel(replLevel) } } @@ -150,8 +161,10 @@ trait Logging { } } -private object Logging { +private[spark] object Logging { @volatile private var initialized = false + @volatile private var defaultRootLevel: Level = null + val initLock = new Object() try { // We use reflection here to handle the case where users remove the @@ -165,4 +178,24 @@ private object Logging { } catch { case e: ClassNotFoundException => // can't log anything yet so just fail silently } + + /** + * Marks the logging system as not initialized. This does a best effort at resetting the + * logging system to its initial state so that the next class to use logging triggers + * initialization again. + */ + def uninitialize(): Unit = initLock.synchronized { + if (isLog4j12()) { + LogManager.resetConfiguration() + } + this.initialized = false + } + + private def isLog4j12(): Boolean = { + // This distinguishes the log4j 1.2 binding, currently + // org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently + // org.apache.logging.slf4j.Log4jLoggerFactory + val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr + "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass) + } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 3dce76c2c9..0da075de71 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -449,7 +449,7 @@ private[spark] object Utils extends Logging { securityMgr: SecurityManager, hadoopConf: Configuration, timestamp: Long, - useCache: Boolean) { + useCache: Boolean): File = { val fileName = decodeFileNameInURI(new URI(url)) val targetFile = new File(targetDir, fileName) val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true) @@ -498,6 +498,8 @@ private[spark] object Utils extends Logging { if (isWindows) { FileUtil.chmod(targetFile.getAbsolutePath, "u+r") } + + targetFile } /** @@ -637,13 +639,13 @@ private[spark] object Utils extends Logging { * Throws SparkException if the target file already exists and has different contents than * the requested file. */ - private def doFetchFile( + def doFetchFile( url: String, targetDir: File, filename: String, conf: SparkConf, securityMgr: SecurityManager, - hadoopConf: Configuration) { + hadoopConf: Configuration): File = { val targetFile = new File(targetDir, filename) val uri = new URI(url) val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false) @@ -687,6 +689,8 @@ private[spark] object Utils extends Logging { fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite, filename = Some(filename)) } + + targetFile } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 95137c868c..724096d4ab 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -29,7 +29,7 @@ import scala.io.Source import com.google.common.io.ByteStreams import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} +import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path} import org.scalatest.{BeforeAndAfterEach, Matchers} import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ @@ -793,30 +793,37 @@ class SparkSubmitSuite } test("downloadFile - invalid url") { + val sparkConf = new SparkConf(false) intercept[IOException] { - SparkSubmit.downloadFile( - "abc:/my/file", Utils.createTempDir(), mutable.Map.empty, new Configuration()) + DependencyUtils.downloadFile( + "abc:/my/file", Utils.createTempDir(), sparkConf, new Configuration(), + new SecurityManager(sparkConf)) } } test("downloadFile - file doesn't exist") { + val sparkConf = new SparkConf(false) val hadoopConf = new Configuration() val tmpDir = Utils.createTempDir() updateConfWithFakeS3Fs(hadoopConf) intercept[FileNotFoundException] { - SparkSubmit.downloadFile("s3a:/no/such/file", tmpDir, mutable.Map.empty, hadoopConf) + DependencyUtils.downloadFile("s3a:/no/such/file", tmpDir, sparkConf, hadoopConf, + new SecurityManager(sparkConf)) } } test("downloadFile does not download local file") { + val sparkConf = new SparkConf(false) + val secMgr = new SecurityManager(sparkConf) // empty path is considered as local file. val tmpDir = Files.createTempDirectory("tmp").toFile - assert(SparkSubmit.downloadFile("", tmpDir, mutable.Map.empty, new Configuration()) === "") - assert(SparkSubmit.downloadFile("/local/file", tmpDir, mutable.Map.empty, - new Configuration()) === "/local/file") + assert(DependencyUtils.downloadFile("", tmpDir, sparkConf, new Configuration(), secMgr) === "") + assert(DependencyUtils.downloadFile("/local/file", tmpDir, sparkConf, new Configuration(), + secMgr) === "/local/file") } test("download one file to local") { + val sparkConf = new SparkConf(false) val jarFile = File.createTempFile("test", ".jar") jarFile.deleteOnExit() val content = "hello, world" @@ -825,13 +832,14 @@ class SparkSubmitSuite val tmpDir = Files.createTempDirectory("tmp").toFile updateConfWithFakeS3Fs(hadoopConf) val sourcePath = s"s3a://${jarFile.getAbsolutePath}" - val outputPath = - SparkSubmit.downloadFile(sourcePath, tmpDir, mutable.Map.empty, hadoopConf) + val outputPath = DependencyUtils.downloadFile(sourcePath, tmpDir, sparkConf, hadoopConf, + new SecurityManager(sparkConf)) checkDownloadedFile(sourcePath, outputPath) deleteTempOutputFile(outputPath) } test("download list of files to local") { + val sparkConf = new SparkConf(false) val jarFile = File.createTempFile("test", ".jar") jarFile.deleteOnExit() val content = "hello, world" @@ -840,8 +848,10 @@ class SparkSubmitSuite val tmpDir = Files.createTempDirectory("tmp").toFile updateConfWithFakeS3Fs(hadoopConf) val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}") - val outputPaths = SparkSubmit.downloadFileList( - sourcePaths.mkString(","), tmpDir, mutable.Map.empty, hadoopConf).split(",") + val outputPaths = DependencyUtils + .downloadFileList(sourcePaths.mkString(","), tmpDir, sparkConf, hadoopConf, + new SecurityManager(sparkConf)) + .split(",") assert(outputPaths.length === sourcePaths.length) sourcePaths.zip(outputPaths).foreach { case (sourcePath, outputPath) => @@ -996,17 +1006,31 @@ object UserClasspathFirstTest { } class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem { - override def copyToLocalFile(src: Path, dst: Path): Unit = { + private def local(path: Path): Path = { // Ignore the scheme for testing. - super.copyToLocalFile(new Path(src.toUri.getPath), dst) + new Path(path.toUri.getPath) } + private def toRemote(status: FileStatus): FileStatus = { + val path = s"s3a://${status.getPath.toUri.getPath}" + status.setPath(new Path(path)) + status + } + + override def isFile(path: Path): Boolean = super.isFile(local(path)) + override def globStatus(pathPattern: Path): Array[FileStatus] = { val newPath = new Path(pathPattern.toUri.getPath) - super.globStatus(newPath).map { status => - val path = s"s3a://${status.getPath.toUri.getPath}" - status.setPath(new Path(path)) - status - } + super.globStatus(newPath).map(toRemote) } + + override def listStatus(path: Path): Array[FileStatus] = { + super.listStatus(local(path)).map(toRemote) + } + + override def copyToLocalFile(src: Path, dst: Path): Unit = { + super.copyToLocalFile(local(src), dst) + } + + override def open(path: Path): FSDataInputStream = super.open(local(path)) }