From 976946a910d877c22213df8fe4508969f6472aa0 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 23 Jan 2020 13:02:10 -0800 Subject: [PATCH] [SPARK-29947][SQL][FOLLOWUP] Fix table lookup cache ### What changes were proposed in this pull request? Fix a bug in https://github.com/apache/spark/pull/26589 , to make this feature work. ### Why are the changes needed? This feature doesn't work actually. ### Does this PR introduce any user-facing change? no ### How was this patch tested? new test Closes #27341 from cloud-fan/cache. Authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/analysis/Analyzer.scala | 8 +- .../analysis/TableLookupCacheSuite.scala | 99 +++++++++++++++++++ 2 files changed, 103 insertions(+), 4 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 36e558b0dc..45547bff8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -917,14 +917,14 @@ class Analyzer( private def lookupRelation(identifier: Seq[String]): Option[LogicalPlan] = { expandRelationName(identifier) match { case SessionCatalogAndIdentifier(catalog, ident) => - CatalogV2Util.loadTable(catalog, ident).map { + def loaded = CatalogV2Util.loadTable(catalog, ident).map { case v1Table: V1Table => - val key = catalog.name +: ident.namespace :+ ident.name - AnalysisContext.get.relationCache.getOrElseUpdate( - key, v1SessionCatalog.getRelation(v1Table.v1Table)) + v1SessionCatalog.getRelation(v1Table.v1Table) case table => DataSourceV2Relation.create(table) } + val key = catalog.name +: ident.namespace :+ ident.name + Option(AnalysisContext.get.relationCache.getOrElseUpdate(key, loaded.orNull)) case _ => None } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala new file mode 100644 index 0000000000..eed962cd0f --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import java.io.File + +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.scalatest.Matchers + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.connector.InMemoryTableCatalog +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, V1Table} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +class TableLookupCacheSuite extends AnalysisTest with Matchers { + private def getAnalyzer(externalCatalog: ExternalCatalog, databasePath: File): Analyzer = { + val conf = new SQLConf() + val v1Catalog = new SessionCatalog(externalCatalog, FunctionRegistry.builtin, conf) + v1Catalog.createDatabase( + CatalogDatabase("default", "", databasePath.toURI, Map.empty), + ignoreIfExists = false) + v1Catalog.createTable( + CatalogTable( + TableIdentifier("t1", Some("default")), + CatalogTableType.MANAGED, + CatalogStorageFormat.empty, + StructType(Seq(StructField("a", IntegerType)))), + ignoreIfExists = false) + val v2Catalog = new InMemoryTableCatalog { + override def loadTable(ident: Identifier): Table = { + V1Table(externalCatalog.getTable("default", ident.name)) + } + override def name: String = CatalogManager.SESSION_CATALOG_NAME + } + val catalogManager = mock(classOf[CatalogManager]) + when(catalogManager.catalog(any())).thenAnswer((invocation: InvocationOnMock) => { + invocation.getArgument[String](0) match { + case CatalogManager.SESSION_CATALOG_NAME => v2Catalog + case name => + throw new CatalogNotFoundException(s"No such catalog: $name") + } + }) + when(catalogManager.v1SessionCatalog).thenReturn(v1Catalog) + when(catalogManager.currentCatalog).thenReturn(v2Catalog) + when(catalogManager.currentNamespace).thenReturn(Array("default")) + + new Analyzer(catalogManager, conf) + } + + test("table lookups to external catalog are cached") { + withTempDir { tempDir => + val inMemoryCatalog = new InMemoryCatalog + val catalog = spy(inMemoryCatalog) + val analyzer = getAnalyzer(catalog, tempDir) + reset(catalog) + analyzer.execute(table("t1").join(table("t1")).join(table("t1"))) + verify(catalog, times(1)).getTable("default", "t1") + } + } + + test("table lookups via nested views are cached") { + withTempDir { tempDir => + val inMemoryCatalog = new InMemoryCatalog + val catalog = spy(inMemoryCatalog) + val analyzer = getAnalyzer(catalog, tempDir) + val viewDef = CatalogTable( + TableIdentifier("view", Some("default")), + CatalogTableType.VIEW, + CatalogStorageFormat.empty, + StructType(Seq(StructField("a", IntegerType, nullable = true))), + viewText = Some("select * from t1") + ) + catalog.createTable(viewDef, ignoreIfExists = false) + reset(catalog) + analyzer.execute(table("t1").join(table("view")).join(table("view"))) + verify(catalog, times(1)).getTable("default", "t1") + } + } +}