[SPARK-25565][BUILD] Add scalastyle rule to check add Locale.ROOT to .toLowerCase and .toUpperCase for internal calls
## What changes were proposed in this pull request? This PR adds a rule to force `.toLowerCase(Locale.ROOT)` or `toUpperCase(Locale.ROOT)`. It produces an error as below: ``` [error] Are you sure that you want to use toUpperCase or toLowerCase without the root locale? In most cases, you [error] should use toUpperCase(Locale.ROOT) or toLowerCase(Locale.ROOT) instead. [error] If you must use toUpperCase or toLowerCase without the root locale, wrap the code block with [error] // scalastyle:off caselocale [error] .toUpperCase [error] .toLowerCase [error] // scalastyle:on caselocale ``` This PR excludes the cases above for SQL code path for external calls like table name, column name and etc. For test suites, or when it's clear there's no locale problem like Turkish locale problem, it uses `Locale.ROOT`. One minor problem is, `UTF8String` has both methods, `toLowerCase` and `toUpperCase`, and the new rule detects them as well. They are ignored. ## How was this patch tested? Manually tested, and Jenkins tests. Closes #22581 from HyukjinKwon/SPARK-25565. Authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: hyukjinkwon <gurwls223@apache.org>
This commit is contained in:
parent
b6b8a6632e
commit
a2f502cf53
|
@ -63,6 +63,7 @@ class UTF8StringPropertyCheckSuite extends FunSuite with GeneratorDrivenProperty
|
|||
}
|
||||
}
|
||||
|
||||
// scalastyle:off caselocale
|
||||
test("toUpperCase") {
|
||||
forAll { (s: String) =>
|
||||
assert(toUTF8(s).toUpperCase === toUTF8(s.toUpperCase))
|
||||
|
@ -74,6 +75,7 @@ class UTF8StringPropertyCheckSuite extends FunSuite with GeneratorDrivenProperty
|
|||
assert(toUTF8(s).toLowerCase === toUTF8(s.toLowerCase))
|
||||
}
|
||||
}
|
||||
// scalastyle:on caselocale
|
||||
|
||||
test("compare") {
|
||||
forAll { (s1: String, s2: String) =>
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.metrics.sink
|
||||
|
||||
import java.util.Properties
|
||||
import java.util.{Locale, Properties}
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
|
@ -52,7 +52,8 @@ private[spark] class StatsdSink(
|
|||
|
||||
val pollPeriod = property.getProperty(STATSD_KEY_PERIOD, STATSD_DEFAULT_PERIOD).toInt
|
||||
val pollUnit =
|
||||
TimeUnit.valueOf(property.getProperty(STATSD_KEY_UNIT, STATSD_DEFAULT_UNIT).toUpperCase)
|
||||
TimeUnit.valueOf(
|
||||
property.getProperty(STATSD_KEY_UNIT, STATSD_DEFAULT_UNIT).toUpperCase(Locale.ROOT))
|
||||
|
||||
val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX)
|
||||
|
||||
|
|
|
@ -35,7 +35,8 @@ import org.apache.spark.internal.Logging
|
|||
*
|
||||
* val rdd: RDD[(String, Int)] = ...
|
||||
* implicit val caseInsensitiveOrdering = new Ordering[String] {
|
||||
* override def compare(a: String, b: String) = a.toLowerCase.compare(b.toLowerCase)
|
||||
* override def compare(a: String, b: String) =
|
||||
* a.toLowerCase(Locale.ROOT).compare(b.toLowerCase(Locale.ROOT))
|
||||
* }
|
||||
*
|
||||
* // Sort by key, using the above case insensitive ordering.
|
||||
|
|
|
@ -2736,7 +2736,7 @@ private[spark] object Utils extends Logging {
|
|||
}
|
||||
|
||||
val masterScheme = new URI(masterWithoutK8sPrefix).getScheme
|
||||
val resolvedURL = masterScheme.toLowerCase match {
|
||||
val resolvedURL = masterScheme.toLowerCase(Locale.ROOT) match {
|
||||
case "https" =>
|
||||
masterWithoutK8sPrefix
|
||||
case "http" =>
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
|
|||
|
||||
import java.io._
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.util.Date
|
||||
import java.util.{Date, Locale}
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.zip.{ZipInputStream, ZipOutputStream}
|
||||
|
||||
|
@ -834,7 +834,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
|||
doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open(
|
||||
argThat(new ArgumentMatcher[Path]() {
|
||||
override def matches(path: Any): Boolean = {
|
||||
path.asInstanceOf[Path].getName.toLowerCase == "accessdenied"
|
||||
path.asInstanceOf[Path].getName.toLowerCase(Locale.ROOT) == "accessdenied"
|
||||
}
|
||||
}))
|
||||
val mockedProvider = spy(provider)
|
||||
|
|
|
@ -118,7 +118,9 @@ class StopWordsRemover @Since("1.5.0") (@Since("1.5.0") override val uid: String
|
|||
}
|
||||
} else {
|
||||
val lc = new Locale($(locale))
|
||||
// scalastyle:off caselocale
|
||||
val toLower = (s: String) => if (s != null) s.toLowerCase(lc) else s
|
||||
// scalastyle:on caselocale
|
||||
val lowerStopWords = $(stopWords).map(toLower(_)).toSet
|
||||
udf { terms: Seq[String] =>
|
||||
terms.filter(s => !lowerStopWords.contains(toLower(s)))
|
||||
|
|
|
@ -36,7 +36,9 @@ class Tokenizer @Since("1.4.0") (@Since("1.4.0") override val uid: String)
|
|||
def this() = this(Identifiable.randomUID("tok"))
|
||||
|
||||
override protected def createTransformFunc: String => Seq[String] = {
|
||||
// scalastyle:off caselocale
|
||||
_.toLowerCase.split("\\s")
|
||||
// scalastyle:on caselocale
|
||||
}
|
||||
|
||||
override protected def validateInputType(inputType: DataType): Unit = {
|
||||
|
@ -140,7 +142,9 @@ class RegexTokenizer @Since("1.4.0") (@Since("1.4.0") override val uid: String)
|
|||
|
||||
override protected def createTransformFunc: String => Seq[String] = { originStr =>
|
||||
val re = $(pattern).r
|
||||
// scalastyle:off caselocale
|
||||
val str = if ($(toLowercase)) originStr.toLowerCase() else originStr
|
||||
// scalastyle:on caselocale
|
||||
val tokens = if ($(gaps)) re.split(str).toSeq else re.findAllIn(str).toSeq
|
||||
val minLength = $(minTokenLength)
|
||||
tokens.filter(_.length >= minLength)
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
package org.apache.spark.deploy.k8s.submit
|
||||
|
||||
import java.io.StringWriter
|
||||
import java.util.{Collections, UUID}
|
||||
import java.util.{Collections, Locale, UUID}
|
||||
import java.util.Properties
|
||||
|
||||
import io.fabric8.kubernetes.api.model._
|
||||
|
@ -260,7 +260,7 @@ private[spark] object KubernetesClientApplication {
|
|||
val launchTime = System.currentTimeMillis()
|
||||
s"$appName-$launchTime"
|
||||
.trim
|
||||
.toLowerCase
|
||||
.toLowerCase(Locale.ROOT)
|
||||
.replaceAll("\\s+", "-")
|
||||
.replaceAll("\\.", "-")
|
||||
.replaceAll("[^a-z0-9\\-]", "")
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.spark.scheduler.cluster.k8s
|
||||
|
||||
import java.util.Locale
|
||||
|
||||
import io.fabric8.kubernetes.api.model.Pod
|
||||
|
||||
import org.apache.spark.deploy.k8s.Constants._
|
||||
|
@ -52,7 +54,7 @@ object ExecutorPodsSnapshot extends Logging {
|
|||
if (isDeleted(pod)) {
|
||||
PodDeleted(pod)
|
||||
} else {
|
||||
val phase = pod.getStatus.getPhase.toLowerCase
|
||||
val phase = pod.getStatus.getPhase.toLowerCase(Locale.ROOT)
|
||||
phase match {
|
||||
case "pending" =>
|
||||
PodPending(pod)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.spark.deploy.mesos
|
||||
|
||||
import java.util.Locale
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
|
||||
|
@ -60,7 +61,7 @@ private[mesos] class MesosClusterDispatcher(
|
|||
}
|
||||
|
||||
private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
|
||||
private val recoveryMode = conf.get(RECOVERY_MODE).toUpperCase()
|
||||
private val recoveryMode = conf.get(RECOVERY_MODE).toUpperCase(Locale.ROOT)
|
||||
logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
|
||||
|
||||
private val engineFactory = recoveryMode match {
|
||||
|
|
|
@ -227,6 +227,19 @@ This file is divided into 3 sections:
|
|||
]]></customMessage>
|
||||
</check>
|
||||
|
||||
<check customId="caselocale" level="error" class="org.scalastyle.file.RegexChecker" enabled="true">
|
||||
<parameters><parameter name="regex">(\.toUpperCase|\.toLowerCase)(?!(\(|\(Locale.ROOT\)))</parameter></parameters>
|
||||
<customMessage><![CDATA[
|
||||
Are you sure that you want to use toUpperCase or toLowerCase without the root locale? In most cases, you
|
||||
should use toUpperCase(Locale.ROOT) or toLowerCase(Locale.ROOT) instead.
|
||||
If you must use toUpperCase or toLowerCase without the root locale, wrap the code block with
|
||||
// scalastyle:off caselocale
|
||||
.toUpperCase
|
||||
.toLowerCase
|
||||
// scalastyle:on caselocale
|
||||
]]></customMessage>
|
||||
</check>
|
||||
|
||||
<!-- As of SPARK-9613 JavaConversions should be replaced with JavaConverters -->
|
||||
<check customId="javaconversions" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true">
|
||||
<parameters><parameter name="regex">JavaConversions</parameter></parameters>
|
||||
|
|
|
@ -73,7 +73,9 @@ case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] {
|
|||
|
||||
private val canonicalizer = {
|
||||
if (!conf.caseSensitiveAnalysis) {
|
||||
// scalastyle:off caselocale
|
||||
s: String => s.toLowerCase
|
||||
// scalastyle:on caselocale
|
||||
} else {
|
||||
s: String => s
|
||||
}
|
||||
|
|
|
@ -330,7 +330,9 @@ trait String2StringExpression extends ImplicitCastInputTypes {
|
|||
case class Upper(child: Expression)
|
||||
extends UnaryExpression with String2StringExpression {
|
||||
|
||||
// scalastyle:off caselocale
|
||||
override def convert(v: UTF8String): UTF8String = v.toUpperCase
|
||||
// scalastyle:on caselocale
|
||||
|
||||
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
|
||||
defineCodeGen(ctx, ev, c => s"($c).toUpperCase()")
|
||||
|
@ -349,7 +351,9 @@ case class Upper(child: Expression)
|
|||
""")
|
||||
case class Lower(child: Expression) extends UnaryExpression with String2StringExpression {
|
||||
|
||||
// scalastyle:off caselocale
|
||||
override def convert(v: UTF8String): UTF8String = v.toLowerCase
|
||||
// scalastyle:on caselocale
|
||||
|
||||
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
|
||||
defineCodeGen(ctx, ev, c => s"($c).toLowerCase()")
|
||||
|
@ -1389,7 +1393,9 @@ case class InitCap(child: Expression) extends UnaryExpression with ImplicitCastI
|
|||
override def dataType: DataType = StringType
|
||||
|
||||
override def nullSafeEval(string: Any): Any = {
|
||||
// scalastyle:off caselocale
|
||||
string.asInstanceOf[UTF8String].toLowerCase.toTitleCase
|
||||
// scalastyle:on caselocale
|
||||
}
|
||||
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
|
||||
defineCodeGen(ctx, ev, str => s"$str.toLowerCase().toTitleCase()")
|
||||
|
|
|
@ -663,7 +663,9 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
|
|||
UnresolvedGenerator(visitFunctionName(ctx.qualifiedName), expressions),
|
||||
unrequiredChildIndex = Nil,
|
||||
outer = ctx.OUTER != null,
|
||||
// scalastyle:off caselocale
|
||||
Some(ctx.tblName.getText.toLowerCase),
|
||||
// scalastyle:on caselocale
|
||||
ctx.colName.asScala.map(_.getText).map(UnresolvedAttribute.apply),
|
||||
query)
|
||||
}
|
||||
|
|
|
@ -62,8 +62,10 @@ object StringUtils {
|
|||
private[this] val trueStrings = Set("t", "true", "y", "yes", "1").map(UTF8String.fromString)
|
||||
private[this] val falseStrings = Set("f", "false", "n", "no", "0").map(UTF8String.fromString)
|
||||
|
||||
// scalastyle:off caselocale
|
||||
def isTrueString(s: UTF8String): Boolean = trueStrings.contains(s.toLowerCase)
|
||||
def isFalseString(s: UTF8String): Boolean = falseStrings.contains(s.toLowerCase)
|
||||
// scalastyle:on caselocale
|
||||
|
||||
/**
|
||||
* This utility can be used for filtering pattern in the "Like" of "Show Tables / Functions" DDL
|
||||
|
|
|
@ -974,8 +974,9 @@ object SQLConf {
|
|||
"Note: This configuration cannot be changed between query restarts from the same " +
|
||||
"checkpoint location.")
|
||||
.stringConf
|
||||
.transform(_.toLowerCase(Locale.ROOT))
|
||||
.checkValue(
|
||||
str => Set("min", "max").contains(str.toLowerCase),
|
||||
str => Set("min", "max").contains(str),
|
||||
"Invalid value for 'spark.sql.streaming.multipleWatermarkPolicy'. " +
|
||||
"Valid values are 'min' and 'max'")
|
||||
.createWithDefault("min") // must be same as MultipleWatermarkPolicy.DEFAULT_POLICY_NAME
|
||||
|
|
|
@ -77,7 +77,9 @@ private[spark] object SchemaUtils {
|
|||
*/
|
||||
def checkColumnNameDuplication(
|
||||
columnNames: Seq[String], colType: String, caseSensitiveAnalysis: Boolean): Unit = {
|
||||
// scalastyle:off caselocale
|
||||
val names = if (caseSensitiveAnalysis) columnNames else columnNames.map(_.toLowerCase)
|
||||
// scalastyle:on caselocale
|
||||
if (names.distinct.length != names.length) {
|
||||
val duplicateColumns = names.groupBy(identity).collect {
|
||||
case (x, ys) if ys.length > 1 => s"`$x`"
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.sql.util
|
||||
|
||||
import java.util.Locale
|
||||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.catalyst.analysis._
|
||||
|
@ -39,7 +41,7 @@ class SchemaUtilsSuite extends SparkFunSuite {
|
|||
test(s"Check column name duplication in $testType cases") {
|
||||
def checkExceptionCases(schemaStr: String, duplicatedColumns: Seq[String]): Unit = {
|
||||
val expectedErrorMsg = "Found duplicate column(s) in SchemaUtilsSuite: " +
|
||||
duplicatedColumns.map(c => s"`${c.toLowerCase}`").mkString(", ")
|
||||
duplicatedColumns.map(c => s"`${c.toLowerCase(Locale.ROOT)}`").mkString(", ")
|
||||
val schema = StructType.fromDDL(schemaStr)
|
||||
var msg = intercept[AnalysisException] {
|
||||
SchemaUtils.checkSchemaColumnNameDuplication(
|
||||
|
|
|
@ -95,7 +95,9 @@ case class InsertIntoHadoopFsRelationCommand(
|
|||
val parameters = CaseInsensitiveMap(options)
|
||||
|
||||
val partitionOverwriteMode = parameters.get("partitionOverwriteMode")
|
||||
// scalastyle:off caselocale
|
||||
.map(mode => PartitionOverwriteMode.withName(mode.toUpperCase))
|
||||
// scalastyle:on caselocale
|
||||
.getOrElse(sparkSession.sessionState.conf.partitionOverwriteMode)
|
||||
val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
|
||||
// This config only makes sense when we are overwriting a partitioned dataset with dynamic
|
||||
|
|
|
@ -86,7 +86,9 @@ abstract class CSVDataSource extends Serializable {
|
|||
if (options.headerFlag) {
|
||||
val duplicates = {
|
||||
val headerNames = row.filter(_ != null)
|
||||
// scalastyle:off caselocale
|
||||
.map(name => if (caseSensitive) name else name.toLowerCase)
|
||||
// scalastyle:on caselocale
|
||||
headerNames.diff(headerNames.distinct).distinct
|
||||
}
|
||||
|
||||
|
@ -95,7 +97,9 @@ abstract class CSVDataSource extends Serializable {
|
|||
// When there are empty strings or the values set in `nullValue`, put the
|
||||
// index as the suffix.
|
||||
s"_c$index"
|
||||
// scalastyle:off caselocale
|
||||
} else if (!caseSensitive && duplicates.contains(value.toLowerCase)) {
|
||||
// scalastyle:on caselocale
|
||||
// When there are case-insensitive duplicates, put the index as the suffix.
|
||||
s"$value$index"
|
||||
} else if (duplicates.contains(value)) {
|
||||
|
@ -153,8 +157,10 @@ object CSVDataSource extends Logging {
|
|||
while (errorMessage.isEmpty && i < headerLen) {
|
||||
var (nameInSchema, nameInHeader) = (fieldNames(i), columnNames(i))
|
||||
if (!caseSensitive) {
|
||||
// scalastyle:off caselocale
|
||||
nameInSchema = nameInSchema.toLowerCase
|
||||
nameInHeader = nameInHeader.toLowerCase
|
||||
// scalastyle:on caselocale
|
||||
}
|
||||
if (nameInHeader != nameInSchema) {
|
||||
errorMessage = Some(
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.sql.execution.streaming
|
||||
|
||||
import java.util.Locale
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
|
@ -36,7 +38,7 @@ object MultipleWatermarkPolicy {
|
|||
val DEFAULT_POLICY_NAME = "min"
|
||||
|
||||
def apply(policyName: String): MultipleWatermarkPolicy = {
|
||||
policyName.toLowerCase match {
|
||||
policyName.toLowerCase(Locale.ROOT) match {
|
||||
case DEFAULT_POLICY_NAME => MinWatermark
|
||||
case "max" => MaxWatermark
|
||||
case _ =>
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.sql.execution.streaming.state
|
||||
|
||||
import java.util.Locale
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
|
||||
import org.apache.spark.TaskContext
|
||||
|
@ -263,7 +265,7 @@ class SymmetricHashJoinStateManager(
|
|||
def metrics: StateStoreMetrics = {
|
||||
val keyToNumValuesMetrics = keyToNumValues.metrics
|
||||
val keyWithIndexToValueMetrics = keyWithIndexToValue.metrics
|
||||
def newDesc(desc: String): String = s"${joinSide.toString.toUpperCase}: $desc"
|
||||
def newDesc(desc: String): String = s"${joinSide.toString.toUpperCase(Locale.ROOT)}: $desc"
|
||||
|
||||
StateStoreMetrics(
|
||||
keyWithIndexToValueMetrics.numKeys, // represent each buffered row only once
|
||||
|
|
|
@ -505,7 +505,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
|
|||
test("upper") {
|
||||
checkAnswer(
|
||||
lowerCaseData.select(upper('l)),
|
||||
('a' to 'd').map(c => Row(c.toString.toUpperCase))
|
||||
('a' to 'd').map(c => Row(c.toString.toUpperCase(Locale.ROOT)))
|
||||
)
|
||||
|
||||
checkAnswer(
|
||||
|
@ -526,7 +526,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext {
|
|||
test("lower") {
|
||||
checkAnswer(
|
||||
upperCaseData.select(lower('L)),
|
||||
('A' to 'F').map(c => Row(c.toString.toLowerCase))
|
||||
('A' to 'F').map(c => Row(c.toString.toLowerCase(Locale.ROOT)))
|
||||
)
|
||||
|
||||
checkAnswer(
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.sql
|
||||
|
||||
import java.util.Locale
|
||||
|
||||
import org.apache.spark.sql.catalyst.expressions.aggregate.PivotFirst
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
@ -272,7 +274,7 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext {
|
|||
val expected = Row(2012, 15000.0, 20000.0) :: Row(2013, 48000.0, 30000.0) :: Nil
|
||||
val df = trainingSales
|
||||
.groupBy($"sales.year")
|
||||
.pivot(lower($"sales.course"), Seq("dotNet", "Java").map(_.toLowerCase))
|
||||
.pivot(lower($"sales.course"), Seq("dotNet", "Java").map(_.toLowerCase(Locale.ROOT)))
|
||||
.agg(sum($"sales.earnings"))
|
||||
|
||||
checkAnswer(df, expected)
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.sql
|
||||
|
||||
import java.util.Locale
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable.ListBuffer
|
||||
import scala.language.existentials
|
||||
|
@ -831,7 +833,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
|
|||
case _ =>
|
||||
}
|
||||
val joinPairs = physicalJoins.zip(executedJoins)
|
||||
val numOfJoins = sqlString.split(" ").count(_.toUpperCase == "JOIN")
|
||||
val numOfJoins = sqlString.split(" ").count(_.toUpperCase(Locale.ROOT) == "JOIN")
|
||||
assert(joinPairs.size == numOfJoins)
|
||||
|
||||
joinPairs.foreach {
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.spark.sql.streaming
|
|||
import java.{util => ju}
|
||||
import java.io.File
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.{Calendar, Date}
|
||||
import java.util.{Calendar, Date, Locale}
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.scalatest.{BeforeAndAfter, Matchers}
|
||||
|
@ -698,7 +698,7 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
|
|||
val e = intercept[IllegalArgumentException] {
|
||||
spark.conf.set(SQLConf.STREAMING_MULTIPLE_WATERMARK_POLICY.key, value)
|
||||
}
|
||||
assert(e.getMessage.toLowerCase.contains("valid values are 'min' and 'max'"))
|
||||
assert(e.getMessage.toLowerCase(Locale.ROOT).contains("valid values are 'min' and 'max'"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -868,7 +868,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
|
|||
// and Hive will validate the column names in partition spec to make sure they are partition
|
||||
// columns. Here we Lowercase the column names before passing the partition spec to Hive
|
||||
// client, to satisfy Hive.
|
||||
// scalastyle:off caselocale
|
||||
orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
|
||||
// scalastyle:on caselocale
|
||||
}
|
||||
|
||||
client.loadPartition(
|
||||
|
@ -896,7 +898,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
|
|||
// and Hive will validate the column names in partition spec to make sure they are partition
|
||||
// columns. Here we Lowercase the column names before passing the partition spec to Hive
|
||||
// client, to satisfy Hive.
|
||||
// scalastyle:off caselocale
|
||||
orderedPartitionSpec.put(colName.toLowerCase, partition(colName))
|
||||
// scalastyle:on caselocale
|
||||
}
|
||||
|
||||
client.loadDynamicPartitions(
|
||||
|
@ -916,13 +920,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
|
|||
// to lower case the column names in partition specification before calling partition related Hive
|
||||
// APIs, to match this behaviour.
|
||||
private def lowerCasePartitionSpec(spec: TablePartitionSpec): TablePartitionSpec = {
|
||||
// scalastyle:off caselocale
|
||||
spec.map { case (k, v) => k.toLowerCase -> v }
|
||||
// scalastyle:on caselocale
|
||||
}
|
||||
|
||||
// Build a map from lower-cased partition column names to exact column names for a given table
|
||||
private def buildLowerCasePartColNameMap(table: CatalogTable): Map[String, String] = {
|
||||
val actualPartColNames = table.partitionColumnNames
|
||||
// scalastyle:off caselocale
|
||||
actualPartColNames.map(colName => (colName.toLowerCase, colName)).toMap
|
||||
// scalastyle:on caselocale
|
||||
}
|
||||
|
||||
// Hive metastore is not case preserving and the column names of the partition specification we
|
||||
|
@ -931,7 +939,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
|
|||
private def restorePartitionSpec(
|
||||
spec: TablePartitionSpec,
|
||||
partColMap: Map[String, String]): TablePartitionSpec = {
|
||||
// scalastyle:off caselocale
|
||||
spec.map { case (k, v) => partColMap(k.toLowerCase) -> v }
|
||||
// scalastyle:on caselocale
|
||||
}
|
||||
|
||||
private def restorePartitionSpec(
|
||||
|
@ -990,7 +1000,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
|
|||
// When Hive rename partition for managed tables, it will create the partition location with
|
||||
// a default path generate by the new spec with lower cased partition column names. This is
|
||||
// unexpected and we need to rename them manually and alter the partition location.
|
||||
// scalastyle:off caselocale
|
||||
val hasUpperCasePartitionColumn = partitionColumnNames.exists(col => col.toLowerCase != col)
|
||||
// scalastyle:on caselocale
|
||||
if (tableMeta.tableType == MANAGED && hasUpperCasePartitionColumn) {
|
||||
val tablePath = new Path(tableMeta.location)
|
||||
val fs = tablePath.getFileSystem(hadoopConf)
|
||||
|
@ -1031,7 +1043,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
|
|||
// another partition to `A=1/B=3`, then we will have `A=1/B=2` and `a=1/b=3`, and we should
|
||||
// just move `a=1/b=3` into `A=1` with new name `B=3`.
|
||||
} else {
|
||||
// scalastyle:off caselocale
|
||||
val actualPartitionString = getPartitionPathString(col.toLowerCase, partValue)
|
||||
// scalastyle:on caselocale
|
||||
val actualPartitionPath = new Path(currentFullPath, actualPartitionString)
|
||||
try {
|
||||
fs.rename(actualPartitionPath, expectedPartitionPath)
|
||||
|
@ -1182,7 +1196,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
|
|||
clientPartitionNames.map { partitionPath =>
|
||||
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
|
||||
partSpec.map { case (partName, partValue) =>
|
||||
// scalastyle:off caselocale
|
||||
partColNameMap(partName.toLowerCase) + "=" + escapePathName(partValue)
|
||||
// scalastyle:on caselocale
|
||||
}.mkString("/")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,8 +59,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
|
|||
// For testing only
|
||||
private[hive] def getCachedDataSourceTable(table: TableIdentifier): LogicalPlan = {
|
||||
val key = QualifiedTableName(
|
||||
// scalastyle:off caselocale
|
||||
table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
|
||||
table.table.toLowerCase)
|
||||
// scalastyle:on caselocale
|
||||
catalogProxy.getCachedTable(key)
|
||||
}
|
||||
|
||||
|
@ -273,6 +275,7 @@ private[hive] object HiveMetastoreCatalog {
|
|||
def mergeWithMetastoreSchema(
|
||||
metastoreSchema: StructType,
|
||||
inferredSchema: StructType): StructType = try {
|
||||
// scalastyle:off caselocale
|
||||
// Find any nullable fields in mestastore schema that are missing from the inferred schema.
|
||||
val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap
|
||||
val missingNullables = metastoreFields
|
||||
|
@ -282,6 +285,7 @@ private[hive] object HiveMetastoreCatalog {
|
|||
// Merge missing nullable fields to inferred schema and build a case-insensitive field map.
|
||||
val inferredFields = StructType(inferredSchema ++ missingNullables)
|
||||
.map(f => f.name.toLowerCase -> f).toMap
|
||||
// scalastyle:on caselocale
|
||||
StructType(metastoreSchema.map(f => f.copy(name = inferredFields(f.name).name)))
|
||||
} catch {
|
||||
case NonFatal(_) =>
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.sql.hive
|
||||
|
||||
import java.io.File
|
||||
import java.util.Locale
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
|
@ -50,23 +51,29 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
|
|||
|
||||
private val maxRecordNum = 50
|
||||
|
||||
private def getConvertMetastoreConfName(format: String): String = format.toLowerCase match {
|
||||
case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
|
||||
case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
|
||||
private def getConvertMetastoreConfName(format: String): String = {
|
||||
format.toLowerCase(Locale.ROOT) match {
|
||||
case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
|
||||
case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key
|
||||
}
|
||||
}
|
||||
|
||||
private def getSparkCompressionConfName(format: String): String = format.toLowerCase match {
|
||||
case "parquet" => SQLConf.PARQUET_COMPRESSION.key
|
||||
case "orc" => SQLConf.ORC_COMPRESSION.key
|
||||
private def getSparkCompressionConfName(format: String): String = {
|
||||
format.toLowerCase(Locale.ROOT) match {
|
||||
case "parquet" => SQLConf.PARQUET_COMPRESSION.key
|
||||
case "orc" => SQLConf.ORC_COMPRESSION.key
|
||||
}
|
||||
}
|
||||
|
||||
private def getHiveCompressPropName(format: String): String = format.toLowerCase match {
|
||||
case "parquet" => ParquetOutputFormat.COMPRESSION
|
||||
case "orc" => COMPRESS.getAttribute
|
||||
private def getHiveCompressPropName(format: String): String = {
|
||||
format.toLowerCase(Locale.ROOT) match {
|
||||
case "parquet" => ParquetOutputFormat.COMPRESSION
|
||||
case "orc" => COMPRESS.getAttribute
|
||||
}
|
||||
}
|
||||
|
||||
private def normalizeCodecName(format: String, name: String): String = {
|
||||
format.toLowerCase match {
|
||||
format.toLowerCase(Locale.ROOT) match {
|
||||
case "parquet" => ParquetOptions.getParquetCompressionCodecName(name)
|
||||
case "orc" => OrcOptions.getORCCompressionCodecName(name)
|
||||
}
|
||||
|
@ -74,7 +81,7 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
|
|||
|
||||
private def getTableCompressionCodec(path: String, format: String): Seq[String] = {
|
||||
val hadoopConf = spark.sessionState.newHadoopConf()
|
||||
val codecs = format.toLowerCase match {
|
||||
val codecs = format.toLowerCase(Locale.ROOT) match {
|
||||
case "parquet" => for {
|
||||
footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
|
||||
block <- footer.getParquetMetadata.getBlocks.asScala
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.sql.hive
|
||||
|
||||
import java.io.File
|
||||
import java.util.Locale
|
||||
|
||||
import scala.util.Random
|
||||
|
||||
|
@ -56,7 +57,7 @@ class HiveSchemaInferenceSuite
|
|||
|
||||
// Return a copy of the given schema with all field names converted to lower case.
|
||||
private def lowerCaseSchema(schema: StructType): StructType = {
|
||||
StructType(schema.map(f => f.copy(name = f.name.toLowerCase)))
|
||||
StructType(schema.map(f => f.copy(name = f.name.toLowerCase(Locale.ROOT))))
|
||||
}
|
||||
|
||||
// Create a Hive external test table containing the given field and partition column names.
|
||||
|
@ -78,7 +79,7 @@ class HiveSchemaInferenceSuite
|
|||
val partitionStructFields = partitionCols.map { field =>
|
||||
StructField(
|
||||
// Partition column case isn't preserved
|
||||
name = field.toLowerCase,
|
||||
name = field.toLowerCase(Locale.ROOT),
|
||||
dataType = IntegerType,
|
||||
nullable = true,
|
||||
metadata = Metadata.empty)
|
||||
|
@ -113,7 +114,7 @@ class HiveSchemaInferenceSuite
|
|||
properties = Map("serialization.format" -> "1")),
|
||||
schema = schema,
|
||||
provider = Option("hive"),
|
||||
partitionColumnNames = partitionCols.map(_.toLowerCase),
|
||||
partitionColumnNames = partitionCols.map(_.toLowerCase(Locale.ROOT)),
|
||||
properties = Map.empty),
|
||||
true)
|
||||
|
||||
|
@ -180,7 +181,7 @@ class HiveSchemaInferenceSuite
|
|||
val catalogTable = externalCatalog.getTable(DATABASE, TEST_TABLE_NAME)
|
||||
assert(catalogTable.schemaPreservesCase)
|
||||
assert(catalogTable.schema == schema)
|
||||
assert(catalogTable.partitionColumnNames == partCols.map(_.toLowerCase))
|
||||
assert(catalogTable.partitionColumnNames == partCols.map(_.toLowerCase(Locale.ROOT)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
|
|||
|
||||
import java.io.{File, PrintWriter}
|
||||
import java.sql.Timestamp
|
||||
import java.util.Locale
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
import scala.util.matching.Regex
|
||||
|
@ -489,7 +490,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
|
|||
sql(s"ANALYZE TABLE $tableName PARTITION (DS='2010-01-01') COMPUTE STATISTICS")
|
||||
}.getMessage
|
||||
assert(message.contains(
|
||||
s"DS is not a valid partition column in table `default`.`${tableName.toLowerCase}`"))
|
||||
"DS is not a valid partition column in table " +
|
||||
s"`default`.`${tableName.toLowerCase(Locale.ROOT)}`"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -503,8 +505,9 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
|
|||
sql(s"ANALYZE TABLE $tableName $partitionSpec COMPUTE STATISTICS")
|
||||
}.getMessage
|
||||
assert(message.contains("The list of partition columns with values " +
|
||||
s"in partition specification for table '${tableName.toLowerCase}' in database 'default' " +
|
||||
"is not a prefix of the list of partition columns defined in the table schema"))
|
||||
s"in partition specification for table '${tableName.toLowerCase(Locale.ROOT)}' in " +
|
||||
"database 'default' is not a prefix of the list of partition columns defined in " +
|
||||
"the table schema"))
|
||||
}
|
||||
|
||||
withTable(tableName) {
|
||||
|
@ -550,12 +553,14 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
|
|||
|
||||
assertAnalysisException(
|
||||
s"ANALYZE TABLE $tableName PARTITION (hour=20) COMPUTE STATISTICS",
|
||||
s"hour is not a valid partition column in table `default`.`${tableName.toLowerCase}`"
|
||||
"hour is not a valid partition column in table " +
|
||||
s"`default`.`${tableName.toLowerCase(Locale.ROOT)}`"
|
||||
)
|
||||
|
||||
assertAnalysisException(
|
||||
s"ANALYZE TABLE $tableName PARTITION (hour) COMPUTE STATISTICS",
|
||||
s"hour is not a valid partition column in table `default`.`${tableName.toLowerCase}`"
|
||||
"hour is not a valid partition column in table " +
|
||||
s"`default`.`${tableName.toLowerCase(Locale.ROOT)}`"
|
||||
)
|
||||
|
||||
intercept[NoSuchPartitionException] {
|
||||
|
|
Loading…
Reference in a new issue