[SPARK-8074] Parquet should throw AnalysisException during setup for data type/name related failures.
Author: Reynold Xin <rxin@databricks.com>
Closes #6608 from rxin/parquet-analysis and squashes the following commits:
b5dc8e2 [Reynold Xin] Code review feedback.
5617cf6 [Reynold Xin] [SPARK-8074] Parquet should throw AnalysisException during setup for data type/name related failures.
(cherry picked from commit 939e4f3d8d
)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This commit is contained in:
parent
f67a27d026
commit
1f90a06bda
|
@ -19,7 +19,7 @@ package org.apache.spark.sql.parquet
|
||||||
|
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
|
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.JavaConversions._
|
||||||
import scala.util.Try
|
import scala.util.Try
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
|
@ -33,12 +33,11 @@ import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeNa
|
||||||
import parquet.schema.Type.Repetition
|
import parquet.schema.Type.Repetition
|
||||||
import parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes}
|
import parquet.schema.{ConversionPatterns, DecimalMetadata, GroupType => ParquetGroupType, MessageType, OriginalType => ParquetOriginalType, PrimitiveType => ParquetPrimitiveType, Type => ParquetType, Types => ParquetTypes}
|
||||||
|
|
||||||
|
import org.apache.spark.Logging
|
||||||
|
import org.apache.spark.sql.AnalysisException
|
||||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
|
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
import org.apache.spark.{Logging, SparkException}
|
|
||||||
|
|
||||||
// Implicits
|
|
||||||
import scala.collection.JavaConversions._
|
|
||||||
|
|
||||||
/** A class representing Parquet info fields we care about, for passing back to Parquet */
|
/** A class representing Parquet info fields we care about, for passing back to Parquet */
|
||||||
private[parquet] case class ParquetTypeInfo(
|
private[parquet] case class ParquetTypeInfo(
|
||||||
|
@ -73,13 +72,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
|
||||||
case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
|
case ParquetPrimitiveTypeName.INT96 if int96AsTimestamp => TimestampType
|
||||||
case ParquetPrimitiveTypeName.INT96 =>
|
case ParquetPrimitiveTypeName.INT96 =>
|
||||||
// TODO: add BigInteger type? TODO(andre) use DecimalType instead????
|
// TODO: add BigInteger type? TODO(andre) use DecimalType instead????
|
||||||
sys.error("Potential loss of precision: cannot convert INT96")
|
throw new AnalysisException("Potential loss of precision: cannot convert INT96")
|
||||||
case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|
case ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
|
||||||
if (originalType == ParquetOriginalType.DECIMAL && decimalInfo.getPrecision <= 18) =>
|
if (originalType == ParquetOriginalType.DECIMAL && decimalInfo.getPrecision <= 18) =>
|
||||||
// TODO: for now, our reader only supports decimals that fit in a Long
|
// TODO: for now, our reader only supports decimals that fit in a Long
|
||||||
DecimalType(decimalInfo.getPrecision, decimalInfo.getScale)
|
DecimalType(decimalInfo.getPrecision, decimalInfo.getScale)
|
||||||
case _ => sys.error(
|
case _ => throw new AnalysisException(s"Unsupported parquet datatype $parquetType")
|
||||||
s"Unsupported parquet datatype $parquetType")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -371,7 +369,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
|
||||||
parquetKeyType,
|
parquetKeyType,
|
||||||
parquetValueType)
|
parquetValueType)
|
||||||
}
|
}
|
||||||
case _ => sys.error(s"Unsupported datatype $ctype")
|
case _ => throw new AnalysisException(s"Unsupported datatype $ctype")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -403,7 +401,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
|
||||||
def convertFromString(string: String): Seq[Attribute] = {
|
def convertFromString(string: String): Seq[Attribute] = {
|
||||||
Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match {
|
Try(DataType.fromJson(string)).getOrElse(DataType.fromCaseClassString(string)) match {
|
||||||
case s: StructType => s.toAttributes
|
case s: StructType => s.toAttributes
|
||||||
case other => sys.error(s"Can convert $string to row")
|
case other => throw new AnalysisException(s"Can convert $string to row")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -411,8 +409,8 @@ private[parquet] object ParquetTypesConverter extends Logging {
|
||||||
// ,;{}()\n\t= and space character are special characters in Parquet schema
|
// ,;{}()\n\t= and space character are special characters in Parquet schema
|
||||||
schema.map(_.name).foreach { name =>
|
schema.map(_.name).foreach { name =>
|
||||||
if (name.matches(".*[ ,;{}()\n\t=].*")) {
|
if (name.matches(".*[ ,;{}()\n\t=].*")) {
|
||||||
sys.error(
|
throw new AnalysisException(
|
||||||
s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\n\t=".
|
s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
|
||||||
|Please use alias to rename it.
|
|Please use alias to rename it.
|
||||||
""".stripMargin.split("\n").mkString(" "))
|
""".stripMargin.split("\n").mkString(" "))
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.spark.broadcast.Broadcast
|
||||||
import org.apache.spark.deploy.SparkHadoopUtil
|
import org.apache.spark.deploy.SparkHadoopUtil
|
||||||
import org.apache.spark.rdd.RDD._
|
import org.apache.spark.rdd.RDD._
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
|
import org.apache.spark.sql.AnalysisException
|
||||||
import org.apache.spark.sql.sources._
|
import org.apache.spark.sql.sources._
|
||||||
import org.apache.spark.sql.types.{DataType, StructType}
|
import org.apache.spark.sql.types.{DataType, StructType}
|
||||||
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
|
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
|
||||||
|
@ -83,7 +84,7 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext
|
||||||
case partFilePattern(id) => id.toInt
|
case partFilePattern(id) => id.toInt
|
||||||
case name if name.startsWith("_") => 0
|
case name if name.startsWith("_") => 0
|
||||||
case name if name.startsWith(".") => 0
|
case name if name.startsWith(".") => 0
|
||||||
case name => sys.error(
|
case name => throw new AnalysisException(
|
||||||
s"Trying to write Parquet files to directory $outputPath, " +
|
s"Trying to write Parquet files to directory $outputPath, " +
|
||||||
s"but found items with illegal name '$name'.")
|
s"but found items with illegal name '$name'.")
|
||||||
}.reduceOption(_ max _).getOrElse(0)
|
}.reduceOption(_ max _).getOrElse(0)
|
||||||
|
@ -380,11 +381,12 @@ private[sql] class ParquetRelation2(
|
||||||
// time-consuming.
|
// time-consuming.
|
||||||
if (dataSchema == null) {
|
if (dataSchema == null) {
|
||||||
dataSchema = {
|
dataSchema = {
|
||||||
val dataSchema0 =
|
val dataSchema0 = maybeDataSchema
|
||||||
maybeDataSchema
|
.orElse(readSchema())
|
||||||
.orElse(readSchema())
|
.orElse(maybeMetastoreSchema)
|
||||||
.orElse(maybeMetastoreSchema)
|
.getOrElse(throw new AnalysisException(
|
||||||
.getOrElse(sys.error("Failed to get the schema."))
|
s"Failed to discover schema of Parquet file(s) in the following location(s):\n" +
|
||||||
|
paths.mkString("\n\t")))
|
||||||
|
|
||||||
// If this Parquet relation is converted from a Hive Metastore table, must reconcile case
|
// If this Parquet relation is converted from a Hive Metastore table, must reconcile case
|
||||||
// case insensitivity issue and possible schema mismatch (probably caused by schema
|
// case insensitivity issue and possible schema mismatch (probably caused by schema
|
||||||
|
|
Loading…
Reference in a new issue