diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala new file mode 100644 index 0000000000..a40ab89e1e --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.streaming + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.streaming.OutputMode + +/** + * Used to create a [[StreamExecution]]. + */ +case class WriteToStream( + name: String, + checkpointLocation: String, + sink: Table, + outputMode: OutputMode, + deleteCheckpointOnStop: Boolean, + inputQuery: LogicalPlan) extends LogicalPlan { + + override def isStreaming: Boolean = true + + override def output: Seq[Attribute] = Nil + + override def children: Seq[LogicalPlan] = inputQuery :: Nil +} + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala new file mode 100644 index 0000000000..c1e2f017cc --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.streaming + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.streaming.OutputMode + +/** + * A statement for Stream writing. It contains all neccessary param and will be resolved in the + * rule [[ResolveStreamWrite]]. + * + * @param userSpecifiedName Query name optionally specified by the user. + * @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user. + * @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user + * has not specified one. If false, then error will be thrown. + * @param recoverFromCheckpointLocation Whether to recover query from the checkpoint location. + * If false and the checkpoint location exists, then error + * will be thrown. + * @param sink Sink to write the streaming outputs. + * @param outputMode Output mode for the sink. + * @param hadoopConf The Hadoop Configuration to get a FileSystem instance + * @param isContinuousTrigger Whether the statement is triggered by a continuous query or not. + * @param inputQuery The analyzed query plan from the streaming DataFrame. + */ +case class WriteToStreamStatement( + userSpecifiedName: Option[String], + userSpecifiedCheckpointLocation: Option[String], + useTempCheckpointLocation: Boolean, + recoverFromCheckpointLocation: Boolean, + sink: Table, + outputMode: OutputMode, + hadoopConf: Configuration, + isContinuousTrigger: Boolean, + inputQuery: LogicalPlan) extends LogicalPlan { + + override def isStreaming: Boolean = true + + override def output: Seq[Attribute] = Nil + + override def children: Seq[LogicalPlan] = inputQuery :: Nil +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index a9cb345c4a..388fba1061 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -23,31 +23,26 @@ import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project} -import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} +import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability} import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, Offset => OffsetV2, ReadLimit, SparkDataStream, SupportsAdmissionControl} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.streaming.{OutputMode, Trigger} +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.util.{Clock, Utils} class MicroBatchExecution( sparkSession: SparkSession, - name: String, - checkpointRoot: String, - analyzedPlan: LogicalPlan, - sink: Table, trigger: Trigger, triggerClock: Clock, - outputMode: OutputMode, extraOptions: Map[String, String], - deleteCheckpointOnStop: Boolean) + plan: WriteToStream) extends StreamExecution( - sparkSession, name, checkpointRoot, analyzedPlan, sink, - trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + sparkSession, plan.name, plan.checkpointLocation, plan.inputQuery, plan.sink, trigger, + triggerClock, plan.outputMode, plan.deleteCheckpointOnStop) { @volatile protected var sources: Seq[SparkDataStream] = Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala new file mode 100644 index 0000000000..9aeca86cc7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ResolveWriteToStream.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.UUID + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement} +import org.apache.spark.sql.connector.catalog.SupportsWrite +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils + +/** + * Replaces logical [[WriteToStreamStatement]] operator with an [[WriteToStream]] operator. + */ +object ResolveWriteToStream extends Rule[LogicalPlan] with SQLConfHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case s: WriteToStreamStatement => + var deleteCheckpointOnStop = false + val checkpointLocation = s.userSpecifiedCheckpointLocation.map { userSpecified => + new Path(userSpecified).toString + }.orElse { + conf.checkpointLocation.map { location => + new Path(location, s.userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString + } + }.getOrElse { + if (s.useTempCheckpointLocation) { + deleteCheckpointOnStop = true + val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath + logWarning("Temporary checkpoint location created which is deleted normally when" + + s" the query didn't fail: $tempDir. If it's required to delete it under any" + + s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" + + s" true. Important to know deleting temp checkpoint folder is best effort.") + tempDir + } else { + throw new AnalysisException( + "checkpointLocation must be specified either " + + """through option("checkpointLocation", ...) or """ + + s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") + } + } + + // If offsets have already been created, we trying to resume a query. + if (!s.recoverFromCheckpointLocation) { + val checkpointPath = new Path(checkpointLocation, "offsets") + val fs = checkpointPath.getFileSystem(s.hadoopConf) + if (fs.exists(checkpointPath)) { + throw new AnalysisException( + s"This query does not support recovering from checkpoint location. " + + s"Delete $checkpointPath to start over.") + } + } + + if (conf.adaptiveExecutionEnabled) { + logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + + "is not supported in streaming DataFrames/Datasets and will be disabled.") + } + + if (conf.isUnsupportedOperationCheckEnabled) { + if (s.sink.isInstanceOf[SupportsWrite] && s.isContinuousTrigger) { + UnsupportedOperationChecker.checkForContinuous(s.inputQuery, s.outputMode) + } else { + UnsupportedOperationChecker.checkForStreaming(s.inputQuery, s.outputMode) + } + } + + WriteToStream( + s.userSpecifiedName.orNull, + checkpointLocation, + s.sink, + s.outputMode, + deleteCheckpointOnStop, + s.inputQuery) + } +} + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ae010f984f..1483f7266d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -69,7 +69,7 @@ abstract class StreamExecution( override val sparkSession: SparkSession, override val name: String, private val checkpointRoot: String, - analyzedPlan: LogicalPlan, + val analyzedPlan: LogicalPlan, val sink: Table, val trigger: Trigger, val triggerClock: Clock, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index ad041ceeba..018b245890 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -28,29 +28,24 @@ import org.apache.spark.SparkEnv import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 +import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2, WriteToStream} import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, TableCapability} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, Offset => OffsetV2, PartitionOffset, ReadLimit} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.streaming.{OutputMode, Trigger} +import org.apache.spark.sql.streaming.Trigger import org.apache.spark.util.Clock class ContinuousExecution( sparkSession: SparkSession, - name: String, - checkpointRoot: String, - analyzedPlan: LogicalPlan, - sink: SupportsWrite, trigger: Trigger, triggerClock: Clock, - outputMode: OutputMode, extraOptions: Map[String, String], - deleteCheckpointOnStop: Boolean) + plan: WriteToStream) extends StreamExecution( - sparkSession, name, checkpointRoot, analyzedPlan, sink, - trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + sparkSession, plan.name, plan.checkpointLocation, plan.inputQuery, plan.sink, + trigger, triggerClock, plan.outputMode, plan.deleteCheckpointOnStop) { @volatile protected var sources: Seq[ContinuousStream] = Seq() @@ -90,7 +85,8 @@ class ContinuousExecution( // TODO (SPARK-27484): we should add the writing node before the plan is analyzed. WriteToContinuousDataSource( - createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan) + createStreamingWrite( + plan.sink.asInstanceOf[SupportsWrite], extraOptions, _logicalPlan), _logicalPlan) } private val triggerExecutor = trigger match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index eb769340d7..c905f5b915 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin import org.apache.spark.sql.execution.command.CommandCheck import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.{TableCapabilityCheck, V2SessionCatalog} +import org.apache.spark.sql.execution.streaming.ResolveWriteToStream import org.apache.spark.sql.streaming.StreamingQueryManager import org.apache.spark.sql.util.ExecutionListenerManager @@ -170,6 +171,7 @@ abstract class BaseSessionStateBuilder( new FallBackFileSourceV2(session) +: ResolveEncodersInScalaAgg +: new ResolveSessionCatalog(catalogManager) +: + ResolveWriteToStream +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index b66037d009..1f29b9be1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -24,13 +24,11 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.hadoop.fs.Path - import org.apache.spark.SparkException import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.streaming.{WriteToStream, WriteToStreamStatement} import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution @@ -239,82 +237,39 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo recoverFromCheckpointLocation: Boolean, trigger: Trigger, triggerClock: Clock): StreamingQueryWrapper = { - var deleteCheckpointOnStop = false - val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => - new Path(userSpecified).toString - }.orElse { - df.sparkSession.sessionState.conf.checkpointLocation.map { location => - new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toString - } - }.getOrElse { - if (useTempCheckpointLocation) { - deleteCheckpointOnStop = true - val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath - logWarning("Temporary checkpoint location created which is deleted normally when" + - s" the query didn't fail: $tempDir. If it's required to delete it under any" + - s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" + - s" true. Important to know deleting temp checkpoint folder is best effort.") - tempDir - } else { - throw new AnalysisException( - "checkpointLocation must be specified either " + - """through option("checkpointLocation", ...) or """ + - s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") - } - } - - // If offsets have already been created, we trying to resume a query. - if (!recoverFromCheckpointLocation) { - val checkpointPath = new Path(checkpointLocation, "offsets") - val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf()) - if (fs.exists(checkpointPath)) { - throw new AnalysisException( - s"This query does not support recovering from checkpoint location. " + - s"Delete $checkpointPath to start over.") - } - } - val analyzedPlan = df.queryExecution.analyzed df.queryExecution.assertAnalyzed() - val operationCheckEnabled = sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled + val dataStreamWritePlan = WriteToStreamStatement( + userSpecifiedName, + userSpecifiedCheckpointLocation, + useTempCheckpointLocation, + recoverFromCheckpointLocation, + sink, + outputMode, + df.sparkSession.sessionState.newHadoopConf(), + trigger.isInstanceOf[ContinuousTrigger], + analyzedPlan) - if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) { - logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + - "is not supported in streaming DataFrames/Datasets and will be disabled.") - } + val analyzedStreamWritePlan = + sparkSession.sessionState.executePlan(dataStreamWritePlan).analyzed + .asInstanceOf[WriteToStream] (sink, trigger) match { - case (table: SupportsWrite, trigger: ContinuousTrigger) => - if (operationCheckEnabled) { - UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode) - } + case (_: SupportsWrite, trigger: ContinuousTrigger) => new StreamingQueryWrapper(new ContinuousExecution( sparkSession, - userSpecifiedName.orNull, - checkpointLocation, - analyzedPlan, - table, trigger, triggerClock, - outputMode, extraOptions, - deleteCheckpointOnStop)) + analyzedStreamWritePlan)) case _ => - if (operationCheckEnabled) { - UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) - } new StreamingQueryWrapper(new MicroBatchExecution( sparkSession, - userSpecifiedName.orNull, - checkpointLocation, - analyzedPlan, - sink, trigger, triggerClock, - outputMode, extraOptions, - deleteCheckpointOnStop)) + analyzedStreamWritePlan)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index b98a956dfc..11e8333567 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin import org.apache.spark.sql.execution.command.CommandCheck import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck +import org.apache.spark.sql.execution.streaming.ResolveWriteToStream import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.hive.execution.PruneHiveTablePartitions import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionResourceLoader, SessionState} @@ -79,6 +80,7 @@ class HiveSessionStateBuilder( new FallBackFileSourceV2(session) +: ResolveEncodersInScalaAgg +: new ResolveSessionCatalog(catalogManager) +: + ResolveWriteToStream +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =