[SPARK-27141][YARN] Use ConfigEntry for hardcoded configs for Yarn

## What changes were proposed in this pull request?
There is some hardcode configs in code, I think it best to modify。

## How was this patch tested?
Existing tests

Closes #24103 from wangjiaochun/yarnHardCode.

Authored-by: 10087686 <wang.jiaochun@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
10087686 2019-03-22 05:29:29 -05:00 committed by Sean Owen
parent 174531c183
commit 8204dc1e54
6 changed files with 25 additions and 22 deletions

View file

@ -296,8 +296,8 @@ private[spark] class ApplicationMaster(
Option(appAttemptId.getApplicationId.toString), None).setCurrentContext()
val driverRef = clientRpcEnv.setupEndpointRef(
RpcAddress(sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port").toInt),
RpcAddress(sparkConf.get(DRIVER_HOST_ADDRESS),
sparkConf.get(DRIVER_PORT)),
YarnSchedulerBackend.ENDPOINT_NAME)
// The client-mode AM doesn't listen for incoming connections, so report an invalid port.
registerAM(Utils.localHostName, -1, sparkConf,

View file

@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.yarn.config._
class ApplicationMasterSuite extends SparkFunSuite {
@ -28,7 +29,7 @@ class ApplicationMasterSuite extends SparkFunSuite {
val port = 18080
val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.historyServer.address",
sparkConf.set(HISTORY_SERVER_ADDRESS,
"http://${hadoopconf-yarn.resourcemanager.hostname}:${spark.history.ui.port}")
val yarnConf = new YarnConfiguration()
yarnConf.set("yarn.resourcemanager.hostname", host)

View file

@ -149,7 +149,7 @@ abstract class BaseYarnClusterSuite
launcher.setSparkHome(sys.props("spark.test.home"))
.setMaster("yarn")
.setDeployMode(deployMode)
.setConf("spark.executor.instances", "1")
.setConf(EXECUTOR_INSTANCES.key, "1")
.setPropertiesFile(propsFile)
.addAppArgs(appArgs.toArray: _*)

View file

@ -33,6 +33,7 @@ import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.util.ManualClock
@ -48,8 +49,8 @@ class MockResolver extends SparkRackResolver {
class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
val conf = new YarnConfiguration()
val sparkConf = new SparkConf()
sparkConf.set("spark.driver.host", "localhost")
sparkConf.set("spark.driver.port", "4040")
sparkConf.set(DRIVER_HOST_ADDRESS, "localhost")
sparkConf.set(DRIVER_PORT, 4040)
sparkConf.set(SPARK_JARS, Seq("notarealjar.jar"))
sparkConf.set("spark.yarn.launchContainers", "false")
@ -95,9 +96,9 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
"--class", "SomeClass")
val sparkConfClone = sparkConf.clone()
sparkConfClone
.set("spark.executor.instances", maxExecutors.toString)
.set("spark.executor.cores", "5")
.set("spark.executor.memory", "2048")
.set(EXECUTOR_INSTANCES, maxExecutors)
.set(EXECUTOR_CORES, 5)
.set(EXECUTOR_MEMORY, 2048L)
for ((name, value) <- additionalConfigs) {
sparkConfClone.set(name, value)
@ -394,7 +395,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
}
test("window based failure executor counting") {
sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
sparkConf.set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS, 100 * 1000L)
val handler = createAllocator(4)
handler.updateResourceRequests()
@ -444,8 +445,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
maxExecutors,
rmClientSpy,
Map(
"spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> "true",
"spark.blacklist.application.maxFailedExecutorsPerNode" -> "0"))
YARN_EXECUTOR_LAUNCH_BLACKLIST_ENABLED.key -> "true",
MAX_FAILED_EXEC_PER_NODE.key -> "0"))
handler.updateResourceRequests()
val hosts = (0 until maxExecutors).map(i => s"host$i")

View file

@ -98,10 +98,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
test("run Spark in yarn-client mode with different configurations, ensuring redaction") {
testBasicYarnApp(true,
Map(
"spark.driver.memory" -> "512m",
"spark.executor.cores" -> "1",
"spark.executor.memory" -> "512m",
"spark.executor.instances" -> "2",
DRIVER_MEMORY.key -> "512m",
EXECUTOR_CORES.key -> "1",
EXECUTOR_MEMORY.key -> "512m",
EXECUTOR_INSTANCES.key -> "2",
// Sending some sensitive information, which we'll make sure gets redacted
"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD,
"spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD
@ -111,11 +111,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
test("run Spark in yarn-cluster mode with different configurations, ensuring redaction") {
testBasicYarnApp(false,
Map(
"spark.driver.memory" -> "512m",
"spark.driver.cores" -> "1",
"spark.executor.cores" -> "1",
"spark.executor.memory" -> "512m",
"spark.executor.instances" -> "2",
DRIVER_MEMORY.key -> "512m",
DRIVER_CORES.key -> "1",
EXECUTOR_CORES.key -> "1",
EXECUTOR_MEMORY.key -> "512m",
EXECUTOR_INSTANCES.key -> "2",
// Sending some sensitive information, which we'll make sure gets redacted
"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD,
"spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD" -> YarnClusterDriver.SECRET_PASSWORD

View file

@ -36,6 +36,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.SecurityManager
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.ShuffleTestAccessor
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.util.Utils
@ -52,7 +53,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle")
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.setInt("spark.shuffle.service.port", 0)
yarnConfig.setInt(SHUFFLE_SERVICE_PORT.key, 0)
yarnConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
val localDir = Utils.createTempDir()
yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)