[SPARK-16759][CORE] Add a configuration property to pass caller contexts of upstream applications into Spark

## What changes were proposed in this pull request?

Many applications take Spark as a computing engine and run on it. This PR adds a configuration property `spark.log.callerContext` that can be used by Spark's upstream applications (e.g. Oozie) to set up their caller contexts into Spark. In the end, Spark will combine its own caller context with the caller contexts of its upstream applications, and write them into Yarn RM log and HDFS audit log.

The audit log has a config to truncate the caller contexts passed in (default 128). The caller contexts will be sent over rpc, so it should be concise. The call context written into HDFS log and Yarn log consists of two parts: the information `A` specified by Spark itself and the value `B` of `spark.log.callerContext` property.  Currently `A` typically takes 64 to 74 characters,  so `B` can have up to 50 characters (mentioned in the doc `running-on-yarn.md`)
## How was this patch tested?

Manual tests. I have run some Spark applications with `spark.log.callerContext` configuration in Yarn client/cluster mode, and verified that the caller contexts were written into Yarn RM log and HDFS audit log correctly.

The ways to configure `spark.log.callerContext` property:
- In spark-defaults.conf:

```
spark.log.callerContext  infoSpecifiedByUpstreamApp
```
- In app's source code:

```
val spark = SparkSession
      .builder
      .appName("SparkKMeans")
      .config("spark.log.callerContext", "infoSpecifiedByUpstreamApp")
      .getOrCreate()
```

When running on Spark Yarn cluster mode, the driver is unable to pass 'spark.log.callerContext' to Yarn client and AM since Yarn client and AM have already started before the driver performs `.config("spark.log.callerContext", "infoSpecifiedByUpstreamApp")`.

The following  example shows the command line used to submit a SparkKMeans application and the corresponding records in Yarn RM log and HDFS audit log.

Command:

```
./bin/spark-submit --verbose --executor-cores 3 --num-executors 1 --master yarn --deploy-mode client --class org.apache.spark.examples.SparkKMeans examples/target/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar hdfs://localhost:9000/lr_big.txt 2 5
```

Yarn RM log:

<img width="1440" alt="screen shot 2016-10-19 at 9 12 03 pm" src="https://cloud.githubusercontent.com/assets/8546874/19547050/7d2f278c-9649-11e6-9df8-8d5ff12609f0.png">

HDFS audit log:

<img width="1400" alt="screen shot 2016-10-19 at 10 18 14 pm" src="https://cloud.githubusercontent.com/assets/8546874/19547102/096060ae-964a-11e6-981a-cb28efd5a058.png">

Author: Weiqing Yang <yangweiqing001@gmail.com>

Closes #15563 from weiqingy/SPARK-16759.
This commit is contained in:
Weiqing Yang 2016-11-11 18:36:23 -08:00 committed by Mridul Muralidharan
parent 46b2550bcd
commit 3af894511b
6 changed files with 59 additions and 22 deletions

View file

@ -207,6 +207,10 @@ package object config {
.booleanConf
.createWithDefault(false)
private[spark] val APP_CALLER_CONTEXT = ConfigBuilder("spark.log.callerContext")
.stringConf
.createOptional
private[spark] val FILES_MAX_PARTITION_BYTES = ConfigBuilder("spark.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
.longConf

View file

@ -26,6 +26,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config.APP_CALLER_CONTEXT
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.serializer.SerializerInstance
@ -92,8 +93,16 @@ private[spark] abstract class Task[T](
kill(interruptThread = false)
}
new CallerContext("TASK", appId, appAttemptId, jobId, Option(stageId), Option(stageAttemptId),
Option(taskAttemptId), Option(attemptNumber)).setCurrentContext()
new CallerContext(
"TASK",
SparkEnv.get.conf.get(APP_CALLER_CONTEXT),
appId,
appAttemptId,
jobId,
Option(stageId),
Option(stageAttemptId),
Option(taskAttemptId),
Option(attemptNumber)).setCurrentContext()
try {
runTask(context)

View file

@ -2569,6 +2569,7 @@ private[util] object CallerContext extends Logging {
* @param from who sets up the caller context (TASK, CLIENT, APPMASTER)
*
* The parameters below are optional:
* @param upstreamCallerContext caller context the upstream application passes in
* @param appId id of the app this task belongs to
* @param appAttemptId attempt id of the app this task belongs to
* @param jobId id of the job this task belongs to
@ -2578,26 +2579,38 @@ private[util] object CallerContext extends Logging {
* @param taskAttemptNumber task attempt id
*/
private[spark] class CallerContext(
from: String,
appId: Option[String] = None,
appAttemptId: Option[String] = None,
jobId: Option[Int] = None,
stageId: Option[Int] = None,
stageAttemptId: Option[Int] = None,
taskId: Option[Long] = None,
taskAttemptNumber: Option[Int] = None) extends Logging {
from: String,
upstreamCallerContext: Option[String] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None,
jobId: Option[Int] = None,
stageId: Option[Int] = None,
stageAttemptId: Option[Int] = None,
taskId: Option[Long] = None,
taskAttemptNumber: Option[Int] = None) extends Logging {
val appIdStr = if (appId.isDefined) s"_${appId.get}" else ""
val appAttemptIdStr = if (appAttemptId.isDefined) s"_${appAttemptId.get}" else ""
val jobIdStr = if (jobId.isDefined) s"_JId_${jobId.get}" else ""
val stageIdStr = if (stageId.isDefined) s"_SId_${stageId.get}" else ""
val stageAttemptIdStr = if (stageAttemptId.isDefined) s"_${stageAttemptId.get}" else ""
val taskIdStr = if (taskId.isDefined) s"_TId_${taskId.get}" else ""
val taskAttemptNumberStr =
if (taskAttemptNumber.isDefined) s"_${taskAttemptNumber.get}" else ""
private val context = prepareContext("SPARK_" +
from +
appId.map("_" + _).getOrElse("") +
appAttemptId.map("_" + _).getOrElse("") +
jobId.map("_JId_" + _).getOrElse("") +
stageId.map("_SId_" + _).getOrElse("") +
stageAttemptId.map("_" + _).getOrElse("") +
taskId.map("_TId_" + _).getOrElse("") +
taskAttemptNumber.map("_" + _).getOrElse("") +
upstreamCallerContext.map("_" + _).getOrElse(""))
val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + taskAttemptNumberStr
private def prepareContext(context: String): String = {
// The default max size of Hadoop caller context is 128
lazy val len = SparkHadoopUtil.get.conf.getInt("hadoop.caller.context.max.size", 128)
if (context == null || context.length <= len) {
context
} else {
val finalContext = context.substring(0, len)
logWarning(s"Truncated Spark caller context from $context to $finalContext")
finalContext
}
}
/**
* Set up the caller context [[context]] by invoking Hadoop CallerContext API of

View file

@ -202,6 +202,15 @@ of the most common options to set are:
or remotely ("cluster") on one of the nodes inside the cluster.
</td>
</tr>
<tr>
<td><code>spark.log.callerContext</code></td>
<td>(none)</td>
<td>
Application information that will be written into Yarn RM log/HDFS audit log when running on Yarn/HDFS.
Its length depends on the Hadoop configuration <code>hadoop.caller.context.max.size</code>. It should be concise,
and typically can have up to 50 characters.
</td>
</tr>
</table>
Apart from these, the following properties are also available, and may be useful in some situations:

View file

@ -202,7 +202,8 @@ private[spark] class ApplicationMaster(
attemptID = Option(appAttemptId.getAttemptId.toString)
}
new CallerContext("APPMASTER",
new CallerContext(
"APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
logInfo("ApplicationAttemptId: " + appAttemptId)

View file

@ -161,7 +161,8 @@ private[spark] class Client(
reportLauncherState(SparkAppHandle.State.SUBMITTED)
launcherBackend.setAppId(appId.toString)
new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()
new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
Option(appId.toString)).setCurrentContext()
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)