[SPARK-30500][SPARK-30501][SQL] Remove SQL configs deprecated in Spark 2.1 and 2.3
### What changes were proposed in this pull request? In the PR, I propose to remove already deprecated SQL configs: - `spark.sql.variable.substitute.depth` deprecated in Spark 2.1 - `spark.sql.parquet.int64AsTimestampMillis` deprecated in Spark 2.3 Also I moved `removedSQLConfigs` closer to `deprecatedSQLConfigs`. This will allow to have references to other config entries. ### Why are the changes needed? To improve code maintainability. ### Does this PR introduce any user-facing change? Yes. ### How was this patch tested? By existing test suites `ParquetQuerySuite` and `SQLConfSuite`. Closes #27169 from MaxGekk/remove-deprecated-conf-2.4. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
6646b3e13e
commit
1846b0261b
|
@ -171,36 +171,6 @@ object SQLConf {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Holds information about keys that have been removed.
|
||||
*
|
||||
* @param key The removed config key.
|
||||
* @param version Version of Spark where key was removed.
|
||||
* @param defaultValue The default config value. It can be used to notice
|
||||
* users that they set non-default value to an already removed config.
|
||||
* @param comment Additional info regarding to the removed config.
|
||||
*/
|
||||
case class RemovedConfig(key: String, version: String, defaultValue: String, comment: String)
|
||||
|
||||
/**
|
||||
* The map contains info about removed SQL configs. Keys are SQL config names,
|
||||
* map values contain extra information like the version in which the config was removed,
|
||||
* config's default value and a comment.
|
||||
*/
|
||||
val removedSQLConfigs: Map[String, RemovedConfig] = {
|
||||
val configs = Seq(
|
||||
RemovedConfig("spark.sql.fromJsonForceNullableSchema", "3.0.0", "true",
|
||||
"It was removed to prevent errors like SPARK-23173 for non-default value."),
|
||||
RemovedConfig(
|
||||
"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation", "3.0.0", "false",
|
||||
"It was removed to prevent loosing of users data for non-default value."),
|
||||
RemovedConfig("spark.sql.legacy.compareDateTimestampInTimestamp", "3.0.0", "true",
|
||||
"It was removed to prevent errors like SPARK-23549 for non-default value.")
|
||||
)
|
||||
|
||||
Map(configs.map { cfg => cfg.key -> cfg } : _*)
|
||||
}
|
||||
|
||||
val ANALYZER_MAX_ITERATIONS = buildConf("spark.sql.analyzer.maxIterations")
|
||||
.internal()
|
||||
.doc("The max number of iterations the analyzer runs.")
|
||||
|
@ -555,14 +525,6 @@ object SQLConf {
|
|||
.checkValues(ParquetOutputTimestampType.values.map(_.toString))
|
||||
.createWithDefault(ParquetOutputTimestampType.TIMESTAMP_MICROS.toString)
|
||||
|
||||
val PARQUET_INT64_AS_TIMESTAMP_MILLIS = buildConf("spark.sql.parquet.int64AsTimestampMillis")
|
||||
.doc(s"(Deprecated since Spark 2.3, please set ${PARQUET_OUTPUT_TIMESTAMP_TYPE.key}.) " +
|
||||
"When true, timestamp values will be stored as INT64 with TIMESTAMP_MILLIS as the " +
|
||||
"extended type. In this mode, the microsecond portion of the timestamp value will be " +
|
||||
"truncated.")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
|
||||
.doc("Sets the compression codec used when writing Parquet files. If either `compression` or " +
|
||||
"`parquet.compression` is specified in the table-specific options/properties, the " +
|
||||
|
@ -1171,13 +1133,6 @@ object SQLConf {
|
|||
.booleanConf
|
||||
.createWithDefault(true)
|
||||
|
||||
val VARIABLE_SUBSTITUTE_DEPTH =
|
||||
buildConf("spark.sql.variable.substitute.depth")
|
||||
.internal()
|
||||
.doc("Deprecated: The maximum replacements the substitution engine will do.")
|
||||
.intConf
|
||||
.createWithDefault(40)
|
||||
|
||||
val ENABLE_TWOLEVEL_AGG_MAP =
|
||||
buildConf("spark.sql.codegen.aggregate.map.twolevel.enabled")
|
||||
.internal()
|
||||
|
@ -2186,13 +2141,9 @@ object SQLConf {
|
|||
*/
|
||||
val deprecatedSQLConfigs: Map[String, DeprecatedConfig] = {
|
||||
val configs = Seq(
|
||||
DeprecatedConfig(VARIABLE_SUBSTITUTE_DEPTH.key, "2.1",
|
||||
"The SQL config is not used by Spark anymore."),
|
||||
DeprecatedConfig(PANDAS_RESPECT_SESSION_LOCAL_TIMEZONE.key, "2.3",
|
||||
"Behavior for `false` config value is considered as a bug, and " +
|
||||
"it will be prohibited in the future releases."),
|
||||
DeprecatedConfig(PARQUET_INT64_AS_TIMESTAMP_MILLIS.key, "2.3",
|
||||
s"Use '${PARQUET_OUTPUT_TIMESTAMP_TYPE.key}' instead of it."),
|
||||
"it will be prohibited in the future releases."),
|
||||
DeprecatedConfig(
|
||||
PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME.key, "2.4",
|
||||
"The config allows to switch to the behaviour before Spark 2.4 " +
|
||||
|
@ -2207,6 +2158,41 @@ object SQLConf {
|
|||
|
||||
Map(configs.map { cfg => cfg.key -> cfg } : _*)
|
||||
}
|
||||
|
||||
/**
|
||||
* Holds information about keys that have been removed.
|
||||
*
|
||||
* @param key The removed config key.
|
||||
* @param version Version of Spark where key was removed.
|
||||
* @param defaultValue The default config value. It can be used to notice
|
||||
* users that they set non-default value to an already removed config.
|
||||
* @param comment Additional info regarding to the removed config.
|
||||
*/
|
||||
case class RemovedConfig(key: String, version: String, defaultValue: String, comment: String)
|
||||
|
||||
/**
|
||||
* The map contains info about removed SQL configs. Keys are SQL config names,
|
||||
* map values contain extra information like the version in which the config was removed,
|
||||
* config's default value and a comment.
|
||||
*/
|
||||
val removedSQLConfigs: Map[String, RemovedConfig] = {
|
||||
val configs = Seq(
|
||||
RemovedConfig("spark.sql.fromJsonForceNullableSchema", "3.0.0", "true",
|
||||
"It was removed to prevent errors like SPARK-23173 for non-default value."),
|
||||
RemovedConfig(
|
||||
"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation", "3.0.0", "false",
|
||||
"It was removed to prevent loosing of users data for non-default value."),
|
||||
RemovedConfig("spark.sql.legacy.compareDateTimestampInTimestamp", "3.0.0", "true",
|
||||
"It was removed to prevent errors like SPARK-23549 for non-default value."),
|
||||
RemovedConfig("spark.sql.variable.substitute.depth", "3.0.0", "40",
|
||||
"It was deprecated since Spark 2.1, and not used in Spark 2.4."),
|
||||
RemovedConfig("spark.sql.parquet.int64AsTimestampMillis", "3.0.0", "false",
|
||||
"The config was deprecated since Spark 2.3." +
|
||||
s"Use '${PARQUET_OUTPUT_TIMESTAMP_TYPE.key}' instead of it.")
|
||||
)
|
||||
|
||||
Map(configs.map { cfg => cfg.key -> cfg } : _*)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2470,18 +2456,8 @@ class SQLConf extends Serializable with Logging {
|
|||
|
||||
def isParquetINT96TimestampConversion: Boolean = getConf(PARQUET_INT96_TIMESTAMP_CONVERSION)
|
||||
|
||||
def isParquetINT64AsTimestampMillis: Boolean = getConf(PARQUET_INT64_AS_TIMESTAMP_MILLIS)
|
||||
|
||||
def parquetOutputTimestampType: ParquetOutputTimestampType.Value = {
|
||||
val isOutputTimestampTypeSet = settings.containsKey(PARQUET_OUTPUT_TIMESTAMP_TYPE.key)
|
||||
if (!isOutputTimestampTypeSet && isParquetINT64AsTimestampMillis) {
|
||||
// If PARQUET_OUTPUT_TIMESTAMP_TYPE is not set and PARQUET_INT64_AS_TIMESTAMP_MILLIS is set,
|
||||
// respect PARQUET_INT64_AS_TIMESTAMP_MILLIS and use TIMESTAMP_MILLIS. Otherwise,
|
||||
// PARQUET_OUTPUT_TIMESTAMP_TYPE has higher priority.
|
||||
ParquetOutputTimestampType.TIMESTAMP_MILLIS
|
||||
} else {
|
||||
ParquetOutputTimestampType.withName(getConf(PARQUET_OUTPUT_TIMESTAMP_TYPE))
|
||||
}
|
||||
ParquetOutputTimestampType.withName(getConf(PARQUET_OUTPUT_TIMESTAMP_TYPE))
|
||||
}
|
||||
|
||||
def writeLegacyParquetFormat: Boolean = getConf(PARQUET_WRITE_LEGACY_FORMAT)
|
||||
|
@ -2539,8 +2515,6 @@ class SQLConf extends Serializable with Logging {
|
|||
|
||||
def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)
|
||||
|
||||
def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH)
|
||||
|
||||
def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString
|
||||
|
||||
def hiveThriftServerSingleSession: Boolean =
|
||||
|
|
|
@ -141,24 +141,6 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-10634 timestamp written and read as INT64 - TIMESTAMP_MILLIS") {
|
||||
val data = (1 to 10).map(i => Row(i, new java.sql.Timestamp(i)))
|
||||
val schema = StructType(List(StructField("d", IntegerType, false),
|
||||
StructField("time", TimestampType, false)).toArray)
|
||||
withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") {
|
||||
withTempPath { file =>
|
||||
val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
|
||||
df.write.parquet(file.getCanonicalPath)
|
||||
("true" :: "false" :: Nil).foreach { vectorized =>
|
||||
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
|
||||
val df2 = spark.read.parquet(file.getCanonicalPath)
|
||||
checkAnswer(df2, df.collect().toSeq)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-10634 timestamp written and read as INT64 - truncation") {
|
||||
withTable("ts") {
|
||||
sql("create table ts (c1 int, c2 timestamp) using parquet")
|
||||
|
@ -172,45 +154,6 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS
|
|||
.toDS().select('_1, $"_2".cast("timestamp"))
|
||||
checkAnswer(sql("select * from ts"), expected)
|
||||
}
|
||||
|
||||
// The microsecond portion is truncated when written as TIMESTAMP_MILLIS.
|
||||
withTable("ts") {
|
||||
withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "true") {
|
||||
sql("create table ts (c1 int, c2 timestamp) using parquet")
|
||||
sql("insert into ts values (1, timestamp'2016-01-01 10:11:12.123456')")
|
||||
sql("insert into ts values (2, null)")
|
||||
sql("insert into ts values (3, timestamp'1965-01-01 10:11:12.125456')")
|
||||
sql("insert into ts values (4, timestamp'1965-01-01 10:11:12.125')")
|
||||
sql("insert into ts values (5, timestamp'1965-01-01 10:11:12.1')")
|
||||
sql("insert into ts values (6, timestamp'1965-01-01 10:11:12.123456789')")
|
||||
sql("insert into ts values (7, timestamp'0001-01-01 00:00:00.000000')")
|
||||
val expected = Seq(
|
||||
(1, "2016-01-01 10:11:12.123"),
|
||||
(2, null),
|
||||
(3, "1965-01-01 10:11:12.125"),
|
||||
(4, "1965-01-01 10:11:12.125"),
|
||||
(5, "1965-01-01 10:11:12.1"),
|
||||
(6, "1965-01-01 10:11:12.123"),
|
||||
(7, "0001-01-01 00:00:00.000"))
|
||||
.toDS().select('_1, $"_2".cast("timestamp"))
|
||||
checkAnswer(sql("select * from ts"), expected)
|
||||
|
||||
// Read timestamps that were encoded as TIMESTAMP_MILLIS annotated as INT64
|
||||
// with PARQUET_INT64_AS_TIMESTAMP_MILLIS set to false.
|
||||
withSQLConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS.key -> "false") {
|
||||
val expected = Seq(
|
||||
(1, "2016-01-01 10:11:12.123"),
|
||||
(2, null),
|
||||
(3, "1965-01-01 10:11:12.125"),
|
||||
(4, "1965-01-01 10:11:12.125"),
|
||||
(5, "1965-01-01 10:11:12.1"),
|
||||
(6, "1965-01-01 10:11:12.123"),
|
||||
(7, "0001-01-01 00:00:00.000"))
|
||||
.toDS().select('_1, $"_2".cast("timestamp"))
|
||||
checkAnswer(sql("select * from ts"), expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-10365 timestamp written and read as INT64 - TIMESTAMP_MICROS") {
|
||||
|
|
|
@ -263,12 +263,6 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
|
|||
assert(spark.sessionState.conf.parquetOutputTimestampType ==
|
||||
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
|
||||
|
||||
// PARQUET_INT64_AS_TIMESTAMP_MILLIS should be respected.
|
||||
spark.sessionState.conf.setConf(SQLConf.PARQUET_INT64_AS_TIMESTAMP_MILLIS, true)
|
||||
assert(spark.sessionState.conf.parquetOutputTimestampType ==
|
||||
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS)
|
||||
|
||||
// PARQUET_OUTPUT_TIMESTAMP_TYPE has higher priority over PARQUET_INT64_AS_TIMESTAMP_MILLIS
|
||||
spark.sessionState.conf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
|
||||
assert(spark.sessionState.conf.parquetOutputTimestampType ==
|
||||
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
|
||||
|
@ -350,13 +344,13 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
|
|||
e.getRenderedMessage.contains(config)))
|
||||
}
|
||||
|
||||
val config1 = "spark.sql.hive.verifyPartitionPath"
|
||||
val config1 = SQLConf.HIVE_VERIFY_PARTITION_PATH.key
|
||||
withLogAppender(logAppender) {
|
||||
spark.conf.set(config1, true)
|
||||
}
|
||||
check(config1)
|
||||
|
||||
val config2 = "spark.sql.execution.pandas.respectSessionTimeZone"
|
||||
val config2 = SQLConf.ARROW_EXECUTION_ENABLED.key
|
||||
withLogAppender(logAppender) {
|
||||
spark.conf.unset(config2)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue