[SPARK-35436][SS] RocksDBFileManager - save checkpoint to DFS

### What changes were proposed in this pull request?
The implementation for the save operation of RocksDBFileManager.

### Why are the changes needed?
Save all the files in the given local checkpoint directory as a committed version in DFS.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New UT added.

Closes #32582 from xuanyuanking/SPARK-35436.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
This commit is contained in:
Yuanjian Li 2021-06-09 14:09:28 +09:00 committed by Jungtaek Lim
parent 8013f985a4
commit 9f010a8eb2
3 changed files with 366 additions and 1 deletions

View file

@ -1129,6 +1129,22 @@ private[spark] object Utils extends Logging {
s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms"
}
/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val result = f.listFiles.toBuffer
val dirList = result.filter(_.isDirectory)
while (dirList.nonEmpty) {
val curDir = dirList.remove(0)
val files = curDir.listFiles()
result ++= files
dirList ++= files.filter(_.isDirectory)
}
result.toArray
}
/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.

View file

@ -17,18 +17,266 @@
package org.apache.spark.sql.execution.streaming.state
import java.io.File
import java.io.{File, FileInputStream, InputStream}
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConverters._
import scala.collection.Seq
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import org.apache.commons.io.{FilenameUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.util.Utils
/**
* Class responsible for syncing RocksDB checkpoint files from local disk to DFS.
* For each version, checkpoint is saved in specific directory structure that allows successive
* versions to reuse to SST data files and archived log files. This allows each commit to be
* incremental, only new SST files and archived log files generated by RocksDB will be uploaded.
* The directory structures on local disk and in DFS are as follows.
*
* Local checkpoint dir structure
* ------------------------------
* RocksDB generates a bunch of files in the local checkpoint directory. The most important among
* them are the SST files; they are the actual log structured data files. Rest of the files contain
* the metadata necessary for RocksDB to read the SST files and start from the checkpoint.
* Note that the SST files are hard links to files in the RocksDB's working directory, and therefore
* successive checkpoints can share some of the SST files. So these SST files have to be copied to
* DFS in shared directory such that different committed versions can save them.
*
* We consider both SST files and archived log files as immutable files which can be shared between
* different checkpoints.
*
* localCheckpointDir
* |
* +-- OPTIONS-000005
* +-- MANIFEST-000008
* +-- CURRENT
* +-- 00007.sst
* +-- 00011.sst
* +-- archive
* | +-- 00008.log
* | +-- 00013.log
* ...
*
*
* DFS directory structure after saving to DFS as version 10
* -----------------------------------------------------------
* The SST and archived log files are given unique file names and copied to the shared subdirectory.
* Every version maintains a mapping of local immutable file name to the unique file name in DFS.
* This mapping is saved in a JSON file (named `metadata`), which is zipped along with other
* checkpoint files into a single file `[version].zip`.
*
* dfsRootDir
* |
* +-- SSTs
* | +-- 00007-[uuid1].sst
* | +-- 00011-[uuid2].sst
* +-- logs
* | +-- 00008-[uuid3].log
* | +-- 00013-[uuid4].log
* +-- 10.zip
* | +-- metadata <--- contains mapping between 00007.sst and [uuid1].sst,
* and the mapping between 00008.log and [uuid3].log
* | +-- OPTIONS-000005
* | +-- MANIFEST-000008
* | +-- CURRENT
* | ...
* |
* +-- 9.zip
* +-- 8.zip
* ...
*
* Note the following.
* - Each [version].zip is a complete description of all the data and metadata needed to recover
* a RocksDB instance at the corresponding version. The SST files and log files are not included
* in the zip files, they can be shared cross different versions. This is unlike the
* [version].delta files of HDFSBackedStateStore where previous delta files needs to be read
* to be recovered.
* - This is safe wrt speculatively executed tasks running concurrently in different executors
* as each task would upload a different copy of the generated immutable files and
* atomically update the [version].zip.
* - Immutable files are identified uniquely based on their file name and file size.
* - Immutable files can be reused only across adjacent checkpoints/versions.
* - This class is thread-safe. Specifically, it is safe to concurrently delete old files from a
* different thread than the task thread saving files.
*
* @param dfsRootDir Directory where the [version].zip files will be stored
* @param localTempDir Local directory for temporary work
* @param hadoopConf Hadoop configuration for talking to DFS
* @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs
*/
class RocksDBFileManager(
dfsRootDir: String,
localTempDir: File,
hadoopConf: Configuration,
loggingId: String = "")
extends Logging {
import RocksDBImmutableFile._
private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
/** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version")
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
val metadataFile = localMetadataFile(checkpointDir)
metadata.writeToFile(metadataFile)
logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}")
if (version <= 1 && numKeys == 0) {
// If we're writing the initial version and there's no data, we have to explicitly initialize
// the root directory. Normally saveImmutableFilesToDfs will do this initialization, but
// when there's no data that method won't write any files, and zipToDfsFile uses the
// CheckpointFileManager.createAtomic API which doesn't auto-initialize parent directories.
val path = new Path(dfsRootDir)
if (!fm.exists(path)) fm.mkdirs(path)
}
zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version))
logInfo(s"Saved checkpoint file for version $version")
}
/** Save immutable files to DFS directory */
private def saveImmutableFilesToDfs(
version: Long,
localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
// Get the immutable files used in previous versions, as some of those uploaded files can be
// reused for this version
logInfo(s"Saving RocksDB files to DFS for $version")
val prevFilesToSizes = versionToRocksDBFiles.values.asScala.flatten.map { f =>
f.localFileName -> f
}.toMap
var bytesCopied = 0L
var filesCopied = 0L
var filesReused = 0L
val immutableFiles = localFiles.map { localFile =>
prevFilesToSizes
.get(localFile.getName)
.filter(_.isSameFile(localFile))
.map { reusable =>
filesReused += 1
reusable
}.getOrElse {
val localFileName = localFile.getName
val dfsFileName = newDFSFileName(localFileName)
val dfsFile = dfsFilePath(dfsFileName)
// Note: The implementation of copyFromLocalFile() closes the output stream when there is
// any exception while copying. So this may generate partial files on DFS. But that is
// okay because until the main [version].zip file is written, those partial files are
// not going to be used at all. Eventually these files should get cleared.
fs.copyFromLocalFile(
new Path(localFile.getAbsoluteFile.toURI), dfsFile)
val localFileSize = localFile.length()
logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes")
filesCopied += 1
bytesCopied += localFileSize
RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
}
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" +
s" DFS for version $version. $filesReused files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)
immutableFiles
}
/**
* Compress files to a single zip file in DFS. Only the file names are embedded in the zip.
* Any error while writing will ensure that the file is not written.
*/
private def zipToDfsFile(files: Seq[File], dfsZipFile: Path): Unit = {
lazy val filesStr = s"$dfsZipFile\n\t${files.mkString("\n\t")}"
var in: InputStream = null
val out = fm.createAtomic(dfsZipFile, overwriteIfPossible = true)
var totalBytes = 0L
val zout = new ZipOutputStream(out)
try {
files.foreach { file =>
zout.putNextEntry(new ZipEntry(file.getName))
in = new FileInputStream(file)
val bytes = IOUtils.copy(in, zout)
in.close()
zout.closeEntry()
totalBytes += bytes
}
zout.close() // so that any error in closing also cancels the output stream
logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr")
} catch {
case e: Exception =>
// Cancel the actual output stream first, so that zout.close() does not write the file
out.cancel()
logError(s"Error zipping to $filesStr", e)
throw e
} finally {
// Close everything no matter what happened
IOUtils.closeQuietly(in)
IOUtils.closeQuietly(zout)
}
}
/** Log the files present in a directory. This is useful for debugging. */
private def logFilesInDir(dir: File, msg: String): Unit = {
lazy val files = Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f =>
s"${f.getAbsolutePath} - ${f.length()} bytes"
}
logInfo(s"$msg - ${files.length} files\n\t${files.mkString("\n\t")}")
}
private def newDFSFileName(localFileName: String): String = {
val baseName = FilenameUtils.getBaseName(localFileName)
val extension = FilenameUtils.getExtension(localFileName)
s"$baseName-${UUID.randomUUID}.$extension"
}
private def dfsBatchZipFile(version: Long): Path = new Path(s"$dfsRootDir/$version.zip")
private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata")
private def dfsFilePath(fileName: String): Path = {
if (isSstFile(fileName)) {
new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName)
} else if (isLogFile(fileName)) {
new Path(new Path(dfsRootDir, LOG_FILES_DFS_SUBDIR), fileName)
} else {
new Path(dfsRootDir, fileName)
}
}
/**
* List all the RocksDB files that need be synced or recovered.
*/
private def listRocksDBFiles(localDir: File): (Seq[File], Seq[File]) = {
val topLevelFiles = localDir.listFiles.filter(!_.isDirectory)
val archivedLogFiles =
Option(new File(localDir, LOG_FILES_LOCAL_SUBDIR).listFiles())
.getOrElse(Array[File]())
// To ignore .log.crc files
.filter(file => isLogFile(file.getName))
val (topLevelSstFiles, topLevelOtherFiles) = topLevelFiles.partition(f => isSstFile(f.getName))
(topLevelSstFiles ++ archivedLogFiles, topLevelOtherFiles)
}
}
/**
* Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
* changes to this MUST be backward-compatible.

View file

@ -20,12 +20,87 @@ package org.apache.spark.sql.execution.streaming.state
import java.io._
import java.nio.charset.Charset
import scala.language.implicitConversions
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.spark._
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
class RocksDBSuite extends SparkFunSuite {
test("RocksDBFileManager: upload only new immutable files") {
withTempDir { dir =>
val dfsRootDir = dir.getAbsolutePath
val fileManager = new RocksDBFileManager(
dfsRootDir, Utils.createTempDir(), new Configuration)
val sstDir = s"$dfsRootDir/SSTs"
def numRemoteSSTFiles: Int = listFiles(sstDir).length
val logDir = s"$dfsRootDir/logs"
def numRemoteLogFiles: Int = listFiles(logDir).length
// Save a version of checkpoint files
val cpFiles1 = Seq(
"sst-file1.sst" -> 10,
"sst-file2.sst" -> 20,
"other-file1" -> 100,
"other-file2" -> 200,
"archive/00001.log" -> 1000,
"archive/00002.log" -> 2000
)
saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101)
assert(numRemoteSSTFiles == 2) // 2 sst files copied
assert(numRemoteLogFiles == 2) // 2 log files copied
// Save SAME version again with different checkpoint files and verify
val cpFiles1_ = Seq(
"sst-file1.sst" -> 10, // same SST file as before, should not get copied
"sst-file2.sst" -> 25, // new SST file with same name as before, but different length
"sst-file3.sst" -> 30, // new SST file
"other-file1" -> 100, // same non-SST file as before, should not get copied
"other-file2" -> 210, // new non-SST file with same name as before, but different length
"other-file3" -> 300, // new non-SST file
"archive/00001.log" -> 1000, // same log file as before, should not get copied
"archive/00002.log" -> 2500, // new log file with same name as before, but different length
"archive/00003.log" -> 3000 // new log file
)
saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001)
assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files
assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files
// Save another version and verify
val cpFiles2 = Seq(
"sst-file4.sst" -> 40,
"other-file4" -> 400,
"archive/00004.log" -> 4000
)
saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501)
assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files
assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files
}
}
test("RocksDBFileManager: error writing [version].zip cancels the output stream") {
quietly {
val hadoopConf = new Configuration()
hadoopConf.set(
SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
classOf[CreateAtomicTestManager].getName)
val dfsRootDir = Utils.createTempDir().getAbsolutePath
val fileManager = new RocksDBFileManager(dfsRootDir, Utils.createTempDir(), hadoopConf)
val cpFiles = Seq("sst-file1.sst" -> 10, "sst-file2.sst" -> 20, "other-file1" -> 100)
CreateAtomicTestManager.shouldFailInCreateAtomic = true
intercept[IOException] {
saveCheckpointFiles(fileManager, cpFiles, version = 1, numKeys = 101)
}
assert(CreateAtomicTestManager.cancelCalledInCreateAtomic)
}
}
test("checkpoint metadata serde roundtrip") {
def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = {
assert(metadata.json == json)
@ -54,4 +129,30 @@ class RocksDBSuite extends SparkFunSuite {
"""{"sstFiles":[{"localFileName":"00001.sst","dfsSstFileName":"00001-uuid.sst","sizeBytes":12345678901234}],"logFiles":[{"localFileName":"00001.log","dfsLogFileName":"00001-uuid.log","sizeBytes":12345678901234}],"numKeys":12345678901234}""")
// scalastyle:on line.size.limit
}
def generateFiles(dir: String, fileToLengths: Seq[(String, Int)]): Unit = {
fileToLengths.foreach { case (fileName, length) =>
val file = new File(dir, fileName)
FileUtils.write(file, "a" * length)
}
}
def saveCheckpointFiles(
fileManager: RocksDBFileManager,
fileToLengths: Seq[(String, Int)],
version: Int,
numKeys: Int): Unit = {
val checkpointDir = Utils.createTempDir().getAbsolutePath // local dir to create checkpoints
generateFiles(checkpointDir, fileToLengths)
fileManager.saveCheckpointToDfs(checkpointDir, version, numKeys)
}
implicit def toFile(path: String): File = new File(path)
def listFiles(file: File): Seq[File] = {
if (!file.exists()) return Seq.empty
file.listFiles.filter(file => !file.getName.endsWith("crc") && !file.isDirectory)
}
def listFiles(file: String): Seq[File] = listFiles(new File(file))
}