[SPARK-26288][CORE] add initRegisteredExecutorsDB

## What changes were proposed in this pull request?

As we all know that spark on Yarn uses DB https://github.com/apache/spark/pull/7943 to record RegisteredExecutors information which can be reloaded and used again when the ExternalShuffleService is restarted .

The RegisteredExecutors information can't be recorded both in the mode of spark's standalone and spark on k8s , which will cause the RegisteredExecutors information to be lost ,when the ExternalShuffleService is restarted.

To solve the problem above, a method is proposed and is committed .

## How was this patch tested?
new  unit tests

Closes #23393 from weixiuli/SPARK-26288.

Authored-by: weixiuli <weixiuli@jd.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
This commit is contained in:
weixiuli 2019-03-19 16:16:43 -05:00 committed by Imran Rashid
parent 6783831f68
commit 8b0aa59218
8 changed files with 253 additions and 2 deletions

View file

@ -66,6 +66,11 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
}
@VisibleForTesting
public ExternalShuffleBlockResolver getBlockResolver() {
return blockManager;
}
/** Enables mocking out the StreamManager and BlockManager. */
@VisibleForTesting
public ExternalShuffleBlockHandler(

View file

@ -372,6 +372,13 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude

View file

@ -17,6 +17,7 @@
package org.apache.spark.deploy
import java.io.File
import java.util.concurrent.CountDownLatch
import scala.collection.JavaConverters._
@ -49,6 +50,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val enabled = sparkConf.get(config.SHUFFLE_SERVICE_ENABLED)
private val port = sparkConf.get(config.SHUFFLE_SERVICE_PORT)
private val registeredExecutorsDB = "registeredExecutors.ldb"
private val transportConf =
SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
private val blockHandler = newShuffleBlockHandler(transportConf)
@ -58,10 +61,30 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val shuffleServiceSource = new ExternalShuffleServiceSource
protected def findRegisteredExecutorsDBFile(dbName: String): File = {
val localDirs = sparkConf.getOption("spark.local.dir").map(_.split(",")).getOrElse(Array())
if (localDirs.length >= 1) {
new File(localDirs.find(new File(_, dbName).exists()).getOrElse(localDirs(0)), dbName)
} else {
logWarning(s"'spark.local.dir' should be set first when we use db in " +
s"ExternalShuffleService. Note that this only affects standalone mode.")
null
}
}
/** Get blockhandler */
def getBlockHandler: ExternalShuffleBlockHandler = {
blockHandler
}
/** Create a new shuffle block handler. Factored out for subclasses to override. */
protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
if (sparkConf.get(config.SHUFFLE_SERVICE_DB_ENABLED) && enabled) {
new ExternalShuffleBlockHandler(conf, findRegisteredExecutorsDBFile(registeredExecutorsDB))
} else {
new ExternalShuffleBlockHandler(conf, null)
}
}
/** Starts the external shuffle service if the user has configured us to. */
def startIfEnabled() {

View file

@ -466,6 +466,15 @@ private[deploy] class Worker(
}.foreach { dir =>
logInfo(s"Removing directory: ${dir.getPath}")
Utils.deleteRecursively(dir)
// Remove some registeredExecutors information of DB in external shuffle service when
// #spark.shuffle.service.db.enabled=true, the one which comes to mind is, what happens
// if an application is stopped while the external shuffle service is down?
// So then it'll leave an entry in the DB and the entry should be removed.
if (conf.get(config.SHUFFLE_SERVICE_DB_ENABLED) &&
conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
shuffleService.applicationRemoved(dir.getName)
}
}
}(cleanupThreadExecutor)

View file

@ -359,6 +359,13 @@ package object config {
private[spark] val SHUFFLE_SERVICE_ENABLED =
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
private[spark] val SHUFFLE_SERVICE_DB_ENABLED =
ConfigBuilder("spark.shuffle.service.db.enabled")
.doc("Whether to use db in ExternalShuffleService. Note that this only affects " +
"standalone mode.")
.booleanConf
.createWithDefault(true)
private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)

View file

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy
import java.io._
import java.nio.charset.StandardCharsets
import com.google.common.io.CharStreams
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver}
import org.apache.spark.network.shuffle.TestShuffleDataContext
import org.apache.spark.util.Utils
/**
* This suite gets BlockData when the ExternalShuffleService is restarted
* with #spark.shuffle.service.db.enabled = true or false
* Note that failures in this suite may arise when#spark.shuffle.service.db.enabled = false
*/
class ExternalShuffleServiceDbSuite extends SparkFunSuite {
val sortBlock0 = "Hello!"
val sortBlock1 = "World!"
val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
var sparkConf: SparkConf = _
var dataContext: TestShuffleDataContext = _
var securityManager: SecurityManager = _
var externalShuffleService: ExternalShuffleService = _
var blockHandler: ExternalShuffleBlockHandler = _
var blockResolver: ExternalShuffleBlockResolver = _
override def beforeAll() {
super.beforeAll()
sparkConf = new SparkConf()
sparkConf.set("spark.shuffle.service.enabled", "true")
sparkConf.set("spark.local.dir", System.getProperty("java.io.tmpdir"))
Utils.loadDefaultSparkProperties(sparkConf, null)
securityManager = new SecurityManager(sparkConf)
dataContext = new TestShuffleDataContext(2, 5)
dataContext.create()
// Write some sort data.
dataContext.insertSortShuffleData(0, 0,
Array[Array[Byte]](sortBlock0.getBytes(StandardCharsets.UTF_8),
sortBlock1.getBytes(StandardCharsets.UTF_8)))
registerExecutor()
}
override def afterAll() {
try {
dataContext.cleanup()
} finally {
super.afterAll()
}
}
def registerExecutor(): Unit = {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "true")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
// external Shuffle Service start
externalShuffleService.start()
blockHandler = externalShuffleService.getBlockHandler
blockResolver = blockHandler.getBlockResolver
blockResolver.registerExecutor("app0", "exec0", dataContext.createExecutorInfo(SORT_MANAGER))
} finally {
blockHandler.close()
// external Shuffle Service stop
externalShuffleService.stop()
}
}
// The beforeAll ensures the shuffle data was already written, and then
// the shuffle service was stopped. Here we restart the shuffle service
// and make we can read the shuffle data
test("Recover shuffle data with spark.shuffle.service.db.enabled=true after " +
"shuffle service restart") {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "true")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
// externalShuffleService restart
externalShuffleService.start()
blockHandler = externalShuffleService.getBlockHandler
blockResolver = blockHandler.getBlockResolver
val block0Stream = blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream
val block0 = CharStreams.toString(new InputStreamReader(block0Stream, StandardCharsets.UTF_8))
block0Stream.close()
assert(sortBlock0 == block0)
// pass
} finally {
blockHandler.close()
// externalShuffleService stop
externalShuffleService.stop()
}
}
// The beforeAll ensures the shuffle data was already written, and then
// the shuffle service was stopped. Here we restart the shuffle service ,
// but we can't read the shuffle data
test("Can't recover shuffle data with spark.shuffle.service.db.enabled=false after" +
" shuffle service restart") {
try {
sparkConf.set("spark.shuffle.service.db.enabled", "false")
externalShuffleService = new ExternalShuffleService(sparkConf, securityManager)
// externalShuffleService restart
externalShuffleService.start()
blockHandler = externalShuffleService.getBlockHandler
blockResolver = blockHandler.getBlockResolver
val error = intercept[RuntimeException] {
blockResolver.getBlockData("app0", "exec0", 0, 0, 0).createInputStream
}.getMessage
assert(error.contains("not registered"))
} finally {
blockHandler.close()
// externalShuffleService stop
externalShuffleService.stop()
}
}
}

View file

@ -17,9 +17,12 @@
package org.apache.spark.deploy.worker
import java.io.{File, IOException}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Supplier
import scala.concurrent.duration._
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.any
@ -27,14 +30,16 @@ import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.{Command, ExecutorState, ExternalShuffleService}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged}
import org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorStateChanged, WorkDirCleanup}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Worker._
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
import org.apache.spark.util.Utils
class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
@ -245,4 +250,48 @@ class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
ExecutorStateChanged("app1", 0, ExecutorState.EXITED, None, None))
assert(cleanupCalled.get() == value)
}
test("WorkDirCleanup cleans app dirs and shuffle metadata when " +
"spark.shuffle.service.db.enabled=true") {
testWorkDirCleanupAndRemoveMetadataWithConfig(true)
}
test("WorkdDirCleanup cleans only app dirs when" +
"spark.shuffle.service.db.enabled=false") {
testWorkDirCleanupAndRemoveMetadataWithConfig(false)
}
private def testWorkDirCleanupAndRemoveMetadataWithConfig(dbCleanupEnabled: Boolean) = {
val conf = new SparkConf().set("spark.shuffle.service.db.enabled", dbCleanupEnabled.toString)
conf.set("spark.worker.cleanup.appDataTtl", "60")
conf.set("spark.shuffle.service.enabled", "true")
val appId = "app1"
val execId = "exec1"
val cleanupCalled = new AtomicBoolean(false)
when(shuffleService.applicationRemoved(any[String])).thenAnswer(new Answer[Unit] {
override def answer(invocations: InvocationOnMock): Unit = {
cleanupCalled.set(true)
}
})
val externalShuffleServiceSupplier = new Supplier[ExternalShuffleService] {
override def get: ExternalShuffleService = shuffleService
}
val worker = makeWorker(conf, externalShuffleServiceSupplier)
val workDir = Utils.createTempDir(namePrefix = "work")
// initialize workers
worker.workDir = workDir
// Create the executor's working directory
val executorDir = new File(worker.workDir, appId + "/" + execId)
if (!executorDir.exists && !executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
executorDir.setLastModified(System.currentTimeMillis - (1000 * 120))
worker.receive(WorkDirCleanup)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
assert(!executorDir.exists() == true)
assert(cleanupCalled.get() == dbCleanupEnabled)
}
}
}

View file

@ -240,6 +240,7 @@ SPARK_WORKER_OPTS supports the following system properties:
<td>
Enable periodic cleanup of worker / application directories. Note that this only affects standalone
mode, as YARN works differently. Only the directories of stopped applications are cleaned up.
This should be enabled if spark.shuffle.service.db.enabled is "true"
</td>
</tr>
<tr>
@ -260,6 +261,16 @@ SPARK_WORKER_OPTS supports the following system properties:
especially if you run jobs very frequently.
</td>
</tr>
<tr>
<td><spark.shuffle.service.db.enabled</code></td>
<td>true</td>
<td>
Store External Shuffle service state on local disk so that when the external shuffle service is restarted, it will
automatically reload info on current executors. This only affects standalone mode (yarn always has this behavior
enabled). You should also enable <code>spark.worker.cleanup.enabled</code>, to ensure that the state
eventually gets cleaned up. This config may be removed in the future.
</td>
</tr>
<tr>
<td><code>spark.storage.cleanupFilesAfterExecutorExit</code></td>
<td>true</td>