[SPARK-30312][SQL] Preserve path permission and acl when truncate table
### What changes were proposed in this pull request? This patch proposes to preserve existing permission/acls of paths when truncate table/partition. ### Why are the changes needed? When Spark SQL truncates table, it deletes the paths of table/partitions, then re-create new ones. If permission/acls were set on the paths, the existing permission/acls will be deleted. We should preserve the permission/acls if possible. ### Does this PR introduce any user-facing change? Yes. When truncate table/partition, Spark will keep permission/acls of paths. ### How was this patch tested? Unit test. Manual test: 1. Create a table. 2. Manually change it permission/acl 3. Truncate table 4. Check permission/acl ```scala val df = Seq(1, 2, 3).toDF df.write.mode("overwrite").saveAsTable("test.test_truncate_table") val testTable = spark.table("test.test_truncate_table") testTable.show() +-----+ |value| +-----+ | 1| | 2| | 3| +-----+ // hdfs dfs -setfacl ... // hdfs dfs -getfacl ... sql("truncate table test.test_truncate_table") // hdfs dfs -getfacl ... val testTable2 = spark.table("test.test_truncate_table") testTable2.show() +-----+ |value| +-----+ +-----+ ``` ![Screen Shot 2019-12-30 at 3 12 15 PM](https://user-images.githubusercontent.com/68855/71604577-c7875a00-2b17-11ea-913a-ba88096d20ab.jpg) Closes #26956 from viirya/truncate-table-permission. Lead-authored-by: Liang-Chi Hsieh <liangchi@uber.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
parent
7fb17f5943
commit
b5bc3e12a6
|
@ -2020,6 +2020,14 @@ object SQLConf {
|
|||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val TRUNCATE_TABLE_IGNORE_PERMISSION_ACL =
|
||||
buildConf("spark.sql.truncateTable.ignorePermissionAcl")
|
||||
.internal()
|
||||
.doc("When set to true, TRUNCATE TABLE command will not try to set back original " +
|
||||
"permission and ACLs when re-creating the table/partition paths.")
|
||||
.booleanConf
|
||||
.createWithDefault(false)
|
||||
|
||||
val NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE =
|
||||
buildConf("spark.sql.legacy.dataset.nameNonStructGroupingKeyAsValue")
|
||||
.internal()
|
||||
|
@ -2683,6 +2691,9 @@ class SQLConf extends Serializable with Logging {
|
|||
|
||||
def integralDivideReturnLong: Boolean = getConf(SQLConf.LEGACY_INTEGRALDIVIDE_RETURN_LONG)
|
||||
|
||||
def truncateTableIgnorePermissionAcl: Boolean =
|
||||
getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL)
|
||||
|
||||
def nameNonStructGroupingKeyAsValue: Boolean =
|
||||
getConf(SQLConf.NAME_NON_STRUCT_GROUPING_KEY_AS_VALUE)
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import scala.util.Try
|
|||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.hadoop.fs.{FileContext, FsConstants, Path}
|
||||
import org.apache.hadoop.fs.permission.{AclEntry, FsPermission}
|
||||
|
||||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
|
@ -494,13 +495,59 @@ case class TruncateTableCommand(
|
|||
partLocations
|
||||
}
|
||||
val hadoopConf = spark.sessionState.newHadoopConf()
|
||||
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
|
||||
locations.foreach { location =>
|
||||
if (location.isDefined) {
|
||||
val path = new Path(location.get)
|
||||
try {
|
||||
val fs = path.getFileSystem(hadoopConf)
|
||||
|
||||
// Not all fs impl. support these APIs.
|
||||
var optPermission: Option[FsPermission] = None
|
||||
var optAcls: Option[java.util.List[AclEntry]] = None
|
||||
if (!ignorePermissionAcl) {
|
||||
val fileStatus = fs.getFileStatus(path)
|
||||
try {
|
||||
optPermission = Some(fileStatus.getPermission())
|
||||
} catch {
|
||||
case NonFatal(_) => // do nothing
|
||||
}
|
||||
|
||||
try {
|
||||
optAcls = Some(fs.getAclStatus(path).getEntries)
|
||||
} catch {
|
||||
case NonFatal(_) => // do nothing
|
||||
}
|
||||
}
|
||||
|
||||
fs.delete(path, true)
|
||||
|
||||
// We should keep original permission/acl of the path.
|
||||
// For owner/group, only super-user can set it, for example on HDFS. Because
|
||||
// current user can delete the path, we assume the user/group is correct or not an issue.
|
||||
fs.mkdirs(path)
|
||||
if (!ignorePermissionAcl) {
|
||||
optPermission.foreach { permission =>
|
||||
try {
|
||||
fs.setPermission(path, permission)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
throw new SecurityException(
|
||||
s"Failed to set original permission $permission back to " +
|
||||
s"the created path: $path. Exception: ${e.getMessage}")
|
||||
}
|
||||
}
|
||||
optAcls.foreach { acls =>
|
||||
try {
|
||||
fs.setAcl(path, acls)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
throw new SecurityException(
|
||||
s"Failed to set original ACL $acls back to " +
|
||||
s"the created path: $path. Exception: ${e.getMessage}")
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
throw new AnalysisException(
|
||||
|
|
|
@ -21,7 +21,8 @@ import java.io.{File, PrintWriter}
|
|||
import java.net.URI
|
||||
import java.util.Locale
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.fs.{Path, RawLocalFileSystem}
|
||||
import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, AclStatus, FsAction, FsPermission}
|
||||
|
||||
import org.apache.spark.{SparkException, SparkFiles}
|
||||
import org.apache.spark.internal.config
|
||||
|
@ -2013,6 +2014,60 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|
|||
}
|
||||
}
|
||||
|
||||
test("SPARK-30312: truncate table - keep acl/permission") {
|
||||
import testImplicits._
|
||||
val ignorePermissionAcl = Seq(true, false)
|
||||
|
||||
ignorePermissionAcl.foreach { ignore =>
|
||||
withSQLConf(
|
||||
"fs.file.impl" -> classOf[FakeLocalFsFileSystem].getName,
|
||||
"fs.file.impl.disable.cache" -> "true",
|
||||
SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL.key -> ignore.toString) {
|
||||
withTable("tab1") {
|
||||
sql("CREATE TABLE tab1 (col INT) USING parquet")
|
||||
sql("INSERT INTO tab1 SELECT 1")
|
||||
checkAnswer(spark.table("tab1"), Row(1))
|
||||
|
||||
val tablePath = new Path(spark.sessionState.catalog
|
||||
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
|
||||
|
||||
val hadoopConf = spark.sessionState.newHadoopConf()
|
||||
val fs = tablePath.getFileSystem(hadoopConf)
|
||||
val fileStatus = fs.getFileStatus(tablePath);
|
||||
|
||||
fs.setPermission(tablePath, new FsPermission("777"))
|
||||
assert(fileStatus.getPermission().toString() == "rwxrwxrwx")
|
||||
|
||||
// Set ACL to table path.
|
||||
val customAcl = new java.util.ArrayList[AclEntry]()
|
||||
customAcl.add(new AclEntry.Builder()
|
||||
.setType(AclEntryType.USER)
|
||||
.setScope(AclEntryScope.ACCESS)
|
||||
.setPermission(FsAction.READ).build())
|
||||
fs.setAcl(tablePath, customAcl)
|
||||
assert(fs.getAclStatus(tablePath).getEntries().get(0) == customAcl.get(0))
|
||||
|
||||
sql("TRUNCATE TABLE tab1")
|
||||
assert(spark.table("tab1").collect().isEmpty)
|
||||
|
||||
val fileStatus2 = fs.getFileStatus(tablePath)
|
||||
if (ignore) {
|
||||
assert(fileStatus2.getPermission().toString() == "rwxr-xr-x")
|
||||
} else {
|
||||
assert(fileStatus2.getPermission().toString() == "rwxrwxrwx")
|
||||
}
|
||||
val aclEntries = fs.getAclStatus(tablePath).getEntries()
|
||||
if (ignore) {
|
||||
assert(aclEntries.size() == 0)
|
||||
} else {
|
||||
assert(aclEntries.size() == 1)
|
||||
assert(aclEntries.get(0) == customAcl.get(0))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("create temporary view with mismatched schema") {
|
||||
withTable("tab1") {
|
||||
spark.range(10).write.saveAsTable("tab1")
|
||||
|
@ -2929,3 +2984,25 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
object FakeLocalFsFileSystem {
|
||||
var aclStatus = new AclStatus.Builder().build()
|
||||
}
|
||||
|
||||
// A fake test local filesystem used to test ACL. It keeps a ACL status. If deletes
|
||||
// a path of this filesystem, it will clean up the ACL status. Note that for test purpose,
|
||||
// it has only one ACL status for all paths.
|
||||
class FakeLocalFsFileSystem extends RawLocalFileSystem {
|
||||
import FakeLocalFsFileSystem._
|
||||
|
||||
override def delete(f: Path, recursive: Boolean): Boolean = {
|
||||
aclStatus = new AclStatus.Builder().build()
|
||||
super.delete(f, recursive)
|
||||
}
|
||||
|
||||
override def getAclStatus(path: Path): AclStatus = aclStatus
|
||||
|
||||
override def setAcl(path: Path, aclSpec: java.util.List[AclEntry]): Unit = {
|
||||
aclStatus = new AclStatus.Builder().addEntries(aclSpec).build()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue