Revert "[SPARK-32481][CORE][SQL] Support truncate table to move data to trash"
This reverts commit 5c077f0580
.
This commit is contained in:
parent
d3304268d3
commit
2dee4352a0
|
@ -50,7 +50,7 @@ import com.google.common.net.InetAddresses
|
||||||
import org.apache.commons.codec.binary.Hex
|
import org.apache.commons.codec.binary.Hex
|
||||||
import org.apache.commons.lang3.SystemUtils
|
import org.apache.commons.lang3.SystemUtils
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path, Trash}
|
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
|
||||||
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
|
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
|
||||||
import org.apache.hadoop.security.UserGroupInformation
|
import org.apache.hadoop.security.UserGroupInformation
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration
|
import org.apache.hadoop.yarn.conf.YarnConfiguration
|
||||||
|
@ -269,27 +269,6 @@ private[spark] object Utils extends Logging {
|
||||||
file.setExecutable(true, true)
|
file.setExecutable(true, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Move data to trash if 'spark.sql.truncate.trash.enabled' is true
|
|
||||||
*/
|
|
||||||
def moveToTrashIfEnabled(
|
|
||||||
fs: FileSystem,
|
|
||||||
partitionPath: Path,
|
|
||||||
isTrashEnabled: Boolean,
|
|
||||||
hadoopConf: Configuration): Boolean = {
|
|
||||||
if (isTrashEnabled) {
|
|
||||||
logDebug(s"will move data ${partitionPath.toString} to trash")
|
|
||||||
val isSuccess = Trash.moveToAppropriateTrash(fs, partitionPath, hadoopConf)
|
|
||||||
if (!isSuccess) {
|
|
||||||
logWarning(s"Failed to move data ${partitionPath.toString} to trash")
|
|
||||||
return fs.delete(partitionPath, true)
|
|
||||||
}
|
|
||||||
isSuccess
|
|
||||||
} else {
|
|
||||||
fs.delete(partitionPath, true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a directory given the abstract pathname
|
* Create a directory given the abstract pathname
|
||||||
* @return true, if the directory is successfully created; otherwise, return false.
|
* @return true, if the directory is successfully created; otherwise, return false.
|
||||||
|
|
|
@ -2722,17 +2722,6 @@ object SQLConf {
|
||||||
.booleanConf
|
.booleanConf
|
||||||
.createWithDefault(false)
|
.createWithDefault(false)
|
||||||
|
|
||||||
val TRUNCATE_TRASH_ENABLED =
|
|
||||||
buildConf("spark.sql.truncate.trash.enabled")
|
|
||||||
.doc("This configuration decides when truncating table, whether data files will be moved " +
|
|
||||||
"to trash directory or deleted permanently. The trash retention time is controlled by " +
|
|
||||||
"fs.trash.interval, and in default, the server side configuration value takes " +
|
|
||||||
"precedence over the client-side one. Note that if fs.trash.interval is non-positive, " +
|
|
||||||
"this will be a no-op and log a warning message.")
|
|
||||||
.version("3.1.0")
|
|
||||||
.booleanConf
|
|
||||||
.createWithDefault(false)
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Holds information about keys that have been deprecated.
|
* Holds information about keys that have been deprecated.
|
||||||
*
|
*
|
||||||
|
@ -3345,8 +3334,6 @@ class SQLConf extends Serializable with Logging {
|
||||||
|
|
||||||
def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)
|
def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)
|
||||||
|
|
||||||
def truncateTrashEnabled: Boolean = getConf(SQLConf.TRUNCATE_TRASH_ENABLED)
|
|
||||||
|
|
||||||
/** ********************** SQLConf functionality methods ************ */
|
/** ********************** SQLConf functionality methods ************ */
|
||||||
|
|
||||||
/** Set Spark SQL configuration properties. */
|
/** Set Spark SQL configuration properties. */
|
||||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2
|
||||||
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
|
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
|
||||||
import org.apache.spark.sql.types._
|
import org.apache.spark.sql.types._
|
||||||
import org.apache.spark.sql.util.SchemaUtils
|
import org.apache.spark.sql.util.SchemaUtils
|
||||||
import org.apache.spark.util.Utils
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A command to create a table with the same definition of the given existing table.
|
* A command to create a table with the same definition of the given existing table.
|
||||||
|
@ -490,7 +489,6 @@ case class TruncateTableCommand(
|
||||||
}
|
}
|
||||||
val hadoopConf = spark.sessionState.newHadoopConf()
|
val hadoopConf = spark.sessionState.newHadoopConf()
|
||||||
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
|
val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl
|
||||||
val isTrashEnabled = SQLConf.get.truncateTrashEnabled
|
|
||||||
locations.foreach { location =>
|
locations.foreach { location =>
|
||||||
if (location.isDefined) {
|
if (location.isDefined) {
|
||||||
val path = new Path(location.get)
|
val path = new Path(location.get)
|
||||||
|
@ -515,7 +513,7 @@ case class TruncateTableCommand(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Utils.moveToTrashIfEnabled(fs, path, isTrashEnabled, hadoopConf)
|
fs.delete(path, true)
|
||||||
|
|
||||||
// We should keep original permission/acl of the path.
|
// We should keep original permission/acl of the path.
|
||||||
// For owner/group, only super-user can set it, for example on HDFS. Because
|
// For owner/group, only super-user can set it, for example on HDFS. Because
|
||||||
|
|
|
@ -3101,78 +3101,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
|
||||||
assert(spark.sessionState.catalog.isRegisteredFunction(rand))
|
assert(spark.sessionState.catalog.isRegisteredFunction(rand))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("SPARK-32481 Move data to trash on truncate table if enabled") {
|
|
||||||
val trashIntervalKey = "fs.trash.interval"
|
|
||||||
withTable("tab1") {
|
|
||||||
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
|
|
||||||
sql("CREATE TABLE tab1 (col INT) USING parquet")
|
|
||||||
sql("INSERT INTO tab1 SELECT 1")
|
|
||||||
// scalastyle:off hadoopconfiguration
|
|
||||||
val hadoopConf = spark.sparkContext.hadoopConfiguration
|
|
||||||
// scalastyle:on hadoopconfiguration
|
|
||||||
val originalValue = hadoopConf.get(trashIntervalKey, "0")
|
|
||||||
val tablePath = new Path(spark.sessionState.catalog
|
|
||||||
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
|
|
||||||
|
|
||||||
val fs = tablePath.getFileSystem(hadoopConf)
|
|
||||||
val trashRoot = fs.getTrashRoot(tablePath)
|
|
||||||
assert(!fs.exists(trashRoot))
|
|
||||||
try {
|
|
||||||
hadoopConf.set(trashIntervalKey, "5")
|
|
||||||
sql("TRUNCATE TABLE tab1")
|
|
||||||
} finally {
|
|
||||||
hadoopConf.set(trashIntervalKey, originalValue)
|
|
||||||
}
|
|
||||||
assert(fs.exists(trashRoot))
|
|
||||||
fs.delete(trashRoot, true)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
test("SPARK-32481 delete data permanently on truncate table if trash interval is non-positive") {
|
|
||||||
val trashIntervalKey = "fs.trash.interval"
|
|
||||||
withTable("tab1") {
|
|
||||||
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "true") {
|
|
||||||
sql("CREATE TABLE tab1 (col INT) USING parquet")
|
|
||||||
sql("INSERT INTO tab1 SELECT 1")
|
|
||||||
// scalastyle:off hadoopconfiguration
|
|
||||||
val hadoopConf = spark.sparkContext.hadoopConfiguration
|
|
||||||
// scalastyle:on hadoopconfiguration
|
|
||||||
val originalValue = hadoopConf.get(trashIntervalKey, "0")
|
|
||||||
val tablePath = new Path(spark.sessionState.catalog
|
|
||||||
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
|
|
||||||
|
|
||||||
val fs = tablePath.getFileSystem(hadoopConf)
|
|
||||||
val trashRoot = fs.getTrashRoot(tablePath)
|
|
||||||
assert(!fs.exists(trashRoot))
|
|
||||||
try {
|
|
||||||
hadoopConf.set(trashIntervalKey, "0")
|
|
||||||
sql("TRUNCATE TABLE tab1")
|
|
||||||
} finally {
|
|
||||||
hadoopConf.set(trashIntervalKey, originalValue)
|
|
||||||
}
|
|
||||||
assert(!fs.exists(trashRoot))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
test("SPARK-32481 Do not move data to trash on truncate table if disabled") {
|
|
||||||
withTable("tab1") {
|
|
||||||
withSQLConf(SQLConf.TRUNCATE_TRASH_ENABLED.key -> "false") {
|
|
||||||
sql("CREATE TABLE tab1 (col INT) USING parquet")
|
|
||||||
sql("INSERT INTO tab1 SELECT 1")
|
|
||||||
val hadoopConf = spark.sessionState.newHadoopConf()
|
|
||||||
val tablePath = new Path(spark.sessionState.catalog
|
|
||||||
.getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get)
|
|
||||||
|
|
||||||
val fs = tablePath.getFileSystem(hadoopConf)
|
|
||||||
val trashRoot = fs.getTrashRoot(tablePath)
|
|
||||||
sql("TRUNCATE TABLE tab1")
|
|
||||||
assert(!fs.exists(trashRoot))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object FakeLocalFsFileSystem {
|
object FakeLocalFsFileSystem {
|
||||||
|
|
Loading…
Reference in a new issue