[SPARK-28341][SQL] create a public API for V2SessionCatalog

## What changes were proposed in this pull request?

The `V2SessionCatalog` has 2 functionalities:
1. work as an adapter: provide v2 APIs and translate calls to the `SessionCatalog`.
2. allow users to extend it, so that they can add hooks to apply custom logic before calling methods of the builtin catalog (session catalog).

To leverage the second functionality, users must extend `V2SessionCatalog` which is an internal class. There is no doc to explain this usage.

This PR does 2 things:
1. refine the document of the config `spark.sql.catalog.session`.
2. add a public abstract class `CatalogExtension` for users to write implementations.

TODOs for followup PRs:
1. discuss if we should allow users to completely overwrite the v2 session catalog with a new one.
2. discuss to change the name of session catalog, so that it's less likely to conflict with existing namespace names.

## How was this patch tested?

existing tests

Closes #25104 from cloud-fan/session-catalog.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2019-09-09 21:14:37 +08:00
parent dadb72028a
commit abec6d7763
18 changed files with 246 additions and 94 deletions

View file

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalog.v2;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* An API to extend the Spark built-in session catalog. Implementation can get the built-in session
* catalog from {@link #setDelegateCatalog(TableCatalog)}, implement catalog functions with
* some custom logic and call the built-in session catalog at the end. For example, they can
* implement {@code createTable}, do something else before calling {@code createTable} of the
* built-in session catalog.
*/
@Experimental
public interface CatalogExtension extends TableCatalog {
/**
* This will be called only once by Spark to pass in the Spark built-in session catalog, after
* {@link #initialize(String, CaseInsensitiveStringMap)} is called.
*/
void setDelegateCatalog(TableCatalog delegate);
}

View file

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalog.v2;
import java.util.Map;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.catalog.v2.expressions.Transform;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* A simple implementation of {@link CatalogExtension}, which implements all the catalog functions
* by calling the built-in session catalog directly. This is created for convenience, so that users
* only need to override some methods where they want to apply custom logic. For example, they can
* override {@code createTable}, do something else before calling {@code super.createTable}.
*/
@Experimental
public abstract class DelegatingCatalogExtension implements CatalogExtension {
private TableCatalog delegate;
public final void setDelegateCatalog(TableCatalog delegate) {
this.delegate = delegate;
}
@Override
public String name() {
return delegate.name();
}
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {}
@Override
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
return delegate.listTables(namespace);
}
@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
return delegate.loadTable(ident);
}
@Override
public void invalidateTable(Identifier ident) {
delegate.invalidateTable(ident);
}
@Override
public boolean tableExists(Identifier ident) {
return delegate.tableExists(ident);
}
@Override
public Table createTable(
Identifier ident,
StructType schema,
Transform[] partitions,
Map<String, String> properties) throws TableAlreadyExistsException, NoSuchNamespaceException {
return delegate.createTable(ident, schema, partitions, properties);
}
@Override
public Table alterTable(
Identifier ident,
TableChange... changes) throws NoSuchTableException {
return delegate.alterTable(ident, changes);
}
@Override
public boolean dropTable(Identifier ident) {
return delegate.dropTable(ident);
}
@Override
public void renameTable(
Identifier oldIdent,
Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException {
delegate.renameTable(oldIdent, newIdent);
}
}

View file

@ -27,13 +27,17 @@ import org.apache.spark.sql.internal.SQLConf
* A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow
* the caller to look up a catalog by name.
*/
class CatalogManager(conf: SQLConf) extends Logging {
class CatalogManager(conf: SQLConf, defaultSessionCatalog: TableCatalog) extends Logging {
private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]
def catalog(name: String): CatalogPlugin = synchronized {
if (name.equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)) {
v2SessionCatalog
} else {
catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))
}
}
def defaultCatalog: Option[CatalogPlugin] = {
conf.defaultV2Catalog.flatMap { catalogName =>
@ -47,16 +51,30 @@ class CatalogManager(conf: SQLConf) extends Logging {
}
}
def v2SessionCatalog: Option[CatalogPlugin] = {
try {
Some(catalog(CatalogManager.SESSION_CATALOG_NAME))
} catch {
case NonFatal(e) =>
logError("Cannot load v2 session catalog", e)
None
private def loadV2SessionCatalog(): CatalogPlugin = {
Catalogs.load(CatalogManager.SESSION_CATALOG_NAME, conf) match {
case extension: CatalogExtension =>
extension.setDelegateCatalog(defaultSessionCatalog)
extension
case other => other
}
}
// If the V2_SESSION_CATALOG config is specified, we try to instantiate the user-specified v2
// session catalog. Otherwise, return the default session catalog.
def v2SessionCatalog: CatalogPlugin = {
conf.getConf(SQLConf.V2_SESSION_CATALOG).map { customV2SessionCatalog =>
try {
catalogs.getOrElseUpdate(CatalogManager.SESSION_CATALOG_NAME, loadV2SessionCatalog())
} catch {
case NonFatal(_) =>
logError(
"Fail to instantiate the custom v2 session catalog: " + customV2SessionCatalog)
defaultSessionCatalog
}
}.getOrElse(defaultSessionCatalog)
}
private def getDefaultNamespace(c: CatalogPlugin) = c match {
case c: SupportsNamespaces => c.defaultNamespace()
case _ => Array.empty[String]

View file

@ -45,7 +45,7 @@ trait LookupCatalog extends Logging {
* This happens when the source implementation extends the v2 TableProvider API and is not listed
* in the fallback configuration, spark.sql.sources.write.useV1SourceList
*/
def sessionCatalog: Option[CatalogPlugin] = catalogManager.v2SessionCatalog
def sessionCatalog: CatalogPlugin = catalogManager.v2SessionCatalog
/**
* Extract catalog plugin and remaining identifier names.

View file

@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.analysis
import java.util
import java.util.Locale
import scala.collection.mutable
@ -25,7 +26,7 @@ import scala.util.Random
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.v2._
import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.catalog.v2.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.OuterScopes
@ -45,6 +46,7 @@ import org.apache.spark.sql.internal.SQLConf.{PartitionOverwriteMode, StoreAssig
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.sources.v2.internal.V1Table
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* A trivial [[Analyzer]] with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
@ -60,6 +62,24 @@ object SimpleAnalyzer extends Analyzer(
},
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
object FakeV2SessionCatalog extends TableCatalog {
private def fail() = throw new UnsupportedOperationException
override def listTables(namespace: Array[String]): Array[Identifier] = fail()
override def loadTable(ident: Identifier): Table = {
throw new NoSuchTableException(ident.toString)
}
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = fail()
override def alterTable(ident: Identifier, changes: TableChange*): Table = fail()
override def dropTable(ident: Identifier): Boolean = fail()
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = fail()
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = fail()
override def name(): String = fail()
}
/**
* Provides a way to keep state during the analysis, this enables us to decouple the concerns
* of analysis environment from the catalog.
@ -101,15 +121,21 @@ object AnalysisContext {
*/
class Analyzer(
catalog: SessionCatalog,
v2SessionCatalog: TableCatalog,
conf: SQLConf,
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {
// Only for tests.
def this(catalog: SessionCatalog, conf: SQLConf) = {
this(catalog, conf, conf.optimizerMaxIterations)
this(catalog, FakeV2SessionCatalog, conf, conf.optimizerMaxIterations)
}
override val catalogManager: CatalogManager = new CatalogManager(conf)
def this(catalog: SessionCatalog, v2SessionCatalog: TableCatalog, conf: SQLConf) = {
this(catalog, v2SessionCatalog, conf, conf.optimizerMaxIterations)
}
override val catalogManager: CatalogManager = new CatalogManager(conf, v2SessionCatalog)
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
AnalysisHelper.markInAnalyzer {
@ -954,7 +980,7 @@ class Analyzer(
case scala.Right(tableOpt) =>
tableOpt.map { table =>
AlterTable(
sessionCatalog.get.asTableCatalog, // table being resolved means this exists
sessionCatalog.asTableCatalog,
Identifier.of(tableName.init.toArray, tableName.last),
DataSourceV2Relation.create(table),
changes
@ -2837,7 +2863,7 @@ class Analyzer(
case CatalogObjectIdentifier(Some(v2Catalog), ident) =>
scala.Left((v2Catalog, ident, loadTable(v2Catalog, ident)))
case CatalogObjectIdentifier(None, ident) =>
catalogManager.v2SessionCatalog.flatMap(loadTable(_, ident)) match {
loadTable(catalogManager.v2SessionCatalog, ident) match {
case Some(_: V1Table) => scala.Right(None)
case other => scala.Right(other)
}

View file

@ -1965,9 +1965,12 @@ object SQLConf {
.createOptional
val V2_SESSION_CATALOG = buildConf("spark.sql.catalog.session")
.doc("Name of the default v2 catalog, used when a catalog is not identified in queries")
.doc("A catalog implementation that will be used in place of the Spark built-in session " +
"catalog for v2 operations. The implementation may extend `CatalogExtension` to be " +
"passed the Spark built-in session catalog, so that it may delegate calls to the " +
"built-in session catalog.")
.stringConf
.createWithDefault("org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog")
.createOptional
val LEGACY_LOOSE_UPCAST = buildConf("spark.sql.legacy.looseUpcast")
.doc("When true, the upcast will be loose and allows string to atomic types.")

View file

@ -21,6 +21,7 @@ import java.util
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalog.v2.{CatalogManager, NamespaceChange, SupportsNamespaces}
import org.apache.spark.sql.catalyst.analysis.FakeV2SessionCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@ -28,7 +29,7 @@ class CatalogManagerSuite extends SparkFunSuite {
test("CatalogManager should reflect the changes of default catalog") {
val conf = new SQLConf
val catalogManager = new CatalogManager(conf)
val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog)
assert(catalogManager.currentCatalog.isEmpty)
assert(catalogManager.currentNamespace.sameElements(Array("default")))
@ -42,7 +43,7 @@ class CatalogManagerSuite extends SparkFunSuite {
test("CatalogManager should keep the current catalog once set") {
val conf = new SQLConf
val catalogManager = new CatalogManager(conf)
val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog)
assert(catalogManager.currentCatalog.isEmpty)
conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName)
catalogManager.setCurrentCatalog("dummy")
@ -57,7 +58,7 @@ class CatalogManagerSuite extends SparkFunSuite {
test("current namespace should be updated when switching current catalog") {
val conf = new SQLConf
val catalogManager = new CatalogManager(conf)
val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog)
catalogManager.setCurrentNamespace(Array("abc"))
assert(catalogManager.currentNamespace.sameElements(Array("abc")))

View file

@ -354,15 +354,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val session = df.sparkSession
val canUseV2 = lookupV2Provider().isDefined
val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog
val sessionCatalog = session.sessionState.analyzer.sessionCatalog
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case CatalogObjectIdentifier(Some(catalog), ident) =>
insertInto(catalog, ident)
case CatalogObjectIdentifier(None, ident)
if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 =>
insertInto(sessionCatalogOpt.get, ident)
case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 =>
insertInto(sessionCatalog, ident)
case AsTableIdentifier(tableIdentifier) =>
insertInto(tableIdentifier)
@ -488,17 +487,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val session = df.sparkSession
val canUseV2 = lookupV2Provider().isDefined
val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog
val sessionCatalog = session.sessionState.analyzer.sessionCatalog
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case CatalogObjectIdentifier(Some(catalog), ident) =>
saveAsTable(catalog.asTableCatalog, ident, modeForDSV2)
case CatalogObjectIdentifier(None, ident)
if canUseV2 && sessionCatalogOpt.isDefined && ident.namespace().length <= 1 =>
case CatalogObjectIdentifier(None, ident) if canUseV2 && ident.namespace().length <= 1 =>
// We pass in the modeForDSV1, as using the V2 session catalog should maintain compatibility
// for now.
saveAsTable(sessionCatalogOpt.get.asTableCatalog, ident, modeForDSV1)
saveAsTable(sessionCatalog.asTableCatalog, ident, modeForDSV1)
case AsTableIdentifier(tableIdentifier) =>
saveAsTable(tableIdentifier)

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalog.v2.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.{CatalogManager, Identifier, LookupCatalog, TableCatalog}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation}
@ -40,9 +40,6 @@ case class DataSourceResolution(
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
def v2SessionCatalog: CatalogPlugin = sessionCatalog.getOrElse(
throw new AnalysisException("No v2 session catalog implementation is available"))
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTableStatement(
AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties,
@ -64,7 +61,7 @@ case class DataSourceResolution(
case _ =>
// the identifier had no catalog and no default catalog is set, but the source is v2.
// use the v2 session catalog, which delegates to the global v1 session catalog
convertCreateTable(v2SessionCatalog.asTableCatalog, identifier, create)
convertCreateTable(sessionCatalog.asTableCatalog, identifier, create)
}
case CreateTableAsSelectStatement(
@ -87,7 +84,7 @@ case class DataSourceResolution(
case _ =>
// the identifier had no catalog and no default catalog is set, but the source is v2.
// use the v2 session catalog, which delegates to the global v1 session catalog
convertCTAS(v2SessionCatalog.asTableCatalog, identifier, create)
convertCTAS(sessionCatalog.asTableCatalog, identifier, create)
}
case DescribeColumnStatement(
@ -119,19 +116,13 @@ case class DataSourceResolution(
case replace: ReplaceTableStatement =>
// the provider was not a v1 source, convert to a v2 plan
val CatalogObjectIdentifier(maybeCatalog, identifier) = replace.tableName
val catalog = maybeCatalog.orElse(sessionCatalog)
.getOrElse(throw new AnalysisException(
s"No catalog specified for table ${identifier.quoted} and no default catalog is set"))
.asTableCatalog
val catalog = maybeCatalog.getOrElse(sessionCatalog).asTableCatalog
convertReplaceTable(catalog, identifier, replace)
case rtas: ReplaceTableAsSelectStatement =>
// the provider was not a v1 source, convert to a v2 plan
val CatalogObjectIdentifier(maybeCatalog, identifier) = rtas.tableName
val catalog = maybeCatalog.orElse(sessionCatalog)
.getOrElse(throw new AnalysisException(
s"No catalog specified for table ${identifier.quoted} and no default catalog is set"))
.asTableCatalog
val catalog = maybeCatalog.getOrElse(sessionCatalog).asTableCatalog
convertRTAS(catalog, identifier, rtas)
case DropTableStatement(CatalogObjectIdentifier(Some(catalog), ident), ifExists, _) =>

View file

@ -24,15 +24,15 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalog.v2.{Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.{CatalogManager, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.NamespaceChange.{RemoveProperty, SetProperty}
import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.sources.v2.internal.V1Table
import org.apache.spark.sql.types.StructType
@ -41,25 +41,17 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
* A [[TableCatalog]] that translates calls to the v1 SessionCatalog.
*/
class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with SupportsNamespaces {
class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
extends TableCatalog with SupportsNamespaces {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
import V2SessionCatalog._
def this() = {
this(SparkSession.active.sessionState)
}
override val defaultNamespace: Array[String] = Array("default")
private lazy val catalog: SessionCatalog = sessionState.catalog
override def name: String = CatalogManager.SESSION_CATALOG_NAME
private var _name: String = _
override def name: String = _name
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
this._name = name
}
// This class is instantiated by Spark, so `initialize` method will not be called.
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}
override def listTables(namespace: Array[String]): Array[Identifier] = {
namespace match {
@ -92,7 +84,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with Sup
properties: util.Map[String, String]): Table = {
val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions)
val provider = properties.getOrDefault("provider", sessionState.conf.defaultDataSourceName)
val provider = properties.getOrDefault("provider", conf.defaultDataSourceName)
val tableProperties = properties.asScala
val location = Option(properties.get(LOCATION_TABLE_PROP))
val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap)
@ -108,7 +100,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with Sup
partitionColumnNames = partitionColumns,
bucketSpec = maybeBucketSpec,
properties = tableProperties.toMap,
tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions,
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
comment = Option(properties.get(COMMENT_TABLE_PROP)))
try {

View file

@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarRule, QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
import org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.TableCapabilityCheck
import org.apache.spark.sql.execution.datasources.v2.{TableCapabilityCheck, V2SessionCatalog}
import org.apache.spark.sql.streaming.StreamingQueryManager
import org.apache.spark.sql.util.ExecutionListenerManager
@ -151,6 +151,8 @@ abstract class BaseSessionStateBuilder(
catalog
}
protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf)
/**
* Interface exposed to the user for registering user-defined functions.
*
@ -164,7 +166,7 @@ abstract class BaseSessionStateBuilder(
*
* Note: this depends on the `conf` and `catalog` fields.
*/
protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:

View file

@ -61,14 +61,12 @@ class PlanResolutionSuite extends AnalysisTest {
invocation.getArgument[String](0) match {
case "testcat" =>
testCat
case "session" =>
v2SessionCatalog
case name =>
throw new CatalogNotFoundException(s"No such catalog: $name")
}
})
when(manager.defaultCatalog).thenReturn(Some(testCat))
when(manager.v2SessionCatalog).thenCallRealMethod()
when(manager.v2SessionCatalog).thenReturn(v2SessionCatalog)
manager
}
@ -78,14 +76,12 @@ class PlanResolutionSuite extends AnalysisTest {
invocation.getArgument[String](0) match {
case "testcat" =>
testCat
case "session" =>
v2SessionCatalog
case name =>
throw new CatalogNotFoundException(s"No such catalog: $name")
}
})
when(manager.defaultCatalog).thenReturn(None)
when(manager.v2SessionCatalog).thenCallRealMethod()
when(manager.v2SessionCatalog).thenReturn(v2SessionCatalog)
manager
}

View file

@ -46,7 +46,7 @@ class V2SessionCatalogBaseSuite extends SparkFunSuite with SharedSparkSession wi
val testIdent: Identifier = Identifier.of(testNs, "test_table")
def newCatalog(): V2SessionCatalog = {
val newCatalog = new V2SessionCatalog(spark.sessionState)
val newCatalog = new V2SessionCatalog(spark.sessionState.catalog, spark.sessionState.conf)
newCatalog.initialize("test", CaseInsensitiveStringMap.empty())
newCatalog
}
@ -58,7 +58,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
override protected def beforeAll(): Unit = {
super.beforeAll()
// TODO: when there is a public API for v2 catalogs, use that instead
val catalog = newCatalog()
catalog.createNamespace(Array("db"), emptyProps)
catalog.createNamespace(Array("db2"), emptyProps)
@ -82,16 +81,6 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
private val testIdentNew = Identifier.of(testNs, "test_table_new")
test("Catalogs can load the catalog") {
val catalog = newCatalog()
val conf = new SQLConf
conf.setConfString("spark.sql.catalog.test", catalog.getClass.getName)
val loaded = Catalogs.load("test", conf)
assert(loaded.getClass == catalog.getClass)
}
test("listTables") {
val catalog = newCatalog()
val ident1 = Identifier.of(Array("ns"), "test_table_1")

View file

@ -21,15 +21,14 @@ import java.util
import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, SaveMode}
import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode}
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog, TableChange}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.InMemoryTable
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode, V2_SESSION_CATALOG}
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG
import org.apache.spark.sql.sources.v2.utils.TestV2SessionCatalogBase
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
@ -89,7 +88,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
val t1 = "prop_table"
withTable(t1) {
spark.range(20).write.format(v2Format).option("path", "abc").saveAsTable(t1)
val cat = spark.sessionState.catalogManager.v2SessionCatalog.get.asInstanceOf[TableCatalog]
val cat = spark.sessionState.catalogManager.v2SessionCatalog.asInstanceOf[TableCatalog]
val tableInfo = cat.loadTable(Identifier.of(Array.empty, t1))
assert(tableInfo.properties().get("location") === "abc")
assert(tableInfo.properties().get("provider") === v2Format)
@ -156,7 +155,7 @@ private[v2] trait SessionCatalogTest[T <: Table, Catalog <: TestV2SessionCatalog
override def afterEach(): Unit = {
super.afterEach()
catalog("session").asInstanceOf[Catalog].clearTables()
spark.conf.set(V2_SESSION_CATALOG.key, classOf[V2SessionCatalog].getName)
spark.conf.unset(V2_SESSION_CATALOG.key)
}
protected def verifyTable(tableName: String, expected: DataFrame): Unit

View file

@ -46,7 +46,7 @@ class DataSourceV2SQLSessionCatalogSuite
}
override def getTableMetadata(tableName: String): Table = {
val v2Catalog = spark.sessionState.catalogManager.v2SessionCatalog.get
val v2Catalog = spark.sessionState.catalogManager.v2SessionCatalog
val nameParts = spark.sessionState.sqlParser.parseMultipartIdentifier(tableName)
v2Catalog.asInstanceOf[TableCatalog]
.loadTable(Identifier.of(Array.empty, nameParts.last))

View file

@ -19,16 +19,14 @@ package org.apache.spark.sql.sources.v2
import scala.collection.JavaConverters._
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog, StagingInMemoryTableCatalog}
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG
import org.apache.spark.sql.sources.v2.internal.V1Table
import org.apache.spark.sql.types.{ArrayType, BooleanType, DoubleType, IntegerType, LongType, MapType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class DataSourceV2SQLSuite
@ -512,7 +510,8 @@ class DataSourceV2SQLSuite
}
test("CreateTableAsSelect: v2 session catalog can load v1 source table") {
spark.conf.set(V2_SESSION_CATALOG.key, classOf[V2SessionCatalog].getName)
// unset this config to use the default v2 session catalog.
spark.conf.unset(V2_SESSION_CATALOG.key)
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
df.createOrReplaceTempView("source")

View file

@ -22,9 +22,8 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalog.v2.Identifier
import org.apache.spark.sql.catalog.v2.{DelegatingCatalogExtension, Identifier}
import org.apache.spark.sql.catalog.v2.expressions.Transform
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.types.StructType
@ -33,7 +32,7 @@ import org.apache.spark.sql.types.StructType
* for testing DDL as well as write operations (through df.write.saveAsTable, df.write.insertInto
* and SQL).
*/
private[v2] trait TestV2SessionCatalogBase[T <: Table] extends V2SessionCatalog {
private[v2] abstract class TestV2SessionCatalogBase[T <: Table] extends DelegatingCatalogExtension {
protected val tables: util.Map[Identifier, T] = new ConcurrentHashMap[Identifier, T]()

View file

@ -67,7 +67,7 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
/**
* A logical query plan `Analyzer` with rules specific to Hive.
*/
override protected def analyzer: Analyzer = new Analyzer(catalog, conf) {
override protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new ResolveHiveSerdeTable(session) +:
new FindDataSourceTable(session) +: