[SPARK-2098] All Spark processes should support spark-defaults.conf, config file

This is another implementation about #1256
cc andrewor14 vanzin

Author: GuoQiang Li <witgo@qq.com>

Closes #2379 from witgo/SPARK-2098-new and squashes the following commits:

4ef1cbd [GuoQiang Li] review commit
49ef70e [GuoQiang Li] Refactor getDefaultPropertiesFile
c45d20c [GuoQiang Li] All Spark processes should support spark-defaults.conf, config file
This commit is contained in:
GuoQiang Li 2014-10-14 22:16:38 -07:00 committed by Andrew Or
parent 18ab6bd709
commit 293a0b5dbb
8 changed files with 124 additions and 50 deletions

View file

@ -17,14 +17,11 @@
package org.apache.spark.deploy
import java.io.{File, FileInputStream, IOException}
import java.util.Properties
import java.util.jar.JarFile
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import org.apache.spark.SparkException
import org.apache.spark.util.Utils
/**
@ -63,9 +60,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
val defaultProperties = new HashMap[String, String]()
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
val file = new File(filename)
SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
if (k.startsWith("spark")) {
Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
if (k.startsWith("spark.")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
} else {
@ -90,19 +86,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
*/
private def mergeSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
val sep = File.separator
val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => s"${sparkHome}${sep}conf")
val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig)
confDir.foreach { sparkConfDir =>
val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
val file = new File(defaultPath)
if (file.exists()) {
propertiesFile = file.getAbsolutePath
}
}
}
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))
val properties = HashMap[String, String]()
properties.putAll(defaultSparkProperties)
@ -397,23 +381,3 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
SparkSubmit.exitFn()
}
}
object SparkSubmitArguments {
/** Load properties present in the given file. */
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inputStream = new FileInputStream(file)
try {
val properties = new Properties()
properties.load(inputStream)
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
} catch {
case e: IOException =>
val message = s"Failed when loading Spark properties file $file"
throw new SparkException(message, e)
} finally {
inputStream.close()
}
}
}

View file

@ -68,7 +68,7 @@ private[spark] object SparkSubmitDriverBootstrapper {
assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")
// Parse the properties file for the equivalent spark.driver.* configs
val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
val properties = Utils.getPropertiesFromFile(propertiesFile)
val confDriverMemory = properties.get("spark.driver.memory")
val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
val confClasspath = properties.get("spark.driver.extraClassPath")

View file

@ -18,12 +18,14 @@
package org.apache.spark.deploy.history
import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
/**
* Command-line parser for the master.
*/
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
private var logDir: String = null
private var propertiesFile: String = null
parse(args.toList)
@ -32,11 +34,16 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
case ("--dir" | "-d") :: value :: tail =>
logDir = value
conf.set("spark.history.fs.logDirectory", value)
System.setProperty("spark.history.fs.logDirectory", value)
parse(tail)
case ("--help" | "-h") :: tail =>
printUsageAndExit(0)
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)
case Nil =>
case _ =>
@ -44,10 +51,17 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
}
}
// This mutates the SparkConf, so all accesses to it must be made after this line
Utils.loadDefaultSparkProperties(conf, propertiesFile)
private def printUsageAndExit(exitCode: Int) {
System.err.println(
"""
|Usage: HistoryServer
|Usage: HistoryServer [options]
|
|Options:
| --properties-file FILE Path to a custom Spark properties file.
| Default is conf/spark-defaults.conf.
|
|Configuration options can be set by setting the corresponding JVM system property.
|History Server options are always available; additional options depend on the provider.

View file

@ -27,6 +27,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
var propertiesFile: String = null
// Check for settings in environment variables
if (System.getenv("SPARK_MASTER_HOST") != null) {
@ -38,12 +39,16 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
parse(args.toList)
// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
if (conf.contains("spark.master.ui.port")) {
webUiPort = conf.get("spark.master.ui.port").toInt
}
parse(args.toList)
def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
@ -63,7 +68,11 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
webUiPort = value
parse(tail)
case ("--help" | "-h") :: tail =>
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)
case ("--help") :: tail =>
printUsageAndExit(0)
case Nil => {}
@ -83,7 +92,9 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
" --webui-port PORT Port for web UI (default: 8080)")
" --webui-port PORT Port for web UI (default: 8080)\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
System.exit(exitCode)
}
}

View file

@ -33,6 +33,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
var memory = inferDefaultMemory()
var masters: Array[String] = null
var workDir: String = null
var propertiesFile: String = null
// Check for settings in environment variables
if (System.getenv("SPARK_WORKER_PORT") != null) {
@ -47,15 +48,19 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
}
if (conf.contains("spark.worker.ui.port")) {
webUiPort = conf.get("spark.worker.ui.port").toInt
}
if (System.getenv("SPARK_WORKER_DIR") != null) {
workDir = System.getenv("SPARK_WORKER_DIR")
}
parse(args.toList)
// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
if (conf.contains("spark.worker.ui.port")) {
webUiPort = conf.get("spark.worker.ui.port").toInt
}
checkWorkerMemory()
def parse(args: List[String]): Unit = args match {
@ -89,7 +94,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
webUiPort = value
parse(tail)
case ("--help" | "-h") :: tail =>
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)
case ("--help") :: tail =>
printUsageAndExit(0)
case value :: tail =>
@ -124,7 +133,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
" -i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)\n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: random)\n" +
" --webui-port PORT Port for web UI (default: 8081)")
" --webui-port PORT Port for web UI (default: 8081)\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
System.exit(exitCode)
}

View file

@ -1410,6 +1410,54 @@ private[spark] object Utils extends Logging {
}
}
/**
* Load default Spark properties from the given file. If no file is provided,
* use the common defaults file. This mutates state in the given SparkConf and
* in this JVM's system properties if the config specified in the file is not
* already set. Return the path of the properties file used.
*/
def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
val path = Option(filePath).getOrElse(getDefaultPropertiesFile())
Option(path).foreach { confFile =>
getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
}.foreach { case (k, v) =>
conf.setIfMissing(k, v)
sys.props.getOrElseUpdate(k, v)
}
}
path
}
/** Load properties present in the given file. */
def getPropertiesFromFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
try {
val properties = new Properties()
properties.load(inReader)
properties.stringPropertyNames().map(k => (k, properties(k).trim)).toMap
} catch {
case e: IOException =>
throw new SparkException(s"Failed when loading Spark properties from $filename", e)
} finally {
inReader.close()
}
}
/** Return the path of the default Spark properties file. */
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
env.get("SPARK_CONF_DIR")
.orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
.map { t => new File(s"$t${File.separator}spark-defaults.conf")}
.filter(_.isFile)
.map(_.getAbsolutePath)
.orNull
}
/** Return a nice string representation of the exception, including the stack trace. */
def exceptionString(e: Exception): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)

View file

@ -27,6 +27,8 @@ import com.google.common.base.Charsets
import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.spark.SparkConf
class UtilsSuite extends FunSuite {
test("bytesToString") {
@ -332,4 +334,21 @@ class UtilsSuite extends FunSuite {
assert(!tempFile2.exists())
}
test("loading properties from file") {
val outFile = File.createTempFile("test-load-spark-properties", "test")
try {
System.setProperty("spark.test.fileNameLoadB", "2")
Files.write("spark.test.fileNameLoadA true\n" +
"spark.test.fileNameLoadB 1\n", outFile, Charsets.UTF_8)
val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath)
properties
.filter { case (k, v) => k.startsWith("spark.")}
.foreach { case (k, v) => sys.props.getOrElseUpdate(k, v)}
val sparkConf = new SparkConf
assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
} finally {
outFile.delete()
}
}
}

View file

@ -77,6 +77,13 @@ follows:
one implementation, provided by Spark, which looks for application logs stored in the
file system.</td>
</tr>
<tr>
<td>spark.history.fs.logDirectory</td>
<td>(none)</td>
<td>
Directory that contains application event logs to be loaded by the history server
</td>
</tr>
<tr>
<td>spark.history.fs.updateInterval</td>
<td>10</td>