[SPARK-35628][SS] RocksDBFileManager - load checkpoint from DFS

### What changes were proposed in this pull request?
The implementation for the load operation of RocksDBFileManager.

### Why are the changes needed?
Provide the functionality of loading all necessary files for specific checkpoint versions from DFS to the given local directory.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New UT added.

Closes #32767 from xuanyuanking/SPARK-35628.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
This commit is contained in:
Yuanjian Li 2021-06-25 18:38:26 +09:00 committed by Jungtaek Lim
parent c0cfbb1743
commit f2029e7442
3 changed files with 192 additions and 4 deletions

View file

@ -31,11 +31,11 @@ import java.security.SecureRandom
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.zip.GZIPInputStream
import java.util.zip.{GZIPInputStream, ZipInputStream}
import scala.annotation.tailrec
import scala.collection.{mutable, Map, Seq}
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
@ -49,6 +49,7 @@ import com.google.common.collect.Interners
import com.google.common.io.{ByteStreams, Files => GFiles}
import com.google.common.net.InetAddresses
import org.apache.commons.codec.binary.Hex
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
@ -3121,6 +3122,38 @@ private[spark] object Utils extends Logging {
"If %s is not used, set spark.security.credentials.%s.enabled to false."
message.format(serviceName, e, serviceName, serviceName)
}
/**
* Decompress a zip file into a local dir. File names are read from the zip file. Note, we skip
* addressing the directory here. Also, we rely on the caller side to address any exceptions.
*/
def unzipFilesFromFile(fs: FileSystem, dfsZipFile: Path, localDir: File): Seq[File] = {
val files = new mutable.ArrayBuffer[File]()
val in = new ZipInputStream(fs.open(dfsZipFile))
var out: OutputStream = null
try {
var entry = in.getNextEntry()
while (entry != null) {
if (!entry.isDirectory) {
val fileName = localDir.toPath.resolve(entry.getName).getFileName.toString
val outFile = new File(localDir, fileName)
files += outFile
out = new FileOutputStream(outFile)
IOUtils.copy(in, out)
out.close()
in.closeEntry()
}
entry = in.getNextEntry()
}
in.close() // so that any error in closing does not get ignored
logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}")
} finally {
// Close everything no matter what happened
IOUtils.closeQuietly(in)
IOUtils.closeQuietly(out)
}
files
}
}
private[util] object CallerContext extends Logging {

View file

@ -32,7 +32,7 @@ import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import org.apache.commons.io.{FilenameUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, PathFilter}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
@ -130,6 +130,9 @@ class RocksDBFileManager(
private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
private val onlyZipFiles = new PathFilter {
override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
}
/** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
@ -153,6 +156,49 @@ class RocksDBFileManager(
logInfo(s"Saved checkpoint file for version $version")
}
/**
* Load all necessary files for specific checkpoint version from DFS to given local directory.
* If version is 0, then it will delete all files in the directory. For other versions, it
* ensures that only the exact files generated during checkpointing will be present in the
* local directory.
*/
def loadCheckpointFromDfs(version: Long, localDir: File): RocksDBCheckpointMetadata = {
logInfo(s"Loading checkpoint files for version $version")
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localDir.mkdirs()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
// Delete all non-immutable files in local dir, and unzip new ones from DFS commit file
listRocksDBFiles(localDir)._2.foreach(_.delete())
Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localDir)
// Copy the necessary immutable files
val metadataFile = localMetadataFile(localDir)
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}")
loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
versionToRocksDBFiles.put(version, metadata.immutableFiles)
metadataFile.delete()
metadata
}
logFilesInDir(localDir, s"Loaded checkpoint files for version $version")
metadata
}
/** Get the latest version available in the DFS directory. If no data present, it returns 0. */
def getLatestVersion(): Long = {
val path = new Path(dfsRootDir)
if (fm.exists(path)) {
fm.list(path, onlyZipFiles)
.map(_.getPath.getName.stripSuffix(".zip"))
.map(_.toLong)
.foldLeft(0L)(math.max)
} else {
0
}
}
/** Save immutable files to DFS directory */
private def saveImmutableFilesToDfs(
version: Long,
@ -200,6 +246,56 @@ class RocksDBFileManager(
immutableFiles
}
/**
* Copy files from DFS directory to a local directory. It will figure out which
* existing files are needed, and accordingly, unnecessary SST files are deleted while
* necessary and non-existing files are copied from DFS.
*/
private def loadImmutableFilesFromDfs(
immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
val requiredFileNameToFileDetails = immutableFiles.map(f => f.localFileName -> f).toMap
// Delete unnecessary local immutable files
listRocksDBFiles(localDir)._1
.foreach { existingFile =>
val isSameFile =
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
if (!isSameFile) {
existingFile.delete()
logInfo(s"Deleted local file $existingFile")
}
}
var filesCopied = 0L
var bytesCopied = 0L
var filesReused = 0L
immutableFiles.foreach { file =>
val localFileName = file.localFileName
val localFile = localFilePath(localDir, localFileName)
if (!localFile.exists) {
val dfsFile = dfsFilePath(file.dfsFileName)
// Note: The implementation of copyToLocalFile() closes the output stream when there is
// any exception while copying. So this may generate partial files on DFS. But that is
// okay because until the main [version].zip file is written, those partial files are
// not going to be used at all. Eventually these files should get cleared.
fs.copyToLocalFile(dfsFile, new Path(localFile.getAbsoluteFile.toURI))
val localFileSize = localFile.length()
val expectedSize = file.sizeBytes
if (localFileSize != expectedSize) {
throw new IllegalStateException(
s"Copied $dfsFile to $localFile," +
s" expected $expectedSize bytes, found $localFileSize bytes ")
}
filesCopied += 1
bytesCopied += localFileSize
logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
} else {
filesReused += 1
}
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local with " +
s"$filesReused files reused.")
}
/**
* Compress files to a single zip file in DFS. Only the file names are embedded in the zip.
* Any error while writing will ensure that the file is not written.
@ -262,6 +358,14 @@ class RocksDBFileManager(
}
}
private def localFilePath(localDir: File, fileName: String): File = {
if (isLogFile(fileName)) {
new File(new File(localDir, LOG_FILES_LOCAL_SUBDIR), fileName)
} else {
new File(localDir, fileName)
}
}
/**
* List all the RocksDB files that need be synced or recovered.
*/

View file

@ -36,6 +36,7 @@ class RocksDBSuite extends SparkFunSuite {
test("RocksDBFileManager: upload only new immutable files") {
withTempDir { dir =>
val dfsRootDir = dir.getAbsolutePath
val verificationDir = Utils.createTempDir().getAbsolutePath // local dir to load checkpoints
val fileManager = new RocksDBFileManager(
dfsRootDir, Utils.createTempDir(), new Configuration)
val sstDir = s"$dfsRootDir/SSTs"
@ -43,6 +44,14 @@ class RocksDBSuite extends SparkFunSuite {
val logDir = s"$dfsRootDir/logs"
def numRemoteLogFiles: Int = listFiles(logDir).length
// Verify behavior before any saved checkpoints
assert(fileManager.getLatestVersion() === 0)
// Try to load incorrect versions
intercept[FileNotFoundException] {
fileManager.loadCheckpointFromDfs(1, Utils.createTempDir())
}
// Save a version of checkpoint files
val cpFiles1 = Seq(
"sst-file1.sst" -> 10,
@ -53,10 +62,24 @@ class RocksDBSuite extends SparkFunSuite {
"archive/00002.log" -> 2000
)
saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101)
assert(fileManager.getLatestVersion() === 1)
assert(numRemoteSSTFiles == 2) // 2 sst files copied
assert(numRemoteLogFiles == 2) // 2 log files copied
// Save SAME version again with different checkpoint files and verify
// Load back the checkpoint files into another local dir with existing files and verify
generateFiles(verificationDir, Seq(
"sst-file1.sst" -> 11, // files with same name but different sizes, should get overwritten
"other-file1" -> 101,
"archive/00001.log" -> 1001,
"random-sst-file.sst" -> 100, // unnecessary files, should get deleted
"random-other-file" -> 9,
"00005.log" -> 101,
"archive/00007.log" -> 101
))
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1, 101)
// Save SAME version again with different checkpoint files and load back again to verify
// whether files were overwritten.
val cpFiles1_ = Seq(
"sst-file1.sst" -> 10, // same SST file as before, should not get copied
"sst-file2.sst" -> 25, // new SST file with same name as before, but different length
@ -71,6 +94,7 @@ class RocksDBSuite extends SparkFunSuite {
saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001)
assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files
assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001)
// Save another version and verify
val cpFiles2 = Seq(
@ -81,6 +105,19 @@ class RocksDBSuite extends SparkFunSuite {
saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501)
assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files
assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, cpFiles2, 1501)
// Loading an older version should work
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001)
// Loading incorrect version should fail
intercept[FileNotFoundException] {
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 3, Nil, 1001)
}
// Loading 0 should delete all files
require(verificationDir.list().length > 0)
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 0, Nil, 0)
}
}
@ -147,6 +184,20 @@ class RocksDBSuite extends SparkFunSuite {
fileManager.saveCheckpointToDfs(checkpointDir, version, numKeys)
}
def loadAndVerifyCheckpointFiles(
fileManager: RocksDBFileManager,
verificationDir: String,
version: Int,
expectedFiles: Seq[(String, Int)],
expectedNumKeys: Int): Unit = {
val metadata = fileManager.loadCheckpointFromDfs(version, verificationDir)
val filesAndLengths =
listFiles(verificationDir).map(f => f.getName -> f.length).toSet ++
listFiles(verificationDir + "/archive").map(f => s"archive/${f.getName}" -> f.length()).toSet
assert(filesAndLengths === expectedFiles.toSet)
assert(metadata.numKeys === expectedNumKeys)
}
implicit def toFile(path: String): File = new File(path)
def listFiles(file: File): Seq[File] = {