[SPARK-10113][SQL] Explicit error message for unsigned Parquet logical types

Parquet supports some unsigned datatypes. However, Since Spark does not support unsigned datatypes, it needs to emit an exception with a clear message rather then with the one saying illegal datatype.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #9646 from HyukjinKwon/SPARK-10113.
This commit is contained in:
hyukjinkwon 2015-11-12 12:29:50 -08:00 committed by Michael Armbrust
parent 4fe99c72c6
commit f5a9526fec
2 changed files with 31 additions and 0 deletions

View file

@ -108,6 +108,9 @@ private[parquet] class CatalystSchemaConverter(
def typeString =
if (originalType == null) s"$typeName" else s"$typeName ($originalType)"
def typeNotSupported() =
throw new AnalysisException(s"Parquet type not supported: $typeString")
def typeNotImplemented() =
throw new AnalysisException(s"Parquet type not yet supported: $typeString")
@ -142,6 +145,9 @@ private[parquet] class CatalystSchemaConverter(
case INT_32 | null => IntegerType
case DATE => DateType
case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT32)
case UINT_8 => typeNotSupported()
case UINT_16 => typeNotSupported()
case UINT_32 => typeNotSupported()
case TIME_MILLIS => typeNotImplemented()
case _ => illegalType()
}
@ -150,6 +156,7 @@ private[parquet] class CatalystSchemaConverter(
originalType match {
case INT_64 | null => LongType
case DECIMAL => makeDecimalType(MAX_PRECISION_FOR_INT64)
case UINT_64 => typeNotSupported()
case TIMESTAMP_MILLIS => typeNotImplemented()
case _ => illegalType()
}

View file

@ -206,6 +206,30 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
test("SPARK-10113 Support for unsigned Parquet logical types") {
val parquetSchema = MessageTypeParser.parseMessageType(
"""message root {
| required int32 c(UINT_32);
|}
""".stripMargin)
withTempPath { location =>
val extraMetadata = Map.empty[String, String].asJava
val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark")
val path = new Path(location.getCanonicalPath)
val footer = List(
new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList()))
).asJava
ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer)
val errorMessage = intercept[Throwable] {
sqlContext.read.parquet(path.toString).printSchema()
}.toString
assert(errorMessage.contains("Parquet type not supported"))
}
}
test("compression codec") {
def compressionCodecFor(path: String, codecName: String): String = {
val codecs = for {