[SPARK-15012][SQL] Simplify configuration API further

## What changes were proposed in this pull request?

1. Remove all the `spark.setConf` etc. Just expose `spark.conf`
2. Make `spark.conf` take in things set in the core `SparkConf` as well, otherwise users may get confused

This was done for both the Python and Scala APIs.

## How was this patch tested?
`SQLConfSuite`, python tests.

This one fixes the failed tests in #12787

Closes #12787

Author: Andrew Or <andrew@databricks.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #12798 from yhuai/conf-api.
This commit is contained in:
Andrew Or 2016-04-29 20:46:07 -07:00 committed by Yin Huai
parent b056e8cb0a
commit 66773eb8a5
18 changed files with 108 additions and 187 deletions

View file

@ -114,7 +114,7 @@ class SQLContext(object):
def setConf(self, key, value):
"""Sets the given Spark SQL configuration property.
"""
self.sparkSession.setConf(key, value)
self.sparkSession.conf.set(key, value)
@ignore_unicode_prefix
@since(1.3)
@ -133,7 +133,7 @@ class SQLContext(object):
>>> sqlContext.getConf("spark.sql.shuffle.partitions", u"10")
u'50'
"""
return self.sparkSession.getConf(key, defaultValue)
return self.sparkSession.conf.get(key, defaultValue)
@property
@since("1.3.1")

View file

@ -134,35 +134,6 @@ class SparkSession(object):
self._conf = RuntimeConfig(self._jsparkSession.conf())
return self._conf
@since(2.0)
def setConf(self, key, value):
"""
Sets the given Spark SQL configuration property.
"""
self._jsparkSession.setConf(key, value)
@ignore_unicode_prefix
@since(2.0)
def getConf(self, key, defaultValue=None):
"""Returns the value of Spark SQL configuration property for the given key.
If the key is not set and defaultValue is not None, return
defaultValue. If the key is not set and defaultValue is None, return
the system default value.
>>> spark.getConf("spark.sql.shuffle.partitions")
u'200'
>>> spark.getConf("spark.sql.shuffle.partitions", "10")
u'10'
>>> spark.setConf("spark.sql.shuffle.partitions", "50")
>>> spark.getConf("spark.sql.shuffle.partitions", "10")
u'50'
"""
if defaultValue is not None:
return self._jsparkSession.getConf(key, defaultValue)
else:
return self._jsparkSession.getConf(key)
@property
@since(2.0)
def catalog(self):

View file

@ -1397,9 +1397,9 @@ class SQLTests(ReusedPySparkTestCase):
def test_conf(self):
spark = self.sparkSession
spark.setConf("bogo", "sipeo")
spark.conf.set("bogo", "sipeo")
self.assertEqual(self.sparkSession.conf.get("bogo"), "sipeo")
spark.setConf("bogo", "ta")
spark.conf.set("bogo", "ta")
self.assertEqual(spark.conf.get("bogo"), "ta")
self.assertEqual(spark.conf.get("bogo", "not.read"), "ta")
self.assertEqual(spark.conf.get("not.set", "ta"), "ta")

View file

@ -184,7 +184,7 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
if (sparkSession.getConf(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}

View file

@ -284,9 +284,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
new Path(userSpecified).toUri.toString
}.orElse {
val checkpointConfig: Option[String] =
df.sparkSession.getConf(
SQLConf.CHECKPOINT_LOCATION,
None)
df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION, None)
checkpointConfig.map { location =>
new Path(location, queryName).toUri.toString

View file

@ -302,7 +302,7 @@ class RelationalGroupedDataset protected[sql](
*/
def pivot(pivotColumn: String): RelationalGroupedDataset = {
// This is to prevent unintended OOM errors when the number of distinct values is large
val maxValues = df.sparkSession.getConf(SQLConf.DATAFRAME_PIVOT_MAX_VALUES)
val maxValues = df.sparkSession.conf.get(SQLConf.DATAFRAME_PIVOT_MAX_VALUES)
// Get the distinct values of the column and sort them so its consistent
val values = df.select(pivotColumn)
.distinct()

View file

@ -17,8 +17,10 @@
package org.apache.spark.sql
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql.internal.SQLConf
/**
* Runtime configuration interface for Spark. To access this, use [[SparkSession.conf]].
*
@ -76,6 +78,30 @@ class RuntimeConfig private[sql](sqlConf: SQLConf = new SQLConf) {
sqlConf.getConfString(key, default)
}
/**
* Returns the value of Spark runtime configuration property for the given key.
*/
@throws[NoSuchElementException]("if the key is not set")
protected[sql] def get[T](entry: ConfigEntry[T]): T = {
sqlConf.getConf(entry)
}
/**
* Returns the value of Spark runtime configuration property for the given key.
*/
protected[sql] def get[T](entry: ConfigEntry[T], default: T): T = {
sqlConf.getConf(entry, default)
}
/**
* Returns all properties set in this conf.
*
* @since 2.0.0
*/
def getAll: Map[String, String] = {
sqlConf.getAllConfs
}
/**
* Returns the value of Spark runtime configuration property for the given key.
*

View file

@ -134,13 +134,15 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
def setConf(props: Properties): Unit = sparkSession.setConf(props)
def setConf(props: Properties): Unit = {
sessionState.conf.setConf(props)
}
/**
* Set the given Spark SQL configuration property.
*/
private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
sparkSession.setConf(entry, value)
sessionState.conf.setConf(entry, value)
}
/**
@ -149,7 +151,9 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
def setConf(key: String, value: String): Unit = sparkSession.setConf(key, value)
def setConf(key: String, value: String): Unit = {
sparkSession.conf.set(key, value)
}
/**
* Return the value of Spark SQL configuration property for the given key.
@ -157,13 +161,17 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
def getConf(key: String): String = sparkSession.getConf(key)
def getConf(key: String): String = {
sparkSession.conf.get(key)
}
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
* yet, return `defaultValue` in [[ConfigEntry]].
*/
private[sql] def getConf[T](entry: ConfigEntry[T]): T = sparkSession.getConf(entry)
private[sql] def getConf[T](entry: ConfigEntry[T]): T = {
sparkSession.conf.get(entry)
}
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
@ -171,7 +179,7 @@ class SQLContext private[sql](
* desired one.
*/
private[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
sparkSession.getConf(entry, defaultValue)
sparkSession.conf.get(entry, defaultValue)
}
/**
@ -181,7 +189,9 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
def getConf(key: String, defaultValue: String): String = sparkSession.getConf(key, defaultValue)
def getConf(key: String, defaultValue: String): String = {
sparkSession.conf.get(key, defaultValue)
}
/**
* Return all the configuration properties that have been set (i.e. not the default).
@ -190,7 +200,9 @@ class SQLContext private[sql](
* @group config
* @since 1.0.0
*/
def getAllConfs: immutable.Map[String, String] = sparkSession.getAllConfs
def getAllConfs: immutable.Map[String, String] = {
sparkSession.conf.getAll
}
protected[sql] def parseSql(sql: String): LogicalPlan = sparkSession.parseSql(sql)

View file

@ -108,6 +108,18 @@ class SparkSession private(
protected[sql] def listener: SQLListener = sharedState.listener
protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
/**
* Runtime configuration interface for Spark.
*
* This is the interface through which the user can get and set all Spark and Hadoop
* configurations that are relevant to Spark SQL. When getting the value of a config,
* this defaults to the value set in the underlying [[SparkContext]], if any.
*
* @group config
* @since 2.0.0
*/
@transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf)
/**
* :: Experimental ::
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
@ -187,89 +199,6 @@ class SparkSession private(
}
/* -------------------------------------------------- *
| Methods for accessing or mutating configurations |
* -------------------------------------------------- */
/**
* Runtime configuration interface for Spark.
*
* This is the interface through which the user can get and set all Spark and Hadoop
* configurations that are relevant to Spark SQL. When getting the value of a config,
* this defaults to the value set in the underlying [[SparkContext]], if any.
*
* @group config
* @since 2.0.0
*/
@transient lazy val conf: RuntimeConfig = new RuntimeConfig(sessionState.conf)
/**
* Set Spark SQL configuration properties.
*
* @group config
* @since 2.0.0
*/
def setConf(props: Properties): Unit = sessionState.setConf(props)
/**
* Set the given Spark SQL configuration property.
*
* @group config
* @since 2.0.0
*/
def setConf(key: String, value: String): Unit = sessionState.setConf(key, value)
/**
* Return the value of Spark SQL configuration property for the given key.
*
* @group config
* @since 2.0.0
*/
def getConf(key: String): String = sessionState.conf.getConfString(key)
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
* yet, return `defaultValue`.
*
* @group config
* @since 2.0.0
*/
def getConf(key: String, defaultValue: String): String = {
sessionState.conf.getConfString(key, defaultValue)
}
/**
* Return all the configuration properties that have been set (i.e. not the default).
* This creates a new copy of the config properties in the form of a Map.
*
* @group config
* @since 2.0.0
*/
def getAllConfs: immutable.Map[String, String] = sessionState.conf.getAllConfs
/**
* Set the given Spark SQL configuration property.
*/
protected[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
sessionState.setConf(entry, value)
}
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
* yet, return `defaultValue` in [[ConfigEntry]].
*/
protected[sql] def getConf[T](entry: ConfigEntry[T]): T = sessionState.conf.getConf(entry)
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
* yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the
* desired one.
*/
protected[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
sessionState.conf.getConf(entry, defaultValue)
}
/* --------------------------------- *
| Methods for creating DataFrames |
* --------------------------------- */

View file

@ -56,7 +56,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
"determining the number of reducers is not supported."
throw new IllegalArgumentException(msg)
} else {
sparkSession.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, value)
Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
}
}
@ -65,7 +65,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sparkSession: SparkSession) => {
sparkSession.setConf(key, value)
sparkSession.conf.set(key, value)
Seq(Row(key, value))
}
(keyValueOutput, runFunc)
@ -74,7 +74,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
// Queries all key-value pairs that are set in the SQLConf of the sparkSession.
case None =>
val runFunc = (sparkSession: SparkSession) => {
sparkSession.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
sparkSession.conf.getAll.map { case (k, v) => Row(k, v) }.toSeq
}
(keyValueOutput, runFunc)
@ -107,10 +107,7 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
// Queries a single property.
case Some((key, None)) =>
val runFunc = (sparkSession: SparkSession) => {
val value =
try sparkSession.getConf(key) catch {
case _: NoSuchElementException => "<undefined>"
}
val value = sparkSession.conf.getOption(key).getOrElse("<undefined>")
Seq(Row(key, value))
}
(keyValueOutput, runFunc)

View file

@ -131,7 +131,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
dataColumns = dataColumns,
inputSchema = query.output,
PartitioningUtils.DEFAULT_PARTITION_NAME,
sparkSession.getConf(SQLConf.PARTITION_MAX_FILES),
sparkSession.conf.get(SQLConf.PARTITION_MAX_FILES),
isAppend)
}

View file

@ -143,10 +143,9 @@ private[sql] class DefaultSource
parameters
.get(ParquetRelation.MERGE_SCHEMA)
.map(_.toBoolean)
.getOrElse(sparkSession.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
.getOrElse(sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
val mergeRespectSummaries =
sparkSession.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
val mergeRespectSummaries = sparkSession.conf.get(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
val filesByType = splitFiles(files)
@ -281,22 +280,23 @@ private[sql] class DefaultSource
// Sets flags for `CatalystSchemaConverter`
hadoopConf.setBoolean(
SQLConf.PARQUET_BINARY_AS_STRING.key,
sparkSession.getConf(SQLConf.PARQUET_BINARY_AS_STRING))
sparkSession.conf.get(SQLConf.PARQUET_BINARY_AS_STRING))
hadoopConf.setBoolean(
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sparkSession.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
sparkSession.conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
// Try to push down filters when filter push-down is enabled.
val pushed = if (sparkSession.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
} else {
None
}
val pushed =
if (sparkSession.conf.get(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(ParquetFilters.createFilter(requiredSchema, _))
.reduceOption(FilterApi.and)
} else {
None
}
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

View file

@ -80,11 +80,11 @@ class FileStreamSinkLog(sparkSession: SparkSession, path: String)
* a live lock may happen if the compaction happens too frequently: one processing keeps deleting
* old files while another one keeps retrying. Setting a reasonable cleanup delay could avoid it.
*/
private val fileCleanupDelayMs = sparkSession.getConf(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
private val isDeletingExpiredLog = sparkSession.getConf(SQLConf.FILE_SINK_LOG_DELETION)
private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
private val compactInterval = sparkSession.getConf(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")

View file

@ -763,9 +763,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
}
private def setConfWithCheck(key: String, value: String): Unit = {
if (key.startsWith("spark.") && !key.startsWith("spark.sql.")) {
logWarning(s"Attempt to set non-Spark SQL config in SQLConf: key = $key, value = $value")
}
settings.put(key, value)
}

View file

@ -152,9 +152,11 @@ private[sql] class SessionState(sparkSession: SparkSession) {
private val jarClassLoader: NonClosableMutableURLClassLoader =
sparkSession.sharedState.jarClassLoader
// Automatically extract `spark.sql.*` entries and put it in our SQLConf
// Automatically extract all entries and put it in our SQLConf
// We need to call it after all of vals have been initialized.
setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf))
sparkSession.sparkContext.getConf.getAll.foreach { case (k, v) =>
conf.setConfString(k, v)
}
// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
@ -170,19 +172,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
}
final def setConf(properties: Properties): Unit = {
properties.asScala.foreach { case (k, v) => setConf(k, v) }
}
final def setConf[T](entry: ConfigEntry[T], value: T): Unit = {
conf.setConf(entry, value)
setConf(entry.key, entry.stringConverter(value))
}
def setConf(key: String, value: String): Unit = {
conf.setConfString(key, value)
}
def addJar(path: String): Unit = {
sparkSession.sparkContext.addJar(path)

View file

@ -79,10 +79,4 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule))
}
test("SQLContext can access `spark.sql.*` configs") {
sc.conf.set("spark.sql.with.or.without.you", "my love")
val sqlContext = new SQLContext(sc)
assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love")
}
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.internal
import org.apache.spark.sql.{QueryTest, SQLContext}
import org.apache.spark.sql.{QueryTest, SparkSession, SQLContext}
import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
class SQLConfSuite extends QueryTest with SharedSQLContext {
@ -125,4 +125,18 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
sqlContext.conf.clear()
}
test("SparkSession can access configs set in SparkConf") {
try {
sparkContext.conf.set("spark.to.be.or.not.to.be", "my love")
sparkContext.conf.set("spark.sql.with.or.without.you", "my love")
val spark = new SparkSession(sparkContext)
assert(spark.conf.get("spark.to.be.or.not.to.be") == "my love")
assert(spark.conf.get("spark.sql.with.or.without.you") == "my love")
} finally {
sparkContext.conf.remove("spark.to.be.or.not.to.be")
sparkContext.conf.remove("spark.sql.with.or.without.you")
}
}
}

View file

@ -17,7 +17,6 @@
package org.apache.spark.sql.hive
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.sql._
@ -114,12 +113,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
* - allow SQL11 keywords to be used as identifiers
*/
def setDefaultOverrideConfs(): Unit = {
setConf(ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, "false")
}
override def setConf(key: String, value: String): Unit = {
super.setConf(key, value)
metadataHive.runSqlHive(s"SET $key=$value")
conf.setConfString(ConfVars.HIVE_SUPPORT_SQL11_RESERVED_KEYWORDS.varname, "false")
}
override def addJar(path: String): Unit = {