From f14f3742e0c98dd306abf02e93d2f10d89bc423f Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Thu, 27 Aug 2020 06:52:34 +0000 Subject: [PATCH] [SPARK-32696][SQL][TEST-HIVE1.2][TEST-HADOOP2.7] Get columns operation should handle interval column properly ### What changes were proposed in this pull request? This PR let JDBC clients identify spark interval columns properly. ### Why are the changes needed? JDBC users can query interval values through thrift server, create views with interval columns, e.g. ```sql CREATE global temp view view1 as select interval 1 day as i; ``` but when they want to get the details of the columns of view1, the will fail with `Unrecognized type name: INTERVAL` ``` Caused by: java.lang.IllegalArgumentException: Unrecognized type name: INTERVAL at org.apache.hadoop.hive.serde2.thrift.Type.getType(Type.java:170) at org.apache.spark.sql.hive.thriftserver.ThriftserverShimUtils$.toJavaSQLType(ThriftserverShimUtils.scala:53) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.$anonfun$addToRowSet$1(SparkGetColumnsOperation.scala:157) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.addToRowSet(SparkGetColumnsOperation.scala:149) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.$anonfun$runInternal$6(SparkGetColumnsOperation.scala:113) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.$anonfun$runInternal$6$adapted(SparkGetColumnsOperation.scala:112) at scala.Option.foreach(Option.scala:407) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.$anonfun$runInternal$5(SparkGetColumnsOperation.scala:112) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.$anonfun$runInternal$5$adapted(SparkGetColumnsOperation.scala:111) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.runInternal(SparkGetColumnsOperation.scala:111) ... 34 more ``` ### Does this PR introduce _any_ user-facing change? YES, #### before ![image](https://user-images.githubusercontent.com/8326978/91162239-6cd1ec80-e6fe-11ea-8c2c-914ddb325c4e.png) #### after ![image](https://user-images.githubusercontent.com/8326978/91162025-1a90cb80-e6fe-11ea-94c4-03a6f2ec296b.png) ### How was this patch tested? new tests Closes #29539 from yaooqinn/SPARK-32696. Authored-by: Kent Yao Signed-off-by: Wenchen Fan --- .../SparkGetColumnsOperation.scala | 28 ++++++++++++++++-- .../HiveThriftServer2Suites.scala | 17 +++++++++++ .../SparkMetadataOperationSuite.scala | 29 ++++++++++++++++++- .../thriftserver/ThriftserverShimUtils.scala | 2 -- .../thriftserver/ThriftserverShimUtils.scala | 2 -- 5 files changed, 70 insertions(+), 8 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 069517acd6..0a46c83718 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.SessionCatalog -import org.apache.spark.sql.hive.thriftserver.ThriftserverShimUtils.toJavaSQLType import org.apache.spark.sql.types._ /** @@ -131,7 +130,8 @@ private[hive] class SparkGetColumnsOperation( * For array, map, string, and binaries, the column size is variable, return null as unknown. */ private def getColumnSize(typ: DataType): Option[Int] = typ match { - case dt @ (BooleanType | _: NumericType | DateType | TimestampType) => Some(dt.defaultSize) + case dt @ (BooleanType | _: NumericType | DateType | TimestampType | CalendarIntervalType) => + Some(dt.defaultSize) case StructType(fields) => val sizeArr = fields.map(f => getColumnSize(f.dataType)) if (sizeArr.contains(None)) { @@ -164,6 +164,28 @@ private[hive] class SparkGetColumnsOperation( case _ => None } + private def toJavaSQLType(typ: DataType): Integer = typ match { + case NullType => java.sql.Types.NULL + case BooleanType => java.sql.Types.BOOLEAN + case ByteType => java.sql.Types.TINYINT + case ShortType => java.sql.Types.SMALLINT + case IntegerType => java.sql.Types.INTEGER + case LongType => java.sql.Types.BIGINT + case FloatType => java.sql.Types.FLOAT + case DoubleType => java.sql.Types.DOUBLE + case _: DecimalType => java.sql.Types.DECIMAL + case StringType => java.sql.Types.VARCHAR + case BinaryType => java.sql.Types.BINARY + case DateType => java.sql.Types.DATE + case TimestampType => java.sql.Types.TIMESTAMP + case _: ArrayType => java.sql.Types.ARRAY + case _: MapType => java.sql.Types.JAVA_OBJECT + case _: StructType => java.sql.Types.STRUCT + // Hive's year-month and day-time intervals are mapping to java.sql.Types.OTHER + case _: CalendarIntervalType => java.sql.Types.OTHER + case _ => throw new IllegalArgumentException(s"Unrecognized type name: ${typ.sql}") + } + private def addToRowSet( columnPattern: Pattern, dbName: String, @@ -177,7 +199,7 @@ private[hive] class SparkGetColumnsOperation( dbName, // TABLE_SCHEM tableName, // TABLE_NAME column.name, // COLUMN_NAME - toJavaSQLType(column.dataType.sql).asInstanceOf[AnyRef], // DATA_TYPE + toJavaSQLType(column.dataType), // DATA_TYPE column.dataType.sql, // TYPE_NAME getColumnSize(column.dataType).map(_.asInstanceOf[AnyRef]).orNull, // COLUMN_SIZE null, // BUFFER_LENGTH, unused diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 3fd46dc82f..ad0f97cae3 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -686,6 +686,23 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } } + test("Query Intervals in VIEWs through thrift server") { + val viewName1 = "view_interval_1" + val viewName2 = "view_interval_2" + val ddl1 = s"CREATE GLOBAL TEMP VIEW $viewName1 AS SELECT INTERVAL 1 DAY AS i" + val ddl2 = s"CREATE TEMP VIEW $viewName2 as select * from global_temp.$viewName1" + withJdbcStatement(viewName1, viewName2) { statement => + statement.executeQuery(ddl1) + statement.executeQuery(ddl2) + val rs = statement.executeQuery(s"SELECT v1.i as a, v2.i as b FROM global_temp.$viewName1" + + s" v1 join $viewName2 v2 on date_part('DAY', v1.i) = date_part('DAY', v2.i)") + while (rs.next()) { + assert(rs.getString("a") === "1 days") + assert(rs.getString("b") === "1 days") + } + } + } + test("ThriftCLIService FetchResults FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR") { def checkResult(rows: RowSet, start: Long, end: Long): Unit = { assert(rows.getStartOffset() == start) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala index 5df3370444..e8932491d1 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{DatabaseMetaData, ResultSet} -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DecimalType, DoubleType, FloatType, IntegerType, MapType, NumericType, StringType, StructType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, CalendarIntervalType, DecimalType, DoubleType, FloatType, IntegerType, MapType, NumericType, StringType, StructType, TimestampType} class SparkMetadataOperationSuite extends HiveThriftJdbcTest { @@ -333,4 +333,31 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest { assert(pos === 17, "all columns should have been verified") } } + + test("get columns operation should handle interval column properly") { + val viewName = "view_interval" + val ddl = s"CREATE GLOBAL TEMP VIEW $viewName as select interval 1 day as i" + + withJdbcStatement(viewName) { statement => + statement.execute(ddl) + val data = statement.getConnection.getMetaData + val rowSet = data.getColumns("", "global_temp", viewName, null) + while (rowSet.next()) { + assert(rowSet.getString("TABLE_CAT") === null) + assert(rowSet.getString("TABLE_SCHEM") === "global_temp") + assert(rowSet.getString("TABLE_NAME") === viewName) + assert(rowSet.getString("COLUMN_NAME") === "i") + assert(rowSet.getInt("DATA_TYPE") === java.sql.Types.OTHER) + assert(rowSet.getString("TYPE_NAME").equalsIgnoreCase(CalendarIntervalType.sql)) + assert(rowSet.getInt("COLUMN_SIZE") === CalendarIntervalType.defaultSize) + assert(rowSet.getInt("DECIMAL_DIGITS") === 0) + assert(rowSet.getInt("NUM_PREC_RADIX") === 0) + assert(rowSet.getInt("NULLABLE") === 0) + assert(rowSet.getString("REMARKS") === "") + assert(rowSet.getInt("ORDINAL_POSITION") === 0) + assert(rowSet.getString("IS_NULLABLE") === "YES") + assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO") + } + } + } } diff --git a/sql/hive-thriftserver/v1.2/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala b/sql/hive-thriftserver/v1.2/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala index fbfc698ecb..ceb7447333 100644 --- a/sql/hive-thriftserver/v1.2/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala +++ b/sql/hive-thriftserver/v1.2/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala @@ -49,8 +49,6 @@ private[thriftserver] object ThriftserverShimUtils { RowSetFactory.create(getResultSetSchema, getProtocolVersion) } - private[thriftserver] def toJavaSQLType(s: String): Int = Type.getType(s).toJavaSQLType - private[thriftserver] def supportedType(): Seq[Type] = { Seq(NULL_TYPE, BOOLEAN_TYPE, STRING_TYPE, BINARY_TYPE, TINYINT_TYPE, SMALLINT_TYPE, INT_TYPE, BIGINT_TYPE, diff --git a/sql/hive-thriftserver/v2.3/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala b/sql/hive-thriftserver/v2.3/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala index 850382fe2b..1f9fd6338a 100644 --- a/sql/hive-thriftserver/v2.3/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala +++ b/sql/hive-thriftserver/v2.3/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala @@ -50,8 +50,6 @@ private[thriftserver] object ThriftserverShimUtils { RowSetFactory.create(getResultSetSchema, getProtocolVersion, false) } - private[thriftserver] def toJavaSQLType(s: String): Int = Type.getType(s).toJavaSQLType - private[thriftserver] def supportedType(): Seq[Type] = { Seq(NULL_TYPE, BOOLEAN_TYPE, STRING_TYPE, BINARY_TYPE, TINYINT_TYPE, SMALLINT_TYPE, INT_TYPE, BIGINT_TYPE,