[SPARK-21728][CORE] Allow SparkSubmit to use Logging.

This change initializes logging when SparkSubmit runs, using
a configuration that should avoid printing log messages as
much as possible with most configurations, and adds code to
restore the Spark logging system to as close as possible to
its initial state, so the Spark app being run can re-initialize
logging with its own configuration.

With that feature, some duplicate code in SparkSubmit can now
be replaced with the existing methods in the Utils class, which
could not be used before because they initialized logging. As part
of that I also did some minor refactoring, moving methods that
should really belong in DependencyUtils.

The change also shuffles some code in SparkHadoopUtil so that
SparkSubmit can create a Hadoop config like the rest of Spark
code, respecting the user's Spark configuration.

The behavior was verified running spark-shell, pyspark and
normal applications, then verifying the logging behavior,
with and without dependency downloads.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19013 from vanzin/SPARK-21728.
This commit is contained in:
Marcelo Vanzin 2017-08-29 14:42:24 -07:00
parent 840ba053b9
commit d7b1fcf8f0
7 changed files with 263 additions and 247 deletions

View file

@ -18,15 +18,15 @@
package org.apache.spark.deploy
import java.io.File
import java.nio.file.Files
import scala.collection.mutable.HashMap
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.util.MutableURLClassLoader
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.util.{MutableURLClassLoader, Utils}
private[deploy] object DependencyUtils {
@ -51,41 +51,22 @@ private[deploy] object DependencyUtils {
SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, exclusions = exclusions)
}
def createTempDir(): File = {
val targetDir = Files.createTempDirectory("tmp").toFile
// scalastyle:off runtimeaddshutdownhook
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
FileUtils.deleteQuietly(targetDir)
}
})
// scalastyle:on runtimeaddshutdownhook
targetDir
}
def resolveAndDownloadJars(jars: String, userJar: String): String = {
val targetDir = DependencyUtils.createTempDir()
val hadoopConf = new Configuration()
val sparkProperties = new HashMap[String, String]()
val securityProperties = List("spark.ssl.fs.trustStore", "spark.ssl.trustStore",
"spark.ssl.fs.trustStorePassword", "spark.ssl.trustStorePassword",
"spark.ssl.fs.protocol", "spark.ssl.protocol")
securityProperties.foreach { pName =>
sys.props.get(pName).foreach { pValue =>
sparkProperties.put(pName, pValue)
}
}
def resolveAndDownloadJars(
jars: String,
userJar: String,
sparkConf: SparkConf,
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
val targetDir = Utils.createTempDir()
Option(jars)
.map {
SparkSubmit.resolveGlobPaths(_, hadoopConf)
resolveGlobPaths(_, hadoopConf)
.split(",")
.filterNot(_.contains(userJar.split("/").last))
.mkString(",")
}
.filterNot(_ == "")
.map(SparkSubmit.downloadFileList(_, targetDir, sparkProperties, hadoopConf))
.map(downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr))
.orNull
}
@ -96,4 +77,73 @@ private[deploy] object DependencyUtils {
}
}
}
/**
* Download a list of remote files to temp local files. If the file is local, the original file
* will be returned.
*
* @param fileList A comma separated file list.
* @param targetDir A temporary directory for which downloaded files.
* @param sparkConf Spark configuration.
* @param hadoopConf Hadoop configuration.
* @param secMgr Spark security manager.
* @return A comma separated local files list.
*/
def downloadFileList(
fileList: String,
targetDir: File,
sparkConf: SparkConf,
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
require(fileList != null, "fileList cannot be null.")
fileList.split(",")
.map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr))
.mkString(",")
}
/**
* Download a file from the remote to a local temporary directory. If the input path points to
* a local path, returns it with no operation.
*
* @param path A file path from where the files will be downloaded.
* @param targetDir A temporary directory for which downloaded files.
* @param sparkConf Spark configuration.
* @param hadoopConf Hadoop configuration.
* @param secMgr Spark security manager.
* @return Path to the local file.
*/
def downloadFile(
path: String,
targetDir: File,
sparkConf: SparkConf,
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
require(path != null, "path cannot be null.")
val uri = Utils.resolveURI(path)
uri.getScheme match {
case "file" | "local" => path
case _ =>
val fname = new Path(uri).getName()
val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, secMgr,
hadoopConf)
localFile.toURI().toString()
}
}
def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = {
require(paths != null, "paths cannot be null.")
paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path =>
val uri = Utils.resolveURI(path)
uri.getScheme match {
case "local" | "http" | "https" | "ftp" => Array(path)
case _ =>
val fs = FileSystem.get(uri, hadoopConf)
Option(fs.globStatus(new Path(uri))).map { status =>
status.filter(_.isFile).map(_.getPath.toUri.toString)
}.getOrElse(Array(path))
}
}.mkString(",")
}
}

View file

@ -81,29 +81,7 @@ class SparkHadoopUtil extends Logging {
* configuration.
*/
def appendS3AndSparkHadoopConfigurations(conf: SparkConf, hadoopConf: Configuration): Unit = {
// Note: this null check is around more than just access to the "conf" object to maintain
// the behavior of the old implementation of this code, for backwards compatibility.
if (conf != null) {
// Explicitly check for S3 environment variables
val keyId = System.getenv("AWS_ACCESS_KEY_ID")
val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
if (keyId != null && accessKey != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", keyId)
hadoopConf.set("fs.s3n.awsAccessKeyId", keyId)
hadoopConf.set("fs.s3a.access.key", keyId)
hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey)
hadoopConf.set("fs.s3a.secret.key", accessKey)
val sessionToken = System.getenv("AWS_SESSION_TOKEN")
if (sessionToken != null) {
hadoopConf.set("fs.s3a.session.token", sessionToken)
}
}
appendSparkHadoopConfigs(conf, hadoopConf)
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
}
SparkHadoopUtil.appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
}
/**
@ -111,10 +89,7 @@ class SparkHadoopUtil extends Logging {
* configuration without the spark.hadoop. prefix.
*/
def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
// Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
}
/**
@ -134,9 +109,7 @@ class SparkHadoopUtil extends Logging {
* subsystems.
*/
def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new Configuration()
appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
hadoopConf
SparkHadoopUtil.newConfiguration(conf)
}
/**
@ -479,4 +452,50 @@ object SparkHadoopUtil {
hadoop
}
}
/**
* Returns a Configuration object with Spark configuration applied on top. Unlike
* the instance method, this will always return a Configuration instance, and not a
* cluster manager-specific type.
*/
private[spark] def newConfiguration(conf: SparkConf): Configuration = {
val hadoopConf = new Configuration()
appendS3AndSparkHadoopConfigurations(conf, hadoopConf)
hadoopConf
}
private def appendS3AndSparkHadoopConfigurations(
conf: SparkConf,
hadoopConf: Configuration): Unit = {
// Note: this null check is around more than just access to the "conf" object to maintain
// the behavior of the old implementation of this code, for backwards compatibility.
if (conf != null) {
// Explicitly check for S3 environment variables
val keyId = System.getenv("AWS_ACCESS_KEY_ID")
val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")
if (keyId != null && accessKey != null) {
hadoopConf.set("fs.s3.awsAccessKeyId", keyId)
hadoopConf.set("fs.s3n.awsAccessKeyId", keyId)
hadoopConf.set("fs.s3a.access.key", keyId)
hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey)
hadoopConf.set("fs.s3a.secret.key", accessKey)
val sessionToken = System.getenv("AWS_SESSION_TOKEN")
if (sessionToken != null) {
hadoopConf.set("fs.s3a.session.token", sessionToken)
}
}
appendSparkHadoopConfigs(conf, hadoopConf)
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
}
}
private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
// Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"
for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
}

View file

@ -20,19 +20,16 @@ package org.apache.spark.deploy
import java.io._
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.security.{KeyStore, PrivilegedExceptionAction}
import java.security.cert.X509Certificate
import java.security.PrivilegedExceptionAction
import java.text.ParseException
import javax.net.ssl._
import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Properties
import com.google.common.io.ByteStreams
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.ivy.Ivy
@ -69,7 +66,9 @@ private[deploy] object SparkSubmitAction extends Enumeration {
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
*/
object SparkSubmit extends CommandLineUtils {
object SparkSubmit extends CommandLineUtils with Logging {
import DependencyUtils._
// Cluster managers
private val YARN = 1
@ -113,6 +112,10 @@ object SparkSubmit extends CommandLineUtils {
// scalastyle:on println
override def main(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
@ -120,7 +123,7 @@ object SparkSubmit extends CommandLineUtils {
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
@ -153,7 +156,7 @@ object SparkSubmit extends CommandLineUtils {
* main class.
*/
@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
@ -185,11 +188,16 @@ object SparkSubmit extends CommandLineUtils {
}
}
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
// Let the main class re-initialize the logging system once it starts.
if (uninitLog) {
Logging.uninitialize()
}
// In standalone cluster mode, there are two submission gateways:
// (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
@ -202,7 +210,7 @@ object SparkSubmit extends CommandLineUtils {
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
submit(args, false)
}
// In all other modes, just run the main class as prepared
} else {
@ -328,8 +336,10 @@ object SparkSubmit extends CommandLineUtils {
}
}
val hadoopConf = conf.getOrElse(new HadoopConfiguration())
val targetDir = DependencyUtils.createTempDir()
val sparkConf = new SparkConf(false)
args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()
// Resolve glob path for different resources.
args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
@ -342,14 +352,15 @@ object SparkSubmit extends CommandLineUtils {
var localJars: String = null
var localPyFiles: String = null
if (deployMode == CLIENT) {
val secMgr = new SecurityManager(sparkConf)
localPrimaryResource = Option(args.primaryResource).map {
downloadFile(_, targetDir, args.sparkProperties, hadoopConf)
downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
localJars = Option(args.jars).map {
downloadFileList(_, targetDir, args.sparkProperties, hadoopConf)
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, args.sparkProperties, hadoopConf)
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
}.orNull
}
@ -863,136 +874,6 @@ object SparkSubmit extends CommandLineUtils {
if (merged == "") null else merged
}
/**
* Download a list of remote files to temp local files. If the file is local, the original file
* will be returned.
* @param fileList A comma separated file list.
* @param targetDir A temporary directory for which downloaded files
* @param sparkProperties Spark properties
* @return A comma separated local files list.
*/
private[deploy] def downloadFileList(
fileList: String,
targetDir: File,
sparkProperties: Map[String, String],
hadoopConf: HadoopConfiguration): String = {
require(fileList != null, "fileList cannot be null.")
fileList.split(",")
.map(downloadFile(_, targetDir, sparkProperties, hadoopConf))
.mkString(",")
}
/**
* Download a file from the remote to a local temporary directory. If the input path points to
* a local path, returns it with no operation.
* @param path A file path from where the files will be downloaded.
* @param targetDir A temporary directory for which downloaded files
* @param sparkProperties Spark properties
* @return A comma separated local files list.
*/
private[deploy] def downloadFile(
path: String,
targetDir: File,
sparkProperties: Map[String, String],
hadoopConf: HadoopConfiguration): String = {
require(path != null, "path cannot be null.")
val uri = Utils.resolveURI(path)
uri.getScheme match {
case "file" | "local" => path
case "http" | "https" | "ftp" =>
val uc = uri.toURL.openConnection()
uc match {
case https: HttpsURLConnection =>
val trustStore = sparkProperties.get("spark.ssl.fs.trustStore")
.orElse(sparkProperties.get("spark.ssl.trustStore"))
val trustStorePwd = sparkProperties.get("spark.ssl.fs.trustStorePassword")
.orElse(sparkProperties.get("spark.ssl.trustStorePassword"))
.map(_.toCharArray)
.orNull
val protocol = sparkProperties.get("spark.ssl.fs.protocol")
.orElse(sparkProperties.get("spark.ssl.protocol"))
if (protocol.isEmpty) {
printErrorAndExit("spark ssl protocol is required when enabling SSL connection.")
}
val trustStoreManagers = trustStore.map { t =>
var input: InputStream = null
try {
input = new FileInputStream(new File(t))
val ks = KeyStore.getInstance(KeyStore.getDefaultType)
ks.load(input, trustStorePwd)
val tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm)
tmf.init(ks)
tmf.getTrustManagers
} finally {
if (input != null) {
input.close()
input = null
}
}
}.getOrElse {
Array({
new X509TrustManager {
override def getAcceptedIssuers: Array[X509Certificate] = null
override def checkClientTrusted(
x509Certificates: Array[X509Certificate], s: String) {}
override def checkServerTrusted(
x509Certificates: Array[X509Certificate], s: String) {}
}: TrustManager
})
}
val sslContext = SSLContext.getInstance(protocol.get)
sslContext.init(null, trustStoreManagers, null)
https.setSSLSocketFactory(sslContext.getSocketFactory)
https.setHostnameVerifier(new HostnameVerifier {
override def verify(s: String, sslSession: SSLSession): Boolean = false
})
case _ =>
}
uc.setConnectTimeout(60 * 1000)
uc.setReadTimeout(60 * 1000)
uc.connect()
val in = uc.getInputStream
val fileName = new Path(uri).getName
val tempFile = new File(targetDir, fileName)
val out = new FileOutputStream(tempFile)
// scalastyle:off println
printStream.println(s"Downloading ${uri.toString} to ${tempFile.getAbsolutePath}.")
// scalastyle:on println
try {
ByteStreams.copy(in, out)
} finally {
in.close()
out.close()
}
tempFile.toURI.toString
case _ =>
val fs = FileSystem.get(uri, hadoopConf)
val tmpFile = new File(targetDir, new Path(uri).getName)
// scalastyle:off println
printStream.println(s"Downloading ${uri.toString} to ${tmpFile.getAbsolutePath}.")
// scalastyle:on println
fs.copyToLocalFile(new Path(uri), new Path(tmpFile.getAbsolutePath))
tmpFile.toURI.toString
}
}
private[deploy] def resolveGlobPaths(paths: String, hadoopConf: HadoopConfiguration): String = {
require(paths != null, "paths cannot be null.")
paths.split(",").map(_.trim).filter(_.nonEmpty).flatMap { path =>
val uri = Utils.resolveURI(path)
uri.getScheme match {
case "local" | "http" | "https" | "ftp" => Array(path)
case _ =>
val fs = FileSystem.get(uri, hadoopConf)
Option(fs.globStatus(new Path(uri))).map { status =>
status.filter(_.isFile).map(_.getPath.toUri.toString)
}.getOrElse(Array(path))
}
}.mkString(",")
}
}
/** Provides utility functions to be used inside SparkSubmit. */

View file

@ -22,7 +22,7 @@ import java.io.File
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.{DependencyUtils, SparkSubmit}
import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
@ -72,6 +72,10 @@ object DriverWrapper {
}
private def setupDependencies(loader: MutableURLClassLoader, userJar: String): Unit = {
val sparkConf = new SparkConf()
val secMgr = new SecurityManager(sparkConf)
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
val Seq(packagesExclusions, packages, repositories, ivyRepoPath) =
Seq("spark.jars.excludes", "spark.jars.packages", "spark.jars.repositories", "spark.jars.ivy")
.map(sys.props.get(_).orNull)
@ -86,7 +90,8 @@ object DriverWrapper {
jarsProp
}
}
val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar)
val localJars = DependencyUtils.resolveAndDownloadJars(jars, userJar, sparkConf, hadoopConf,
secMgr)
DependencyUtils.addJarsToClassPath(localJars, loader)
}
}

View file

@ -96,24 +96,27 @@ trait Logging {
}
protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = {
initializeLogIfNecessary(isInterpreter, silent = false)
}
protected def initializeLogIfNecessary(
isInterpreter: Boolean,
silent: Boolean = false): Boolean = {
if (!Logging.initialized) {
Logging.initLock.synchronized {
if (!Logging.initialized) {
initializeLogging(isInterpreter)
initializeLogging(isInterpreter, silent)
return true
}
}
}
false
}
private def initializeLogging(isInterpreter: Boolean): Unit = {
private def initializeLogging(isInterpreter: Boolean, silent: Boolean): Unit = {
// Don't use a logger in here, as this is itself occurring during initialization of a logger
// If Log4j 1.2 is being used, but is not initialized, load a default properties file
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
// This distinguishes the log4j 1.2 binding, currently
// org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
// org.apache.logging.slf4j.Log4jLoggerFactory
val usingLog4j12 = "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
if (usingLog4j12) {
if (Logging.isLog4j12()) {
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
// scalastyle:off println
if (!log4j12Initialized) {
@ -121,22 +124,30 @@ trait Logging {
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
case Some(url) =>
PropertyConfigurator.configure(url)
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
if (!silent) {
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
}
case None =>
System.err.println(s"Spark was unable to load $defaultLogProps")
}
}
val rootLogger = LogManager.getRootLogger()
if (Logging.defaultRootLevel == null) {
Logging.defaultRootLevel = rootLogger.getLevel()
}
if (isInterpreter) {
// Use the repl's main class to define the default log level when running the shell,
// overriding the root logger's config if they're different.
val rootLogger = LogManager.getRootLogger()
val replLogger = LogManager.getLogger(logName)
val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN)
if (replLevel != rootLogger.getEffectiveLevel()) {
System.err.printf("Setting default log level to \"%s\".\n", replLevel)
System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " +
"For SparkR, use setLogLevel(newLevel).")
if (!silent) {
System.err.printf("Setting default log level to \"%s\".\n", replLevel)
System.err.println("To adjust logging level use sc.setLogLevel(newLevel). " +
"For SparkR, use setLogLevel(newLevel).")
}
rootLogger.setLevel(replLevel)
}
}
@ -150,8 +161,10 @@ trait Logging {
}
}
private object Logging {
private[spark] object Logging {
@volatile private var initialized = false
@volatile private var defaultRootLevel: Level = null
val initLock = new Object()
try {
// We use reflection here to handle the case where users remove the
@ -165,4 +178,24 @@ private object Logging {
} catch {
case e: ClassNotFoundException => // can't log anything yet so just fail silently
}
/**
* Marks the logging system as not initialized. This does a best effort at resetting the
* logging system to its initial state so that the next class to use logging triggers
* initialization again.
*/
def uninitialize(): Unit = initLock.synchronized {
if (isLog4j12()) {
LogManager.resetConfiguration()
}
this.initialized = false
}
private def isLog4j12(): Boolean = {
// This distinguishes the log4j 1.2 binding, currently
// org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
// org.apache.logging.slf4j.Log4jLoggerFactory
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
"org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
}
}

View file

@ -449,7 +449,7 @@ private[spark] object Utils extends Logging {
securityMgr: SecurityManager,
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean) {
useCache: Boolean): File = {
val fileName = decodeFileNameInURI(new URI(url))
val targetFile = new File(targetDir, fileName)
val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true)
@ -498,6 +498,8 @@ private[spark] object Utils extends Logging {
if (isWindows) {
FileUtil.chmod(targetFile.getAbsolutePath, "u+r")
}
targetFile
}
/**
@ -637,13 +639,13 @@ private[spark] object Utils extends Logging {
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
private def doFetchFile(
def doFetchFile(
url: String,
targetDir: File,
filename: String,
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration) {
hadoopConf: Configuration): File = {
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
@ -687,6 +689,8 @@ private[spark] object Utils extends Logging {
fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite,
filename = Some(filename))
}
targetFile
}
/**

View file

@ -29,7 +29,7 @@ import scala.io.Source
import com.google.common.io.ByteStreams
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path}
import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
@ -793,30 +793,37 @@ class SparkSubmitSuite
}
test("downloadFile - invalid url") {
val sparkConf = new SparkConf(false)
intercept[IOException] {
SparkSubmit.downloadFile(
"abc:/my/file", Utils.createTempDir(), mutable.Map.empty, new Configuration())
DependencyUtils.downloadFile(
"abc:/my/file", Utils.createTempDir(), sparkConf, new Configuration(),
new SecurityManager(sparkConf))
}
}
test("downloadFile - file doesn't exist") {
val sparkConf = new SparkConf(false)
val hadoopConf = new Configuration()
val tmpDir = Utils.createTempDir()
updateConfWithFakeS3Fs(hadoopConf)
intercept[FileNotFoundException] {
SparkSubmit.downloadFile("s3a:/no/such/file", tmpDir, mutable.Map.empty, hadoopConf)
DependencyUtils.downloadFile("s3a:/no/such/file", tmpDir, sparkConf, hadoopConf,
new SecurityManager(sparkConf))
}
}
test("downloadFile does not download local file") {
val sparkConf = new SparkConf(false)
val secMgr = new SecurityManager(sparkConf)
// empty path is considered as local file.
val tmpDir = Files.createTempDirectory("tmp").toFile
assert(SparkSubmit.downloadFile("", tmpDir, mutable.Map.empty, new Configuration()) === "")
assert(SparkSubmit.downloadFile("/local/file", tmpDir, mutable.Map.empty,
new Configuration()) === "/local/file")
assert(DependencyUtils.downloadFile("", tmpDir, sparkConf, new Configuration(), secMgr) === "")
assert(DependencyUtils.downloadFile("/local/file", tmpDir, sparkConf, new Configuration(),
secMgr) === "/local/file")
}
test("download one file to local") {
val sparkConf = new SparkConf(false)
val jarFile = File.createTempFile("test", ".jar")
jarFile.deleteOnExit()
val content = "hello, world"
@ -825,13 +832,14 @@ class SparkSubmitSuite
val tmpDir = Files.createTempDirectory("tmp").toFile
updateConfWithFakeS3Fs(hadoopConf)
val sourcePath = s"s3a://${jarFile.getAbsolutePath}"
val outputPath =
SparkSubmit.downloadFile(sourcePath, tmpDir, mutable.Map.empty, hadoopConf)
val outputPath = DependencyUtils.downloadFile(sourcePath, tmpDir, sparkConf, hadoopConf,
new SecurityManager(sparkConf))
checkDownloadedFile(sourcePath, outputPath)
deleteTempOutputFile(outputPath)
}
test("download list of files to local") {
val sparkConf = new SparkConf(false)
val jarFile = File.createTempFile("test", ".jar")
jarFile.deleteOnExit()
val content = "hello, world"
@ -840,8 +848,10 @@ class SparkSubmitSuite
val tmpDir = Files.createTempDirectory("tmp").toFile
updateConfWithFakeS3Fs(hadoopConf)
val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}")
val outputPaths = SparkSubmit.downloadFileList(
sourcePaths.mkString(","), tmpDir, mutable.Map.empty, hadoopConf).split(",")
val outputPaths = DependencyUtils
.downloadFileList(sourcePaths.mkString(","), tmpDir, sparkConf, hadoopConf,
new SecurityManager(sparkConf))
.split(",")
assert(outputPaths.length === sourcePaths.length)
sourcePaths.zip(outputPaths).foreach { case (sourcePath, outputPath) =>
@ -996,17 +1006,31 @@ object UserClasspathFirstTest {
}
class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem {
override def copyToLocalFile(src: Path, dst: Path): Unit = {
private def local(path: Path): Path = {
// Ignore the scheme for testing.
super.copyToLocalFile(new Path(src.toUri.getPath), dst)
new Path(path.toUri.getPath)
}
private def toRemote(status: FileStatus): FileStatus = {
val path = s"s3a://${status.getPath.toUri.getPath}"
status.setPath(new Path(path))
status
}
override def isFile(path: Path): Boolean = super.isFile(local(path))
override def globStatus(pathPattern: Path): Array[FileStatus] = {
val newPath = new Path(pathPattern.toUri.getPath)
super.globStatus(newPath).map { status =>
val path = s"s3a://${status.getPath.toUri.getPath}"
status.setPath(new Path(path))
status
}
super.globStatus(newPath).map(toRemote)
}
override def listStatus(path: Path): Array[FileStatus] = {
super.listStatus(local(path)).map(toRemote)
}
override def copyToLocalFile(src: Path, dst: Path): Unit = {
super.copyToLocalFile(local(src), dst)
}
override def open(path: Path): FSDataInputStream = super.open(local(path))
}