From e635cbb6e61dee450db0ef45f3d82ac282a85d3c Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Mon, 16 Jan 2017 19:11:21 +0800 Subject: [PATCH] [SPARK-18801][SQL][FOLLOWUP] Alias the view with its child ## What changes were proposed in this pull request? This PR is a follow-up to address the comments https://github.com/apache/spark/pull/16233/files#r95669988 and https://github.com/apache/spark/pull/16233/files#r95662299. We try to wrap the child by: 1. Generate the `queryOutput` by: 1.1. If the query column names are defined, map the column names to attributes in the child output by name; 1.2. Else set the child output attributes to `queryOutput`. 2. Map the `queryQutput` to view output by index, if the corresponding attributes don't match, try to up cast and alias the attribute in `queryOutput` to the attribute in the view output. 3. Add a Project over the child, with the new output generated by the previous steps. If the view output doesn't have the same number of columns neither with the child output, nor with the query column names, throw an AnalysisException. ## How was this patch tested? Add new test cases in `SQLViewSuite`. Author: jiangxingbo Closes #16561 from jiangxb1987/alias-view. --- .../sql/catalyst/analysis/Analyzer.scala | 24 +--- .../spark/sql/catalyst/analysis/view.scala | 66 +++++++++-- .../sql/catalyst/catalog/interface.scala | 49 +++++++- .../spark/sql/catalyst/expressions/Cast.scala | 21 +++- .../sql/hive/execution/SQLViewSuite.scala | 111 ++++++++++++++---- 5 files changed, 214 insertions(+), 57 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1957df89e6..bd9037ec43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2281,12 +2281,6 @@ class Analyzer( "type of the field in the target object") } - private def illegalNumericPrecedence(from: DataType, to: DataType): Boolean = { - val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from) - val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to) - toPrecedence > 0 && fromPrecedence > toPrecedence - } - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case p if !p.childrenResolved => p case p if p.resolved => p @@ -2294,19 +2288,11 @@ class Analyzer( case p => p transformExpressions { case u @ UpCast(child, _, _) if !child.resolved => u - case UpCast(child, dataType, walkedTypePath) => (child.dataType, dataType) match { - case (from: NumericType, to: DecimalType) if !to.isWiderThan(from) => - fail(child, to, walkedTypePath) - case (from: DecimalType, to: NumericType) if !from.isTighterThan(to) => - fail(child, to, walkedTypePath) - case (from, to) if illegalNumericPrecedence(from, to) => - fail(child, to, walkedTypePath) - case (TimestampType, DateType) => - fail(child, DateType, walkedTypePath) - case (StringType, to: NumericType) => - fail(child, to, walkedTypePath) - case _ => Cast(child, dataType.asNullable) - } + case UpCast(child, dataType, walkedTypePath) + if Cast.mayTruncate(child.dataType, dataType) => + fail(child, dataType, walkedTypePath) + + case UpCast(child, dataType, walkedTypePath) => Cast(child, dataType.asNullable) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala index 737f846ef4..a5640a6c96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala @@ -28,22 +28,60 @@ import org.apache.spark.sql.catalyst.rules.Rule */ /** - * Make sure that a view's child plan produces the view's output attributes. We wrap the child - * with a Project and add an alias for each output attribute. The attributes are resolved by - * name. This should be only done after the batch of Resolution, because the view attributes are - * not completely resolved during the batch of Resolution. + * Make sure that a view's child plan produces the view's output attributes. We try to wrap the + * child by: + * 1. Generate the `queryOutput` by: + * 1.1. If the query column names are defined, map the column names to attributes in the child + * output by name(This is mostly for handling view queries like SELECT * FROM ..., the + * schema of the referenced table/view may change after the view has been created, so we + * have to save the output of the query to `viewQueryColumnNames`, and restore them during + * view resolution, in this way, we are able to get the correct view column ordering and + * omit the extra columns that we don't require); + * 1.2. Else set the child output attributes to `queryOutput`. + * 2. Map the `queryQutput` to view output by index, if the corresponding attributes don't match, + * try to up cast and alias the attribute in `queryOutput` to the attribute in the view output. + * 3. Add a Project over the child, with the new output generated by the previous steps. + * If the view output doesn't have the same number of columns neither with the child output, nor + * with the query column names, throw an AnalysisException. + * + * This should be only done after the batch of Resolution, because the view attributes are not + * completely resolved during the batch of Resolution. */ case class AliasViewChild(conf: CatalystConf) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case v @ View(_, output, child) if child.resolved => + case v @ View(desc, output, child) if child.resolved && output != child.output => val resolver = conf.resolver - val newOutput = output.map { attr => - val originAttr = findAttributeByName(attr.name, child.output, resolver) - // The dataType of the output attributes may be not the same with that of the view output, - // so we should cast the attribute to the dataType of the view output attribute. If the - // cast can't perform, will throw an AnalysisException. - Alias(Cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId, - qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata)) + val queryColumnNames = desc.viewQueryColumnNames + val queryOutput = if (queryColumnNames.nonEmpty) { + // If the view output doesn't have the same number of columns with the query column names, + // throw an AnalysisException. + if (output.length != queryColumnNames.length) { + throw new AnalysisException( + s"The view output ${output.mkString("[", ",", "]")} doesn't have the same number of " + + s"columns with the query column names ${queryColumnNames.mkString("[", ",", "]")}") + } + desc.viewQueryColumnNames.map { colName => + findAttributeByName(colName, child.output, resolver) + } + } else { + // For view created before Spark 2.2.0, the view text is already fully qualified, the plan + // output is the same with the view output. + child.output + } + // Map the attributes in the query output to the attributes in the view output by index. + val newOutput = output.zip(queryOutput).map { + case (attr, originAttr) if attr != originAttr => + // The dataType of the output attributes may be not the same with that of the view + // output, so we should cast the attribute to the dataType of the view output attribute. + // Will throw an AnalysisException if the cast can't perform or might truncate. + if (Cast.mayTruncate(originAttr.dataType, attr.dataType)) { + throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " + + s"${originAttr.dataType.simpleString} to ${attr.simpleString} as it may truncate\n") + } else { + Alias(Cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId, + qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata)) + } + case (_, originAttr) => originAttr } v.copy(child = Project(newOutput, child)) } @@ -74,7 +112,9 @@ object EliminateView extends Rule[LogicalPlan] { // The child should have the same output attributes with the View operator, so we simply // remove the View operator. case View(_, output, child) => - assert(output == child.output, "The output of the child is different from the view output") + assert(output == child.output, + s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " + + s"view output ${output.mkString("[", ",", "]")}") child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index a9de10717e..2adccdd7bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -19,12 +19,15 @@ package org.apache.spark.sql.catalyst.catalog import java.util.Date +import scala.collection.mutable + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.StructType + /** @@ -178,6 +181,8 @@ case class CatalogTable( unsupportedFeatures: Seq[String] = Seq.empty, tracksPartitionsInCatalog: Boolean = false) { + import CatalogTable._ + /** schema of this table's partition columns */ def partitionSchema: StructType = StructType(schema.filter { c => partitionColumnNames.contains(c.name) @@ -198,9 +203,44 @@ case class CatalogTable( /** * Return the default database name we use to resolve a view, should be None if the CatalogTable - * is not a View. + * is not a View or created by older versions of Spark(before 2.2.0). */ - def viewDefaultDatabase: Option[String] = properties.get(CatalogTable.VIEW_DEFAULT_DATABASE) + def viewDefaultDatabase: Option[String] = properties.get(VIEW_DEFAULT_DATABASE) + + /** + * Return the output column names of the query that creates a view, the column names are used to + * resolve a view, should be empty if the CatalogTable is not a View or created by older versions + * of Spark(before 2.2.0). + */ + def viewQueryColumnNames: Seq[String] = { + for { + numCols <- properties.get(VIEW_QUERY_OUTPUT_NUM_COLUMNS).toSeq + index <- 0 until numCols.toInt + } yield properties.getOrElse( + s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", + throw new AnalysisException("Corrupted view query output column names in catalog: " + + s"$numCols parts expected, but part $index is missing.") + ) + } + + /** + * Insert/Update the view query output column names in `properties`. + */ + def withQueryColumnNames(columns: Seq[String]): CatalogTable = { + val props = new mutable.HashMap[String, String] + if (columns.nonEmpty) { + props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString) + columns.zipWithIndex.foreach { case (colName, index) => + props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName) + } + } + + // We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable, + // while `CatalogTable` should be serializable. + copy(properties = properties.filterNot { case (key, _) => + key.startsWith(VIEW_QUERY_OUTPUT_PREFIX) + } ++ props) + } /** Syntactic sugar to update a field in `storage`. */ def withNewStorage( @@ -254,6 +294,9 @@ case class CatalogTable( object CatalogTable { val VIEW_DEFAULT_DATABASE = "view.default.database" + val VIEW_QUERY_OUTPUT_PREFIX = "view.query.out." + val VIEW_QUERY_OUTPUT_NUM_COLUMNS = VIEW_QUERY_OUTPUT_PREFIX + "numCols" + val VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX = VIEW_QUERY_OUTPUT_PREFIX + "col." } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 14e275bf88..ad59271e5b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -21,7 +21,7 @@ import java.math.{BigDecimal => JavaBigDecimal} import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.types._ @@ -89,6 +89,25 @@ object Cast { case _ => false } + /** + * Return true iff we may truncate during casting `from` type to `to` type. e.g. long -> int, + * timestamp -> date. + */ + def mayTruncate(from: DataType, to: DataType): Boolean = (from, to) match { + case (from: NumericType, to: DecimalType) if !to.isWiderThan(from) => true + case (from: DecimalType, to: NumericType) if !from.isTighterThan(to) => true + case (from, to) if illegalNumericPrecedence(from, to) => true + case (TimestampType, DateType) => true + case (StringType, to: NumericType) => true + case _ => false + } + + private def illegalNumericPrecedence(from: DataType, to: DataType): Boolean = { + val fromPrecedence = TypeCoercion.numericPrecedence.indexOf(from) + val toPrecedence = TypeCoercion.numericPrecedence.indexOf(to) + toPrecedence > 0 && fromPrecedence > toPrecedence + } + def forceNullable(from: DataType, to: DataType): Boolean = (from, to) match { case (NullType, _) => true case (_, _) if from == to => false diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index e06d0ae045..9bc078dbb0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -553,18 +553,24 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { identifier = TableIdentifier("view1", Some(db)), tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "int").add("id1", "int"), + schema = new StructType().add("x", "long").add("y", "long"), viewOriginalText = Some("SELECT * FROM jt"), viewText = Some("SELECT * FROM jt"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default")) + properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", + CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) val view2 = CatalogTable( identifier = TableIdentifier("view2", Some(db)), tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "int").add("id1", "int"), + schema = new StructType().add("id", "long").add("id1", "long"), viewOriginalText = Some("SELECT * FROM view1"), viewText = Some("SELECT * FROM view1"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> db)) + properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> db, + CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "x", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "y")) activateDatabase(db) { hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false) hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false) @@ -583,7 +589,9 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { schema = new StructType().add("n", "int"), viewOriginalText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"), viewText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default")) + properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", + CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "1", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "n")) hiveContext.sessionState.catalog.createTable(cte_view, ignoreIfExists = false) checkAnswer(sql("SELECT * FROM cte_view"), Row(1)) } @@ -595,10 +603,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { identifier = TableIdentifier("join_view"), tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "int").add("id1", "int"), + schema = new StructType().add("id", "long").add("id1", "long"), viewOriginalText = Some("SELECT * FROM jt"), viewText = Some("SELECT * FROM jt"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default")) + properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", + CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) hiveContext.sessionState.catalog.createTable(join_view, ignoreIfExists = false) checkAnswer( sql("SELECT * FROM join_view t1 JOIN join_view t2 ON t1.id = t2.id ORDER BY t1.id"), @@ -620,10 +631,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { identifier = TableIdentifier("view1"), tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "int").add("id1", "int"), + schema = new StructType().add("id", "long").add("id1", "long"), viewOriginalText = Some("SELECT * FROM invalid_db.jt"), viewText = Some("SELECT * FROM invalid_db.jt"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default")) + properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", + CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false) assertInvalidReference("SELECT * FROM view1") @@ -632,10 +646,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { identifier = TableIdentifier("view2"), tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "int").add("id1", "int"), + schema = new StructType().add("id", "long").add("id1", "long"), viewOriginalText = Some("SELECT * FROM invalid_table"), viewText = Some("SELECT * FROM invalid_table"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default")) + properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", + CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false) assertInvalidReference("SELECT * FROM view2") @@ -644,10 +661,13 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { identifier = TableIdentifier("view3"), tableType = CatalogTableType.VIEW, storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "int").add("id1", "int"), + schema = new StructType().add("id", "long").add("id1", "long"), viewOriginalText = Some("SELECT * FROM view2"), viewText = Some("SELECT * FROM view2"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default")) + properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", + CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) hiveContext.sessionState.catalog.createTable(view3, ignoreIfExists = false) assertInvalidReference("SELECT * FROM view3") } @@ -680,21 +700,70 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("correctly handle type casting between view output and child output") { + test("resolve a view with custom column names") { withTable("testTable") { + spark.range(1, 10).selectExpr("id", "id + 1 id1").write.saveAsTable("testTable") withView("testView") { - spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("testTable") - sql("CREATE VIEW testView AS SELECT * FROM testTable") + val testView = CatalogTable( + identifier = TableIdentifier("testView"), + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("x", "long").add("y", "long"), + viewOriginalText = Some("SELECT * FROM testTable"), + viewText = Some("SELECT * FROM testTable"), + properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", + CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) + hiveContext.sessionState.catalog.createTable(testView, ignoreIfExists = false) + + // Correctly resolve a view with custom column names. + checkAnswer(sql("SELECT * FROM testView ORDER BY x"), (1 to 9).map(i => Row(i, i + 1))) + + // Correctly resolve a view when the referenced table schema changes. + spark.range(1, 10).selectExpr("id", "id + id dummy", "id + 1 id1") + .write.mode(SaveMode.Overwrite).saveAsTable("testTable") + checkAnswer(sql("SELECT * FROM testView ORDER BY x"), (1 to 9).map(i => Row(i, i + 1))) + + // Throw an AnalysisException if the column name is not found. + spark.range(1, 10).selectExpr("id", "id + 1 dummy") + .write.mode(SaveMode.Overwrite).saveAsTable("testTable") + intercept[AnalysisException](sql("SELECT * FROM testView")) + } + } + } + + test("resolve a view when the dataTypes of referenced table columns changed") { + withTable("testTable") { + spark.range(1, 10).selectExpr("id", "id + 1 id1").write.saveAsTable("testTable") + withView("testView") { + val testView = CatalogTable( + identifier = TableIdentifier("testView"), + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("id", "long").add("id1", "long"), + viewOriginalText = Some("SELECT * FROM testTable"), + viewText = Some("SELECT * FROM testTable"), + properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", + CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", + s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) + hiveContext.sessionState.catalog.createTable(testView, ignoreIfExists = false) // Allow casting from IntegerType to LongType - val df = (1 until 10).map(i => i).toDF("id1") + val df = (1 until 10).map(i => (i, i + 1)).toDF("id", "id1") df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable") - checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i))) + checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i + 1))) + + // Casting from DoubleType to LongType might truncate, throw an AnalysisException. + val df2 = (1 until 10).map(i => (i.toDouble, i.toDouble)).toDF("id", "id1") + df2.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable") + intercept[AnalysisException](sql("SELECT * FROM testView")) // Can't cast from ArrayType to LongType, throw an AnalysisException. - val df2 = (1 until 10).map(i => Seq(i)).toDF("id1") - df2.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable") - intercept[AnalysisException](sql("SELECT * FROM testView ORDER BY id1")) + val df3 = (1 until 10).map(i => (i, Seq(i))).toDF("id", "id1") + df3.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable") + intercept[AnalysisException](sql("SELECT * FROM testView")) } } }