[MINOR][SQL] Add missing functions for some options in SQLConf and use them where applicable

## What changes were proposed in this pull request?

I first thought they are missing because they are kind of hidden options but it seems they are just missing.

For example, `spark.sql.parquet.mergeSchema` is documented in [sql-programming-guide.md](https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md) but this function is missing whereas many options such as `spark.sql.join.preferSortMergeJoin` are not documented but have its own function individually.

So, this PR suggests making them consistent by adding the missing functions for some options in `SQLConf` and use them where applicable, in order to make them more readable.

## How was this patch tested?

Existing tests should cover this.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14678 from HyukjinKwon/sqlconf-cleanup.
This commit is contained in:
hyukjinkwon 2016-09-15 01:33:56 +08:00 committed by Wenchen Fan
parent 6d06ff6f7e
commit a79838bdee
12 changed files with 49 additions and 31 deletions

View file

@ -313,7 +313,7 @@ class RelationalGroupedDataset protected[sql](
*/
def pivot(pivotColumn: String): RelationalGroupedDataset = {
// This is to prevent unintended OOM errors when the number of distinct values is large
val maxValues = df.sparkSession.conf.get(SQLConf.DATAFRAME_PIVOT_MAX_VALUES)
val maxValues = df.sparkSession.sessionState.conf.dataFramePivotMaxValues
// Get the distinct values of the column and sort them so its consistent
val values = df.select(pivotColumn)
.distinct()

View file

@ -55,7 +55,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
}
def assertSupported(): Unit = {
if (sparkSession.sessionState.conf.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForBatch(analyzed)
}
}

View file

@ -231,7 +231,7 @@ case class DataSource(
}
}
val isSchemaInferenceEnabled = sparkSession.conf.get(SQLConf.STREAMING_SCHEMA_INFERENCE)
val isSchemaInferenceEnabled = sparkSession.sessionState.conf.streamingSchemaInference
val isTextSource = providingClass == classOf[text.TextFileFormat]
// If the schema inference is disabled, only text sources require schema to be specified
if (!isSchemaInferenceEnabled && !isTextSource && userSpecifiedSchema.isEmpty) {

View file

@ -131,7 +131,7 @@ case class InsertIntoHadoopFsRelationCommand(
dataColumns = dataColumns,
inputSchema = query.output,
PartitioningUtils.DEFAULT_PARTITION_NAME,
sparkSession.conf.get(SQLConf.PARTITION_MAX_FILES),
sparkSession.sessionState.conf.partitionMaxFiles,
isAppend)
}

View file

@ -126,7 +126,7 @@ abstract class PartitioningAwareFileCatalog(
PartitioningUtils.parsePartitions(
leafDirs,
PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled(),
typeInference = sparkSession.sessionState.conf.partitionColumnTypeInferenceEnabled,
basePaths = basePaths)
}
}

View file

@ -151,7 +151,7 @@ class ParquetFileFormat
// Should we merge schemas from all Parquet part-files?
val shouldMergeSchemas = parquetOptions.mergeSchema
val mergeRespectSummaries = sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
val mergeRespectSummaries = sparkSession.sessionState.conf.isParquetSchemaRespectSummaries
val filesByType = splitFiles(files)
@ -308,14 +308,14 @@ class ParquetFileFormat
// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.conf.get(SQLConf.PARQUET_BINARY_AS_STRING))
sparkSession.sessionState.conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
// Try to push down filters when filter push-down is enabled.
val pushed =
if (sparkSession.conf.get(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
if (sparkSession.sessionState.conf.parquetFilterPushDown) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`

View file

@ -52,7 +52,7 @@ private[parquet] class ParquetOptions(
val mergeSchema: Boolean = parameters
.get(MERGE_SCHEMA)
.map(_.toBoolean)
.getOrElse(sqlConf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
.getOrElse(sqlConf.isParquetSchemaMergingEnabled)
}

View file

@ -93,11 +93,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
* a live lock may happen if the compaction happens too frequently: one processing keeps deleting
* old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
*/
private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
private val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
private val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
private val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompatInterval
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")

View file

@ -58,7 +58,7 @@ class StreamExecution(
import org.apache.spark.sql.streaming.StreamingQueryListener._
private val pollingDelayMs = sparkSession.conf.get(SQLConf.STREAMING_POLLING_DELAY)
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.

View file

@ -24,11 +24,9 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
def this() = this(new SQLConf)
import SQLConf._
val minDeltasForSnapshot = conf.stateStoreMinDeltasForSnapshot
val minDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
val minVersionsToRetain = conf.getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
val minVersionsToRetain = conf.stateStoreMinVersionsToRetain
}
private[streaming] object StateStoreConf {

View file

@ -338,11 +338,6 @@ object SQLConf {
.intConf
.createWithDefault(4000)
val PARTITION_DISCOVERY_ENABLED = SQLConfigBuilder("spark.sql.sources.partitionDiscovery.enabled")
.doc("When true, automatically discover data partitions.")
.booleanConf
.createWithDefault(true)
val PARTITION_COLUMN_TYPE_INFERENCE =
SQLConfigBuilder("spark.sql.sources.partitionColumnTypeInference.enabled")
.doc("When true, automatically infer the data types for partitioned columns.")
@ -391,8 +386,10 @@ object SQLConf {
val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
.doc("The degree of parallelism for schema merging and partition discovery of " +
"Parquet data sources.")
.doc("The maximum number of files allowed for listing files at driver side. If the number " +
"of detected files exceeds this value during partition discovery, it tries to list the " +
"files with another Spark distributed job. This applies to Parquet, ORC, CSV, JSON and " +
"LibSVM data sources.")
.intConf
.createWithDefault(32)
@ -592,8 +589,24 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)
def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
def stateStoreMinVersionsToRetain: Int = getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
def fileSinkLogCompatInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)
def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE)
def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
@ -657,6 +670,12 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue)
def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED)
def isParquetSchemaRespectSummaries: Boolean = getConf(PARQUET_SCHEMA_RESPECT_SUMMARIES)
def parquetOutputCommitterClass: String = getConf(PARQUET_OUTPUT_COMMITTER_CLASS)
def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)
def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
@ -673,12 +692,11 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def convertCTAS: Boolean = getConf(CONVERT_CTAS)
def partitionDiscoveryEnabled(): Boolean =
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)
def partitionColumnTypeInferenceEnabled(): Boolean =
def partitionColumnTypeInferenceEnabled: Boolean =
getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
def partitionMaxFiles: Int = getConf(PARTITION_MAX_FILES)
def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
@ -695,6 +713,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)
override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)

View file

@ -204,7 +204,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
new Path(location, name).toUri.toString
}
}.getOrElse {
@ -232,7 +232,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}