[SPARK-19611][SQL][FOLLOWUP] set dataSchema correctly in HiveMetastoreCatalog.convertToLogicalRelation
## What changes were proposed in this pull request? We made a mistake in https://github.com/apache/spark/pull/16944 . In `HiveMetastoreCatalog#inferIfNeeded` we infer the data schema, merge with full schema, and return the new full schema. At caller side we treat the full schema as data schema and set it to `HadoopFsRelation`. This doesn't cause any problem because both parquet and orc can work with a wrong data schema that has extra columns, but it's better to fix this mistake. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #19615 from cloud-fan/infer.
This commit is contained in:
parent
59589bc654
commit
4d9ebf3835
|
@ -164,13 +164,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
|
|||
}
|
||||
}
|
||||
|
||||
val (dataSchema, updatedTable) =
|
||||
inferIfNeeded(relation, options, fileFormat, Option(fileIndex))
|
||||
val updatedTable = inferIfNeeded(relation, options, fileFormat, Option(fileIndex))
|
||||
|
||||
val fsRelation = HadoopFsRelation(
|
||||
location = fileIndex,
|
||||
partitionSchema = partitionSchema,
|
||||
dataSchema = dataSchema,
|
||||
dataSchema = updatedTable.dataSchema,
|
||||
bucketSpec = None,
|
||||
fileFormat = fileFormat,
|
||||
options = options)(sparkSession = sparkSession)
|
||||
|
@ -191,13 +190,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
|
|||
fileFormatClass,
|
||||
None)
|
||||
val logicalRelation = cached.getOrElse {
|
||||
val (dataSchema, updatedTable) = inferIfNeeded(relation, options, fileFormat)
|
||||
val updatedTable = inferIfNeeded(relation, options, fileFormat)
|
||||
val created =
|
||||
LogicalRelation(
|
||||
DataSource(
|
||||
sparkSession = sparkSession,
|
||||
paths = rootPath.toString :: Nil,
|
||||
userSpecifiedSchema = Option(dataSchema),
|
||||
userSpecifiedSchema = Option(updatedTable.dataSchema),
|
||||
bucketSpec = None,
|
||||
options = options,
|
||||
className = fileType).resolveRelation(),
|
||||
|
@ -224,7 +223,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
|
|||
relation: HiveTableRelation,
|
||||
options: Map[String, String],
|
||||
fileFormat: FileFormat,
|
||||
fileIndexOpt: Option[FileIndex] = None): (StructType, CatalogTable) = {
|
||||
fileIndexOpt: Option[FileIndex] = None): CatalogTable = {
|
||||
val inferenceMode = sparkSession.sessionState.conf.caseSensitiveInferenceMode
|
||||
val shouldInfer = (inferenceMode != NEVER_INFER) && !relation.tableMeta.schemaPreservesCase
|
||||
val tableName = relation.tableMeta.identifier.unquotedString
|
||||
|
@ -241,21 +240,22 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
|
|||
sparkSession,
|
||||
options,
|
||||
fileIndex.listFiles(Nil, Nil).flatMap(_.files))
|
||||
.map(mergeWithMetastoreSchema(relation.tableMeta.schema, _))
|
||||
.map(mergeWithMetastoreSchema(relation.tableMeta.dataSchema, _))
|
||||
|
||||
inferredSchema match {
|
||||
case Some(schema) =>
|
||||
case Some(dataSchema) =>
|
||||
val schema = StructType(dataSchema ++ relation.tableMeta.partitionSchema)
|
||||
if (inferenceMode == INFER_AND_SAVE) {
|
||||
updateCatalogSchema(relation.tableMeta.identifier, schema)
|
||||
}
|
||||
(schema, relation.tableMeta.copy(schema = schema))
|
||||
relation.tableMeta.copy(schema = schema)
|
||||
case None =>
|
||||
logWarning(s"Unable to infer schema for table $tableName from file format " +
|
||||
s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.")
|
||||
(relation.tableMeta.schema, relation.tableMeta)
|
||||
relation.tableMeta
|
||||
}
|
||||
} else {
|
||||
(relation.tableMeta.schema, relation.tableMeta)
|
||||
relation.tableMeta
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue