[SPARK-17199] Use CatalystConf.resolver for case-sensitivity comparison
## What changes were proposed in this pull request? Use `CatalystConf.resolver` consistently for case-sensitivity comparison (removed dups). ## How was this patch tested? Local build. Waiting for Jenkins to ensure clean build and test. Author: Jacek Laskowski <jacek@japila.pl> Closes #14771 from jaceklaskowski/17199-catalystconf-resolver.
This commit is contained in:
parent
cc33460a51
commit
9d376ad76c
|
@ -64,13 +64,7 @@ class Analyzer(
|
|||
this(catalog, conf, conf.optimizerMaxIterations)
|
||||
}
|
||||
|
||||
def resolver: Resolver = {
|
||||
if (conf.caseSensitiveAnalysis) {
|
||||
caseSensitiveResolution
|
||||
} else {
|
||||
caseInsensitiveResolution
|
||||
}
|
||||
}
|
||||
def resolver: Resolver = conf.resolver
|
||||
|
||||
protected val fixedPoint = FixedPoint(maxIterations)
|
||||
|
||||
|
|
|
@ -394,13 +394,7 @@ case class DataSource(
|
|||
sparkSession, globbedPaths, options, partitionSchema, !checkPathExist)
|
||||
|
||||
val dataSchema = userSpecifiedSchema.map { schema =>
|
||||
val equality =
|
||||
if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
|
||||
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
|
||||
} else {
|
||||
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
|
||||
}
|
||||
|
||||
val equality = sparkSession.sessionState.conf.resolver
|
||||
StructType(schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
|
||||
}.orElse {
|
||||
format.inferSchema(
|
||||
|
@ -430,7 +424,7 @@ case class DataSource(
|
|||
relation
|
||||
}
|
||||
|
||||
/** Writes the give [[DataFrame]] out to this [[DataSource]]. */
|
||||
/** Writes the given [[DataFrame]] out to this [[DataSource]]. */
|
||||
def write(
|
||||
mode: SaveMode,
|
||||
data: DataFrame): BaseRelation = {
|
||||
|
|
|
@ -45,13 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String
|
|||
*/
|
||||
case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
|
||||
|
||||
def resolver: Resolver = {
|
||||
if (conf.caseSensitiveAnalysis) {
|
||||
caseSensitiveResolution
|
||||
} else {
|
||||
caseInsensitiveResolution
|
||||
}
|
||||
}
|
||||
def resolver: Resolver = conf.resolver
|
||||
|
||||
// Visible for testing.
|
||||
def convertStaticPartitions(
|
||||
|
|
|
@ -102,11 +102,7 @@ class FileStreamSinkWriter(
|
|||
// Get the actual partition columns as attributes after matching them by name with
|
||||
// the given columns names.
|
||||
private val partitionColumns = partitionColumnNames.map { col =>
|
||||
val nameEquality = if (data.sparkSession.sessionState.conf.caseSensitiveAnalysis) {
|
||||
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
|
||||
} else {
|
||||
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
|
||||
}
|
||||
val nameEquality = data.sparkSession.sessionState.conf.resolver
|
||||
data.logicalPlan.output.find(f => nameEquality(f.name, col)).getOrElse {
|
||||
throw new RuntimeException(s"Partition column $col not found in schema $dataSchema")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue