[SPARK-32481][CORE][SQL] Support truncate table to move data to trash

### What changes were proposed in this pull request?
Instead of deleting the data, we can move the data to trash.
Based on the configuration provided by the user it will be deleted permanently from the trash.

### Why are the changes needed?
Instead of directly deleting the data, we can provide flexibility to move data to the trash and then delete it permanently.

### Does this PR introduce _any_ user-facing change?
Yes, After truncate table the data is not permanently deleted now.
It is first moved to the trash and then after the given time deleted permanently;

### How was this patch tested?
new UTs added

Closes #29552 from Udbhav30/truncate.

Authored-by: Udbhav30 <u.agrawal30@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Udbhav30 2020-08-30 10:25:32 -07:00 committed by Dongjoon Hyun
parent cfe012a431
commit 065f17386d
4 changed files with 116 additions and 2 deletions

View file

@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses
import org.apache.commons.codec.binary.Hex
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
@ -269,6 +269,29 @@ private[spark] object Utils extends Logging {
file.setExecutable(true, true)
}
/**
* Move data to trash if 'spark.sql.truncate.trash.enabled' is true, else
* delete the data permanently. If move data to trash failed fallback to hard deletion.
*/
def moveToTrashOrDelete(
fs: FileSystem,
partitionPath: Path,
isTrashEnabled: Boolean,
hadoopConf: Configuration): Boolean = {
if (isTrashEnabled) {
logDebug(s"Try to move data ${partitionPath.toString} to trash")
val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf)
if (!isSuccess) {
logWarning(s"Failed to move data ${partitionPath.toString} to trash. " +
"Fallback to hard deletion")
return fs.delete(partitionPath, true)
}
isSuccess
} else {
fs.delete(partitionPath, true)
}
}
/**
* Create a directory given the abstract pathname
* @return true, if the directory is successfully created; otherwise, return false.

View file

@ -2732,6 +2732,18 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
val TRUNCATE_TRASH_ENABLED =
buildConf("spark.sql.truncate.trash.enabled")
.doc("This configuration decides when truncating table, whether data files will be moved " +
"to trash directory or deleted permanently. The trash retention time is controlled by " +
"'fs.trash.interval', and in default, the server side configuration value takes " +
"precedence over the client-side one. Note that if 'fs.trash.interval' is non-positive, " +
"this will be a no-op and log a warning message. If the data fails to be moved to " +
"trash, Spark will turn to delete it permanently.")
.version("3.1.0")
.booleanConf
.createWithDefault(false)
/**
* Holds information about keys that have been deprecated.
*
@ -3350,6 +3362,8 @@ class SQLConf extends Serializable with Logging {
def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)
def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED)
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */

View file

@ -48,6 +48,7 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.Utils
/**
* A command to create a table with the same definition of the given existing table.
@ -489,6 +490,7 @@ case class TruncateTableCommand(
}
val hadoopConf = spark.sessionState.newHadoopConf()
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
val isTrashEnabled = SQLConf.get.truncateTrashEnabled
locations.foreach { location =>
if (location.isDefined) {
val path = new Path(location.get)
@ -513,7 +515,7 @@ case class TruncateTableCommand(
}
}
fs.delete(path, true)
Utils.moveToTrashOrDelete(fs, path, isTrashEnabled, hadoopConf)
// We should keep original permission/acl of the path.
// For owner/group, only super-user can set it, for example on HDFS. Because

View file

@ -3101,6 +3101,81 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
assert(spark.sessionState.catalog.isRegisteredFunction(rand))
}
}
test("SPARK-32481 Move data to trash on truncate table if enabled") {
val trashIntervalKey = "fs.trash.interval"
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
val originalValue = hadoopConf.get(trashIntervalKey, "0")
val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
val fs = tablePath.getFileSystem(hadoopConf)
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
val trashPath = Path.mergePaths(trashCurrent, tablePath)
assert(!fs.exists(trashPath))
try {
hadoopConf.set(trashIntervalKey, "5")
sql("TRUNCATE TABLE tab1")
} finally {
hadoopConf.set(trashIntervalKey, originalValue)
}
assert(fs.exists(trashPath))
fs.delete(trashPath, true)
}
}
}
test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") {
val trashIntervalKey = "fs.trash.interval"
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
// scalastyle:off hadoopconfiguration
val hadoopConf = spark.sparkContext.hadoopConfiguration
// scalastyle:on hadoopconfiguration
val originalValue = hadoopConf.get(trashIntervalKey, "0")
val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
val fs = tablePath.getFileSystem(hadoopConf)
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
val trashPath = Path.mergePaths(trashCurrent, tablePath)
assert(!fs.exists(trashPath))
try {
hadoopConf.set(trashIntervalKey, "0")
sql("TRUNCATE TABLE tab1")
} finally {
hadoopConf.set(trashIntervalKey, originalValue)
}
assert(!fs.exists(trashPath))
}
}
}
test("SPARK-32481 Do not move data to trash on truncate table if disabled") {
withTable("tab1") {
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") {
sql("CREATE TABLE tab1 (col INT) USING parquet")
sql("INSERT INTO tab1 SELECT 1")
val hadoopConf = spark.sessionState.newHadoopConf()
val tablePath = new Path(spark.sessionState.catalog
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
val fs = tablePath.getFileSystem(hadoopConf)
val trashCurrent = new Path(fs.getHomeDirectory, ".Trash/Current")
val trashPath = Path.mergePaths(trashCurrent, tablePath)
sql("TRUNCATE TABLE tab1")
assert(!fs.exists(trashPath))
}
}
}
}
object FakeLocalFsFileSystem {