[SPARK-14877][SQL] Remove HiveMetastoreTypes class
## What changes were proposed in this pull request? It is unnecessary as DataType.catalogString largely replaces the need for this class. ## How was this patch tested? Mostly removing dead code and should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #12644 from rxin/SPARK-14877.
This commit is contained in:
parent
e3c1366bbc
commit
162e12b085
|
@ -35,7 +35,7 @@ import org.apache.hive.service.cli.session.HiveSession
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.sql.{DataFrame, Row => SparkRow}
|
import org.apache.spark.sql.{DataFrame, Row => SparkRow}
|
||||||
import org.apache.spark.sql.execution.command.SetCommand
|
import org.apache.spark.sql.execution.command.SetCommand
|
||||||
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, HiveUtils}
|
import org.apache.spark.sql.hive.{HiveContext, HiveUtils}
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
import org.apache.spark.util.{Utils => SparkUtils}
|
import org.apache.spark.util.{Utils => SparkUtils}
|
||||||
|
@ -60,7 +60,7 @@ private[hive] class SparkExecuteStatementOperation(
|
||||||
} else {
|
} else {
|
||||||
logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
|
logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
|
||||||
val schema = result.queryExecution.analyzed.output.map { attr =>
|
val schema = result.queryExecution.analyzed.output.map { attr =>
|
||||||
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
|
new FieldSchema(attr.name, attr.dataType.catalogString, "")
|
||||||
}
|
}
|
||||||
new TableSchema(schema.asJava)
|
new TableSchema(schema.asJava)
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,10 +29,9 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.sql.AnalysisException
|
import org.apache.spark.sql.AnalysisException
|
||||||
import org.apache.spark.sql.execution.QueryExecution
|
import org.apache.spark.sql.execution.QueryExecution
|
||||||
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
|
import org.apache.spark.sql.hive.HiveContext
|
||||||
|
|
||||||
private[hive] class SparkSQLDriver(
|
private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
|
||||||
val context: HiveContext = SparkSQLEnv.hiveContext)
|
|
||||||
extends Driver
|
extends Driver
|
||||||
with Logging {
|
with Logging {
|
||||||
|
|
||||||
|
@ -49,7 +48,7 @@ private[hive] class SparkSQLDriver(
|
||||||
new Schema(Arrays.asList(new FieldSchema("Response code", "string", "")), null)
|
new Schema(Arrays.asList(new FieldSchema("Response code", "string", "")), null)
|
||||||
} else {
|
} else {
|
||||||
val fieldSchemas = analyzed.output.map { attr =>
|
val fieldSchemas = analyzed.output.map { attr =>
|
||||||
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
|
new FieldSchema(attr.name, attr.dataType.catalogString, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
new Schema(fieldSchemas.asJava, null)
|
new Schema(fieldSchemas.asJava, null)
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
|
||||||
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
|
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
|
||||||
import org.apache.spark.sql.catalyst.catalog._
|
import org.apache.spark.sql.catalyst.catalog._
|
||||||
import org.apache.spark.sql.catalyst.expressions._
|
import org.apache.spark.sql.catalyst.expressions._
|
||||||
import org.apache.spark.sql.catalyst.parser.DataTypeParser
|
|
||||||
import org.apache.spark.sql.catalyst.plans.logical
|
import org.apache.spark.sql.catalyst.plans.logical
|
||||||
import org.apache.spark.sql.catalyst.plans.logical._
|
import org.apache.spark.sql.catalyst.plans.logical._
|
||||||
import org.apache.spark.sql.catalyst.rules._
|
import org.apache.spark.sql.catalyst.rules._
|
||||||
|
@ -270,7 +269,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
|
||||||
serdeProperties = options
|
serdeProperties = options
|
||||||
),
|
),
|
||||||
schema = relation.schema.map { f =>
|
schema = relation.schema.map { f =>
|
||||||
CatalogColumn(f.name, HiveMetastoreTypes.toMetastoreType(f.dataType))
|
CatalogColumn(f.name, f.dataType.catalogString)
|
||||||
},
|
},
|
||||||
properties = tableProperties.toMap,
|
properties = tableProperties.toMap,
|
||||||
viewText = None) // TODO: We need to place the SQL string here
|
viewText = None) // TODO: We need to place the SQL string here
|
||||||
|
@ -637,7 +636,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
|
||||||
table.schema
|
table.schema
|
||||||
} else {
|
} else {
|
||||||
child.output.map { a =>
|
child.output.map { a =>
|
||||||
CatalogColumn(a.name, HiveMetastoreTypes.toMetastoreType(a.dataType), a.nullable)
|
CatalogColumn(a.name, a.dataType.catalogString, a.nullable)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -770,35 +769,3 @@ private[hive] case class InsertIntoHiveTable(
|
||||||
case (childAttr, tableAttr) => childAttr.dataType.sameType(tableAttr.dataType)
|
case (childAttr, tableAttr) => childAttr.dataType.sameType(tableAttr.dataType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private[hive] object HiveMetastoreTypes {
|
|
||||||
def toDataType(metastoreType: String): DataType = DataTypeParser.parse(metastoreType)
|
|
||||||
|
|
||||||
def decimalMetastoreString(decimalType: DecimalType): String = decimalType match {
|
|
||||||
case DecimalType.Fixed(precision, scale) => s"decimal($precision,$scale)"
|
|
||||||
case _ => s"decimal($HiveShim.UNLIMITED_DECIMAL_PRECISION,$HiveShim.UNLIMITED_DECIMAL_SCALE)"
|
|
||||||
}
|
|
||||||
|
|
||||||
def toMetastoreType(dt: DataType): String = dt match {
|
|
||||||
case ArrayType(elementType, _) => s"array<${toMetastoreType(elementType)}>"
|
|
||||||
case StructType(fields) =>
|
|
||||||
s"struct<${fields.map(f => s"${f.name}:${toMetastoreType(f.dataType)}").mkString(",")}>"
|
|
||||||
case MapType(keyType, valueType, _) =>
|
|
||||||
s"map<${toMetastoreType(keyType)},${toMetastoreType(valueType)}>"
|
|
||||||
case StringType => "string"
|
|
||||||
case FloatType => "float"
|
|
||||||
case IntegerType => "int"
|
|
||||||
case ByteType => "tinyint"
|
|
||||||
case ShortType => "smallint"
|
|
||||||
case DoubleType => "double"
|
|
||||||
case LongType => "bigint"
|
|
||||||
case BinaryType => "binary"
|
|
||||||
case BooleanType => "boolean"
|
|
||||||
case DateType => "date"
|
|
||||||
case d: DecimalType => decimalMetastoreString(d)
|
|
||||||
case TimestampType => "timestamp"
|
|
||||||
case NullType => "void"
|
|
||||||
case udt: UserDefinedType[_] => toMetastoreType(udt.sqlType)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -18,11 +18,10 @@
|
||||||
package org.apache.spark.sql.hive.execution
|
package org.apache.spark.sql.hive.execution
|
||||||
|
|
||||||
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
|
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
|
||||||
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
|
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
|
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
|
||||||
import org.apache.spark.sql.execution.command.RunnableCommand
|
import org.apache.spark.sql.execution.command.RunnableCommand
|
||||||
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes, MetastoreRelation}
|
import org.apache.spark.sql.hive.MetastoreRelation
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create table and insert the query result into it.
|
* Create table and insert the query result into it.
|
||||||
|
@ -62,7 +61,7 @@ case class CreateTableAsSelect(
|
||||||
// Hive doesn't support specifying the column list for target table in CTAS
|
// Hive doesn't support specifying the column list for target table in CTAS
|
||||||
// However we don't think SparkSQL should follow that.
|
// However we don't think SparkSQL should follow that.
|
||||||
tableDesc.copy(schema = query.output.map { c =>
|
tableDesc.copy(schema = query.output.map { c =>
|
||||||
CatalogColumn(c.name, HiveMetastoreTypes.toMetastoreType(c.dataType))
|
CatalogColumn(c.name, c.dataType.catalogString)
|
||||||
})
|
})
|
||||||
} else {
|
} else {
|
||||||
withFormat
|
withFormat
|
||||||
|
@ -85,7 +84,8 @@ case class CreateTableAsSelect(
|
||||||
throw new AnalysisException(s"$tableIdentifier already exists.")
|
throw new AnalysisException(s"$tableIdentifier already exists.")
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sqlContext.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true, false)).toRdd
|
sqlContext.executePlan(InsertIntoTable(
|
||||||
|
metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
|
||||||
}
|
}
|
||||||
|
|
||||||
Seq.empty[Row]
|
Seq.empty[Row]
|
||||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
|
||||||
|
|
||||||
import org.apache.spark.deploy.SparkHadoopUtil
|
import org.apache.spark.deploy.SparkHadoopUtil
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.sql.hive.HiveMetastoreTypes
|
import org.apache.spark.sql.catalyst.parser.DataTypeParser
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
|
|
||||||
private[orc] object OrcFileOperator extends Logging {
|
private[orc] object OrcFileOperator extends Logging {
|
||||||
|
@ -78,7 +78,7 @@ private[orc] object OrcFileOperator extends Logging {
|
||||||
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
|
val readerInspector = reader.getObjectInspector.asInstanceOf[StructObjectInspector]
|
||||||
val schema = readerInspector.getTypeName
|
val schema = readerInspector.getTypeName
|
||||||
logDebug(s"Reading schema from file $paths, got Hive schema string: $schema")
|
logDebug(s"Reading schema from file $paths, got Hive schema string: $schema")
|
||||||
HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType]
|
DataTypeParser.parse(schema).asInstanceOf[StructType]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow
|
||||||
import org.apache.spark.sql.catalyst.expressions._
|
import org.apache.spark.sql.catalyst.expressions._
|
||||||
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
|
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
|
||||||
import org.apache.spark.sql.execution.datasources._
|
import org.apache.spark.sql.execution.datasources._
|
||||||
import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim}
|
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
|
||||||
import org.apache.spark.sql.sources.{Filter, _}
|
import org.apache.spark.sql.sources.{Filter, _}
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
import org.apache.spark.util.SerializableConfiguration
|
import org.apache.spark.util.SerializableConfiguration
|
||||||
|
@ -186,9 +186,7 @@ private[orc] class OrcOutputWriter(
|
||||||
private val serializer = {
|
private val serializer = {
|
||||||
val table = new Properties()
|
val table = new Properties()
|
||||||
table.setProperty("columns", dataSchema.fieldNames.mkString(","))
|
table.setProperty("columns", dataSchema.fieldNames.mkString(","))
|
||||||
table.setProperty("columns.types", dataSchema.map { f =>
|
table.setProperty("columns.types", dataSchema.map(_.dataType.catalogString).mkString(":"))
|
||||||
HiveMetastoreTypes.toMetastoreType(f.dataType)
|
|
||||||
}.mkString(":"))
|
|
||||||
|
|
||||||
val serde = new OrcSerde
|
val serde = new OrcSerde
|
||||||
val configuration = context.getConfiguration
|
val configuration = context.getConfiguration
|
||||||
|
@ -198,10 +196,7 @@ private[orc] class OrcOutputWriter(
|
||||||
|
|
||||||
// Object inspector converted from the schema of the relation to be written.
|
// Object inspector converted from the schema of the relation to be written.
|
||||||
private val structOI = {
|
private val structOI = {
|
||||||
val typeInfo =
|
val typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(dataSchema.catalogString)
|
||||||
TypeInfoUtils.getTypeInfoFromTypeString(
|
|
||||||
HiveMetastoreTypes.toMetastoreType(dataSchema))
|
|
||||||
|
|
||||||
OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
|
OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo])
|
||||||
.asInstanceOf[SettableStructObjectInspector]
|
.asInstanceOf[SettableStructObjectInspector]
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.File
|
||||||
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
|
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
|
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
|
||||||
|
import org.apache.spark.sql.catalyst.parser.DataTypeParser
|
||||||
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
|
import org.apache.spark.sql.test.{ExamplePointUDT, SQLTestUtils}
|
||||||
|
@ -32,14 +33,14 @@ class HiveMetastoreCatalogSuite extends TestHiveSingleton {
|
||||||
|
|
||||||
test("struct field should accept underscore in sub-column name") {
|
test("struct field should accept underscore in sub-column name") {
|
||||||
val hiveTypeStr = "struct<a: int, b_1: string, c: string>"
|
val hiveTypeStr = "struct<a: int, b_1: string, c: string>"
|
||||||
val dateType = HiveMetastoreTypes.toDataType(hiveTypeStr)
|
val dateType = DataTypeParser.parse(hiveTypeStr)
|
||||||
assert(dateType.isInstanceOf[StructType])
|
assert(dateType.isInstanceOf[StructType])
|
||||||
}
|
}
|
||||||
|
|
||||||
test("udt to metastore type conversion") {
|
test("udt to metastore type conversion") {
|
||||||
val udt = new ExamplePointUDT
|
val udt = new ExamplePointUDT
|
||||||
assertResult(HiveMetastoreTypes.toMetastoreType(udt.sqlType)) {
|
assertResult(udt.sqlType.catalogString) {
|
||||||
HiveMetastoreTypes.toMetastoreType(udt)
|
udt.catalogString
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path
|
||||||
import org.apache.spark.sql._
|
import org.apache.spark.sql._
|
||||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||||
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
|
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
|
||||||
|
import org.apache.spark.sql.catalyst.parser.DataTypeParser
|
||||||
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
|
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
|
||||||
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
||||||
import org.apache.spark.sql.internal.SQLConf
|
import org.apache.spark.sql.internal.SQLConf
|
||||||
|
@ -918,7 +919,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
||||||
// As a proxy for verifying that the table was stored in Hive compatible format,
|
// As a proxy for verifying that the table was stored in Hive compatible format,
|
||||||
// we verify that each column of the table is of native type StringType.
|
// we verify that each column of the table is of native type StringType.
|
||||||
assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema
|
assert(sharedState.externalCatalog.getTable("default", "not_skip_hive_metadata").schema
|
||||||
.forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType))
|
.forall(column => DataTypeParser.parse(column.dataType) == StringType))
|
||||||
|
|
||||||
sessionState.catalog.createDataSourceTable(
|
sessionState.catalog.createDataSourceTable(
|
||||||
name = TableIdentifier("skip_hive_metadata"),
|
name = TableIdentifier("skip_hive_metadata"),
|
||||||
|
@ -932,7 +933,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
||||||
// As a proxy for verifying that the table was stored in SparkSQL format,
|
// As a proxy for verifying that the table was stored in SparkSQL format,
|
||||||
// we verify that the table has a column type as array of StringType.
|
// we verify that the table has a column type as array of StringType.
|
||||||
assert(sharedState.externalCatalog.getTable("default", "skip_hive_metadata")
|
assert(sharedState.externalCatalog.getTable("default", "skip_hive_metadata")
|
||||||
.schema.forall { c => HiveMetastoreTypes.toDataType(c.dataType) == ArrayType(StringType) })
|
.schema.forall { c => DataTypeParser.parse(c.dataType) == ArrayType(StringType) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue