[SPARK-34871][SS] Move the checkpoint location resolving into the rule ResolveWriteToStream

### What changes were proposed in this pull request?
Move the checkpoint location resolving into the rule ResolveWriteToStream, which is added in SPARK-34748.

### Why are the changes needed?
After SPARK-34748, we have a rule ResolveWriteToStream for the analysis logic for the resolving logic of stream write plans. Based on it, we can further move the checkpoint location resolving work in the rule. Then, all the checkpoint resolving logic was done in the analyzer.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing UT.

Closes #31963 from xuanyuanking/SPARK-34871.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Yuanjian Li 2021-03-26 10:29:50 +09:00 committed by HyukjinKwon
parent 658e95c345
commit 5ffc3897e0
5 changed files with 91 additions and 84 deletions

View file

@ -27,7 +27,7 @@ import org.apache.spark.sql.streaming.OutputMode
*/
case class WriteToStream(
name: String,
checkpointLocation: String,
resolvedCheckpointLocation: String,
sink: Table,
outputMode: OutputMode,
deleteCheckpointOnStop: Boolean,

View file

@ -41,7 +41,7 @@ class MicroBatchExecution(
extraOptions: Map[String, String],
plan: WriteToStream)
extends StreamExecution(
sparkSession, plan.name, plan.checkpointLocation, plan.inputQuery, plan.sink, trigger,
sparkSession, plan.name, plan.resolvedCheckpointLocation, plan.inputQuery, plan.sink, trigger,
triggerClock, plan.outputMode, plan.deleteCheckpointOnStop) {
@volatile protected var sources: Seq[SparkDataStream] = Seq.empty

View file

@ -19,8 +19,11 @@ package org.apache.spark.sql.execution.streaming
import java.util.UUID
import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
@ -37,40 +40,7 @@ import org.apache.spark.util.Utils
object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case s: WriteToStreamStatement =>
var deleteCheckpointOnStop = false
val checkpointLocation = s.userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toString
}.orElse {
conf.checkpointLocation.map { location =>
new Path(location, s.userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString
}
}.getOrElse {
if (s.useTempCheckpointLocation) {
deleteCheckpointOnStop = true
val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
logWarning("Temporary checkpoint location created which is deleted normally when" +
s" the query didn't fail: $tempDir. If it's required to delete it under any" +
s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
s" true. Important to know deleting temp checkpoint folder is best effort.")
tempDir
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
"""through option("checkpointLocation", ...) or """ +
s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
}
}
// If offsets have already been created, we trying to resume a query.
if (!s.recoverFromCheckpointLocation) {
val checkpointPath = new Path(checkpointLocation, "offsets")
val fs = checkpointPath.getFileSystem(s.hadoopConf)
if (fs.exists(checkpointPath)) {
throw new AnalysisException(
s"This query does not support recovering from checkpoint location. " +
s"Delete $checkpointPath to start over.")
}
}
val (resolvedCheckpointLocation, deleteCheckpointOnStop) = resolveCheckpointLocation(s)
if (conf.adaptiveExecutionEnabled) {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
@ -87,11 +57,93 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper {
WriteToStream(
s.userSpecifiedName.orNull,
checkpointLocation,
resolvedCheckpointLocation,
s.sink,
s.outputMode,
deleteCheckpointOnStop,
s.inputQuery)
}
def resolveCheckpointLocation(s: WriteToStreamStatement): (String, Boolean) = {
var deleteCheckpointOnStop = false
val checkpointLocation = s.userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toString
}.orElse {
conf.checkpointLocation.map { location =>
new Path(location, s.userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString
}
}.getOrElse {
if (s.useTempCheckpointLocation) {
deleteCheckpointOnStop = true
val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
logWarning("Temporary checkpoint location created which is deleted normally when" +
s" the query didn't fail: $tempDir. If it's required to delete it under any" +
s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
s" true. Important to know deleting temp checkpoint folder is best effort.")
tempDir
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
"""through option("checkpointLocation", ...) or """ +
s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
}
}
// If offsets have already been created, we trying to resume a query.
if (!s.recoverFromCheckpointLocation) {
val checkpointPath = new Path(checkpointLocation, "offsets")
val fs = checkpointPath.getFileSystem(s.hadoopConf)
if (fs.exists(checkpointPath)) {
throw new AnalysisException(
s"This query does not support recovering from checkpoint location. " +
s"Delete $checkpointPath to start over.")
}
}
val resolvedCheckpointRoot = {
val checkpointPath = new Path(checkpointLocation)
val fs = checkpointPath.getFileSystem(s.hadoopConf)
if (conf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
&& StreamExecution.containsSpecialCharsInPath(checkpointPath)) {
// In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString`
// calls). If this legacy checkpoint path exists, we will throw an error to tell the user
// how to migrate.
val legacyCheckpointDir =
new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString
val legacyCheckpointDirExists =
try {
fs.exists(new Path(legacyCheckpointDir))
} catch {
case NonFatal(e) =>
// We may not have access to this directory. Don't fail the query if that happens.
logWarning(e.getMessage, e)
false
}
if (legacyCheckpointDirExists) {
throw new SparkException(
s"""Error: we detected a possible problem with the location of your checkpoint and you
|likely need to move it before restarting this query.
|
|Earlier version of Spark incorrectly escaped paths when writing out checkpoints for
|structured streaming. While this was corrected in Spark 3.0, it appears that your
|query was started using an earlier version that incorrectly handled the checkpoint
|path.
|
|Correct Checkpoint Directory: $checkpointPath
|Incorrect Checkpoint Directory: $legacyCheckpointDir
|
|Please move the data from the incorrect directory to the correct one, delete the
|incorrect directory, and then restart this query. If you believe you are receiving
|this message in error, you can disable it with the SQL conf
|${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}."""
.stripMargin)
}
}
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
fs.mkdirs(checkpointDir)
checkpointDir.toString
}
logInfo(s"Checkpoint root $checkpointLocation resolved to $resolvedCheckpointRoot.")
(resolvedCheckpointRoot, deleteCheckpointOnStop)
}
}

View file

@ -68,7 +68,7 @@ case object RECONFIGURING extends State
abstract class StreamExecution(
override val sparkSession: SparkSession,
override val name: String,
private val checkpointRoot: String,
val resolvedCheckpointRoot: String,
val analyzedPlan: LogicalPlan,
val sink: Table,
val trigger: Trigger,
@ -94,51 +94,6 @@ abstract class StreamExecution(
private val startLatch = new CountDownLatch(1)
private val terminationLatch = new CountDownLatch(1)
val resolvedCheckpointRoot = {
val checkpointPath = new Path(checkpointRoot)
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
if (sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
&& StreamExecution.containsSpecialCharsInPath(checkpointPath)) {
// In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString`
// calls). If this legacy checkpoint path exists, we will throw an error to tell the user how
// to migrate.
val legacyCheckpointDir =
new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString
val legacyCheckpointDirExists =
try {
fs.exists(new Path(legacyCheckpointDir))
} catch {
case NonFatal(e) =>
// We may not have access to this directory. Don't fail the query if that happens.
logWarning(e.getMessage, e)
false
}
if (legacyCheckpointDirExists) {
throw new SparkException(
s"""Error: we detected a possible problem with the location of your checkpoint and you
|likely need to move it before restarting this query.
|
|Earlier version of Spark incorrectly escaped paths when writing out checkpoints for
|structured streaming. While this was corrected in Spark 3.0, it appears that your
|query was started using an earlier version that incorrectly handled the checkpoint
|path.
|
|Correct Checkpoint Directory: $checkpointPath
|Incorrect Checkpoint Directory: $legacyCheckpointDir
|
|Please move the data from the incorrect directory to the correct one, delete the
|incorrect directory, and then restart this query. If you believe you are receiving
|this message in error, you can disable it with the SQL conf
|${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}."""
.stripMargin)
}
}
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
fs.mkdirs(checkpointDir)
checkpointDir.toString
}
logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.")
def logicalPlan: LogicalPlan
/**

View file

@ -44,7 +44,7 @@ class ContinuousExecution(
extraOptions: Map[String, String],
plan: WriteToStream)
extends StreamExecution(
sparkSession, plan.name, plan.checkpointLocation, plan.inputQuery, plan.sink,
sparkSession, plan.name, plan.resolvedCheckpointLocation, plan.inputQuery, plan.sink,
trigger, triggerClock, plan.outputMode, plan.deleteCheckpointOnStop) {
@volatile protected var sources: Seq[ContinuousStream] = Seq()