[SPARK-29174][SQL] Support LOCAL in INSERT OVERWRITE DIRECTORY to data source
### What changes were proposed in this pull request? `INSERT OVERWRITE LOCAL DIRECTORY` is supported with ensuring the provided path is always using `file://` as scheme and removing the check which throws exception if we do insert overwrite by mentioning directory with `LOCAL` syntax ### Why are the changes needed? without the modification in PR, ``` insert overwrite local directory <location> using ``` throws exception ``` Error: org.apache.spark.sql.catalyst.parser.ParseException: LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source(line 1, pos 0) ``` which was introduced in https://github.com/apache/spark/pull/18975, but this restriction is not needed, hence dropping the same. Keep behaviour consistent for local and remote file-system in `INSERT OVERWRITE DIRECTORY` ### Does this PR introduce any user-facing change? Yes, after this change `INSERT OVERWRITE LOCAL DIRECTORY` will not throw exception ### How was this patch tested? Added UT Closes #27039 from ajithme/insertoverwrite2. Authored-by: Ajith <ajith2489@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
parent
2854091d12
commit
657d151395
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.sql.execution
|
||||
|
||||
import java.util.Locale
|
||||
import javax.ws.rs.core.UriBuilder
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
|
@ -753,7 +754,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
|
|||
*
|
||||
* Expected format:
|
||||
* {{{
|
||||
* INSERT OVERWRITE DIRECTORY
|
||||
* INSERT OVERWRITE [LOCAL] DIRECTORY
|
||||
* [path]
|
||||
* [OPTIONS table_property_list]
|
||||
* select_statement;
|
||||
|
@ -761,11 +762,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
|
|||
*/
|
||||
override def visitInsertOverwriteDir(
|
||||
ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) {
|
||||
if (ctx.LOCAL != null) {
|
||||
throw new ParseException(
|
||||
"LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source", ctx)
|
||||
}
|
||||
|
||||
val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty)
|
||||
var storage = DataSource.buildStorageFormatFromOptions(options)
|
||||
|
||||
|
@ -781,6 +777,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
|
|||
storage = storage.copy(locationUri = customLocation)
|
||||
}
|
||||
|
||||
if (ctx.LOCAL() != null) {
|
||||
// assert if directory is local when LOCAL keyword is mentioned
|
||||
val scheme = Option(storage.locationUri.get.getScheme)
|
||||
scheme match {
|
||||
case None =>
|
||||
// force scheme to be file rather than fs.default.name
|
||||
val loc = Some(UriBuilder.fromUri(CatalogUtils.stringToURI(path)).scheme("file").build())
|
||||
storage = storage.copy(locationUri = loc)
|
||||
case Some(pathScheme) if (!pathScheme.equals("file")) =>
|
||||
throw new ParseException("LOCAL is supported only with file: scheme", ctx)
|
||||
}
|
||||
}
|
||||
|
||||
val provider = ctx.tableProvider.multipartIdentifier.getText
|
||||
|
||||
(false, storage, Some(provider))
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.spark.SparkException
|
|||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
|
||||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
|
||||
import org.apache.spark.sql.test.SharedSparkSession
|
||||
|
@ -820,6 +821,28 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY to data source") {
|
||||
withTempPath { dir =>
|
||||
val path = dir.toURI.getPath
|
||||
sql(s"""create table tab1 ( a int) location '$path'""")
|
||||
sql("insert into tab1 values(1)")
|
||||
checkAnswer(sql("select * from tab1"), Seq(1).map(i => Row(i)))
|
||||
sql("create table tab2 ( a int)")
|
||||
sql("insert into tab2 values(2)")
|
||||
checkAnswer(sql("select * from tab2"), Seq(2).map(i => Row(i)))
|
||||
sql(s"""insert overwrite local directory '$path' using parquet select * from tab2""")
|
||||
sql("refresh table tab1")
|
||||
checkAnswer(sql("select * from tab1"), Seq(2).map(i => Row(i)))
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-29174 fail LOCAL in INSERT OVERWRITE DIRECT remote path") {
|
||||
val message = intercept[ParseException] {
|
||||
sql("insert overwrite local directory 'hdfs:/abcd' using parquet select 1")
|
||||
}.getMessage
|
||||
assert(message.contains("LOCAL is supported only with file: scheme"))
|
||||
}
|
||||
}
|
||||
|
||||
class FileExistingTestFileSystem extends RawLocalFileSystem {
|
||||
|
|
Loading…
Reference in a new issue