diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 6005d35f01..2c2f7c35df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -26,6 +26,11 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.sql.catalyst.CatalystConf +//////////////////////////////////////////////////////////////////////////////////////////////////// +// This file defines the configuration options for Spark SQL. +//////////////////////////////////////////////////////////////////////////////////////////////////// + + private[spark] object SQLConf { private val sqlConfEntries = java.util.Collections.synchronizedMap( @@ -184,17 +189,20 @@ private[spark] object SQLConf { val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed", defaultValue = Some(true), doc = "When set to true Spark SQL will automatically select a compression codec for each " + - "column based on statistics of the data.") + "column based on statistics of the data.", + isPublic = false) val COLUMN_BATCH_SIZE = intConf("spark.sql.inMemoryColumnarStorage.batchSize", defaultValue = Some(10000), doc = "Controls the size of batches for columnar caching. Larger batch sizes can improve " + - "memory utilization and compression, but risk OOMs when caching data.") + "memory utilization and compression, but risk OOMs when caching data.", + isPublic = false) val IN_MEMORY_PARTITION_PRUNING = booleanConf("spark.sql.inMemoryColumnarStorage.partitionPruning", defaultValue = Some(false), - doc = "") + doc = "When true, enable partition pruning for in-memory columnar tables.", + isPublic = false) val AUTO_BROADCASTJOIN_THRESHOLD = intConf("spark.sql.autoBroadcastJoinThreshold", defaultValue = Some(10 * 1024 * 1024), @@ -203,29 +211,35 @@ private[spark] object SQLConf { "Note that currently statistics are only supported for Hive Metastore tables where the " + "commandANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run.") - val DEFAULT_SIZE_IN_BYTES = longConf("spark.sql.defaultSizeInBytes", isPublic = false) + val DEFAULT_SIZE_IN_BYTES = longConf( + "spark.sql.defaultSizeInBytes", + doc = "The default table size used in query planning. By default, it is set to a larger " + + "value than `spark.sql.autoBroadcastJoinThreshold` to be more conservative. That is to say " + + "by default the optimizer will not choose to broadcast a table unless it knows for sure its" + + "size is small enough.", + isPublic = false) val SHUFFLE_PARTITIONS = intConf("spark.sql.shuffle.partitions", defaultValue = Some(200), - doc = "Configures the number of partitions to use when shuffling data for joins or " + - "aggregations.") + doc = "The default number of partitions to use when shuffling data for joins or aggregations.") val CODEGEN_ENABLED = booleanConf("spark.sql.codegen", defaultValue = Some(true), doc = "When true, code will be dynamically generated at runtime for expression evaluation in" + - " a specific query. For some queries with complicated expression this option can lead to " + - "significant speed-ups. However, for simple queries this can actually slow down query " + - "execution.") + " a specific query.") val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled", defaultValue = Some(false), - doc = "") + doc = "When true, use the new optimized Tungsten physical execution backend.") - val DIALECT = stringConf("spark.sql.dialect", defaultValue = Some("sql"), doc = "") + val DIALECT = stringConf( + "spark.sql.dialect", + defaultValue = Some("sql"), + doc = "The default SQL dialect to use.") val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive", defaultValue = Some(true), - doc = "") + doc = "Whether the query analyzer should be case sensitive or not.") val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema", defaultValue = Some(true), @@ -273,9 +287,8 @@ private[spark] object SQLConf { val PARQUET_FOLLOW_PARQUET_FORMAT_SPEC = booleanConf( key = "spark.sql.parquet.followParquetFormatSpec", defaultValue = Some(false), - doc = "Whether to stick to Parquet format specification when converting Parquet schema to " + - "Spark SQL schema and vice versa. Sticks to the specification if set to true; falls back " + - "to compatible mode if set to false.", + doc = "Whether to follow Parquet's format specification when converting Parquet schema to " + + "Spark SQL schema and vice versa.", isPublic = false) val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf( @@ -290,7 +303,7 @@ private[spark] object SQLConf { val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown", defaultValue = Some(false), - doc = "") + doc = "When true, enable filter pushdown for ORC files.") val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath", defaultValue = Some(true), @@ -302,7 +315,7 @@ private[spark] object SQLConf { val BROADCAST_TIMEOUT = intConf("spark.sql.broadcastTimeout", defaultValue = Some(5 * 60), - doc = "") + doc = "Timeout in seconds for the broadcast wait time in broadcast joins.") // Options that control which operators can be chosen by the query planner. These should be // considered hints and may be ignored by future versions of Spark SQL. @@ -313,7 +326,7 @@ private[spark] object SQLConf { val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin", defaultValue = Some(false), - doc = "") + doc = "When true, use sort merge join (as opposed to hash join) by default for large joins.") // This is only used for the thriftserver val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool", @@ -321,16 +334,16 @@ private[spark] object SQLConf { val THRIFTSERVER_UI_STATEMENT_LIMIT = intConf("spark.sql.thriftserver.ui.retainedStatements", defaultValue = Some(200), - doc = "") + doc = "The number of SQL statements kept in the JDBC/ODBC web UI history.") val THRIFTSERVER_UI_SESSION_LIMIT = intConf("spark.sql.thriftserver.ui.retainedSessions", defaultValue = Some(200), - doc = "") + doc = "The number of SQL client sessions kept in the JDBC/ODBC web UI history.") // This is used to set the default data source val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default", defaultValue = Some("org.apache.spark.sql.parquet"), - doc = "") + doc = "The default data source to use in input/output.") // This is used to control the when we will split a schema's JSON string to multiple pieces // in order to fit the JSON string in metastore's table property (by default, the value has @@ -338,18 +351,20 @@ private[spark] object SQLConf { // to its length exceeds the threshold. val SCHEMA_STRING_LENGTH_THRESHOLD = intConf("spark.sql.sources.schemaStringLengthThreshold", defaultValue = Some(4000), - doc = "") + doc = "The maximum length allowed in a single cell when " + + "storing additional schema information in Hive's metastore.", + isPublic = false) // Whether to perform partition discovery when loading external data sources. Default to true. val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled", defaultValue = Some(true), - doc = "") + doc = "When true, automtically discover data partitions.") // Whether to perform partition column type inference. Default to true. val PARTITION_COLUMN_TYPE_INFERENCE = booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled", defaultValue = Some(true), - doc = "") + doc = "When true, automatically infer the data types for partitioned columns.") // The output committer class used by HadoopFsRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. @@ -363,22 +378,28 @@ private[spark] object SQLConf { // Whether to perform eager analysis when constructing a dataframe. // Set to false when debugging requires the ability to look at invalid query plans. - val DATAFRAME_EAGER_ANALYSIS = booleanConf("spark.sql.eagerAnalysis", + val DATAFRAME_EAGER_ANALYSIS = booleanConf( + "spark.sql.eagerAnalysis", defaultValue = Some(true), - doc = "") + doc = "When true, eagerly applies query analysis on DataFrame operations.", + isPublic = false) // Whether to automatically resolve ambiguity in join conditions for self-joins. // See SPARK-6231. - val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = - booleanConf("spark.sql.selfJoinAutoResolveAmbiguity", defaultValue = Some(true), doc = "") + val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = booleanConf( + "spark.sql.selfJoinAutoResolveAmbiguity", + defaultValue = Some(true), + isPublic = false) // Whether to retain group by columns or not in GroupedData.agg. - val DATAFRAME_RETAIN_GROUP_COLUMNS = booleanConf("spark.sql.retainGroupColumns", + val DATAFRAME_RETAIN_GROUP_COLUMNS = booleanConf( + "spark.sql.retainGroupColumns", defaultValue = Some(true), - doc = "") + isPublic = false) - val USE_SQL_SERIALIZER2 = booleanConf("spark.sql.useSerializer2", - defaultValue = Some(true), doc = "") + val USE_SQL_SERIALIZER2 = booleanConf( + "spark.sql.useSerializer2", + defaultValue = Some(true), isPublic = false) val USE_JACKSON_STREAMING_API = booleanConf("spark.sql.json.useJacksonStreamingAPI", defaultValue = Some(true), doc = "") @@ -422,112 +443,53 @@ private[sql] class SQLConf extends Serializable with CatalystConf { */ private[spark] def dialect: String = getConf(DIALECT) - /** When true tables cached using the in-memory columnar caching will be compressed. */ private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED) - /** The compression codec for writing to a Parquetfile */ private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) private[spark] def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) - /** The number of rows that will be */ private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) - /** Number of partitions to use for shuffle operators. */ private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) - /** When true predicates will be passed to the parquet record reader when possible. */ private[spark] def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED) - /** When true uses Parquet implementation based on data source API */ private[spark] def parquetUseDataSourceApi: Boolean = getConf(PARQUET_USE_DATA_SOURCE_API) private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED) - /** When true uses verifyPartitionPath to prune the path which is not exists. */ private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH) - /** When true the planner will use the external sort, which may spill to disk. */ private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT) - /** - * Sort merge join would sort the two side of join first, and then iterate both sides together - * only once to get all matches. Using sort merge join can save a lot of memory usage compared - * to HashJoin. - */ private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN) - /** - * When set to true, Spark SQL will use the Janino at runtime to generate custom bytecode - * that evaluates expressions found in queries. In general this custom code runs much faster - * than interpreted evaluation, but there are some start-up costs (5-10ms) due to compilation. - */ private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED) - /** - * caseSensitive analysis true by default - */ def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE) - /** - * When set to true, Spark SQL will use managed memory for certain operations. This option only - * takes effect if codegen is enabled. - * - * Defaults to false as this feature is currently experimental. - */ private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED) private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2) - /** - * Selects between the new (true) and old (false) JSON handlers, to be removed in Spark 1.5.0 - */ private[spark] def useJacksonStreamingAPI: Boolean = getConf(USE_JACKSON_STREAMING_API) - /** - * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to - * a broadcast value during the physical executions of join operations. Setting this to -1 - * effectively disables auto conversion. - * - * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is 10000. - */ private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD) - /** - * The default size in bytes to assign to a logical operator's estimation statistics. By default, - * it is set to a larger value than `autoBroadcastJoinThreshold`, hence any logical operator - * without a properly implemented estimation of this statistic will not be incorrectly broadcasted - * in joins. - */ private[spark] def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, autoBroadcastJoinThreshold + 1L) - /** - * When set to true, we always treat byte arrays in Parquet files as strings. - */ private[spark] def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING) - /** - * When set to true, we always treat INT96Values in Parquet files as timestamp. - */ private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP) - /** - * When set to true, sticks to Parquet format spec when converting Parquet schema to Spark SQL - * schema and vice versa. Otherwise, falls back to compatible mode. - */ private[spark] def followParquetFormatSpec: Boolean = getConf(PARQUET_FOLLOW_PARQUET_FORMAT_SPEC) - /** - * When set to true, partition pruning for in-memory columnar tables is enabled. - */ private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) private[spark] def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD) - /** - * Timeout in seconds for the broadcast wait time in hash join - */ private[spark] def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT) private[spark] def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)