[SPARK-34055][SQL] Refresh cache in ALTER TABLE .. ADD PARTITION

### What changes were proposed in this pull request?
Invoke `refreshTable()` from `CatalogImpl` which refreshes the cache in v1 `ALTER TABLE .. ADD PARTITION`.

### Why are the changes needed?
This fixes the issues portrayed by the example:
```sql
spark-sql> create table tbl (col int, part int) using parquet partitioned by (part);
spark-sql> insert into tbl partition (part=0) select 0;
spark-sql> cache table tbl;
spark-sql> select * from tbl;
0	0
spark-sql> show table extended like 'tbl' partition(part=0);
default	tbl	false	Partition Values: [part=0]
Location: file:/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=0
...
```
Create new partition by copying the existing one:
```
$ cp -r /Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=0 /Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1
```
```sql
spark-sql> alter table tbl add partition (part=1) location '/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1';
spark-sql> select * from tbl;
0	0
```

The last query must return `0	1` since it has been added by `ALTER TABLE .. ADD PARTITION`.

### Does this PR introduce _any_ user-facing change?
Yes. After the changes for the example above:
```sql
...
spark-sql> alter table tbl add partition (part=1) location '/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1';
spark-sql> select * from tbl;
0	0
0	1
```

### How was this patch tested?
By running the affected test suite:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableAddPartitionSuite"
```

Closes #31101 from MaxGekk/add-partition-refresh-cache-2.

Lead-authored-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Max Gekk 2021-01-10 14:06:17 +09:00 committed by HyukjinKwon
parent 105ba6e5f0
commit e0e06c18fd
2 changed files with 33 additions and 1 deletions

View file

@ -485,6 +485,7 @@ case class AlterTableAddPartitionCommand(
catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists) catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists)
} }
sparkSession.catalog.refreshTable(table.identifier.quotedString)
if (table.stats.nonEmpty) { if (table.stats.nonEmpty) {
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
val addedSize = CommandUtils.calculateMultipleLocationSizes(sparkSession, table.identifier, val addedSize = CommandUtils.calculateMultipleLocationSizes(sparkSession, table.identifier,

View file

@ -17,7 +17,11 @@
package org.apache.spark.sql.execution.command.v1 package org.apache.spark.sql.execution.command.v1
import org.apache.spark.sql.AnalysisException import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.execution.command import org.apache.spark.sql.execution.command
/** /**
@ -41,6 +45,33 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit
"The spec ([p1=]) contains an empty partition column value")) "The spec ([p1=]) contains an empty partition column value"))
} }
} }
test("SPARK-34055: refresh cache in partition adding") {
withTable("t") {
sql("CREATE TABLE t (id int, part int) USING parquet PARTITIONED BY (part)")
sql("INSERT INTO t PARTITION (part=0) SELECT 0")
assert(!spark.catalog.isCached("t"))
sql("CACHE TABLE t")
assert(spark.catalog.isCached("t"))
checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0)))
// Create new partition (part = 1) in the filesystem
val information = sql("SHOW TABLE EXTENDED LIKE 't' PARTITION (part = 0)")
.select("information")
.first().getString(0)
val part0Loc = information
.split("\\r?\\n")
.filter(_.startsWith("Location:"))
.head
.replace("Location: file:", "")
val part1Loc = part0Loc.replace("part=0", "part=1")
FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
sql(s"ALTER TABLE t ADD PARTITION (part=1) LOCATION '$part1Loc'")
assert(spark.catalog.isCached("t"))
checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
}
}
} }
/** /**