[SPARK-26092][SS] Use CheckpointFileManager to write the streaming metadata file
## What changes were proposed in this pull request? Use CheckpointFileManager to write the streaming `metadata` file so that the `metadata` file will never be a partial file. ## How was this patch tested? Jenkins Closes #23060 from zsxwing/SPARK-26092. Authored-by: Shixiong Zhu <zsxwing@gmail.com> Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
This commit is contained in:
parent
99cbc51b32
commit
058c4602b0
|
@ -56,7 +56,7 @@ trait CheckpointFileManager {
|
|||
* @param overwriteIfPossible If true, then the implementations must do a best-effort attempt to
|
||||
* overwrite the file if it already exists. It should not throw
|
||||
* any exception if the file exists. However, if false, then the
|
||||
* implementation must not overwrite if the file alraedy exists and
|
||||
* implementation must not overwrite if the file already exists and
|
||||
* must throw `FileAlreadyExistsException` in that case.
|
||||
*/
|
||||
def createAtomic(path: Path, overwriteIfPossible: Boolean): CancellableFSDataOutputStream
|
||||
|
|
|
@ -88,6 +88,7 @@ abstract class StreamExecution(
|
|||
val resolvedCheckpointRoot = {
|
||||
val checkpointPath = new Path(checkpointRoot)
|
||||
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
|
||||
fs.mkdirs(checkpointPath)
|
||||
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
|
||||
}
|
||||
|
||||
|
|
|
@ -19,16 +19,18 @@ package org.apache.spark.sql.execution.streaming
|
|||
|
||||
import java.io.{InputStreamReader, OutputStreamWriter}
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.util.ConcurrentModificationException
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.commons.io.IOUtils
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, FSDataOutputStream, Path}
|
||||
import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path}
|
||||
import org.json4s.NoTypeHints
|
||||
import org.json4s.jackson.Serialization
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
|
||||
import org.apache.spark.sql.streaming.StreamingQuery
|
||||
|
||||
/**
|
||||
|
@ -70,19 +72,26 @@ object StreamMetadata extends Logging {
|
|||
metadata: StreamMetadata,
|
||||
metadataFile: Path,
|
||||
hadoopConf: Configuration): Unit = {
|
||||
var output: FSDataOutputStream = null
|
||||
var output: CancellableFSDataOutputStream = null
|
||||
try {
|
||||
val fs = metadataFile.getFileSystem(hadoopConf)
|
||||
output = fs.create(metadataFile)
|
||||
val fileManager = CheckpointFileManager.create(metadataFile.getParent, hadoopConf)
|
||||
output = fileManager.createAtomic(metadataFile, overwriteIfPossible = false)
|
||||
val writer = new OutputStreamWriter(output)
|
||||
Serialization.write(metadata, writer)
|
||||
writer.close()
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
case e: FileAlreadyExistsException =>
|
||||
if (output != null) {
|
||||
output.cancel()
|
||||
}
|
||||
throw new ConcurrentModificationException(
|
||||
s"Multiple streaming queries are concurrently using $metadataFile", e)
|
||||
case e: Throwable =>
|
||||
if (output != null) {
|
||||
output.cancel()
|
||||
}
|
||||
logError(s"Error writing stream metadata $metadata to $metadataFile", e)
|
||||
throw e
|
||||
} finally {
|
||||
IOUtils.closeQuietly(output)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue