[SPARK-36197][SQL] Use PartitionDesc instead of TableDesc for reading hive partitioned tables

### What changes were proposed in this pull request?

A hive partition can have different `PartitionDesc`s from `TableDesc` for describing Serde/InputFormatClass/OutputFormatClass, for a hive partitioned table, we shall respect those in `PartitionDesc`.

### Why are the changes needed?

in many cases, that Spark reads hive tables could result in surprise because of this issue.

### Does this PR introduce _any_ user-facing change?

yes, hive partition table that contains different serde/input/output could be recognized by Spark

### How was this patch tested?

new test added

Closes #33406 from yaooqinn/SPARK-36197.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
This commit is contained in:
Kent Yao 2021-07-19 15:59:36 +08:00
parent 8396a70ddc
commit ef80356614
2 changed files with 62 additions and 4 deletions

View file

@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.plan.{PartitionDesc, TableDesc}
import org.apache.hadoop.hive.serde2.Deserializer import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.AvroTableProperties import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.AvroTableProperties
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector} import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
@ -201,7 +201,7 @@ class HadoopTableReader(
val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer) val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer)
.map { case (partition, partDeserializer) => .map { case (partition, partDeserializer) =>
val partDesc = Utilities.getPartitionDesc(partition) val partDesc = Utilities.getPartitionDescFromTableDesc(tableDesc, partition, true)
val partPath = partition.getDataLocation val partPath = partition.getDataLocation
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
// Get partition field info // Get partition field info
@ -245,7 +245,8 @@ class HadoopTableReader(
// Create local references so that the outer object isn't serialized. // Create local references so that the outer object isn't serialized.
val localTableDesc = tableDesc val localTableDesc = tableDesc
createHadoopRDD(localTableDesc, inputPathStr).mapPartitions { iter =>
createHadoopRDD(partDesc, inputPathStr).mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.getConstructor().newInstance() val deserializer = localDeserializer.getConstructor().newInstance()
// SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema // SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema
@ -314,6 +315,15 @@ class HadoopTableReader(
} }
} }
private def createHadoopRDD(partitionDesc: PartitionDesc, inputPathStr: String): RDD[Writable] = {
val inputFormatClazz = partitionDesc.getInputFileFormatClass
if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
createNewHadoopRDD(partitionDesc, inputPathStr)
} else {
createOldHadoopRDD(partitionDesc, inputPathStr)
}
}
/** /**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be * Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be
* applied locally on each executor. * applied locally on each executor.
@ -322,7 +332,24 @@ class HadoopTableReader(
val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
val inputFormatClass = tableDesc.getInputFileFormatClass val inputFormatClass = tableDesc.getInputFileFormatClass
.asInstanceOf[Class[oldInputClass[Writable, Writable]]] .asInstanceOf[Class[oldInputClass[Writable, Writable]]]
createOldHadoopRDD(inputFormatClass, initializeJobConfFunc)
}
/**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be
* applied locally on each executor.
*/
private def createOldHadoopRDD(partitionDesc: PartitionDesc, path: String): RDD[Writable] = {
val initializeJobConfFunc =
HadoopTableReader.initializeLocalJobConfFunc(path, partitionDesc.getTableDesc) _
val inputFormatClass = partitionDesc.getInputFileFormatClass
.asInstanceOf[Class[oldInputClass[Writable, Writable]]]
createOldHadoopRDD(inputFormatClass, initializeJobConfFunc)
}
private def createOldHadoopRDD(
inputFormatClass: Class[oldInputClass[Writable, Writable]],
initializeJobConfFunc: JobConf => Unit): RDD[Writable] = {
val rdd = new HadoopRDD( val rdd = new HadoopRDD(
sparkSession.sparkContext, sparkSession.sparkContext,
_broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
@ -345,13 +372,26 @@ class HadoopTableReader(
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc)(newJobConf) HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc)(newJobConf)
val inputFormatClass = tableDesc.getInputFileFormatClass val inputFormatClass = tableDesc.getInputFileFormatClass
.asInstanceOf[Class[newInputClass[Writable, Writable]]] .asInstanceOf[Class[newInputClass[Writable, Writable]]]
createNewHadoopRDD(inputFormatClass, newJobConf)
}
private def createNewHadoopRDD(partDesc: PartitionDesc, path: String): RDD[Writable] = {
val newJobConf = new JobConf(hadoopConf)
HadoopTableReader.initializeLocalJobConfFunc(path, partDesc.getTableDesc)(newJobConf)
val inputFormatClass = partDesc.getInputFileFormatClass
.asInstanceOf[Class[newInputClass[Writable, Writable]]]
createNewHadoopRDD(inputFormatClass, newJobConf)
}
private def createNewHadoopRDD(
inputFormatClass: Class[newInputClass[Writable, Writable]],
jobConf: JobConf): RDD[Writable] = {
val rdd = new NewHadoopRDD( val rdd = new NewHadoopRDD(
sparkSession.sparkContext, sparkSession.sparkContext,
inputFormatClass, inputFormatClass,
classOf[Writable], classOf[Writable],
classOf[Writable], classOf[Writable],
newJobConf jobConf
) )
// Only take the value (skip the key) because Hive works only with values. // Only take the value (skip the key) because Hive works only with values.

View file

@ -2614,6 +2614,24 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
} }
} }
} }
test("SPARK-36197: Use PartitionDesc instead of TableDesc for reading hive partitioned tables") {
withTempDir { dir =>
val t1Loc = s"file:///$dir/t1"
val t2Loc = s"file:///$dir/t2"
withTable("t1", "t2") {
hiveClient.runSqlHive(
s"create table t1(id int) partitioned by(pid int) stored as avro location '$t1Loc'")
hiveClient.runSqlHive("insert into t1 partition(pid=1) select 2")
hiveClient.runSqlHive(
s"create table t2(id int) partitioned by(pid int) stored as textfile location '$t2Loc'")
hiveClient.runSqlHive("insert into t2 partition(pid=2) select 2")
hiveClient.runSqlHive(s"alter table t1 add partition (pid=2) location '$t2Loc/pid=2'")
hiveClient.runSqlHive("alter table t1 partition(pid=2) SET FILEFORMAT textfile")
checkAnswer(sql("select pid, id from t1 order by pid"), Seq(Row(1, 2), Row(2, 2)))
}
}
}
} }
@SlowHiveTest @SlowHiveTest