[SPARK-35020][SQL] Group exception messages in catalyst/util

### What changes were proposed in this pull request?
This PR group exception messages in `sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util`.

### Why are the changes needed?
It will largely help with standardization of error messages and its maintenance.

### Does this PR introduce _any_ user-facing change?
No. Error messages remain unchanged.

### How was this patch tested?
No new tests - pass all original tests to make sure it doesn't break any existing behavior.

Closes #32367 from beliefer/SPARK-35020.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
gengjiaan 2021-05-07 08:30:30 +00:00 committed by Wenchen Fan
parent e83910f1f8
commit cf2c4ba584
7 changed files with 77 additions and 32 deletions

View file

@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.util
import scala.collection.mutable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.array.ByteArrayMethods
@ -52,24 +53,20 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
def put(key: Any, value: Any): Unit = {
if (key == null) {
throw new RuntimeException("Cannot use null as map key.")
throw QueryExecutionErrors.nullAsMapKeyNotAllowedError()
}
val index = keyToIndex.getOrDefault(key, -1)
if (index == -1) {
if (size >= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
throw new RuntimeException(s"Unsuccessful attempt to build maps with $size elements " +
s"due to exceeding the map size limit ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
throw QueryExecutionErrors.exceedMapSizeLimitError(size)
}
keyToIndex.put(key, values.length)
keys.append(key)
values.append(value)
} else {
if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.EXCEPTION.toString) {
throw new RuntimeException(s"Duplicate map key $key was found, please check the input " +
"data. If you want to remove the duplicated keys, you can set " +
s"${SQLConf.MAP_KEY_DEDUP_POLICY.key} to ${SQLConf.MapKeyDedupPolicy.LAST_WIN} so that " +
"the key inserted at last takes precedence.")
throw QueryExecutionErrors.duplicateMapKeyFoundError(key)
} else if (mapKeyDedupPolicy == SQLConf.MapKeyDedupPolicy.LAST_WIN.toString) {
// Overwrite the previous value, as the policy is last wins.
values(index) = value
@ -82,15 +79,14 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
// write a 2-field row, the first field is key and the second field is value.
def put(entry: InternalRow): Unit = {
if (entry.isNullAt(0)) {
throw new RuntimeException("Cannot use null as map key.")
throw QueryExecutionErrors.nullAsMapKeyNotAllowedError()
}
put(keyGetter(entry, 0), valueGetter(entry, 1))
}
def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
if (keyArray.numElements() != valueArray.numElements()) {
throw new RuntimeException(
"The key array and value array of MapData must have the same length.")
throw QueryExecutionErrors.mapDataKeyArrayLengthDiffersFromValueArrayLengthError()
}
var i = 0

View file

@ -25,8 +25,8 @@ import java.util.{Date, Locale}
import com.google.common.cache.CacheBuilder
import org.apache.spark.SparkUpgradeException
import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
@ -45,8 +45,8 @@ trait DateTimeFormatterHelper {
val actual = accessor.get(field)
val expected = candidate.get(field)
if (actual != expected) {
throw new DateTimeException(s"Conflict found: Field $field $actual differs from" +
s" $field $expected derived from $candidate")
throw QueryExecutionErrors.fieldDiffersFromDerivedLocalDateError(
field, actual, expected, candidate)
}
}
}
@ -145,9 +145,7 @@ trait DateTimeFormatterHelper {
} catch {
case _: Throwable => throw e
}
throw new SparkUpgradeException("3.0", s"Fail to parse '$s' in the new parser. You can " +
s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior " +
s"before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.", e)
throw QueryExecutionErrors.failToParseDateTimeInNewParserError(s, e)
}
// When legacy time parser policy set to EXCEPTION, check whether we will get different results
@ -163,10 +161,7 @@ trait DateTimeFormatterHelper {
} catch {
case _: Throwable => throw e
}
throw new SparkUpgradeException("3.0", s"Fail to format it to '$resultCandidate' in the new" +
s" formatter. You can set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore" +
" the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid" +
" datetime string.", e)
throw QueryExecutionErrors.failToFormatDateTimeInNewFormatterError(resultCandidate, e)
}
/**
@ -189,11 +184,7 @@ trait DateTimeFormatterHelper {
} catch {
case _: Throwable => throw e
}
throw new SparkUpgradeException("3.0", s"Fail to recognize '$pattern' pattern in the" +
s" DateTimeFormatter. 1) You can set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY" +
s" to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern" +
s" with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html",
e)
throw QueryExecutionErrors.failToRecognizePatternInDateTimeFormatterError(pattern, e)
}
}

View file

@ -30,7 +30,8 @@ import sun.util.calendar.ZoneInfo
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.types.Decimal
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.types.{DateType, Decimal, TimestampType}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
/**
@ -369,7 +370,7 @@ object DateTimeUtils {
def stringToTimestampAnsi(s: UTF8String, timeZoneId: ZoneId): Long = {
stringToTimestamp(s, timeZoneId).getOrElse {
throw new DateTimeException(s"Cannot cast $s to TimestampType.")
throw QueryExecutionErrors.cannotCastUTF8StringToDataTypeError(s, TimestampType)
}
}
@ -468,7 +469,7 @@ object DateTimeUtils {
def stringToDateAnsi(s: UTF8String, zoneId: ZoneId): Int = {
stringToDate(s, zoneId).getOrElse {
throw new DateTimeException(s"Cannot cast $s to DateType.")
throw QueryExecutionErrors.cannotCastUTF8StringToDataTypeError(s, DateType)
}
}

View file

@ -26,6 +26,7 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.millisToMicros
import org.apache.spark.sql.catalyst.util.IntervalStringStyles.{ANSI_STYLE, HIVE_STYLE, IntervalStyle}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.Decimal
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
@ -584,7 +585,7 @@ object IntervalUtils {
* @throws ArithmeticException if the result overflows any field value or divided by zero
*/
def divideExact(interval: CalendarInterval, num: Double): CalendarInterval = {
if (num == 0) throw new ArithmeticException("divide by zero")
if (num == 0) throw QueryExecutionErrors.divideByZeroError()
fromDoubles(interval.months / num, interval.days / num, interval.microseconds / num)
}

View file

@ -17,9 +17,9 @@
package org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.RowOrdering
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types._
/**
@ -110,7 +110,7 @@ object TypeUtils {
def failWithIntervalType(dataType: DataType): Unit = {
invokeOnceForInterval(dataType) {
throw new AnalysisException("Cannot use interval type in the table schema.")
throw QueryCompilationErrors.cannotUseIntervalTypeInTableSchemaError()
}
}

View file

@ -1351,4 +1351,8 @@ private[spark] object QueryCompilationErrors {
new AnalysisException(
s"Ambiguous field name: $fieldName. Found multiple columns that can match: $names")
}
def cannotUseIntervalTypeInTableSchemaError(): Throwable = {
new AnalysisException("Cannot use interval type in the table schema.")
}
}

View file

@ -20,7 +20,8 @@ package org.apache.spark.sql.errors
import java.io.{FileNotFoundException, IOException}
import java.net.URISyntaxException
import java.sql.{SQLException, SQLFeatureNotSupportedException}
import java.time.DateTimeException
import java.time.{DateTimeException, LocalDate}
import java.time.temporal.ChronoField
import org.apache.hadoop.fs.{FileStatus, Path}
import org.codehaus.commons.compiler.CompileException
@ -823,4 +824,55 @@ object QueryExecutionErrors {
new SparkException(s"Failed to merge incompatible data types ${left.catalogString}" +
s" and ${right.catalogString}")
}
def exceedMapSizeLimitError(size: Int): Throwable = {
new RuntimeException(s"Unsuccessful attempt to build maps with $size elements " +
s"due to exceeding the map size limit ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
}
def duplicateMapKeyFoundError(key: Any): Throwable = {
new RuntimeException(s"Duplicate map key $key was found, please check the input " +
"data. If you want to remove the duplicated keys, you can set " +
s"${SQLConf.MAP_KEY_DEDUP_POLICY.key} to ${SQLConf.MapKeyDedupPolicy.LAST_WIN} so that " +
"the key inserted at last takes precedence.")
}
def mapDataKeyArrayLengthDiffersFromValueArrayLengthError(): Throwable = {
new RuntimeException("The key array and value array of MapData must have the same length.")
}
def fieldDiffersFromDerivedLocalDateError(
field: ChronoField, actual: Int, expected: Int, candidate: LocalDate): Throwable = {
new DateTimeException(s"Conflict found: Field $field $actual differs from" +
s" $field $expected derived from $candidate")
}
def failToParseDateTimeInNewParserError(s: String, e: Throwable): Throwable = {
new SparkUpgradeException("3.0", s"Fail to parse '$s' in the new parser. You can " +
s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior " +
s"before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.", e)
}
def failToFormatDateTimeInNewFormatterError(
resultCandidate: String, e: Throwable): Throwable = {
new SparkUpgradeException("3.0",
s"""
|Fail to format it to '$resultCandidate' in the new formatter. You can set
|${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior before
|Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.
""".stripMargin.replaceAll("\n", " "), e)
}
def failToRecognizePatternInDateTimeFormatterError(
pattern: String, e: Throwable): Throwable = {
new SparkUpgradeException("3.0", s"Fail to recognize '$pattern' pattern in the" +
s" DateTimeFormatter. 1) You can set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY" +
s" to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern" +
s" with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html",
e)
}
def cannotCastUTF8StringToDataTypeError(s: UTF8String, to: DataType): Throwable = {
new DateTimeException(s"Cannot cast $s to $to.")
}
}