[SPARK-17928][MESOS] No driver.memoryOverhead setting for mesos cluster mode
## What changes were proposed in this pull request? Added a new configuration 'spark.mesos.driver.memoryOverhead' for providing the driver memory overhead in mesos cluster mode. ## How was this patch tested? Verified it manually, Resource Scheduler allocates (drivermemory+ driver memoryOverhead) for driver in mesos cluster mode. Closes #17726 from devaraj-kavali/SPARK-17928. Authored-by: Devaraj K <devaraj@apache.org> Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
parent
8a54492149
commit
1b75f3bcff
|
@ -169,7 +169,7 @@ of the most common options to set are:
|
|||
The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless
|
||||
otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
|
||||
other native overheads, etc. This tends to grow with the container size (typically 6-10%).
|
||||
This option is currently supported on YARN and Kubernetes.
|
||||
This option is currently supported on YARN, Mesos and Kubernetes.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
|
|
@ -68,6 +68,10 @@ private[mesos] class MesosSubmitRequestServlet(
|
|||
private def newDriverId(submitDate: Date): String =
|
||||
f"driver-${createDateFormat.format(submitDate)}-${nextDriverNumber.incrementAndGet()}%04d"
|
||||
|
||||
// These defaults copied from YARN
|
||||
private val MEMORY_OVERHEAD_FACTOR = 0.10
|
||||
private val MEMORY_OVERHEAD_MIN = 384
|
||||
|
||||
/**
|
||||
* Build a driver description from the fields specified in the submit request.
|
||||
*
|
||||
|
@ -98,6 +102,7 @@ private[mesos] class MesosSubmitRequestServlet(
|
|||
val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
|
||||
val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
|
||||
val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
|
||||
val driverMemoryOverhead = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD.key)
|
||||
val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
|
||||
val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
|
||||
|
||||
|
@ -112,13 +117,15 @@ private[mesos] class MesosSubmitRequestServlet(
|
|||
mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
|
||||
val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
|
||||
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
|
||||
val actualDriverMemoryOverhead = driverMemoryOverhead.map(_.toInt).getOrElse(
|
||||
math.max((MEMORY_OVERHEAD_FACTOR * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN))
|
||||
val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
|
||||
val submitDate = new Date()
|
||||
val submissionId = newDriverId(submitDate)
|
||||
|
||||
new MesosDriverDescription(
|
||||
name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver,
|
||||
command, request.sparkProperties, submissionId, submitDate)
|
||||
name, appResource, actualDriverMemory + actualDriverMemoryOverhead, actualDriverCores,
|
||||
actualSuperviseDriver, command, request.sparkProperties, submissionId, submitDate)
|
||||
}
|
||||
|
||||
protected override def handleSubmit(
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.deploy.rest.mesos
|
||||
|
||||
import javax.servlet.http.HttpServletResponse
|
||||
|
||||
import org.scalatest.mockito.MockitoSugar
|
||||
|
||||
import org.apache.spark.{SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.deploy.TestPrematureExit
|
||||
import org.apache.spark.deploy.mesos.MesosDriverDescription
|
||||
import org.apache.spark.deploy.rest.{CreateSubmissionRequest, CreateSubmissionResponse, SubmitRestProtocolMessage, SubmitRestProtocolResponse}
|
||||
import org.apache.spark.internal.config
|
||||
import org.apache.spark.scheduler.cluster.mesos.{MesosClusterPersistenceEngineFactory, MesosClusterScheduler}
|
||||
|
||||
class MesosRestServerSuite extends SparkFunSuite
|
||||
with TestPrematureExit with MockitoSugar {
|
||||
|
||||
test("test default driver overhead memory") {
|
||||
testOverheadMemory(new SparkConf(), "2000M", 2384)
|
||||
}
|
||||
|
||||
test("test driver overhead memory with overhead factor") {
|
||||
testOverheadMemory(new SparkConf(), "5000M", 5500)
|
||||
}
|
||||
|
||||
test("test configured driver overhead memory") {
|
||||
val conf = new SparkConf()
|
||||
conf.set(config.DRIVER_MEMORY_OVERHEAD.key, "1000")
|
||||
testOverheadMemory(conf, "2000M", 3000)
|
||||
}
|
||||
|
||||
def testOverheadMemory(conf: SparkConf, driverMemory: String, expectedResult: Int) {
|
||||
conf.set("spark.master", "testmaster")
|
||||
conf.set("spark.app.name", "testapp")
|
||||
conf.set(config.DRIVER_MEMORY.key, driverMemory)
|
||||
var actualMem = 0
|
||||
class TestMesosClusterScheduler extends MesosClusterScheduler(
|
||||
mock[MesosClusterPersistenceEngineFactory], conf) {
|
||||
override def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
|
||||
actualMem = desc.mem
|
||||
mock[CreateSubmissionResponse]
|
||||
}
|
||||
}
|
||||
|
||||
class TestServlet extends MesosSubmitRequestServlet(new TestMesosClusterScheduler, conf) {
|
||||
override def handleSubmit(
|
||||
requestMessageJson: String,
|
||||
requestMessage: SubmitRestProtocolMessage,
|
||||
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
|
||||
super.handleSubmit(requestMessageJson, requestMessage, responseServlet)
|
||||
}
|
||||
|
||||
override def findUnknownFields(
|
||||
requestJson: String,
|
||||
requestMessage: SubmitRestProtocolMessage): Array[String] = {
|
||||
Array()
|
||||
}
|
||||
}
|
||||
val servlet = new TestServlet()
|
||||
val request = new CreateSubmissionRequest()
|
||||
request.appResource = "testresource"
|
||||
request.mainClass = "mainClass"
|
||||
request.appArgs = Array("appArgs")
|
||||
request.environmentVariables = Map("envVar" -> "envVal")
|
||||
request.sparkProperties = conf.getAll.toMap
|
||||
servlet.handleSubmit("null", request, null)
|
||||
assert(actualMem == expectedResult)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue