[SPARK-35064][SQL] Group error in spark-catalyst

### What changes were proposed in this pull request?
This PR group exception messages in sql/catalyst/src/main/scala/org/apache/spark/sql (except catalyst)

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce any user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32916 from dgd-contributor/SPARK-35064_catalyst_group_error.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
dgd-contributor 2021-06-28 07:21:24 +00:00 committed by Wenchen Fan
parent 378ac78bdf
commit 1c81ad2029
18 changed files with 219 additions and 75 deletions

View file

@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal
import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
import org.apache.spark.sql.catalyst.expressions.{BoundReference, Cast}
import org.apache.spark.sql.catalyst.expressions.objects.{DecodeUsingSerializer, EncodeUsingSerializer}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types._
/**
@ -224,16 +225,14 @@ object Encoders {
/** Throws an exception if T is not a public class. */
private def validatePublicClass[T: ClassTag](): Unit = {
if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) {
throw new UnsupportedOperationException(
s"${classTag[T].runtimeClass.getName} is not a public class. " +
"Only public classes are supported.")
throw QueryExecutionErrors.notPublicClassError(classTag[T].runtimeClass.getName)
}
}
/** A way to construct encoders using generic serializers. */
private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
if (classTag[T].runtimeClass.isPrimitive) {
throw new UnsupportedOperationException("Primitive types are not supported.")
throw QueryExecutionErrors.primitiveTypesNotSupportedError()
}
validatePublicClass[T]()

View file

@ -33,6 +33,7 @@ import org.apache.spark.annotation.{Stable, Unstable}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
@ -375,7 +376,7 @@ trait Row extends Serializable {
* @throws IllegalArgumentException when a field `name` does not exist.
*/
def fieldIndex(name: String): Int = {
throw new UnsupportedOperationException("fieldIndex on a Row without schema is undefined.")
throw QueryExecutionErrors.fieldIndexOnRowWithoutSchemaError()
}
/**
@ -520,7 +521,7 @@ trait Row extends Serializable {
* @throws NullPointerException when value is null.
*/
private def getAnyValAs[T <: AnyVal](i: Int): T =
if (isNullAt(i)) throw new NullPointerException(s"Value at index $i is null")
if (isNullAt(i)) throw QueryExecutionErrors.valueIsNullError(i)
else getAs[T](i)
/**

View file

@ -21,8 +21,8 @@ import scala.collection.mutable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
/**
@ -103,9 +103,9 @@ class CatalogManager(
case _ if isSessionCatalog(currentCatalog) && namespace.length == 1 =>
v1SessionCatalog.setCurrentDatabase(namespace.head)
case _ if isSessionCatalog(currentCatalog) =>
throw new NoSuchNamespaceException(namespace)
throw QueryCompilationErrors.noSuchNamespaceError(namespace)
case catalog: SupportsNamespaces if !catalog.namespaceExists(namespace) =>
throw new NoSuchNamespaceException(namespace)
throw QueryCompilationErrors.noSuchNamespaceError(namespace)
case _ =>
_currentNamespace = Some(namespace)
}

View file

@ -23,6 +23,7 @@ import java.util.NoSuchElementException
import java.util.regex.Pattern
import org.apache.spark.SparkException
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
@ -47,35 +48,32 @@ private[sql] object Catalogs {
conf.getConfString("spark.sql.catalog." + name)
} catch {
case _: NoSuchElementException =>
throw new CatalogNotFoundException(
s"Catalog '$name' plugin class not found: spark.sql.catalog.$name is not defined")
throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)
}
val loader = Utils.getContextOrSparkClassLoader
try {
val pluginClass = loader.loadClass(pluginClassName)
if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) {
throw new SparkException(
s"Plugin class for catalog '$name' does not implement CatalogPlugin: $pluginClassName")
throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName)
}
val plugin = pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin]
plugin.initialize(name, catalogOptions(name, conf))
plugin
} catch {
case _: ClassNotFoundException =>
throw new SparkException(
s"Cannot find catalog plugin class for catalog '$name': $pluginClassName")
throw QueryExecutionErrors.catalogPluginClassNotFoundForCatalogError(name, pluginClassName)
case e: NoSuchMethodException =>
throw new SparkException(
s"Failed to find public no-arg constructor for catalog '$name': $pluginClassName)", e)
throw QueryExecutionErrors.catalogFailToFindPublicNoArgConstructorError(
name, pluginClassName, e)
case e: IllegalAccessException =>
throw new SparkException(
s"Failed to call public no-arg constructor for catalog '$name': $pluginClassName)", e)
throw QueryExecutionErrors.catalogFailToCallPublicNoArgConstructorError(
name, pluginClassName, e)
case e: InstantiationException =>
throw new SparkException("Cannot instantiate abstract catalog plugin class for " +
s"catalog '$name': $pluginClassName", e.getCause)
throw QueryExecutionErrors.cannotInstantiateAbstractCatalogPluginClassError(
name, pluginClassName, e)
case e: InvocationTargetException =>
throw new SparkException("Failed during instantiating constructor for catalog " +
s"'$name': $pluginClassName", e.getCause)
throw QueryExecutionErrors.failedToInstantiateConstructorForCatalogError(
name, pluginClassName, e)
}
}

View file

@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, UnboundFunction}
import org.apache.spark.sql.connector.expressions.{NamedReference, Transform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LEGACY_CTE_PRECEDENCE_POLICY
import org.apache.spark.sql.internal.SQLConf.{LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED, LEGACY_CTE_PRECEDENCE_POLICY}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types._
@ -1647,4 +1647,51 @@ private[spark] object QueryCompilationErrors {
def invalidYearMonthIntervalType(startFieldName: String, endFieldName: String): Throwable = {
new AnalysisException(s"'interval $startFieldName to $endFieldName' is invalid.")
}
def configRemovedInVersionError(
configName: String,
version: String,
comment: String): Throwable = {
new AnalysisException(
s"The SQL config '$configName' was removed in the version $version. $comment")
}
def failedFallbackParsingError(msg: String, e1: Throwable, e2: Throwable): Throwable = {
new AnalysisException(s"$msg${e1.getMessage}\nFailed fallback parsing: ${e2.getMessage}",
cause = Some(e1.getCause))
}
def decimalCannotGreaterThanPrecisionError(scale: Int, precision: Int): Throwable = {
new AnalysisException(s"Decimal scale ($scale) cannot be greater than precision ($precision).")
}
def decimalOnlySupportPrecisionUptoError(decimalType: String, precision: Int): Throwable = {
new AnalysisException(s"$decimalType can only support precision up to $precision")
}
def negativeScaleNotAllowedError(scale: Int): Throwable = {
new AnalysisException(
s"""|Negative scale is not allowed: $scale.
|You can use ${LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED.key}=true
|to enable legacy mode to allow it.""".stripMargin.replaceAll("\n", " "))
}
def invalidPartitionColumnKeyInTableError(key: String, tblName: String): Throwable = {
new AnalysisException(s"$key is not a valid partition column in table $tblName.")
}
def invalidPartitionSpecError(
specKeys: String,
partitionColumnNames: Seq[String],
tableName: String): Throwable = {
new AnalysisException(
s"""|Partition spec is invalid. The spec ($specKeys) must match
|the partition spec (${partitionColumnNames.mkString(", ")}) defined in
|table '$tableName'""".stripMargin.replaceAll("\n", " "))
}
def foundDuplicateColumnError(colType: String, duplicateCol: Seq[String]): Throwable = {
new AnalysisException(
s"Found duplicate column(s) $colType: ${duplicateCol.sorted.mkString(", ")}")
}
}

View file

@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{DomainJoin, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ValueInterval
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.{sideBySide, BadRecordException, FailFastMode}
import org.apache.spark.sql.connector.catalog.CatalogNotFoundException
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.Transform
@ -752,8 +753,8 @@ object QueryExecutionErrors {
new IllegalArgumentException(s"Could not compare cost with $cost")
}
def unsupportedDataTypeError(dt: DataType): Throwable = {
new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}")
def unsupportedDataTypeError(dt: String): Throwable = {
new UnsupportedOperationException(s"Unsupported data type: ${dt}")
}
def notSupportTypeError(dataType: DataType): Throwable = {
@ -1428,4 +1429,111 @@ object QueryExecutionErrors {
def invalidStreamingOutputModeError(outputMode: Option[OutputMode]): Throwable = {
new UnsupportedOperationException(s"Invalid output mode: $outputMode")
}
def catalogPluginClassNotFoundError(name: String): Throwable = {
new CatalogNotFoundException(
s"Catalog '$name' plugin class not found: spark.sql.catalog.$name is not defined")
}
def catalogPluginClassNotImplementedError(name: String, pluginClassName: String): Throwable = {
new SparkException(
s"Plugin class for catalog '$name' does not implement CatalogPlugin: $pluginClassName")
}
def catalogPluginClassNotFoundForCatalogError(
name: String,
pluginClassName: String): Throwable = {
new SparkException(s"Cannot find catalog plugin class for catalog '$name': $pluginClassName")
}
def catalogFailToFindPublicNoArgConstructorError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
new SparkException(
s"Failed to find public no-arg constructor for catalog '$name': $pluginClassName)", e)
}
def catalogFailToCallPublicNoArgConstructorError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
new SparkException(
s"Failed to call public no-arg constructor for catalog '$name': $pluginClassName)", e)
}
def cannotInstantiateAbstractCatalogPluginClassError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
new SparkException("Cannot instantiate abstract catalog plugin class for " +
s"catalog '$name': $pluginClassName", e.getCause)
}
def failedToInstantiateConstructorForCatalogError(
name: String,
pluginClassName: String,
e: Exception): Throwable = {
new SparkException("Failed during instantiating constructor for catalog " +
s"'$name': $pluginClassName", e.getCause)
}
def noSuchElementExceptionError(): Throwable = {
new NoSuchElementException
}
def noSuchElementExceptionError(key: String): Throwable = {
new NoSuchElementException(key)
}
def cannotMutateReadOnlySQLConfError(): Throwable = {
new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
}
def cannotCloneOrCopyReadOnlySQLConfError(): Throwable = {
new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.")
}
def cannotGetSQLConfInSchedulerEventLoopThreadError(): Throwable = {
new RuntimeException("Cannot get SQLConf inside scheduler event loop thread.")
}
def unsupportedOperationExceptionError(): Throwable = {
new UnsupportedOperationException
}
def nullLiteralsCannotBeCastedError(name: String): Throwable = {
new UnsupportedOperationException(s"null literals can't be casted to $name")
}
def notUserDefinedTypeError(name: String, userClass: String): Throwable = {
new SparkException(s"$name is not an UserDefinedType. Please make sure registering " +
s"an UserDefinedType for ${userClass}")
}
def cannotLoadUserDefinedTypeError(name: String, userClass: String): Throwable = {
new SparkException(s"Can not load in UserDefinedType ${name} for user class ${userClass}.")
}
def timeZoneIdNotSpecifiedForTimestampTypeError(): Throwable = {
new UnsupportedOperationException(
s"${TimestampType.catalogString} must supply timeZoneId parameter")
}
def notPublicClassError(name: String): Throwable = {
new UnsupportedOperationException(
s"$name is not a public class. Only public classes are supported.")
}
def primitiveTypesNotSupportedError(): Throwable = {
new UnsupportedOperationException("Primitive types are not supported.")
}
def fieldIndexOnRowWithoutSchemaError(): Throwable = {
new UnsupportedOperationException("fieldIndex on a Row without schema is undefined.")
}
def valueIsNullError(index: Int): Throwable = {
new NullPointerException(s"Value at index $index is null")
}
}

View file

@ -17,9 +17,8 @@
package org.apache.spark.sql.execution
import java.util.NoSuchElementException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
* An internal iterator interface which presents a more restrictive API than
@ -71,7 +70,7 @@ private final class RowIteratorToScala(val rowIter: RowIterator) extends Iterato
_hasNext
}
override def next(): InternalRow = {
if (!hasNext) throw new NoSuchElementException
if (!hasNext) throw QueryExecutionErrors.noSuchElementExceptionError()
hasNextWasCalled = false
rowIter.getRow
}

View file

@ -21,6 +21,7 @@ import java.util.{Map => JMap}
import org.apache.spark.TaskContext
import org.apache.spark.internal.config.{ConfigEntry, ConfigProvider, ConfigReader}
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
* A readonly SQLConf that will be created by tasks running at the executor side. It reads the
@ -37,27 +38,27 @@ class ReadOnlySQLConf(context: TaskContext) extends SQLConf {
}
override protected def setConfWithCheck(key: String, value: String): Unit = {
throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
throw QueryExecutionErrors.cannotMutateReadOnlySQLConfError()
}
override def unsetConf(key: String): Unit = {
throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
throw QueryExecutionErrors.cannotMutateReadOnlySQLConfError()
}
override def unsetConf(entry: ConfigEntry[_]): Unit = {
throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
throw QueryExecutionErrors.cannotMutateReadOnlySQLConfError()
}
override def clear(): Unit = {
throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
throw QueryExecutionErrors.cannotMutateReadOnlySQLConfError()
}
override def clone(): SQLConf = {
throw new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.")
throw QueryExecutionErrors.cannotCloneOrCopyReadOnlySQLConfError()
}
override def copy(entries: (ConfigEntry[_], Any)*): SQLConf = {
throw new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.")
throw QueryExecutionErrors.cannotCloneOrCopyReadOnlySQLConfError()
}
}

View file

@ -36,7 +36,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.{IGNORE_MISSING_FILES => SPARK_IGNORE_MISSING_FILES}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
@ -44,6 +43,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils
@ -217,7 +217,7 @@ object SQLConf {
if (conf != null) {
conf
} else if (Utils.isTesting) {
throw new RuntimeException("Cannot get SQLConf inside scheduler event loop thread.")
throw QueryExecutionErrors.cannotGetSQLConfInSchedulerEventLoopThreadError()
} else {
confGetter.get()()
}
@ -3999,7 +3999,7 @@ class SQLConf extends Serializable with Logging {
// Try to use the default value
Option(getConfigEntry(key)).map { e => e.stringConverter(e.readFrom(reader)) }
}.
getOrElse(throw new NoSuchElementException(key))
getOrElse(throw QueryExecutionErrors.noSuchElementExceptionError(key))
}
/**
@ -4132,8 +4132,7 @@ class SQLConf extends Serializable with Logging {
SQLConf.removedSQLConfigs.get(key).foreach {
case RemovedConfig(configName, version, defaultValue, comment) =>
if (value != defaultValue) {
throw new AnalysisException(
s"The SQL config '$configName' was removed in the version $version. $comment")
throw QueryCompilationErrors.configRemovedInVersionError(configName, version, comment)
}
}
}

View file

@ -21,6 +21,7 @@ import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
* A non-concrete data type, reserved for internal uses.
@ -112,7 +113,8 @@ protected[sql] object AnyDataType extends AbstractDataType with Serializable {
// Note that since AnyDataType matches any concrete types, defaultConcreteType should never
// be invoked.
override private[sql] def defaultConcreteType: DataType = throw new UnsupportedOperationException
override private[sql] def defaultConcreteType: DataType =
throw QueryExecutionErrors.unsupportedOperationExceptionError()
override private[sql] def simpleString: String = "any"

View file

@ -28,12 +28,12 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.DataTypeJsonUtils.{DataTypeJsonDeserializer, DataTypeJsonSerializer}
import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy.{ANSI, STRICT}
@ -161,9 +161,7 @@ object DataType {
fallbackParser(schema)
} catch {
case NonFatal(e2) =>
throw new AnalysisException(
message = s"$errorMsg${e1.getMessage}\nFailed fallback parsing: ${e2.getMessage}",
cause = Some(e1.getCause))
throw QueryCompilationErrors.failedFallbackParsingError(errorMsg, e1, e2)
}
}
}

View file

@ -22,8 +22,8 @@ import java.util.Locale
import scala.reflect.runtime.universe.typeTag
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
/**
@ -45,13 +45,12 @@ case class DecimalType(precision: Int, scale: Int) extends FractionalType {
DecimalType.checkNegativeScale(scale)
if (scale > precision) {
throw new AnalysisException(
s"Decimal scale ($scale) cannot be greater than precision ($precision).")
throw QueryCompilationErrors.decimalCannotGreaterThanPrecisionError(scale, precision)
}
if (precision > DecimalType.MAX_PRECISION) {
throw new AnalysisException(
s"${DecimalType.simpleString} can only support precision up to ${DecimalType.MAX_PRECISION}")
throw QueryCompilationErrors.decimalOnlySupportPrecisionUptoError(
DecimalType.simpleString, DecimalType.MAX_PRECISION)
}
// default constructor for Java
@ -158,9 +157,7 @@ object DecimalType extends AbstractDataType {
private[sql] def checkNegativeScale(scale: Int): Unit = {
if (scale < 0 && !SQLConf.get.allowNegativeScaleOfDecimalEnabled) {
throw new AnalysisException(s"Negative scale is not allowed: $scale. " +
s"You can use spark.sql.legacy.allowNegativeScaleOfDecimal=true " +
s"to enable legacy mode to allow it.")
throw QueryCompilationErrors.negativeScaleNotAllowedError(scale)
}
}

View file

@ -17,10 +17,11 @@
package org.apache.spark.sql.types
import org.apache.spark.sql.errors.QueryExecutionErrors
object ObjectType extends AbstractDataType {
override private[sql] def defaultConcreteType: DataType =
throw new UnsupportedOperationException(
s"null literals can't be casted to ${ObjectType.simpleString}")
throw QueryExecutionErrors.nullLiteralsCannotBeCastedError(ObjectType.simpleString)
override private[sql] def acceptsType(other: DataType): Boolean = other match {
case ObjectType(_) => true

View file

@ -19,9 +19,9 @@ package org.apache.spark.sql.types
import scala.collection.mutable
import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.util.Utils
/**
@ -77,13 +77,10 @@ object UDTRegistration extends Serializable with Logging {
if (classOf[UserDefinedType[_]].isAssignableFrom(udtClass)) {
udtClass
} else {
throw new SparkException(
s"${udtClass.getName} is not an UserDefinedType. Please make sure registering " +
s"an UserDefinedType for ${userClass}")
throw QueryExecutionErrors.notUserDefinedTypeError(udtClass.getName, userClass)
}
} else {
throw new SparkException(
s"Can not load in UserDefinedType ${udtClassName} for user class ${userClass}.")
throw QueryExecutionErrors.cannotLoadUserDefinedTypeError(udtClassName, userClass)
}
}
}

View file

@ -24,6 +24,7 @@ import org.apache.arrow.vector.complex.MapVector
import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, IntervalUnit, TimeUnit}
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@ -48,8 +49,7 @@ private[sql] object ArrowUtils {
case DateType => new ArrowType.Date(DateUnit.DAY)
case TimestampType =>
if (timeZoneId == null) {
throw new UnsupportedOperationException(
s"${TimestampType.catalogString} must supply timeZoneId parameter")
throw QueryExecutionErrors.timeZoneIdNotSpecifiedForTimestampTypeError()
} else {
new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId)
}
@ -57,7 +57,7 @@ private[sql] object ArrowUtils {
case _: YearMonthIntervalType => new ArrowType.Interval(IntervalUnit.YEAR_MONTH)
case _: DayTimeIntervalType => new ArrowType.Interval(IntervalUnit.DAY_TIME)
case _ =>
throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}")
throw QueryExecutionErrors.unsupportedDataTypeError(dt.catalogString)
}
def fromArrowType(dt: ArrowType): DataType = dt match {
@ -78,7 +78,7 @@ private[sql] object ArrowUtils {
case ArrowType.Null.INSTANCE => NullType
case yi: ArrowType.Interval if yi.getUnit == IntervalUnit.YEAR_MONTH => YearMonthIntervalType()
case di: ArrowType.Interval if di.getUnit == IntervalUnit.DAY_TIME => DayTimeIntervalType()
case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dt")
case _ => throw QueryExecutionErrors.unsupportedDataTypeError(dt.toString)
}
/** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */

View file

@ -17,12 +17,12 @@
package org.apache.spark.sql.util
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{CharType, StructType, VarcharType}
import org.apache.spark.unsafe.types.UTF8String
@ -42,7 +42,7 @@ private[sql] object PartitioningUtils {
val rawSchema = CharVarcharUtils.getRawSchema(partCols)
val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) =>
val normalizedFiled = rawSchema.find(f => resolver(f.name, key)).getOrElse {
throw new AnalysisException(s"$key is not a valid partition column in table $tblName.")
throw QueryCompilationErrors.invalidPartitionColumnKeyInTableError(key, tblName)
}
val normalizedVal =
@ -92,10 +92,8 @@ private[sql] object PartitioningUtils {
partitionColumnNames: Seq[String]): Unit = {
val defined = partitionColumnNames.sorted
if (spec.keys.toSeq.sorted != defined) {
throw new AnalysisException(
s"Partition spec is invalid. The spec (${spec.keys.mkString(", ")}) must match " +
s"the partition spec (${partitionColumnNames.mkString(", ")}) defined in " +
s"table '$tableName'")
throw QueryCompilationErrors.invalidPartitionSpecError(spec.keys.mkString(", "),
partitionColumnNames, tableName)
}
}
}

View file

@ -22,7 +22,7 @@ import java.util.Locale
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, NamedTransform, Transform}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}
@ -119,8 +119,7 @@ private[spark] object SchemaUtils {
val duplicateColumns = names.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => s"`$x`"
}
throw new AnalysisException(
s"Found duplicate column(s) $colType: ${duplicateColumns.toSeq.sorted.mkString(", ")}")
throw QueryCompilationErrors.foundDuplicateColumnError(colType, duplicateColumns.toSeq)
}
}

View file

@ -78,7 +78,7 @@ object ArrowWriter {
case (_: YearMonthIntervalType, vector: IntervalYearVector) => new IntervalYearWriter(vector)
case (_: DayTimeIntervalType, vector: IntervalDayVector) => new IntervalDayWriter(vector)
case (dt, _) =>
throw QueryExecutionErrors.unsupportedDataTypeError(dt)
throw QueryExecutionErrors.unsupportedDataTypeError(dt.catalogString)
}
}
}