[SPARK-21934][CORE] Expose Shuffle Netty memory usage to MetricsSystem

## What changes were proposed in this pull request?

This is a followup work of SPARK-9104 to expose the Netty memory usage to MetricsSystem. Current the shuffle Netty memory usage of `NettyBlockTransferService` will be exposed, if using external shuffle, then the Netty memory usage of `ExternalShuffleClient` and `ExternalShuffleService` will be exposed instead. Currently I don't expose Netty memory usage of `YarnShuffleService`, because `YarnShuffleService` doesn't have `MetricsSystem` itself, and is better to connect to Hadoop's MetricsSystem.

## How was this patch tested?

Manually verified in local cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes #19160 from jerryshao/SPARK-21934.
This commit is contained in:
jerryshao 2017-09-21 13:54:30 +08:00
parent 352bea5457
commit 1da5822e6a
7 changed files with 67 additions and 7 deletions

View file

@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import com.codahale.metrics.MetricSet;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -117,6 +118,12 @@ public class ExternalShuffleClient extends ShuffleClient {
}
}
@Override
public MetricSet shuffleMetrics() {
checkInit();
return clientFactory.getAllMetrics();
}
/**
* Registers this executor with an external shuffle server. This registration is required to
* inform the shuffle server about where and how we store our shuffle files.
@ -140,6 +147,7 @@ public class ExternalShuffleClient extends ShuffleClient {
@Override
public void close() {
checkInit();
clientFactory.close();
}
}

View file

@ -18,6 +18,9 @@
package org.apache.spark.network.shuffle;
import java.io.Closeable;
import java.util.Collections;
import com.codahale.metrics.MetricSet;
/** Provides an interface for reading shuffle files, either from an Executor or external service. */
public abstract class ShuffleClient implements Closeable {
@ -52,4 +55,13 @@ public abstract class ShuffleClient implements Closeable {
String[] blockIds,
BlockFetchingListener listener,
TempShuffleFileManager tempShuffleFileManager);
/**
* Get the shuffle MetricsSet from ShuffleClient, this will be used in MetricsSystem to
* get the Shuffle related metrics.
*/
public MetricSet shuffleMetrics() {
// Return an empty MetricSet by default.
return () -> Collections.emptyMap();
}
}

View file

@ -56,7 +56,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private var server: TransportServer = _
private val shuffleServiceSource = new ExternalShuffleServiceSource(blockHandler)
private val shuffleServiceSource = new ExternalShuffleServiceSource
/** Create a new shuffle block handler. Factored out for subclasses to override. */
protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
@ -83,6 +83,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
}
server = transportContext.createServer(port, bootstraps.asJava)
shuffleServiceSource.registerMetricSet(server.getAllMetrics)
shuffleServiceSource.registerMetricSet(blockHandler.getAllMetrics)
masterMetricsSystem.registerSource(shuffleServiceSource)
masterMetricsSystem.start()
}

View file

@ -19,19 +19,19 @@ package org.apache.spark.deploy
import javax.annotation.concurrent.ThreadSafe
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.{MetricRegistry, MetricSet}
import org.apache.spark.metrics.source.Source
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
/**
* Provides metrics source for external shuffle service
*/
@ThreadSafe
private class ExternalShuffleServiceSource
(blockHandler: ExternalShuffleBlockHandler) extends Source {
private class ExternalShuffleServiceSource extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "shuffleService"
metricRegistry.registerAll(blockHandler.getAllMetrics)
def registerMetricSet(metricSet: MetricSet): Unit = {
metricRegistry.registerAll(metricSet)
}
}

View file

@ -113,8 +113,9 @@ private[spark] class Executor(
private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, TaskReaper]()
if (!isLocal) {
env.metricsSystem.registerSource(executorSource)
env.blockManager.initialize(conf.getAppId)
env.metricsSystem.registerSource(executorSource)
env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
}
// Whether to load classes in user jars before those in Spark jars

View file

@ -18,11 +18,14 @@
package org.apache.spark.network.netty
import java.nio.ByteBuffer
import java.util.{HashMap => JHashMap, Map => JMap}
import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
import scala.reflect.ClassTag
import com.codahale.metrics.{Metric, MetricSet}
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
@ -83,6 +86,19 @@ private[spark] class NettyBlockTransferService(
Utils.startServiceOnPort(_port, startService, conf, getClass.getName)._1
}
override def shuffleMetrics(): MetricSet = {
require(server != null && clientFactory != null, "NettyBlockTransferServer is not initialized")
new MetricSet {
val allMetrics = new JHashMap[String, Metric]()
override def getMetrics: JMap[String, Metric] = {
allMetrics.putAll(clientFactory.getAllMetrics.getMetrics)
allMetrics.putAll(server.getAllMetrics.getMetrics)
allMetrics
}
}
}
override def fetchBlocks(
host: String,
port: Int,

View file

@ -29,10 +29,13 @@ import scala.reflect.ClassTag
import scala.util.Random
import scala.util.control.NonFatal
import com.codahale.metrics.{MetricRegistry, MetricSet}
import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.metrics.source.Source
import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.netty.SparkTransportConf
@ -248,6 +251,16 @@ private[spark] class BlockManager(
logInfo(s"Initialized BlockManager: $blockManagerId")
}
def shuffleMetricsSource: Source = {
import BlockManager._
if (externalShuffleServiceEnabled) {
new ShuffleMetricsSource("ExternalShuffle", shuffleClient.shuffleMetrics())
} else {
new ShuffleMetricsSource("NettyBlockTransfer", shuffleClient.shuffleMetrics())
}
}
private def registerWithExternalShuffleServer() {
logInfo("Registering executor with local external shuffle service.")
val shuffleConfig = new ExecutorShuffleInfo(
@ -1531,4 +1544,12 @@ private[spark] object BlockManager {
}
blockManagers.toMap
}
private class ShuffleMetricsSource(
override val sourceName: String,
metricSet: MetricSet) extends Source {
override val metricRegistry = new MetricRegistry
metricRegistry.registerAll(metricSet)
}
}