[SPARK-14963][YARN] Using recoveryPath if NM recovery is enabled

## What changes were proposed in this pull request?

From Hadoop 2.5+, Yarn NM supports NM recovery which using recovery path for auxiliary services such as spark_shuffle, mapreduce_shuffle. So here change to use this path install of NM local dir if NM recovery is enabled.

## How was this patch tested?

Unit test + local test.

Author: jerryshao <sshao@hortonworks.com>

Closes #12994 from jerryshao/SPARK-14963.
This commit is contained in:
jerryshao 2016-05-10 10:28:36 -05:00 committed by Tom Graves
parent a019e6efb7
commit aab99d31a9
2 changed files with 144 additions and 23 deletions

View file

@ -68,6 +68,8 @@ public class YarnShuffleService extends AuxiliaryService {
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate"; private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false; private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
private static final String RECOVERY_FILE_NAME = "registeredExecutor.ldb";
// An entity that manages the shuffle secret per application // An entity that manages the shuffle secret per application
// This is used only if authentication is enabled // This is used only if authentication is enabled
private ShuffleSecretManager secretManager; private ShuffleSecretManager secretManager;
@ -75,6 +77,12 @@ public class YarnShuffleService extends AuxiliaryService {
// The actual server that serves shuffle files // The actual server that serves shuffle files
private TransportServer shuffleServer = null; private TransportServer shuffleServer = null;
private Configuration _conf = null;
// The recovery path used to shuffle service recovery
@VisibleForTesting
Path _recoveryPath = null;
// Handles registering executors and opening shuffle blocks // Handles registering executors and opening shuffle blocks
@VisibleForTesting @VisibleForTesting
ExternalShuffleBlockHandler blockHandler; ExternalShuffleBlockHandler blockHandler;
@ -112,6 +120,7 @@ public class YarnShuffleService extends AuxiliaryService {
*/ */
@Override @Override
protected void serviceInit(Configuration conf) { protected void serviceInit(Configuration conf) {
_conf = conf;
// In case this NM was killed while there were running spark applications, we need to restore // In case this NM was killed while there were running spark applications, we need to restore
// lost state for the existing executors. We look for an existing file in the NM's local dirs. // lost state for the existing executors. We look for an existing file in the NM's local dirs.
@ -119,7 +128,7 @@ public class YarnShuffleService extends AuxiliaryService {
// an application was stopped while the NM was down, we expect yarn to call stopApplication() // an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back // when it comes back
registeredExecutorFile = registeredExecutorFile =
findRegisteredExecutorFile(conf.getTrimmedStrings("yarn.nodemanager.local-dirs")); new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
// If authentication is enabled, set up the shuffle server to use a // If authentication is enabled, set up the shuffle server to use a
@ -190,16 +199,6 @@ public class YarnShuffleService extends AuxiliaryService {
logger.info("Stopping container {}", containerId); logger.info("Stopping container {}", containerId);
} }
private File findRegisteredExecutorFile(String[] localDirs) {
for (String dir: localDirs) {
File f = new File(new Path(dir).toUri().getPath(), "registeredExecutors.ldb");
if (f.exists()) {
return f;
}
}
return new File(new Path(localDirs[0]).toUri().getPath(), "registeredExecutors.ldb");
}
/** /**
* Close the shuffle server to clean up any associated state. * Close the shuffle server to clean up any associated state.
*/ */
@ -222,4 +221,47 @@ public class YarnShuffleService extends AuxiliaryService {
public ByteBuffer getMetaData() { public ByteBuffer getMetaData() {
return ByteBuffer.allocate(0); return ByteBuffer.allocate(0);
} }
/**
* Set the recovery path for shuffle service recovery when NM is restarted. The method will be
* overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we
* have to manually call this to set our own recovery path.
*/
public void setRecoveryPath(Path recoveryPath) {
_recoveryPath = recoveryPath;
}
/**
* Get the recovery path, this will override the default one to get our own maintained
* recovery path.
*/
protected Path getRecoveryPath() {
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
for (String dir : localDirs) {
File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME);
if (f.exists()) {
if (_recoveryPath == null) {
// If NM recovery is not enabled, we should specify the recovery path using NM local
// dirs, which is compatible with the old code.
_recoveryPath = new Path(dir);
} else {
// If NM recovery is enabled and the recovery file exists in old NM local dirs, which
// means old version of Spark already generated the recovery file, we should copy the
// old file in to a new recovery path for the compatibility.
if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) {
// Fail to move recovery file to new path
logger.error("Failed to move recovery file {} to the path {}",
RECOVERY_FILE_NAME, _recoveryPath.toString());
}
}
break;
}
}
if (_recoveryPath == null) {
_recoveryPath = new Path(localDirs[0]);
}
return _recoveryPath;
}
} }

View file

@ -19,16 +19,20 @@ package org.apache.spark.network.yarn
import java.io.{DataOutputStream, File, FileOutputStream} import java.io.{DataOutputStream, File, FileOutputStream}
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.duration._
import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path
import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext} import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext}
import org.scalatest.{BeforeAndAfterEach, Matchers} import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts
import org.apache.spark.SparkFunSuite import org.apache.spark.SparkFunSuite
import org.apache.spark.network.shuffle.ShuffleTestAccessor import org.apache.spark.network.shuffle.ShuffleTestAccessor
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
import org.apache.spark.util.Utils
class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
@ -40,15 +44,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName) classOf[YarnShuffleService].getCanonicalName)
yarnConfig.setInt("spark.shuffle.service.port", 0) yarnConfig.setInt("spark.shuffle.service.port", 0)
val localDir = Utils.createTempDir()
yarnConfig.get("yarn.nodemanager.local-dirs").split(",").foreach { dir => yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath)
val d = new File(dir)
if (d.exists()) {
FileUtils.deleteDirectory(d)
}
FileUtils.forceMkdir(d)
logInfo(s"creating yarn.nodemanager.local-dirs: $d")
}
} }
var s1: YarnShuffleService = null var s1: YarnShuffleService = null
@ -234,7 +231,89 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s3.initializeApplication(app2Data) s3.initializeApplication(app2Data)
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be (Some(shuffleInfo2)) ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be (Some(shuffleInfo2))
s3.stop() s3.stop()
} }
} test("get correct recovery path") {
// Test recovery path is set outside the shuffle service, this is to simulate NM recovery
// enabled scenario, where recovery path will be set by yarn.
s1 = new YarnShuffleService
val recoveryPath = new Path(Utils.createTempDir().toURI)
s1.setRecoveryPath(recoveryPath)
s1.init(yarnConfig)
s1._recoveryPath should be (recoveryPath)
s1.stop()
// Test recovery path is set inside the shuffle service, this will be happened when NM
// recovery is not enabled or there's no NM recovery (Hadoop 2.5-).
s2 = new YarnShuffleService
s2.init(yarnConfig)
s2._recoveryPath should be
(new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0)))
s2.stop()
}
test("moving recovery file form NM local dir to recovery path") {
// This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move
// old recovery file to the new path to keep compatibility
// Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local
// dir.
s1 = new YarnShuffleService
s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1)
val app1Data: ApplicationInitializationContext =
new ApplicationInitializationContext("user", app1Id, null)
s1.initializeApplication(app1Data)
val app2Id = ApplicationId.newInstance(0, 2)
val app2Data: ApplicationInitializationContext =
new ApplicationInitializationContext("user", app2Id, null)
s1.initializeApplication(app2Data)
val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null)
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
val blockHandler = s1.blockHandler
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be (execStateFile)
blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
be (Some(shuffleInfo1))
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
be (Some(shuffleInfo2))
assert(execStateFile.exists(), s"$execStateFile did not exist")
s1.stop()
// Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled.
assert(execStateFile.exists())
val recoveryPath = new Path(Utils.createTempDir().toURI)
s2 = new YarnShuffleService
s2.setRecoveryPath(recoveryPath)
s2.init(yarnConfig)
val execStateFile2 = s2.registeredExecutorFile
recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
eventually(timeout(10 seconds), interval(5 millis)) {
assert(!execStateFile.exists())
}
val handler2 = s2.blockHandler
val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
// now we reinitialize only one of the apps, and expect yarn to tell us that app2 was stopped
// during the restart
// Since recovery file is got from old path, so the previous state should be stored.
s2.initializeApplication(app1Data)
s2.stopApplication(new ApplicationTerminationContext(app2Id))
ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be (Some(shuffleInfo1))
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (None)
s2.stop()
}
}