[MINOR][YARN] Make memLimitExceededLogMessage more clean
## What changes were proposed in this pull request? Current `memLimitExceededLogMessage`: <img src="https://user-images.githubusercontent.com/5399861/48467789-ec8e1000-e824-11e8-91fc-280d342e1bf3.png" width="360"> It‘s not very clear, because physical memory exceeds but suggestion contains virtual memory config. This pr makes it more clear and replace deprecated config: ```spark.yarn.executor.memoryOverhead```. ## How was this patch tested? manual tests Closes #23030 from wangyum/EXECUTOR_MEMORY_OVERHEAD. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
parent
a09d5ba886
commit
a00aaf649c
|
@ -20,7 +20,6 @@ package org.apache.spark.deploy.yarn
|
|||
import java.util.Collections
|
||||
import java.util.concurrent._
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.regex.Pattern
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable
|
||||
|
@ -598,13 +597,21 @@ private[yarn] class YarnAllocator(
|
|||
(false, s"Container ${containerId}${onHostStr} was preempted.")
|
||||
// Should probably still count memory exceeded exit codes towards task failures
|
||||
case VMEM_EXCEEDED_EXIT_CODE =>
|
||||
(true, memLimitExceededLogMessage(
|
||||
completedContainer.getDiagnostics,
|
||||
VMEM_EXCEEDED_PATTERN))
|
||||
val vmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX virtual memory used".r
|
||||
val diag = vmemExceededPattern.findFirstIn(completedContainer.getDiagnostics)
|
||||
.map(_.concat(".")).getOrElse("")
|
||||
val message = "Container killed by YARN for exceeding virtual memory limits. " +
|
||||
s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key} or boosting " +
|
||||
s"${YarnConfiguration.NM_VMEM_PMEM_RATIO} or disabling " +
|
||||
s"${YarnConfiguration.NM_VMEM_CHECK_ENABLED} because of YARN-4714."
|
||||
(true, message)
|
||||
case PMEM_EXCEEDED_EXIT_CODE =>
|
||||
(true, memLimitExceededLogMessage(
|
||||
completedContainer.getDiagnostics,
|
||||
PMEM_EXCEEDED_PATTERN))
|
||||
val pmemExceededPattern = raw"$MEM_REGEX of $MEM_REGEX physical memory used".r
|
||||
val diag = pmemExceededPattern.findFirstIn(completedContainer.getDiagnostics)
|
||||
.map(_.concat(".")).getOrElse("")
|
||||
val message = "Container killed by YARN for exceeding physical memory limits. " +
|
||||
s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
|
||||
(true, message)
|
||||
case _ =>
|
||||
// all the failures which not covered above, like:
|
||||
// disk failure, kill by app master or resource manager, ...
|
||||
|
@ -735,18 +742,6 @@ private[yarn] class YarnAllocator(
|
|||
|
||||
private object YarnAllocator {
|
||||
val MEM_REGEX = "[0-9.]+ [KMG]B"
|
||||
val PMEM_EXCEEDED_PATTERN =
|
||||
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
|
||||
val VMEM_EXCEEDED_PATTERN =
|
||||
Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
|
||||
val VMEM_EXCEEDED_EXIT_CODE = -103
|
||||
val PMEM_EXCEEDED_EXIT_CODE = -104
|
||||
|
||||
def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
|
||||
val matcher = pattern.matcher(diagnostics)
|
||||
val diag = if (matcher.find()) " " + matcher.group() + "." else ""
|
||||
s"Container killed by YARN for exceeding memory limits. $diag " +
|
||||
"Consider boosting spark.yarn.executor.memoryOverhead or " +
|
||||
"disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714."
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.mockito.Mockito._
|
|||
import org.scalatest.{BeforeAndAfterEach, Matchers}
|
||||
|
||||
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
|
||||
import org.apache.spark.deploy.yarn.YarnAllocator._
|
||||
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
|
||||
import org.apache.spark.deploy.yarn.config._
|
||||
import org.apache.spark.rpc.RpcEndpointRef
|
||||
|
@ -376,17 +375,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
|
|||
verify(mockAmClient).updateBlacklist(Seq[String]().asJava, Seq("hostA", "hostB").asJava)
|
||||
}
|
||||
|
||||
test("memory exceeded diagnostic regexes") {
|
||||
val diagnostics =
|
||||
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
|
||||
"beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
|
||||
"5.8 GB of 4.2 GB virtual memory used. Killing container."
|
||||
val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
|
||||
val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
|
||||
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
|
||||
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
|
||||
}
|
||||
|
||||
test("window based failure executor counting") {
|
||||
sparkConf.set("spark.yarn.executor.failuresValidityInterval", "100s")
|
||||
val handler = createAllocator(4)
|
||||
|
|
Loading…
Reference in a new issue