[MINOR][SQL] Use SQLConf.resolver for caseSensitiveResolution/caseInsensitiveResolution
### What changes were proposed in this pull request? Use `SQLConf.resolver` for `caseSensitiveResolution`/`caseInsensitveResolution` instead of having a new method ### Why are the changes needed? remove redundant code ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing code Closes #34171 from huaxingao/minor. Authored-by: Huaxin Gao <huaxin_gao@apple.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
parent
38d39812c1
commit
167664896d
|
@ -29,14 +29,13 @@ import scala.util.control.NonFatal
|
|||
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
|
||||
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
|
||||
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionValueString
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
|
||||
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter}
|
||||
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.sql.util.SchemaUtils
|
||||
import org.apache.spark.unsafe.types.UTF8String
|
||||
|
@ -62,7 +61,7 @@ object PartitionSpec {
|
|||
val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionPath])
|
||||
}
|
||||
|
||||
object PartitioningUtils {
|
||||
object PartitioningUtils extends SQLConfHelper{
|
||||
|
||||
val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"
|
||||
|
||||
|
@ -491,7 +490,7 @@ object PartitioningUtils {
|
|||
val timestampTry = Try {
|
||||
val unescapedRaw = unescapePathName(raw)
|
||||
// the inferred data type is consistent with the default timestamp type
|
||||
val timestampType = SQLConf.get.timestampType
|
||||
val timestampType = conf.timestampType
|
||||
// try and parse the date, if no exception occurs this is a candidate to be resolved as
|
||||
// TimestampType or TimestampNTZType
|
||||
timestampType match {
|
||||
|
@ -556,7 +555,7 @@ object PartitioningUtils {
|
|||
SchemaUtils.checkColumnNameDuplication(
|
||||
partitionColumns, partitionColumns.mkString(", "), caseSensitive)
|
||||
|
||||
partitionColumnsSchema(schema, partitionColumns, caseSensitive).foreach {
|
||||
partitionColumnsSchema(schema, partitionColumns).foreach {
|
||||
field => field.dataType match {
|
||||
case _: AtomicType => // OK
|
||||
case _ => throw QueryCompilationErrors.cannotUseDataTypeForPartitionColumnError(field)
|
||||
|
@ -570,11 +569,9 @@ object PartitioningUtils {
|
|||
|
||||
def partitionColumnsSchema(
|
||||
schema: StructType,
|
||||
partitionColumns: Seq[String],
|
||||
caseSensitive: Boolean): StructType = {
|
||||
val equality = columnNameEquality(caseSensitive)
|
||||
partitionColumns: Seq[String]): StructType = {
|
||||
StructType(partitionColumns.map { col =>
|
||||
schema.find(f => equality(f.name, col)).getOrElse {
|
||||
schema.find(f => conf.resolver(f.name, col)).getOrElse {
|
||||
val schemaCatalog = schema.catalogString
|
||||
throw QueryCompilationErrors.partitionColumnNotFoundInSchemaError(col, schemaCatalog)
|
||||
}
|
||||
|
@ -610,14 +607,6 @@ object PartitioningUtils {
|
|||
}
|
||||
}
|
||||
|
||||
private def columnNameEquality(caseSensitive: Boolean): (String, String) => Boolean = {
|
||||
if (caseSensitive) {
|
||||
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
|
||||
} else {
|
||||
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a collection of [[Literal]]s, resolves possible type conflicts by
|
||||
* [[findWiderTypeForPartitionColumn]].
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.apache.spark.TaskContext
|
|||
import org.apache.spark.executor.InputMetrics
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.{DataFrame, Row}
|
||||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
|
||||
import org.apache.spark.sql.catalyst.analysis.Resolver
|
||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder
|
||||
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
|
||||
|
@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, localD
|
|||
import org.apache.spark.sql.connector.catalog.TableChange
|
||||
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
|
||||
import org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
|
||||
import org.apache.spark.sql.types._
|
||||
import org.apache.spark.sql.util.SchemaUtils
|
||||
|
@ -49,7 +48,7 @@ import org.apache.spark.util.NextIterator
|
|||
/**
|
||||
* Util functions for JDBC tables.
|
||||
*/
|
||||
object JdbcUtils extends Logging {
|
||||
object JdbcUtils extends Logging with SQLConfHelper {
|
||||
/**
|
||||
* Returns a factory for creating connections to the given JDBC URL.
|
||||
*
|
||||
|
@ -131,18 +130,13 @@ object JdbcUtils extends Logging {
|
|||
val columns = if (tableSchema.isEmpty) {
|
||||
rddSchema.fields.map(x => dialect.quoteIdentifier(x.name)).mkString(",")
|
||||
} else {
|
||||
val columnNameEquality = if (isCaseSensitive) {
|
||||
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
|
||||
} else {
|
||||
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
|
||||
}
|
||||
// The generated insert statement needs to follow rddSchema's column sequence and
|
||||
// tableSchema's column names. When appending data into some case-sensitive DBMSs like
|
||||
// PostgreSQL/Oracle, we need to respect the existing case-sensitive column names instead of
|
||||
// RDD column names for user convenience.
|
||||
val tableColumnNames = tableSchema.get.fieldNames
|
||||
rddSchema.fields.map { col =>
|
||||
val normalizedName = tableColumnNames.find(f => columnNameEquality(f, col.name)).getOrElse {
|
||||
val normalizedName = tableColumnNames.find(f => conf.resolver(f, col.name)).getOrElse {
|
||||
throw QueryCompilationErrors.columnNotFoundInSchemaError(col, tableSchema)
|
||||
}
|
||||
dialect.quoteIdentifier(normalizedName)
|
||||
|
@ -475,7 +469,7 @@ object JdbcUtils extends Logging {
|
|||
val localTimeMicro = TimeUnit.NANOSECONDS.toMicros(
|
||||
rawTime.toLocalTime().toNanoOfDay())
|
||||
val utcTimeMicro = DateTimeUtils.toUTCTime(
|
||||
localTimeMicro, SQLConf.get.sessionLocalTimeZone)
|
||||
localTimeMicro, conf.sessionLocalTimeZone)
|
||||
row.setLong(pos, utcTimeMicro)
|
||||
} else {
|
||||
row.update(pos, null)
|
||||
|
@ -594,7 +588,7 @@ object JdbcUtils extends Logging {
|
|||
stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos))
|
||||
|
||||
case TimestampType =>
|
||||
if (SQLConf.get.datetimeJava8ApiEnabled) {
|
||||
if (conf.datetimeJava8ApiEnabled) {
|
||||
(stmt: PreparedStatement, row: Row, pos: Int) =>
|
||||
stmt.setTimestamp(pos + 1, toJavaTimestamp(instantToMicros(row.getAs[Instant](pos))))
|
||||
} else {
|
||||
|
@ -603,7 +597,7 @@ object JdbcUtils extends Logging {
|
|||
}
|
||||
|
||||
case DateType =>
|
||||
if (SQLConf.get.datetimeJava8ApiEnabled) {
|
||||
if (conf.datetimeJava8ApiEnabled) {
|
||||
(stmt: PreparedStatement, row: Row, pos: Int) =>
|
||||
stmt.setDate(pos + 1, toJavaDate(localDateToDays(row.getAs[LocalDate](pos))))
|
||||
} else {
|
||||
|
@ -812,19 +806,14 @@ object JdbcUtils extends Logging {
|
|||
caseSensitive: Boolean,
|
||||
createTableColumnTypes: String): Map[String, String] = {
|
||||
val userSchema = CatalystSqlParser.parseTableSchema(createTableColumnTypes)
|
||||
val nameEquality = if (caseSensitive) {
|
||||
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
|
||||
} else {
|
||||
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
|
||||
}
|
||||
|
||||
// checks duplicate columns in the user specified column types.
|
||||
SchemaUtils.checkColumnNameDuplication(
|
||||
userSchema.map(_.name), "in the createTableColumnTypes option value", nameEquality)
|
||||
userSchema.map(_.name), "in the createTableColumnTypes option value", conf.resolver)
|
||||
|
||||
// checks if user specified column names exist in the DataFrame schema
|
||||
userSchema.fieldNames.foreach { col =>
|
||||
schema.find(f => nameEquality(f.name, col)).getOrElse {
|
||||
schema.find(f => conf.resolver(f.name, col)).getOrElse {
|
||||
throw QueryCompilationErrors.createTableColumnTypesOptionColumnNotFoundInSchemaError(
|
||||
col, schema)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue