[SPARK-34748][SS] Create a rule of the analysis logic for streaming write

### What changes were proposed in this pull request?
- Create a new rule `ResolveStreamWrite` for all analysis logic for streaming write.
- Add corresponding logical plans `WriteToStreamStatement` and `WriteToStream`.

### Why are the changes needed?
Currently, the analysis logic for streaming write is mixed in StreamingQueryManager. If we create a specific analyzer rule and separated logical plans, it should be helpful for further extension.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #31842 from xuanyuanking/SPARK-34748.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Yuanjian Li 2021-03-22 06:39:39 +00:00 committed by Wenchen Fan
parent 3bef2dc01a
commit 45235ac4bc
9 changed files with 236 additions and 86 deletions

View file

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.streaming
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.streaming.OutputMode
/**
* Used to create a [[StreamExecution]].
*/
case class WriteToStream(
name: String,
checkpointLocation: String,
sink: Table,
outputMode: OutputMode,
deleteCheckpointOnStop: Boolean,
inputQuery: LogicalPlan) extends LogicalPlan {
override def isStreaming: Boolean = true
override def output: Seq[Attribute] = Nil
override def children: Seq[LogicalPlan] = inputQuery :: Nil
}

View file

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.streaming
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.streaming.OutputMode
/**
* A statement for Stream writing. It contains all neccessary param and will be resolved in the
* rule [[ResolveStreamWrite]].
*
* @param userSpecifiedName Query name optionally specified by the user.
* @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user.
* @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user
* has not specified one. If false, then error will be thrown.
* @param recoverFromCheckpointLocation Whether to recover query from the checkpoint location.
* If false and the checkpoint location exists, then error
* will be thrown.
* @param sink Sink to write the streaming outputs.
* @param outputMode Output mode for the sink.
* @param hadoopConf The Hadoop Configuration to get a FileSystem instance
* @param isContinuousTrigger Whether the statement is triggered by a continuous query or not.
* @param inputQuery The analyzed query plan from the streaming DataFrame.
*/
case class WriteToStreamStatement(
userSpecifiedName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
useTempCheckpointLocation: Boolean,
recoverFromCheckpointLocation: Boolean,
sink: Table,
outputMode: OutputMode,
hadoopConf: Configuration,
isContinuousTrigger: Boolean,
inputQuery: LogicalPlan) extends LogicalPlan {
override def isStreaming: Boolean = true
override def output: Seq[Attribute] = Nil
override def children: Seq[LogicalPlan] = inputQuery :: Nil
}

View file

@ -23,31 +23,26 @@ import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability}
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.util.{Clock, Utils}
class MicroBatchExecution(
sparkSession: SparkSession,
name: String,
checkpointRoot: String,
analyzedPlan: LogicalPlan,
sink: Table,
trigger: Trigger,
triggerClock: Clock,
outputMode: OutputMode,
extraOptions: Map[String, String],
deleteCheckpointOnStop: Boolean)
plan: WriteToStream)
extends StreamExecution(
sparkSession, name, checkpointRoot, analyzedPlan, sink,
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
sparkSession, plan.name, plan.checkpointLocation, plan.inputQuery, plan.sink, trigger,
triggerClock, plan.outputMode, plan.deleteCheckpointOnStop) {
@volatile protected var sources: Seq[SparkDataStream] = Seq.empty

View file

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import java.util.UUID
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement}
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
/**
* Replaces logical [[WriteToStreamStatement]] operator with an [[WriteToStream]] operator.
*/
object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case s: WriteToStreamStatement =>
var deleteCheckpointOnStop = false
val checkpointLocation = s.userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toString
}.orElse {
conf.checkpointLocation.map { location =>
new Path(location, s.userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString
}
}.getOrElse {
if (s.useTempCheckpointLocation) {
deleteCheckpointOnStop = true
val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
logWarning("Temporary checkpoint location created which is deleted normally when" +
s" the query didn't fail: $tempDir. If it's required to delete it under any" +
s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
s" true. Important to know deleting temp checkpoint folder is best effort.")
tempDir
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
"""through option("checkpointLocation", ...) or """ +
s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
}
}
// If offsets have already been created, we trying to resume a query.
if (!s.recoverFromCheckpointLocation) {
val checkpointPath = new Path(checkpointLocation, "offsets")
val fs = checkpointPath.getFileSystem(s.hadoopConf)
if (fs.exists(checkpointPath)) {
throw new AnalysisException(
s"This query does not support recovering from checkpoint location. " +
s"Delete $checkpointPath to start over.")
}
}
if (conf.adaptiveExecutionEnabled) {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
"is not supported in streaming DataFrames/Datasets and will be disabled.")
}
if (conf.isUnsupportedOperationCheckEnabled) {
if (s.sink.isInstanceOf[SupportsWrite] && s.isContinuousTrigger) {
UnsupportedOperationChecker.checkForContinuous(s.inputQuery, s.outputMode)
} else {
UnsupportedOperationChecker.checkForStreaming(s.inputQuery, s.outputMode)
}
}
WriteToStream(
s.userSpecifiedName.orNull,
checkpointLocation,
s.sink,
s.outputMode,
deleteCheckpointOnStop,
s.inputQuery)
}
}

View file

@ -69,7 +69,7 @@ abstract class StreamExecution(
override val sparkSession: SparkSession,
override val name: String,
private val checkpointRoot: String,
analyzedPlan: LogicalPlan,
val analyzedPlan: LogicalPlan,
val sink: Table,
val trigger: Trigger,
val triggerClock: Clock,

View file

@ -28,29 +28,24 @@ import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream}
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Offset => OffsetV2, PartitionOffset, ReadLimit}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.util.Clock
class ContinuousExecution(
sparkSession: SparkSession,
name: String,
checkpointRoot: String,
analyzedPlan: LogicalPlan,
sink: SupportsWrite,
trigger: Trigger,
triggerClock: Clock,
outputMode: OutputMode,
extraOptions: Map[String, String],
deleteCheckpointOnStop: Boolean)
plan: WriteToStream)
extends StreamExecution(
sparkSession, name, checkpointRoot, analyzedPlan, sink,
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
sparkSession, plan.name, plan.checkpointLocation, plan.inputQuery, plan.sink,
trigger, triggerClock, plan.outputMode, plan.deleteCheckpointOnStop) {
@volatile protected var sources: Seq[ContinuousStream] = Seq()
@ -90,7 +85,8 @@ class ContinuousExecution(
// TODO (SPARK-27484): we should add the writing node before the plan is analyzed.
WriteToContinuousDataSource(
createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan)
createStreamingWrite(
plan.sink.asInstanceOf[SupportsWrite], extraOptions, _logicalPlan), _logicalPlan)
}
private val triggerExecutor = trigger match {

View file

@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin
import org.apache.spark.sql.execution.command.CommandCheck
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.{TableCapabilityCheck, V2SessionCatalog}
import org.apache.spark.sql.execution.streaming.ResolveWriteToStream
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.sql.util.ExecutionListenerManager
@ -170,6 +171,7 @@ abstract class BaseSessionStateBuilder(
new FallBackFileSourceV2(session) +:
ResolveEncodersInScalaAgg +:
new ResolveSessionCatalog(catalogManager) +:
ResolveWriteToStream +:
customResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =

View file

@ -24,13 +24,11 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
@ -239,82 +237,39 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
recoverFromCheckpointLocation: Boolean,
trigger: Trigger,
triggerClock: Clock): StreamingQueryWrapper = {
var deleteCheckpointOnStop = false
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toString
}.orElse {
df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString
}
}.getOrElse {
if (useTempCheckpointLocation) {
deleteCheckpointOnStop = true
val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
logWarning("Temporary checkpoint location created which is deleted normally when" +
s" the query didn't fail: $tempDir. If it's required to delete it under any" +
s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
s" true. Important to know deleting temp checkpoint folder is best effort.")
tempDir
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
"""through option("checkpointLocation", ...) or """ +
s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
}
}
// If offsets have already been created, we trying to resume a query.
if (!recoverFromCheckpointLocation) {
val checkpointPath = new Path(checkpointLocation, "offsets")
val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
if (fs.exists(checkpointPath)) {
throw new AnalysisException(
s"This query does not support recovering from checkpoint location. " +
s"Delete $checkpointPath to start over.")
}
}
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
val operationCheckEnabled = sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled
val dataStreamWritePlan = WriteToStreamStatement(
userSpecifiedName,
userSpecifiedCheckpointLocation,
useTempCheckpointLocation,
recoverFromCheckpointLocation,
sink,
outputMode,
df.sparkSession.sessionState.newHadoopConf(),
trigger.isInstanceOf[ContinuousTrigger],
analyzedPlan)
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
"is not supported in streaming DataFrames/Datasets and will be disabled.")
}
val analyzedStreamWritePlan =
sparkSession.sessionState.executePlan(dataStreamWritePlan).analyzed
.asInstanceOf[WriteToStream]
(sink, trigger) match {
case (table: SupportsWrite, trigger: ContinuousTrigger) =>
if (operationCheckEnabled) {
UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
}
case (_: SupportsWrite, trigger: ContinuousTrigger) =>
new StreamingQueryWrapper(new ContinuousExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
table,
trigger,
triggerClock,
outputMode,
extraOptions,
deleteCheckpointOnStop))
analyzedStreamWritePlan))
case _ =>
if (operationCheckEnabled) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}
new StreamingQueryWrapper(new MicroBatchExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
sink,
trigger,
triggerClock,
outputMode,
extraOptions,
deleteCheckpointOnStop))
analyzedStreamWritePlan))
}
}

View file

@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin
import org.apache.spark.sql.execution.command.CommandCheck
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck
import org.apache.spark.sql.execution.streaming.ResolveWriteToStream
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.hive.execution.PruneHiveTablePartitions
import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState}
@ -79,6 +80,7 @@ class HiveSessionStateBuilder(
new FallBackFileSourceV2(session) +:
ResolveEncodersInScalaAgg +:
new ResolveSessionCatalog(catalogManager) +:
ResolveWriteToStream +:
customResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =