[SPARK-32696][SQL][TEST-HIVE1.2][TEST-HADOOP2.7] Get columns operation should handle interval column properly
### What changes were proposed in this pull request? This PR let JDBC clients identify spark interval columns properly. ### Why are the changes needed? JDBC users can query interval values through thrift server, create views with interval columns, e.g. ```sql CREATE global temp view view1 as select interval 1 day as i; ``` but when they want to get the details of the columns of view1, the will fail with `Unrecognized type name: INTERVAL` ``` Caused by: java.lang.IllegalArgumentException: Unrecognized type name: INTERVAL at org.apache.hadoop.hive.serde2.thrift.Type.getType(Type.java:170) at org.apache.spark.sql.hive.thriftserver.ThriftserverShimUtils$.toJavaSQLType(ThriftserverShimUtils.scala:53) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.$anonfun$addToRowSet$1(SparkGetColumnsOperation.scala:157) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.addToRowSet(SparkGetColumnsOperation.scala:149) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.$anonfun$runInternal$6(SparkGetColumnsOperation.scala:113) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.$anonfun$runInternal$6$adapted(SparkGetColumnsOperation.scala:112) at scala.Option.foreach(Option.scala:407) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.$anonfun$runInternal$5(SparkGetColumnsOperation.scala:112) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.$anonfun$runInternal$5$adapted(SparkGetColumnsOperation.scala:111) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.hive.thriftserver.SparkGetColumnsOperation.runInternal(SparkGetColumnsOperation.scala:111) ... 34 more ``` ### Does this PR introduce _any_ user-facing change? YES, #### before ![image](https://user-images.githubusercontent.com/8326978/91162239-6cd1ec80-e6fe-11ea-8c2c-914ddb325c4e.png) #### after ![image](https://user-images.githubusercontent.com/8326978/91162025-1a90cb80-e6fe-11ea-94c4-03a6f2ec296b.png) ### How was this patch tested? new tests Closes #29539 from yaooqinn/SPARK-32696. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
eb379766f4
commit
f14f3742e0
|
@ -31,7 +31,6 @@ import org.apache.spark.internal.Logging
|
|||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
|
||||
import org.apache.spark.sql.hive.thriftserver.ThriftserverShimUtils.toJavaSQLType
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
/**
|
||||
|
@ -131,7 +130,8 @@ private[hive] class SparkGetColumnsOperation(
|
|||
* For array, map, string, and binaries, the column size is variable, return null as unknown.
|
||||
*/
|
||||
private def getColumnSize(typ: DataType): Option[Int] = typ match {
|
||||
case dt @ (BooleanType | _: NumericType | DateType | TimestampType) => Some(dt.defaultSize)
|
||||
case dt @ (BooleanType | _: NumericType | DateType | TimestampType | CalendarIntervalType) =>
|
||||
Some(dt.defaultSize)
|
||||
case StructType(fields) =>
|
||||
val sizeArr = fields.map(f => getColumnSize(f.dataType))
|
||||
if (sizeArr.contains(None)) {
|
||||
|
@ -164,6 +164,28 @@ private[hive] class SparkGetColumnsOperation(
|
|||
case _ => None
|
||||
}
|
||||
|
||||
private def toJavaSQLType(typ: DataType): Integer = typ match {
|
||||
case NullType => java.sql.Types.NULL
|
||||
case BooleanType => java.sql.Types.BOOLEAN
|
||||
case ByteType => java.sql.Types.TINYINT
|
||||
case ShortType => java.sql.Types.SMALLINT
|
||||
case IntegerType => java.sql.Types.INTEGER
|
||||
case LongType => java.sql.Types.BIGINT
|
||||
case FloatType => java.sql.Types.FLOAT
|
||||
case DoubleType => java.sql.Types.DOUBLE
|
||||
case _: DecimalType => java.sql.Types.DECIMAL
|
||||
case StringType => java.sql.Types.VARCHAR
|
||||
case BinaryType => java.sql.Types.BINARY
|
||||
case DateType => java.sql.Types.DATE
|
||||
case TimestampType => java.sql.Types.TIMESTAMP
|
||||
case _: ArrayType => java.sql.Types.ARRAY
|
||||
case _: MapType => java.sql.Types.JAVA_OBJECT
|
||||
case _: StructType => java.sql.Types.STRUCT
|
||||
// Hive's year-month and day-time intervals are mapping to java.sql.Types.OTHER
|
||||
case _: CalendarIntervalType => java.sql.Types.OTHER
|
||||
case _ => throw new IllegalArgumentException(s"Unrecognized type name: ${typ.sql}")
|
||||
}
|
||||
|
||||
private def addToRowSet(
|
||||
columnPattern: Pattern,
|
||||
dbName: String,
|
||||
|
@ -177,7 +199,7 @@ private[hive] class SparkGetColumnsOperation(
|
|||
dbName, // TABLE_SCHEM
|
||||
tableName, // TABLE_NAME
|
||||
column.name, // COLUMN_NAME
|
||||
toJavaSQLType(column.dataType.sql).asInstanceOf[AnyRef], // DATA_TYPE
|
||||
toJavaSQLType(column.dataType), // DATA_TYPE
|
||||
column.dataType.sql, // TYPE_NAME
|
||||
getColumnSize(column.dataType).map(_.asInstanceOf[AnyRef]).orNull, // COLUMN_SIZE
|
||||
null, // BUFFER_LENGTH, unused
|
||||
|
|
|
@ -686,6 +686,23 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
|
|||
}
|
||||
}
|
||||
|
||||
test("Query Intervals in VIEWs through thrift server") {
|
||||
val viewName1 = "view_interval_1"
|
||||
val viewName2 = "view_interval_2"
|
||||
val ddl1 = s"CREATE GLOBAL TEMP VIEW $viewName1 AS SELECT INTERVAL 1 DAY AS i"
|
||||
val ddl2 = s"CREATE TEMP VIEW $viewName2 as select * from global_temp.$viewName1"
|
||||
withJdbcStatement(viewName1, viewName2) { statement =>
|
||||
statement.executeQuery(ddl1)
|
||||
statement.executeQuery(ddl2)
|
||||
val rs = statement.executeQuery(s"SELECT v1.i as a, v2.i as b FROM global_temp.$viewName1" +
|
||||
s" v1 join $viewName2 v2 on date_part('DAY', v1.i) = date_part('DAY', v2.i)")
|
||||
while (rs.next()) {
|
||||
assert(rs.getString("a") === "1 days")
|
||||
assert(rs.getString("b") === "1 days")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("ThriftCLIService FetchResults FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR") {
|
||||
def checkResult(rows: RowSet, start: Long, end: Long): Unit = {
|
||||
assert(rows.getStartOffset() == start)
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver
|
|||
|
||||
import java.sql.{DatabaseMetaData, ResultSet}
|
||||
|
||||
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DecimalType, DoubleType, FloatType, IntegerType, MapType, NumericType, StringType, StructType, TimestampType}
|
||||
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, CalendarIntervalType, DecimalType, DoubleType, FloatType, IntegerType, MapType, NumericType, StringType, StructType, TimestampType}
|
||||
|
||||
class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
|
||||
|
||||
|
@ -333,4 +333,31 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
|
|||
assert(pos === 17, "all columns should have been verified")
|
||||
}
|
||||
}
|
||||
|
||||
test("get columns operation should handle interval column properly") {
|
||||
val viewName = "view_interval"
|
||||
val ddl = s"CREATE GLOBAL TEMP VIEW $viewName as select interval 1 day as i"
|
||||
|
||||
withJdbcStatement(viewName) { statement =>
|
||||
statement.execute(ddl)
|
||||
val data = statement.getConnection.getMetaData
|
||||
val rowSet = data.getColumns("", "global_temp", viewName, null)
|
||||
while (rowSet.next()) {
|
||||
assert(rowSet.getString("TABLE_CAT") === null)
|
||||
assert(rowSet.getString("TABLE_SCHEM") === "global_temp")
|
||||
assert(rowSet.getString("TABLE_NAME") === viewName)
|
||||
assert(rowSet.getString("COLUMN_NAME") === "i")
|
||||
assert(rowSet.getInt("DATA_TYPE") === java.sql.Types.OTHER)
|
||||
assert(rowSet.getString("TYPE_NAME").equalsIgnoreCase(CalendarIntervalType.sql))
|
||||
assert(rowSet.getInt("COLUMN_SIZE") === CalendarIntervalType.defaultSize)
|
||||
assert(rowSet.getInt("DECIMAL_DIGITS") === 0)
|
||||
assert(rowSet.getInt("NUM_PREC_RADIX") === 0)
|
||||
assert(rowSet.getInt("NULLABLE") === 0)
|
||||
assert(rowSet.getString("REMARKS") === "")
|
||||
assert(rowSet.getInt("ORDINAL_POSITION") === 0)
|
||||
assert(rowSet.getString("IS_NULLABLE") === "YES")
|
||||
assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,8 +49,6 @@ private[thriftserver] object ThriftserverShimUtils {
|
|||
RowSetFactory.create(getResultSetSchema, getProtocolVersion)
|
||||
}
|
||||
|
||||
private[thriftserver] def toJavaSQLType(s: String): Int = Type.getType(s).toJavaSQLType
|
||||
|
||||
private[thriftserver] def supportedType(): Seq[Type] = {
|
||||
Seq(NULL_TYPE, BOOLEAN_TYPE, STRING_TYPE, BINARY_TYPE,
|
||||
TINYINT_TYPE, SMALLINT_TYPE, INT_TYPE, BIGINT_TYPE,
|
||||
|
|
|
@ -50,8 +50,6 @@ private[thriftserver] object ThriftserverShimUtils {
|
|||
RowSetFactory.create(getResultSetSchema, getProtocolVersion, false)
|
||||
}
|
||||
|
||||
private[thriftserver] def toJavaSQLType(s: String): Int = Type.getType(s).toJavaSQLType
|
||||
|
||||
private[thriftserver] def supportedType(): Seq[Type] = {
|
||||
Seq(NULL_TYPE, BOOLEAN_TYPE, STRING_TYPE, BINARY_TYPE,
|
||||
TINYINT_TYPE, SMALLINT_TYPE, INT_TYPE, BIGINT_TYPE,
|
||||
|
|
Loading…
Reference in a new issue