[SPARK-26630][SQL] Support reading Hive-serde tables whose INPUTFORMAT is org.apache.hadoop.mapreduce

## What changes were proposed in this pull request?

When we read a hive table and create RDDs in `TableReader`, it'll throw exception `java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.TextInputFormat cannot be cast to org.apache.hadoop.mapred.InputFormat` if the input format class of the table is from mapreduce package.

Now we use NewHadoopRDD to deal with the new input format and keep HadoopRDD to the old one.

This PR is from #23506. We can reproduce this issue by executing the new test with the code in old version. When create a table with `org.apache.hadoop.mapreduce.....` input format, we will find the exception thrown in `org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:190)`

## How was this patch tested?

Added a new test.

Closes #23559 from Deegue/fix-hadoopRDD.

Lead-authored-by: heguozi <zyzzxycj@gmail.com>
Co-authored-by: Yizhong Zhang <zyzzxycj@163.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
heguozi 2019-01-26 10:17:03 -08:00 committed by gatorsmile
parent aa3d16d68b
commit e71acd9a23
2 changed files with 149 additions and 14 deletions

View file

@ -31,12 +31,13 @@ import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat => oldInputClass, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => newInputClass}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.CastSupport
@ -123,9 +124,7 @@ class HadoopTableReader(
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc)
val hadoopRDD = createHadoopRDD(localTableDesc, inputPathStr)
val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
@ -164,7 +163,7 @@ class HadoopTableReader(
def verifyPartitionPath(
partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
Map[HivePartition, Class[_ <: Deserializer]] = {
if (!sparkSession.sessionState.conf.verifyPartitionPath) {
if (!conf.verifyPartitionPath) {
partitionToDeserializer
} else {
val existPathSet = collection.mutable.Set[String]()
@ -202,8 +201,6 @@ class HadoopTableReader(
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = partition.getDataLocation
val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)
val ifc = partDesc.getInputFileFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
// Get partition field info
val partSpec = partDesc.getPartSpec
val partProps = partDesc.getProperties
@ -243,7 +240,7 @@ class HadoopTableReader(
// Create local references so that the outer object isn't serialized.
val localTableDesc = tableDesc
createHadoopRdd(localTableDesc, inputPathStr, ifc).mapPartitions { iter =>
createHadoopRDD(localTableDesc, inputPathStr).mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.getConstructor().newInstance()
// SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema
@ -288,16 +285,29 @@ class HadoopTableReader(
}
}
/**
* The entry of creating a RDD.
* [SPARK-26630] Using which HadoopRDD will be decided by the input format of tables.
* The input format of NewHadoopRDD is from `org.apache.hadoop.mapreduce` package while
* the input format of HadoopRDD is from `org.apache.hadoop.mapred` package.
*/
private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = {
val inputFormatClazz = localTableDesc.getInputFileFormatClass
if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
createNewHadoopRDD(localTableDesc, inputPathStr)
} else {
createOldHadoopRDD(localTableDesc, inputPathStr)
}
}
/**
* Creates a HadoopRDD based on the broadcasted HiveConf and other job properties that will be
* applied locally on each slave.
*/
private def createHadoopRdd(
tableDesc: TableDesc,
path: String,
inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = {
private def createOldHadoopRDD(tableDesc: TableDesc, path: String): RDD[Writable] = {
val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
val inputFormatClass = tableDesc.getInputFileFormatClass
.asInstanceOf[Class[oldInputClass[Writable, Writable]]]
val rdd = new HadoopRDD(
sparkSession.sparkContext,
@ -311,6 +321,29 @@ class HadoopTableReader(
// Only take the value (skip the key) because Hive works only with values.
rdd.map(_._2)
}
/**
* Creates a NewHadoopRDD based on the broadcasted HiveConf and other job properties that will be
* applied locally on each slave.
*/
private def createNewHadoopRDD(tableDesc: TableDesc, path: String): RDD[Writable] = {
val newJobConf = new JobConf(hadoopConf)
HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc)(newJobConf)
val inputFormatClass = tableDesc.getInputFileFormatClass
.asInstanceOf[Class[newInputClass[Writable, Writable]]]
val rdd = new NewHadoopRDD(
sparkSession.sparkContext,
inputFormatClass,
classOf[Writable],
classOf[Writable],
newJobConf
)
// Only take the value (skip the key) because Hive works only with values.
rdd.map(_._2)
}
}
private[hive] object HiveTableUtil {

View file

@ -260,6 +260,108 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
assert(err.contains("Cannot recognize hive type string:"))
}
}
test("SPARK-26630: table with old input format and without partitioned will use HadoopRDD") {
withTable("table_old", "table_ctas_old") {
sql(
"""
|CREATE TABLE table_old (col1 LONG, col2 STRING, col3 DOUBLE, col4 BOOLEAN)
|STORED AS
|INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
|OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
""".stripMargin)
sql(
"""
|INSERT INTO table_old
|VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, true)
""".stripMargin)
checkAnswer(
sql("SELECT col1, col2, col3, col4 FROM table_old"),
Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
sql("CREATE TABLE table_ctas_old AS SELECT col1, col2, col3, col4 FROM table_old")
checkAnswer(
sql("SELECT col1, col2, col3, col4 from table_ctas_old"),
Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
}
}
test("SPARK-26630: table with old input format and partitioned will use HadoopRDD") {
withTable("table_pt_old", "table_ctas_pt_old") {
sql(
"""
|CREATE TABLE table_pt_old (col1 LONG, col2 STRING, col3 DOUBLE, col4 BOOLEAN)
|PARTITIONED BY (pt INT)
|STORED AS
|INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
|OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
""".stripMargin)
sql(
"""
|INSERT INTO table_pt_old PARTITION (pt = 1)
|VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, true)
""".stripMargin)
checkAnswer(
sql("SELECT col1, col2, col3, col4 FROM table_pt_old WHERE pt = 1"),
Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
sql("CREATE TABLE table_ctas_pt_old AS SELECT col1, col2, col3, col4 FROM table_pt_old")
checkAnswer(
sql("SELECT col1, col2, col3, col4 from table_ctas_pt_old"),
Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
}
}
test("SPARK-26630: table with new input format and without partitioned will use NewHadoopRDD") {
withTable("table_new", "table_ctas_new") {
sql(
"""
|CREATE TABLE table_new (col1 LONG, col2 STRING, col3 DOUBLE, col4 BOOLEAN)
|STORED AS
|INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
|OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
""".stripMargin)
sql(
"""
|INSERT INTO table_new
|VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, true)
""".stripMargin)
checkAnswer(
sql("SELECT col1, col2, col3, col4 FROM table_new"),
Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
sql("CREATE TABLE table_ctas_new AS SELECT col1, col2, col3, col4 FROM table_new")
checkAnswer(
sql("SELECT col1, col2, col3, col4 from table_ctas_new"),
Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
}
}
test("SPARK-26630: table with new input format and partitioned will use NewHadoopRDD") {
withTable("table_pt_new", "table_ctas_pt_new") {
sql(
"""
|CREATE TABLE table_pt_new (col1 LONG, col2 STRING, col3 DOUBLE, col4 BOOLEAN)
|PARTITIONED BY (pt INT)
|STORED AS
|INPUTFORMAT 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat'
|OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
""".stripMargin)
sql(
"""
|INSERT INTO table_pt_new PARTITION (pt = 1)
|VALUES (2147483648, 'AAA', 3.14, false), (2147483649, 'BBB', 3.142, true)
""".stripMargin)
checkAnswer(
sql("SELECT col1, col2, col3, col4 FROM table_pt_new WHERE pt = 1"),
Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
sql("CREATE TABLE table_ctas_pt_new AS SELECT col1, col2, col3, col4 FROM table_pt_new")
checkAnswer(
sql("SELECT col1, col2, col3, col4 from table_ctas_pt_new"),
Row(2147483648L, "AAA", 3.14, false) :: Row(2147483649L, "BBB", 3.142, true) :: Nil)
}
}
}
class HiveDDLSuite