[SPARK-20605][CORE][YARN][MESOS] Deprecate not used AM and executor port configuration

## What changes were proposed in this pull request?

After SPARK-10997, client mode Netty RpcEnv doesn't require to start server, so port configurations are not used any more, here propose to remove these two configurations: "spark.executor.port" and "spark.am.port".

## How was this patch tested?

Existing UTs.

Author: jerryshao <sshao@hortonworks.com>

Closes #17866 from jerryshao/SPARK-20605.
This commit is contained in:
jerryshao 2017-05-08 14:27:56 -07:00 committed by Marcelo Vanzin
parent aeb2ecc0cd
commit 829cd7b8b7
10 changed files with 22 additions and 57 deletions

View file

@ -579,7 +579,9 @@ private[spark] object SparkConf extends Logging {
"are no longer accepted. To specify the equivalent now, one may use '64k'."),
DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
"Please use the new blacklisting options, spark.blacklist.*")
"Please use the new blacklisting options, spark.blacklist.*"),
DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more")
)
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)

View file

@ -177,7 +177,7 @@ object SparkEnv extends Logging {
SparkContext.DRIVER_IDENTIFIER,
bindAddress,
advertiseAddress,
port,
Option(port),
isLocal,
numCores,
ioEncryptionKey,
@ -194,7 +194,6 @@ object SparkEnv extends Logging {
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
@ -203,7 +202,7 @@ object SparkEnv extends Logging {
executorId,
hostname,
hostname,
port,
None,
isLocal,
numCores,
ioEncryptionKey
@ -220,7 +219,7 @@ object SparkEnv extends Logging {
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
port: Option[Int],
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
@ -243,17 +242,12 @@ object SparkEnv extends Logging {
}
val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
securityManager, clientMode = !isDriver)
// Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
// In the non-driver case, the RPC env's address may be null since it may not be listening
// for incoming connections.
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
}
// Create an instance of the class with the given name, possibly initializing it with our conf

View file

@ -191,11 +191,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
val port = executorConf.getInt("spark.executor.port", 0)
val fetcher = RpcEnv.create(
"driverPropsFetcher",
hostname,
port,
-1,
executorConf,
new SecurityManager(executorConf),
clientMode = true)
@ -221,7 +220,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

View file

@ -209,7 +209,7 @@ provide such guarantees on the offer stream.
In this mode spark executors will honor port allocation if such is
provided from the user. Specifically if the user defines
`spark.executor.port` or `spark.blockManager.port` in Spark configuration,
`spark.blockManager.port` in Spark configuration,
the mesos scheduler will check the available offers for a valid port
range containing the port numbers. If no such range is available it will
not launch any task. If no restriction is imposed on port numbers by the

View file

@ -239,13 +239,6 @@ To use a custom metrics.properties for the application master and executors, upd
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the YARN Application Master in client mode.
</td>
</tr>
<tr>
<td><code>spark.yarn.am.port</code></td>
<td>(random)</td>
<td>
Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend.
</td>
</tr>
<tr>
<td><code>spark.yarn.queue</code></td>
<td><code>default</code></td>

View file

@ -74,9 +74,8 @@ private[spark] class MesosExecutorBackend
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
val conf = new SparkConf(loadDefaults = true).setAll(properties)
val port = conf.getInt("spark.executor.port", 0)
val env = SparkEnv.createExecutorEnv(
conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false)
conf, executorId, slaveInfo.getHostname, cpusPerTask, None, isLocal = false)
executor = new Executor(
executorId,

View file

@ -438,7 +438,7 @@ trait MesosSchedulerUtils extends Logging {
}
}
val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key)
val managedPortNames = List(BLOCK_MANAGER_PORT.key)
/**
* The values of the non-zero ports to be used by the executor process.

View file

@ -179,40 +179,25 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Port reservation is done correctly with user specified ports only") {
val conf = new SparkConf()
conf.set("spark.executor.port", "3000" )
conf.set(BLOCK_MANAGER_PORT, 4000)
val portResource = createTestPortResource((3000, 5000), Some("my_role"))
val (resourcesLeft, resourcesToBeUsed) = utils
.partitionPortResources(List(3000, 4000), List(portResource))
resourcesToBeUsed.length shouldBe 2
.partitionPortResources(List(4000), List(portResource))
resourcesToBeUsed.length shouldBe 1
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
portsToUse.length shouldBe 2
arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
portsToUse.length shouldBe 1
arePortsEqual(portsToUse, Array(4000L)) shouldBe true
val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
val expectedUSed = Array((4000L, 4000L))
arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
}
test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
val conf = new SparkConf()
conf.set("spark.executor.port", "3100" )
val portResource = createTestPortResource((3000, 5000), Some("my_role"))
val (resourcesLeft, resourcesToBeUsed) = utils
.partitionPortResources(List(3100), List(portResource))
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
portsToUse.length shouldBe 1
portsToUse.contains(3100) shouldBe true
}
test("Port reservation is done correctly with all random ports") {
val conf = new SparkConf()
val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
@ -226,21 +211,20 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
test("Port reservation is done correctly with user specified ports only - multiple ranges") {
val conf = new SparkConf()
conf.set("spark.executor.port", "2100" )
conf.set("spark.blockManager.port", "4000")
val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
createTestPortResource((2000, 2500), Some("other_role")))
val (resourcesLeft, resourcesToBeUsed) = utils
.partitionPortResources(List(2100, 4000), portResourceList)
.partitionPortResources(List(4000), portResourceList)
val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
portsToUse.length shouldBe 2
portsToUse.length shouldBe 1
val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
val expectedUsed = Array((4000L, 4000L))
arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
arePortsEqual(portsToUse.toArray, Array(4000L)) shouldBe true
arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
}

View file

@ -429,8 +429,7 @@ private[spark] class ApplicationMaster(
}
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
val port = sparkConf.get(AM_PORT)
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, securityMgr,
clientMode = true)
val driverRef = waitForSparkDriver()
addAmIpFilter()

View file

@ -40,11 +40,6 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
private[spark] val AM_PORT =
ConfigBuilder("spark.yarn.am.port")
.intConf
.createWithDefault(0)
private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
.doc("Interval after which Executor failures will be considered independent and not " +