From 5ffc3897e0a238b369bf44970160e8647b9f4963 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 26 Mar 2021 10:29:50 +0900 Subject: [PATCH] [SPARK-34871][SS] Move the checkpoint location resolving into the rule ResolveWriteToStream ### What changes were proposed in this pull request? Move the checkpoint location resolving into the rule ResolveWriteToStream, which is added in SPARK-34748. ### Why are the changes needed? After SPARK-34748, we have a rule ResolveWriteToStream for the analysis logic for the resolving logic of stream write plans. Based on it, we can further move the checkpoint location resolving work in the rule. Then, all the checkpoint resolving logic was done in the analyzer. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UT. Closes #31963 from xuanyuanking/SPARK-34871. Authored-by: Yuanjian Li Signed-off-by: HyukjinKwon --- .../catalyst/streaming/WriteToStream.scala | 2 +- .../streaming/MicroBatchExecution.scala | 2 +- .../streaming/ResolveWriteToStream.scala | 122 +++++++++++++----- .../execution/streaming/StreamExecution.scala | 47 +------ .../continuous/ContinuousExecution.scala | 2 +- 5 files changed, 91 insertions(+), 84 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala index a40ab89e1e..9571cf4618 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.streaming.OutputMode */ case class WriteToStream( name: String, - checkpointLocation: String, + resolvedCheckpointLocation: String, sink: Table, outputMode: OutputMode, deleteCheckpointOnStop: Boolean, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 388fba1061..5075c682ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -41,7 +41,7 @@ class MicroBatchExecution( extraOptions: Map[String, String], plan: WriteToStream) extends StreamExecution( - sparkSession, plan.name, plan.checkpointLocation, plan.inputQuery, plan.sink, trigger, + sparkSession, plan.name, plan.resolvedCheckpointLocation, plan.inputQuery, plan.sink, trigger, triggerClock, plan.outputMode, plan.deleteCheckpointOnStop) { @volatile protected var sources: Seq[SparkDataStream] = Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala index 9aeca86cc7..d05246468b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.execution.streaming import java.util.UUID +import scala.util.control.NonFatal + import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker @@ -37,40 +40,7 @@ import org.apache.spark.util.Utils object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper { def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { case s: WriteToStreamStatement => - var deleteCheckpointOnStop = false - val checkpointLocation = s.userSpecifiedCheckpointLocation.map { userSpecified => - new Path(userSpecified).toString - }.orElse { - conf.checkpointLocation.map { location => - new Path(location, s.userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString - } - }.getOrElse { - if (s.useTempCheckpointLocation) { - deleteCheckpointOnStop = true - val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath - logWarning("Temporary checkpoint location created which is deleted normally when" + - s" the query didn't fail: $tempDir. If it's required to delete it under any" + - s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" + - s" true. Important to know deleting temp checkpoint folder is best effort.") - tempDir - } else { - throw new AnalysisException( - "checkpointLocation must be specified either " + - """through option("checkpointLocation", ...) or """ + - s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") - } - } - - // If offsets have already been created, we trying to resume a query. - if (!s.recoverFromCheckpointLocation) { - val checkpointPath = new Path(checkpointLocation, "offsets") - val fs = checkpointPath.getFileSystem(s.hadoopConf) - if (fs.exists(checkpointPath)) { - throw new AnalysisException( - s"This query does not support recovering from checkpoint location. " + - s"Delete $checkpointPath to start over.") - } - } + val (resolvedCheckpointLocation, deleteCheckpointOnStop) = resolveCheckpointLocation(s) if (conf.adaptiveExecutionEnabled) { logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + @@ -87,11 +57,93 @@ object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper { WriteToStream( s.userSpecifiedName.orNull, - checkpointLocation, + resolvedCheckpointLocation, s.sink, s.outputMode, deleteCheckpointOnStop, s.inputQuery) } + + def resolveCheckpointLocation(s: WriteToStreamStatement): (String, Boolean) = { + var deleteCheckpointOnStop = false + val checkpointLocation = s.userSpecifiedCheckpointLocation.map { userSpecified => + new Path(userSpecified).toString + }.orElse { + conf.checkpointLocation.map { location => + new Path(location, s.userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString + } + }.getOrElse { + if (s.useTempCheckpointLocation) { + deleteCheckpointOnStop = true + val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath + logWarning("Temporary checkpoint location created which is deleted normally when" + + s" the query didn't fail: $tempDir. If it's required to delete it under any" + + s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" + + s" true. Important to know deleting temp checkpoint folder is best effort.") + tempDir + } else { + throw new AnalysisException( + "checkpointLocation must be specified either " + + """through option("checkpointLocation", ...) or """ + + s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") + } + } + // If offsets have already been created, we trying to resume a query. + if (!s.recoverFromCheckpointLocation) { + val checkpointPath = new Path(checkpointLocation, "offsets") + val fs = checkpointPath.getFileSystem(s.hadoopConf) + if (fs.exists(checkpointPath)) { + throw new AnalysisException( + s"This query does not support recovering from checkpoint location. " + + s"Delete $checkpointPath to start over.") + } + } + + val resolvedCheckpointRoot = { + val checkpointPath = new Path(checkpointLocation) + val fs = checkpointPath.getFileSystem(s.hadoopConf) + if (conf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) + && StreamExecution.containsSpecialCharsInPath(checkpointPath)) { + // In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString` + // calls). If this legacy checkpoint path exists, we will throw an error to tell the user + // how to migrate. + val legacyCheckpointDir = + new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString + val legacyCheckpointDirExists = + try { + fs.exists(new Path(legacyCheckpointDir)) + } catch { + case NonFatal(e) => + // We may not have access to this directory. Don't fail the query if that happens. + logWarning(e.getMessage, e) + false + } + if (legacyCheckpointDirExists) { + throw new SparkException( + s"""Error: we detected a possible problem with the location of your checkpoint and you + |likely need to move it before restarting this query. + | + |Earlier version of Spark incorrectly escaped paths when writing out checkpoints for + |structured streaming. While this was corrected in Spark 3.0, it appears that your + |query was started using an earlier version that incorrectly handled the checkpoint + |path. + | + |Correct Checkpoint Directory: $checkpointPath + |Incorrect Checkpoint Directory: $legacyCheckpointDir + | + |Please move the data from the incorrect directory to the correct one, delete the + |incorrect directory, and then restart this query. If you believe you are receiving + |this message in error, you can disable it with the SQL conf + |${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}.""" + .stripMargin) + } + } + val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + fs.mkdirs(checkpointDir) + checkpointDir.toString + } + logInfo(s"Checkpoint root $checkpointLocation resolved to $resolvedCheckpointRoot.") + (resolvedCheckpointRoot, deleteCheckpointOnStop) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 1483f7266d..0a3bbc5e49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -68,7 +68,7 @@ case object RECONFIGURING extends State abstract class StreamExecution( override val sparkSession: SparkSession, override val name: String, - private val checkpointRoot: String, + val resolvedCheckpointRoot: String, val analyzedPlan: LogicalPlan, val sink: Table, val trigger: Trigger, @@ -94,51 +94,6 @@ abstract class StreamExecution( private val startLatch = new CountDownLatch(1) private val terminationLatch = new CountDownLatch(1) - val resolvedCheckpointRoot = { - val checkpointPath = new Path(checkpointRoot) - val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf()) - if (sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED) - && StreamExecution.containsSpecialCharsInPath(checkpointPath)) { - // In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString` - // calls). If this legacy checkpoint path exists, we will throw an error to tell the user how - // to migrate. - val legacyCheckpointDir = - new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString - val legacyCheckpointDirExists = - try { - fs.exists(new Path(legacyCheckpointDir)) - } catch { - case NonFatal(e) => - // We may not have access to this directory. Don't fail the query if that happens. - logWarning(e.getMessage, e) - false - } - if (legacyCheckpointDirExists) { - throw new SparkException( - s"""Error: we detected a possible problem with the location of your checkpoint and you - |likely need to move it before restarting this query. - | - |Earlier version of Spark incorrectly escaped paths when writing out checkpoints for - |structured streaming. While this was corrected in Spark 3.0, it appears that your - |query was started using an earlier version that incorrectly handled the checkpoint - |path. - | - |Correct Checkpoint Directory: $checkpointPath - |Incorrect Checkpoint Directory: $legacyCheckpointDir - | - |Please move the data from the incorrect directory to the correct one, delete the - |incorrect directory, and then restart this query. If you believe you are receiving - |this message in error, you can disable it with the SQL conf - |${SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED.key}.""" - .stripMargin) - } - } - val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - fs.mkdirs(checkpointDir) - checkpointDir.toString - } - logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.") - def logicalPlan: LogicalPlan /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 018b245890..7faa45e9d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -44,7 +44,7 @@ class ContinuousExecution( extraOptions: Map[String, String], plan: WriteToStream) extends StreamExecution( - sparkSession, plan.name, plan.checkpointLocation, plan.inputQuery, plan.sink, + sparkSession, plan.name, plan.resolvedCheckpointLocation, plan.inputQuery, plan.sink, trigger, triggerClock, plan.outputMode, plan.deleteCheckpointOnStop) { @volatile protected var sources: Seq[ContinuousStream] = Seq()