From 3c0c2d09ca89c6b6247137823169db17847dfae3 Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Tue, 22 Aug 2017 19:07:43 -0700 Subject: [PATCH] [SPARK-21765] Set isStreaming on leaf nodes for streaming plans. ## What changes were proposed in this pull request? All streaming logical plans will now have isStreaming set. This involved adding isStreaming as a case class arg in a few cases, since a node might be logically streaming depending on where it came from. ## How was this patch tested? Existing unit tests - no functional change is intended in this PR. Author: Jose Torres Author: Tathagata Das Closes #18973 from joseph-torres/SPARK-21765. --- .../spark/sql/kafka010/KafkaSource.scala | 2 +- .../sql/catalyst/optimizer/Optimizer.scala | 10 +++--- .../plans/logical/LocalRelation.scala | 5 ++- .../catalyst/plans/logical/LogicalPlan.scala | 2 +- .../plans/logical/basicLogicalOperators.scala | 11 ++++--- .../analysis/ResolveInlineTablesSuite.scala | 2 +- .../analysis/UnsupportedOperationsSuite.scala | 6 ++-- .../optimizer/ReplaceOperatorSuite.scala | 6 ++-- .../sql/catalyst/plans/LogicalPlanSuite.scala | 6 ++-- .../apache/spark/sql/DataFrameReader.scala | 4 +-- .../apache/spark/sql/DataFrameWriter.scala | 4 +-- .../scala/org/apache/spark/sql/Dataset.scala | 7 +++-- .../org/apache/spark/sql/SQLContext.scala | 7 +++-- .../org/apache/spark/sql/SparkSession.scala | 8 +++-- .../spark/sql/execution/ExistingRDD.scala | 8 +++-- .../execution/OptimizeMetadataOnlyQuery.scala | 6 ++-- .../spark/sql/execution/SparkStrategies.scala | 8 +++-- .../execution/datasources/DataSource.scala | 2 +- .../datasources/DataSourceStrategy.scala | 15 ++++----- .../datasources/FileSourceStrategy.scala | 2 +- .../datasources/LogicalRelation.scala | 12 ++++--- .../PruneFileSourcePartitions.scala | 1 + .../sql/execution/datasources/rules.scala | 10 +++--- .../streaming/FileStreamSource.scala | 2 +- .../streaming/RateSourceProvider.scala | 5 +-- .../execution/streaming/StreamExecution.scala | 3 ++ .../sql/execution/streaming/memory.scala | 31 +++++++++++++++---- .../OptimizeMetadataOnlyQuerySuite.scala | 4 +-- .../sql/execution/SparkPlannerSuite.scala | 2 +- .../datasources/FileSourceStrategySuite.scala | 2 +- .../parquet/ParquetFilterSuite.scala | 3 +- .../ParquetPartitionDiscoverySuite.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- .../spark/sql/sources/FilteredScanSuite.scala | 2 +- .../spark/sql/sources/PathOptionSuite.scala | 2 +- .../sql/streaming/FileStreamSinkSuite.scala | 2 +- .../sql/streaming/FileStreamSourceSuite.scala | 5 ++- .../spark/sql/streaming/StreamSuite.scala | 12 ++++++- .../streaming/StreamingAggregationSuite.scala | 29 +++++++++++++++-- .../sql/streaming/StreamingQuerySuite.scala | 5 ++- .../test/DataStreamReaderWriterSuite.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 2 +- .../sql/hive/execution/SQLQuerySuite.scala | 2 +- .../spark/sql/hive/orc/OrcFilterSuite.scala | 4 +-- .../apache/spark/sql/hive/parquetSuites.scala | 8 ++--- 46 files changed, 180 insertions(+), 97 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 7ac183776e..e9cff04ba5 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -310,7 +310,7 @@ private[kafka010] class KafkaSource( currentPartitionOffsets = Some(untilPartitionOffsets) } - sqlContext.internalCreateDataFrame(rdd, schema) + sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) } /** Stop this source and free any resources it has allocated. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e2d7164d93..75d83bc6e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1175,14 +1175,14 @@ object DecimalAggregates extends Rule[LogicalPlan] { */ object ConvertToLocalRelation extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case Project(projectList, LocalRelation(output, data)) + case Project(projectList, LocalRelation(output, data, isStreaming)) if !projectList.exists(hasUnevaluableExpr) => val projection = new InterpretedProjection(projectList, output) projection.initialize(0) - LocalRelation(projectList.map(_.toAttribute), data.map(projection)) + LocalRelation(projectList.map(_.toAttribute), data.map(projection), isStreaming) - case Limit(IntegerLiteral(limit), LocalRelation(output, data)) => - LocalRelation(output, data.take(limit)) + case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) => + LocalRelation(output, data.take(limit), isStreaming) } private def hasUnevaluableExpr(expr: Expression): Boolean = { @@ -1207,7 +1207,7 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] { */ object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case Deduplicate(keys, child, streaming) if !streaming => + case Deduplicate(keys, child) if !child.isStreaming => val keyExprIds = keys.map(_.exprId) val aggCols = child.output.map { attr => if (keyExprIds.contains(attr.exprId)) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index 1c986fbde7..7a21183664 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -43,7 +43,10 @@ object LocalRelation { } } -case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) +case class LocalRelation(output: Seq[Attribute], + data: Seq[InternalRow] = Nil, + // Indicates whether this relation has data from a streaming source. + override val isStreaming: Boolean = false) extends LeafNode with analysis.MultiInstanceRelation { // A local relation must have resolved output. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 9b440cd99f..d893b392e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -47,7 +47,7 @@ abstract class LogicalPlan */ def analyzed: Boolean = _analyzed - /** Returns true if this subtree contains any streaming data sources. */ + /** Returns true if this subtree has data from a streaming data source. */ def isStreaming: Boolean = children.exists(_.isStreaming == true) /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 303014e0b8..4b3054dbfe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -429,9 +429,10 @@ case class Sort( /** Factory for constructing new `Range` nodes. */ object Range { - def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]): Range = { + def apply(start: Long, end: Long, step: Long, + numSlices: Option[Int], isStreaming: Boolean = false): Range = { val output = StructType(StructField("id", LongType, nullable = false) :: Nil).toAttributes - new Range(start, end, step, numSlices, output) + new Range(start, end, step, numSlices, output, isStreaming) } def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = { Range(start, end, step, Some(numSlices)) @@ -443,7 +444,8 @@ case class Range( end: Long, step: Long, numSlices: Option[Int], - output: Seq[Attribute]) + output: Seq[Attribute], + override val isStreaming: Boolean) extends LeafNode with MultiInstanceRelation { require(step != 0, s"step ($step) cannot be 0") @@ -784,8 +786,7 @@ case class OneRowRelation() extends LeafNode { /** A logical plan for `dropDuplicates`. */ case class Deduplicate( keys: Seq[Attribute], - child: LogicalPlan, - streaming: Boolean) extends UnaryNode { + child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala index d0fe815052..9e99c8e11c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala @@ -93,7 +93,7 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter { val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType)))) val withTimeZone = ResolveTimeZone(conf).apply(table) - val LocalRelation(output, data) = ResolveInlineTables(conf).apply(withTimeZone) + val LocalRelation(output, data, _) = ResolveInlineTables(conf).apply(withTimeZone) val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType) .withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long] assert(output.map(_.dataType) == Seq(TimestampType)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index f68d930f60..4de75866e0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -368,18 +368,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite { Aggregate( Seq(attributeWithWatermark), aggExprs("c"), - Deduplicate(Seq(att), streamRelation, streaming = true)), + Deduplicate(Seq(att), streamRelation)), outputMode = Append) assertNotSupportedInStreamingPlan( "Deduplicate - Deduplicate on streaming relation after aggregation", - Deduplicate(Seq(att), Aggregate(Nil, aggExprs("c"), streamRelation), streaming = true), + Deduplicate(Seq(att), Aggregate(Nil, aggExprs("c"), streamRelation)), outputMode = Complete, expectedMsgs = Seq("dropDuplicates")) assertSupportedInStreamingPlan( "Deduplicate - Deduplicate on batch relation inside a streaming query", - Deduplicate(Seq(att), batchRelation, streaming = false), + Deduplicate(Seq(att), batchRelation), outputMode = Append ) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala index e68423f85c..85988d2fb9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala @@ -79,7 +79,7 @@ class ReplaceOperatorSuite extends PlanTest { val input = LocalRelation('a.int, 'b.int) val attrA = input.output(0) val attrB = input.output(1) - val query = Deduplicate(Seq(attrA), input, streaming = false) // dropDuplicates("a") + val query = Deduplicate(Seq(attrA), input) // dropDuplicates("a") val optimized = Optimize.execute(query.analyze) val correctAnswer = @@ -95,9 +95,9 @@ class ReplaceOperatorSuite extends PlanTest { } test("don't replace streaming Deduplicate") { - val input = LocalRelation('a.int, 'b.int) + val input = LocalRelation(Seq('a.int, 'b.int), isStreaming = true) val attrA = input.output(0) - val query = Deduplicate(Seq(attrA), input, streaming = true) // dropDuplicates("a") + val query = Deduplicate(Seq(attrA), input) // dropDuplicates("a") val optimized = Optimize.execute(query.analyze) comparePlans(optimized, query) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala index cc86f1f6e2..cdf912df7c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala @@ -73,10 +73,8 @@ class LogicalPlanSuite extends SparkFunSuite { test("isStreaming") { val relation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)()) - val incrementalRelation = new LocalRelation( - Seq(AttributeReference("a", IntegerType, nullable = true)())) { - override def isStreaming(): Boolean = true - } + val incrementalRelation = LocalRelation( + Seq(AttributeReference("a", IntegerType, nullable = true)()), isStreaming = true) case class TestBinaryRelation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { override def output: Seq[Attribute] = left.output ++ right.output diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 10b28ce812..41cb019499 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -410,7 +410,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { Dataset.ofRows( sparkSession, - LogicalRDD(schema.toAttributes, parsed)(sparkSession)) + LogicalRDD(schema.toAttributes, parsed, isStreaming = jsonDataset.isStreaming)(sparkSession)) } /** @@ -473,7 +473,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { Dataset.ofRows( sparkSession, - LogicalRDD(schema.toAttributes, parsed)(sparkSession)) + LogicalRDD(schema.toAttributes, parsed, isStreaming = csvDataset.isStreaming)(sparkSession)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 877051a60e..cca93525d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -371,14 +371,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { case (true, SaveMode.Overwrite) => // Get all input data source or hive relations of the query. val srcRelations = df.logicalPlan.collect { - case LogicalRelation(src: BaseRelation, _, _) => src + case LogicalRelation(src: BaseRelation, _, _, _) => src case relation: HiveTableRelation => relation.tableMeta.identifier } val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed EliminateSubqueryAliases(tableRelation) match { // check if the table is a data source table (the relation is a BaseRelation). - case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) => + case LogicalRelation(dest: BaseRelation, _, _, _) if srcRelations.contains(dest) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") // check hive table relation when overwrite mode diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 615686ccbe..c6707396af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -569,7 +569,8 @@ class Dataset[T] private[sql]( logicalPlan.output, internalRdd, outputPartitioning, - physicalPlan.outputOrdering + physicalPlan.outputOrdering, + isStreaming )(sparkSession)).as[T] } @@ -2233,7 +2234,7 @@ class Dataset[T] private[sql]( } cols } - Deduplicate(groupCols, logicalPlan, isStreaming) + Deduplicate(groupCols, logicalPlan) } /** @@ -2993,7 +2994,7 @@ class Dataset[T] private[sql]( */ def inputFiles: Array[String] = { val files: Seq[String] = queryExecution.optimizedPlan.collect { - case LogicalRelation(fsBasedRelation: FileRelation, _, _) => + case LogicalRelation(fsBasedRelation: FileRelation, _, _, _) => fsBasedRelation.inputFiles case fr: FileRelation => fr.inputFiles diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 7fde6e9469..af6018472c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -420,8 +420,11 @@ class SQLContext private[sql](val sparkSession: SparkSession) * converted to Catalyst rows. */ private[sql] - def internalCreateDataFrame(catalystRows: RDD[InternalRow], schema: StructType) = { - sparkSession.internalCreateDataFrame(catalystRows, schema) + def internalCreateDataFrame( + catalystRows: RDD[InternalRow], + schema: StructType, + isStreaming: Boolean = false) = { + sparkSession.internalCreateDataFrame(catalystRows, schema, isStreaming) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 6dfe8a66ba..863c316bba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -564,10 +564,14 @@ class SparkSession private( */ private[sql] def internalCreateDataFrame( catalystRows: RDD[InternalRow], - schema: StructType): DataFrame = { + schema: StructType, + isStreaming: Boolean = false): DataFrame = { // TODO: use MutableProjection when rowRDD is another DataFrame and the applied // schema differs from the existing schema on any field data type. - val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) + val logicalPlan = LogicalRDD( + schema.toAttributes, + catalystRows, + isStreaming = isStreaming)(self) Dataset.ofRows(self, logicalPlan) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index dcb918eeb9..f355550818 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -125,7 +125,8 @@ case class LogicalRDD( output: Seq[Attribute], rdd: RDD[InternalRow], outputPartitioning: Partitioning = UnknownPartitioning(0), - outputOrdering: Seq[SortOrder] = Nil)(session: SparkSession) + outputOrdering: Seq[SortOrder] = Nil, + override val isStreaming: Boolean = false)(session: SparkSession) extends LeafNode with MultiInstanceRelation { override protected final def otherCopyArgs: Seq[AnyRef] = session :: Nil @@ -150,11 +151,12 @@ case class LogicalRDD( output.map(rewrite), rdd, rewrittenPartitioning, - rewrittenOrdering + rewrittenOrdering, + isStreaming )(session).asInstanceOf[this.type] } - override protected def stringArgs: Iterator[Any] = Iterator(output) + override protected def stringArgs: Iterator[Any] = Iterator(output, isStreaming) override def computeStats(): Statistics = Statistics( // TODO: Instead of returning a default value here, find a way to return a meaningful size diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index 301c4f0264..18f6f697bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -94,10 +94,10 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic child transform { case plan if plan eq relation => relation match { - case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) => + case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, isStreaming) => val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) val partitionData = fsRelation.location.listFiles(Nil, Nil) - LocalRelation(partAttrs, partitionData.map(_.values)) + LocalRelation(partAttrs, partitionData.map(_.values), isStreaming) case relation: HiveTableRelation => val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) @@ -130,7 +130,7 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic object PartitionedRelation { def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = plan match { - case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) + case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _) if fsRelation.partitionSchema.nonEmpty => val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) Some((AttributeSet(partAttrs), l)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c115cb6e80..6b16408e27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -221,12 +221,14 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } /** - * Used to plan aggregation queries that are computed incrementally as part of a + * Used to plan streaming aggregation queries that are computed incrementally as part of a * [[StreamingQuery]]. Currently this rule is injected into the planner * on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]] */ object StatefulAggregationStrategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case _ if !plan.isStreaming => Nil + case EventTimeWatermark(columnName, delay, child) => EventTimeWatermarkExec(columnName, delay, planLater(child)) :: Nil @@ -248,7 +250,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object StreamingDeduplicationStrategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case Deduplicate(keys, child, true) => + case Deduplicate(keys, child) if child.isStreaming => StreamingDeduplicateExec(keys, planLater(child)) :: Nil case _ => Nil @@ -410,7 +412,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.window.WindowExec(windowExprs, partitionSpec, orderSpec, planLater(child)) :: Nil case logical.Sample(lb, ub, withReplacement, seed, child) => execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil - case logical.LocalRelation(output, data) => + case logical.LocalRelation(output, data, _) => LocalTableScanExec(output, data) :: Nil case logical.LocalLimit(IntegerLiteral(limit), child) => execution.LocalLimitExec(limit, planLater(child)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 567ff49773..b9502a95a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -455,7 +455,7 @@ case class DataSource( val fileIndex = catalogTable.map(_.identifier).map { tableIdent => sparkSession.table(tableIdent).queryExecution.analyzed.collect { - case LogicalRelation(t: HadoopFsRelation, _, _) => t.location + case LogicalRelation(t: HadoopFsRelation, _, _, _) => t.location }.head } // For partitioned relation r, r.schema's column ordering can be different from the column diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 2370177427..0deac1984b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -136,12 +136,12 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast if query.resolved && DDLUtils.isDatasourceTable(tableDesc) => CreateDataSourceTableAsSelectCommand(tableDesc, mode, query) - case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _), + case InsertIntoTable(l @ LogicalRelation(_: InsertableRelation, _, _, _), parts, query, overwrite, false) if parts.isEmpty => InsertIntoDataSourceCommand(l, query, overwrite) case i @ InsertIntoTable( - l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, _) => + l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, query, overwrite, _) => // If the InsertIntoTable command is for a partitioned HadoopFsRelation and // the user has specified static partitions, we add a Project operator on top of the query // to include those constant column values in the query result. @@ -177,7 +177,7 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast val outputPath = t.location.rootPaths.head val inputPaths = actualQuery.collect { - case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths + case LogicalRelation(r: HadoopFsRelation, _, _, _) => r.location.rootPaths }.flatten val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append @@ -268,7 +268,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with import DataSourceStrategy._ def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) => pruneFilterProjectRaw( l, projects, @@ -276,21 +276,22 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with (requestedColumns, allPredicates, _) => toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _)) => + case PhysicalOperation(projects, filters, + l @ LogicalRelation(t: PrunedFilteredScan, _, _, _)) => pruneFilterProject( l, projects, filters, (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil - case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _, _)) => pruneFilterProject( l, projects, filters, (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil - case l @ LogicalRelation(baseRelation: TableScan, _, _) => + case l @ LogicalRelation(baseRelation: TableScan, _, _, _) => RowDataSourceScanExec( l.output, l.output.indices, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 17f7e0e601..16b22717b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -52,7 +52,7 @@ import org.apache.spark.sql.execution.SparkPlan object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projects, filters, - l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table)) => + l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table, _)) => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: // - partition keys only - used to prune directories to read diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 699f1bad9c..17a61074d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -30,12 +30,14 @@ import org.apache.spark.util.Utils case class LogicalRelation( relation: BaseRelation, output: Seq[AttributeReference], - catalogTable: Option[CatalogTable]) + catalogTable: Option[CatalogTable], + override val isStreaming: Boolean) extends LeafNode with MultiInstanceRelation { // Logical Relations are distinct if they have different output for the sake of transformations. override def equals(other: Any): Boolean = other match { - case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output + case l @ LogicalRelation(otherRelation, _, _, isStreaming) => + relation == otherRelation && output == l.output && isStreaming == l.isStreaming case _ => false } @@ -76,9 +78,9 @@ case class LogicalRelation( } object LogicalRelation { - def apply(relation: BaseRelation): LogicalRelation = - LogicalRelation(relation, relation.schema.toAttributes, None) + def apply(relation: BaseRelation, isStreaming: Boolean = false): LogicalRelation = + LogicalRelation(relation, relation.schema.toAttributes, None, isStreaming) def apply(relation: BaseRelation, table: CatalogTable): LogicalRelation = - LogicalRelation(relation, relation.schema.toAttributes, Some(table)) + LogicalRelation(relation, relation.schema.toAttributes, Some(table), false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index f5df1848a3..3b830accb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -36,6 +36,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { _, _), _, + _, _)) if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => // The attribute name of predicate could be different than the one in schema in case of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 84acca242a..7a2c85e8e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -385,10 +385,10 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] wit case relation: HiveTableRelation => val metadata = relation.tableMeta preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames) - case LogicalRelation(h: HadoopFsRelation, _, catalogTable) => + case LogicalRelation(h: HadoopFsRelation, _, catalogTable, _) => val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") preprocess(i, tblName, h.partitionSchema.map(_.name)) - case LogicalRelation(_: InsertableRelation, _, catalogTable) => + case LogicalRelation(_: InsertableRelation, _, catalogTable, _) => val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") preprocess(i, tblName, Nil) case _ => i @@ -428,7 +428,7 @@ object PreReadCheck extends (LogicalPlan => Unit) { private def checkNumInputFileBlockSources(e: Expression, operator: LogicalPlan): Int = { operator match { case _: HiveTableRelation => 1 - case _ @ LogicalRelation(_: HadoopFsRelation, _, _) => 1 + case _ @ LogicalRelation(_: HadoopFsRelation, _, _, _) => 1 case _: LeafNode => 0 // UNION ALL has multiple children, but these children do not concurrently use InputFileBlock. case u: Union => @@ -454,10 +454,10 @@ object PreWriteCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { - case InsertIntoTable(l @ LogicalRelation(relation, _, _), partition, query, _, _) => + case InsertIntoTable(l @ LogicalRelation(relation, _, _, _), partition, query, _, _) => // Get all input data source relations of the query. val srcRelations = query.collect { - case LogicalRelation(src, _, _) => src + case LogicalRelation(src, _, _, _) => src } if (srcRelations.contains(relation)) { failAnalysis("Cannot insert into table that is also being read from.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 4b1b252039..f17417343e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -171,7 +171,7 @@ class FileStreamSource( className = fileFormatClassName, options = optionsWithPartitionBasePath) Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation( - checkFilesExist = false))) + checkFilesExist = false), isStreaming = true)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala index e76d4dc612..077a4778e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala @@ -200,7 +200,8 @@ class RateStreamSource( s"rangeStart: $rangeStart, rangeEnd: $rangeEnd") if (rangeStart == rangeEnd) { - return sqlContext.internalCreateDataFrame(sqlContext.sparkContext.emptyRDD, schema) + return sqlContext.internalCreateDataFrame( + sqlContext.sparkContext.emptyRDD, schema, isStreaming = true) } val localStartTimeMs = startTimeMs + TimeUnit.SECONDS.toMillis(startSeconds) @@ -211,7 +212,7 @@ class RateStreamSource( val relative = math.round((v - rangeStart) * relativeMsPerValue) InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), v) } - sqlContext.internalCreateDataFrame(rdd, schema) + sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) } override def stop(): Unit = {} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 9bc114f138..432b2d4925 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -609,6 +609,9 @@ class StreamExecution( if committedOffsets.get(source).map(_ != available).getOrElse(true) => val current = committedOffsets.get(source) val batch = source.getBatch(current, available) + assert(batch.isStreaming, + s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" + + s"${batch.queryExecution.logical}") logDebug(s"Retrieving data from $source: $current -> $available") Some(source -> batch) case _ => None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 587ae2bfb6..c9784c093b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.util.control.NonFatal @@ -27,13 +29,14 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, Statistics} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils + object MemoryStream { protected val currentBlockId = new AtomicInteger(0) protected val memoryStreamId = new AtomicInteger(0) @@ -44,7 +47,7 @@ object MemoryStream { /** * A [[Source]] that produces value stored in memory as they are added by the user. This [[Source]] - * is primarily intended for use in unit tests as it can only replay data when the object is still + * is intended for use in unit tests as it can only replay data when the object is still * available. */ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) @@ -85,8 +88,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } def addData(data: TraversableOnce[A]): Offset = { - import sqlContext.implicits._ - val ds = data.toVector.toDS() + val encoded = data.toVector.map(d => encoder.toRow(d).copy()) + val plan = new LocalRelation(schema.toAttributes, encoded, isStreaming = true) + val ds = Dataset[A](sqlContext.sparkSession, plan) logDebug(s"Adding ds: $ds") this.synchronized { currentOffset = currentOffset + 1 @@ -118,8 +122,8 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) batches.slice(sliceStart, sliceEnd) } - logDebug( - s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}") + logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal)) + newBlocks .map(_.toDF()) .reduceOption(_ union _) @@ -128,6 +132,21 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } + private def generateDebugString( + blocks: TraversableOnce[Dataset[A]], + startOrdinal: Int, + endOrdinal: Int): String = { + val originalUnsupportedCheck = + sqlContext.getConf("spark.sql.streaming.unsupportedOperationCheck") + try { + sqlContext.setConf("spark.sql.streaming.unsupportedOperationCheck", "false") + s"MemoryBatch [$startOrdinal, $endOrdinal]: " + + s"${blocks.flatMap(_.collect()).mkString(", ")}" + } finally { + sqlContext.setConf("spark.sql.streaming.unsupportedOperationCheck", originalUnsupportedCheck) + } + } + override def commit(end: Offset): Unit = synchronized { def check(newOffset: LongOffset): Unit = { val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala index 58c310596c..223c3d7729 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala @@ -42,14 +42,14 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext { private def assertMetadataOnlyQuery(df: DataFrame): Unit = { val localRelations = df.queryExecution.optimizedPlan.collect { - case l @ LocalRelation(_, _) => l + case l @ LocalRelation(_, _, _) => l } assert(localRelations.size == 1) } private def assertNotMetadataOnlyQuery(df: DataFrame): Unit = { val localRelations = df.queryExecution.optimizedPlan.collect { - case l @ LocalRelation(_, _) => l + case l @ LocalRelation(_, _, _) => l } assert(localRelations.size == 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala index aecfd30621..5828f9783d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala @@ -40,7 +40,7 @@ class SparkPlannerSuite extends SharedSQLContext { case Union(children) => planned += 1 UnionExec(children.map(planLater)) :: planLater(NeverPlanned) :: Nil - case LocalRelation(output, data) => + case LocalRelation(output, data, _) => planned += 1 LocalTableScanExec(output, data) :: planLater(NeverPlanned) :: Nil case NeverPlanned => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index d77f0c298f..c1d61b843d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -556,7 +556,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi if (buckets > 0) { val bucketed = df.queryExecution.analyzed transform { - case l @ LogicalRelation(r: HadoopFsRelation, _, _) => + case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) => l.copy(relation = r.copy(bucketSpec = Some(BucketSpec(numBuckets = buckets, "c1" :: Nil, Nil)))(r.sparkSession)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index c43c1ec8b9..28e8521b35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -63,7 +63,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex var maybeRelation: Option[HadoopFsRelation] = None val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(relation: HadoopFsRelation, _, _)) => + case PhysicalOperation(_, filters, + LogicalRelation(relation: HadoopFsRelation, _, _, _)) => maybeRelation = Some(relation) filters }.flatten.reduceLeftOption(_ && _) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 2f5fd8438f..837a0872d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -651,7 +651,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution queryExecution.analyzed.collectFirst { case LogicalRelation( - HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _) => + HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _, _) => assert(location.partitionSpec() === PartitionSpec.emptySpec) }.getOrElse { fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 8dc11d80c3..f951b46e4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -247,7 +247,7 @@ class JDBCSuite extends SparkFunSuite // Check whether the tables are fetched in the expected degree of parallelism def checkNumPartitions(df: DataFrame, expectedNumPartitions: Int): Unit = { val jdbcRelations = df.queryExecution.analyzed.collect { - case LogicalRelation(r: JDBCRelation, _, _) => r + case LogicalRelation(r: JDBCRelation, _, _, _) => r } assert(jdbcRelations.length == 1) assert(jdbcRelations.head.parts.length == expectedNumPartitions, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index fe9469b49e..c45b507d2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -327,7 +327,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic val table = spark.table("oneToTenFiltered") val relation = table.queryExecution.logical.collectFirst { - case LogicalRelation(r, _, _) => r + case LogicalRelation(r, _, _, _) => r }.get assert( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala index 3fd7a5be1d..85da3f0e38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PathOptionSuite.scala @@ -135,7 +135,7 @@ class PathOptionSuite extends DataSourceTest with SharedSQLContext { private def getPathOption(tableName: String): Option[String] = { spark.table(tableName).queryExecution.analyzed.collect { - case LogicalRelation(r: TestOptionsRelation, _, _) => r.pathOption + case LogicalRelation(r: TestOptionsRelation, _, _, _) => r.pathOption }.head } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index a5cf40c358..08db06b949 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -127,7 +127,7 @@ class FileStreamSinkSuite extends StreamTest { // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has // been inferred val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect { - case LogicalRelation(baseRelation: HadoopFsRelation, _, _) => baseRelation + case LogicalRelation(baseRelation: HadoopFsRelation, _, _, _) => baseRelation } assert(hadoopdFsRelations.size === 1) assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index e2ec690d90..b6baaed192 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1105,7 +1105,10 @@ class FileStreamSourceSuite extends FileStreamSourceTest { def verify(startId: Option[Int], endId: Int, expected: String*): Unit = { val start = startId.map(new FileStreamSourceOffset(_)) val end = FileStreamSourceOffset(endId) - assert(fileSource.getBatch(start, end).as[String].collect().toSeq === expected) + + withSQLConf("spark.sql.streaming.unsupportedOperationCheck" -> "false") { + assert(fileSource.getBatch(start, end).as[String].collect().toSeq === expected) + } } verify(startId = None, endId = 2, "keep1", "keep2", "keep3") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 6f7b9d35a6..012cccfdd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.streaming._ @@ -728,7 +729,16 @@ class FakeDefaultSource extends FakeSource { override def getBatch(start: Option[Offset], end: Offset): DataFrame = { val startOffset = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L) + 1 - spark.range(startOffset, end.asInstanceOf[LongOffset].offset + 1).toDF("a") + val ds = new Dataset[java.lang.Long]( + spark.sparkSession, + Range( + startOffset, + end.asInstanceOf[LongOffset].offset + 1, + 1, + Some(spark.sparkSession.sparkContext.defaultParallelism), + isStreaming = true), + Encoders.LONG) + ds.toDF("a") } override def stop() {} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index b6e82b621c..e0979ce296 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -19,10 +19,12 @@ package org.apache.spark.sql.streaming import java.util.{Locale, TimeZone} +import org.scalatest.Assertions import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, DataFrame} +import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming._ @@ -31,12 +33,14 @@ import org.apache.spark.sql.expressions.scalalang.typed import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.OutputMode._ import org.apache.spark.sql.streaming.util.StreamManualClock +import org.apache.spark.sql.types.StructType object FailureSinglton { var firstTime = true } -class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfterAll { +class StreamingAggregationSuite extends StateStoreMetricsTest + with BeforeAndAfterAll with Assertions { override def afterAll(): Unit = { super.afterAll() @@ -356,4 +360,25 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte CheckLastBatch((90L, 1), (100L, 1), (105L, 1)) ) } + + test("SPARK-19690: do not convert batch aggregation in streaming query to streaming") { + val streamInput = MemoryStream[Int] + val batchDF = Seq(1, 2, 3, 4, 5) + .toDF("value") + .withColumn("parity", 'value % 2) + .groupBy('parity) + .agg(count("*") as 'joinValue) + val joinDF = streamInput + .toDF() + .join(batchDF, 'value === 'parity) + + // make sure we're planning an aggregate in the first place + assert(batchDF.queryExecution.optimizedPlan match { case _: Aggregate => true }) + + testStream(joinDF, Append)( + AddData(streamInput, 0, 1, 2, 3), + CheckLastBatch((0, 0, 2), (1, 1, 3)), + AddData(streamInput, 0, 1, 2, 3), + CheckLastBatch((0, 0, 2), (1, 1, 3))) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 27ea6902fa..969f594edf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -647,7 +647,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi val source = new Source() { override def schema: StructType = triggerDF.schema override def getOffset: Option[Offset] = Some(LongOffset(0)) - override def getBatch(start: Option[Offset], end: Offset): DataFrame = triggerDF + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + sqlContext.internalCreateDataFrame( + triggerDF.queryExecution.toRdd, triggerDF.schema, isStreaming = true) + } override def stop(): Unit = {} } StreamingExecutionRelation(source) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index e8a6202b8a..aa163d2211 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -88,7 +88,7 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { override def getBatch(start: Option[Offset], end: Offset): DataFrame = { import spark.implicits._ - Seq[Int]().toDS().toDF() + spark.internalCreateDataFrame(spark.sparkContext.emptyRDD, schema, isStreaming = true) } override def stop() {} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 8bab059ed5..f0f2c49349 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -73,7 +73,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log catalogProxy.getCachedTable(tableIdentifier) match { case null => None // Cache miss - case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => + case logical @ LogicalRelation(relation: HadoopFsRelation, _, _, _) => val cachedRelationFileFormatClass = relation.fileFormat.getClass expectedFileFormat match { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index e01198dd53..83cee5d1b8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -583,7 +583,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv Row(3) :: Row(4) :: Nil) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(p: HadoopFsRelation, _, _) => // OK + case LogicalRelation(p: HadoopFsRelation, _, _, _) => // OK case _ => fail(s"test_parquet_ctas should have be converted to ${classOf[HadoopFsRelation]}") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 02cfa02a37..d2a6ef7b2b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -411,7 +411,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { val catalogTable = sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) relation match { - case LogicalRelation(r: HadoopFsRelation, _, _) => + case LogicalRelation(r: HadoopFsRelation, _, _, _) => if (!isDataSourceTable) { fail( s"${classOf[HiveTableRelation].getCanonicalName} is expected, but found " + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala index 222c24927a..de6f0d67f1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala @@ -45,7 +45,7 @@ class OrcFilterSuite extends QueryTest with OrcTest { var maybeRelation: Option[HadoopFsRelation] = None val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _)) => + case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) => maybeRelation = Some(orcRelation) filters }.flatten.reduceLeftOption(_ && _) @@ -89,7 +89,7 @@ class OrcFilterSuite extends QueryTest with OrcTest { var maybeRelation: Option[HadoopFsRelation] = None val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect { - case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _)) => + case PhysicalOperation(_, filters, LogicalRelation(orcRelation: HadoopFsRelation, _, _, _)) => maybeRelation = Some(orcRelation) filters }.flatten.reduceLeftOption(_ && _) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 303884da19..740e083735 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -285,7 +285,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { ) table("test_parquet_ctas").queryExecution.optimizedPlan match { - case LogicalRelation(_: HadoopFsRelation, _, _) => // OK + case LogicalRelation(_: HadoopFsRelation, _, _, _) => // OK case _ => fail( "test_parquet_ctas should be converted to " + s"${classOf[HadoopFsRelation ].getCanonicalName }") @@ -370,7 +370,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { assertResult(2) { analyzed.collect { - case r @ LogicalRelation(_: HadoopFsRelation, _, _) => r + case r @ LogicalRelation(_: HadoopFsRelation, _, _, _) => r }.size } } @@ -379,7 +379,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { def collectHadoopFsRelation(df: DataFrame): HadoopFsRelation = { val plan = df.queryExecution.analyzed plan.collectFirst { - case LogicalRelation(r: HadoopFsRelation, _, _) => r + case LogicalRelation(r: HadoopFsRelation, _, _, _) => r }.getOrElse { fail(s"Expecting a HadoopFsRelation 2, but got:\n$plan") } @@ -459,7 +459,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { // Converted test_parquet should be cached. getCachedDataSourceTable(tableIdentifier) match { case null => fail("Converted test_parquet should be cached in the cache.") - case LogicalRelation(_: HadoopFsRelation, _, _) => // OK + case LogicalRelation(_: HadoopFsRelation, _, _, _) => // OK case other => fail( "The cached test_parquet should be a Parquet Relation. " +