[SPARK-27011][SQL] reset command fails with cache
## What changes were proposed in this pull request? When cache is enabled ( i.e once cache table command is executed), any following sql will trigger CacheManager#lookupCachedData which will create a copy of the tree node, which inturn calls TreeNode#makeCopy. Here the problem is it will try to create a copy instance. But as ResetCommand is a case object this will fail ## How was this patch tested? Added UT to reproduce the issue Closes #23918 from ajithme/reset. Authored-by: Ajith <ajith2489@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
60be6d2ea3
commit
b8dd84b9e4
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.catalyst.plans.logical
|
||||
|
||||
/**
|
||||
* A [[LogicalPlan]] operator that does not use the cached results stored in CacheManager
|
||||
*/
|
||||
trait IgnoreCachedData extends LogicalPlan {}
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
|
|||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.{Dataset, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ResolvedHint}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
|
||||
import org.apache.spark.sql.execution.columnar.InMemoryRelation
|
||||
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
|
||||
import org.apache.spark.storage.StorageLevel
|
||||
|
@ -239,6 +239,7 @@ class CacheManager extends Logging {
|
|||
/** Replaces segments of the given logical plan with cached versions where possible. */
|
||||
def useCachedData(plan: LogicalPlan): LogicalPlan = {
|
||||
val newPlan = plan transformDown {
|
||||
case command: IgnoreCachedData => command
|
||||
// Do not lookup the cache by hint node. Hint node is special, we should ignore it when
|
||||
// canonicalizing plans, so that plans which are same except hint can hit the same cache.
|
||||
// However, we also want to keep the hint info after cache lookup. Here we skip the hint
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command
|
|||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.{Row, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.expressions.Attribute
|
||||
import org.apache.spark.sql.catalyst.plans.logical.IgnoreCachedData
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
|
@ -161,7 +162,7 @@ object SetCommand {
|
|||
* reset;
|
||||
* }}}
|
||||
*/
|
||||
case object ResetCommand extends RunnableCommand with Logging {
|
||||
case object ResetCommand extends RunnableCommand with IgnoreCachedData with Logging {
|
||||
|
||||
override def run(sparkSession: SparkSession): Seq[Row] = {
|
||||
sparkSession.sessionState.conf.clear()
|
||||
|
|
|
@ -3003,6 +3003,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("reset command should not fail with cache") {
|
||||
withTable("tbl") {
|
||||
val provider = spark.sessionState.conf.defaultDataSourceName
|
||||
sql(s"CREATE TABLE tbl(i INT, j STRING) USING $provider")
|
||||
sql("reset")
|
||||
sql("cache table tbl")
|
||||
sql("reset")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case class Foo(bar: Option[String])
|
||||
|
|
Loading…
Reference in a new issue