[SPARK-35700][SQL][FOLLOWUP] Read schema from ORC files should strip CHAR/VARCHAR types

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/33001 , to provide a more direct fix.

The regression in 3.1 was caused by the fact that we changed the parser and allow the parser to return CHAR/VARCHAR type. We should have replaced CHAR/VARCHAR with STRING before the data type flows into the query engine, however, `OrcUtils` is missed.

When reading ORC files, at the task side we will read the real schema from ORC file metadata, then apply filter pushdown. For some reason, the implementation turns ORC schema to Spark schema before filter pushdown, and this step does not strip CHAR/VARCHAR. Note, for Parquet we use the Parquet schema directly in filter pushdown, and do not this have problem.

This PR proposes to replace the CHAR/VARCHAR with STRING when turning ORC schema to Spark schema.

### Why are the changes needed?

a more directly bug fix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33030 from cloud-fan/help.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Wenchen Fan 2021-06-22 13:50:49 -07:00 committed by Dongjoon Hyun
parent c418803df7
commit a2c1a55b1f
3 changed files with 12 additions and 8 deletions

View file

@ -142,7 +142,7 @@ private[sql] object OrcFilters extends OrcFiltersBase {
case BooleanType => PredicateLeaf.Type.BOOLEAN
case ByteType | ShortType | IntegerType | LongType => PredicateLeaf.Type.LONG
case FloatType | DoubleType => PredicateLeaf.Type.FLOAT
case StringType | _: CharType | _: VarcharType => PredicateLeaf.Type.STRING
case StringType => PredicateLeaf.Type.STRING
case DateType => PredicateLeaf.Type.DATE
case TimestampType => PredicateLeaf.Type.TIMESTAMP
case _: DecimalType => PredicateLeaf.Type.DECIMAL

View file

@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SPARK_VERSION_METADATA_KEY, SparkSession}
import org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.catalyst.util.{quoteIdentifier, CharVarcharUtils}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
import org.apache.spark.sql.types._
@ -84,6 +84,13 @@ object OrcUtils extends Logging {
}
}
private def toCatalystSchema(schema: TypeDescription): StructType = {
// The Spark query engine has not completely supported CHAR/VARCHAR type yet, and here we
// replace the orc CHAR/VARCHAR with STRING type.
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType])
}
def readSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String])
: Option[StructType] = {
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
@ -91,7 +98,7 @@ object OrcUtils extends Logging {
files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
toCatalystSchema(schema)
}
}
@ -100,8 +107,7 @@ object OrcUtils extends Logging {
conf: Configuration,
ignoreCorruptFiles: Boolean): Option[StructType] = {
readSchema(file, conf, ignoreCorruptFiles) match {
case Some(schema) =>
Some(CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType])
case Some(schema) => Some(toCatalystSchema(schema))
case None =>
// Field names is empty or `FileFormatException` was thrown but ignoreCorruptFiles is true.
@ -116,8 +122,7 @@ object OrcUtils extends Logging {
def readOrcSchemasInParallel(
files: Seq[FileStatus], conf: Configuration, ignoreCorruptFiles: Boolean): Seq[StructType] = {
ThreadUtils.parmap(files, "readingOrcSchemas", 8) { currentFile =>
OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles)
.map(s => CatalystSqlParser.parseDataType(s.toString).asInstanceOf[StructType])
OrcUtils.readSchema(currentFile.getPath, conf, ignoreCorruptFiles).map(toCatalystSchema)
}.flatten
}

View file

@ -52,7 +52,6 @@ class HiveCharVarcharTestSuite extends CharVarcharTestSuite with TestHiveSinglet
}
test("SPARK-35700: Read char/varchar orc table with created and written by external systems") {
withTable("t") {
hiveClient.runSqlHive("CREATE TABLE t(c CHAR(5), v VARCHAR(7)) STORED AS ORC")
hiveClient.runSqlHive("INSERT INTO t VALUES('Spark', 'kyuubi')")