From 657d151395ca996f8ec0eed695a873abbd63d760 Mon Sep 17 00:00:00 2001 From: Ajith Date: Tue, 18 Feb 2020 09:42:31 +0900 Subject: [PATCH] [SPARK-29174][SQL] Support LOCAL in INSERT OVERWRITE DIRECTORY to data source ### What changes were proposed in this pull request? `INSERT OVERWRITE LOCAL DIRECTORY` is supported with ensuring the provided path is always using `file://` as scheme and removing the check which throws exception if we do insert overwrite by mentioning directory with `LOCAL` syntax ### Why are the changes needed? without the modification in PR, ``` insert overwrite local directory using ``` throws exception ``` Error: org.apache.spark.sql.catalyst.parser.ParseException: LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source(line 1, pos 0) ``` which was introduced in https://github.com/apache/spark/pull/18975, but this restriction is not needed, hence dropping the same. Keep behaviour consistent for local and remote file-system in `INSERT OVERWRITE DIRECTORY` ### Does this PR introduce any user-facing change? Yes, after this change `INSERT OVERWRITE LOCAL DIRECTORY` will not throw exception ### How was this patch tested? Added UT Closes #27039 from ajithme/insertoverwrite2. Authored-by: Ajith Signed-off-by: HyukjinKwon --- .../spark/sql/execution/SparkSqlParser.scala | 21 ++++++++++++----- .../spark/sql/sources/InsertSuite.scala | 23 +++++++++++++++++++ 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index aa139cb6b0..078813b7d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import java.util.Locale +import javax.ws.rs.core.UriBuilder import scala.collection.JavaConverters._ @@ -753,7 +754,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { * * Expected format: * {{{ - * INSERT OVERWRITE DIRECTORY + * INSERT OVERWRITE [LOCAL] DIRECTORY * [path] * [OPTIONS table_property_list] * select_statement; @@ -761,11 +762,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { */ override def visitInsertOverwriteDir( ctx: InsertOverwriteDirContext): InsertDirParams = withOrigin(ctx) { - if (ctx.LOCAL != null) { - throw new ParseException( - "LOCAL is not supported in INSERT OVERWRITE DIRECTORY to data source", ctx) - } - val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) var storage = DataSource.buildStorageFormatFromOptions(options) @@ -781,6 +777,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { storage = storage.copy(locationUri = customLocation) } + if (ctx.LOCAL() != null) { + // assert if directory is local when LOCAL keyword is mentioned + val scheme = Option(storage.locationUri.get.getScheme) + scheme match { + case None => + // force scheme to be file rather than fs.default.name + val loc = Some(UriBuilder.fromUri(CatalogUtils.stringToURI(path)).scheme("file").build()) + storage = storage.copy(locationUri = loc) + case Some(pathScheme) if (!pathScheme.equals("file")) => + throw new ParseException("LOCAL is supported only with file: scheme", ctx) + } + } + val provider = ctx.tableProvider.multipartIdentifier.getText (false, storage, Some(provider)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index bcff30a51c..0101803561 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.test.SharedSparkSession @@ -820,6 +821,28 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } } + + test("SPARK-29174 Support LOCAL in INSERT OVERWRITE DIRECTORY to data source") { + withTempPath { dir => + val path = dir.toURI.getPath + sql(s"""create table tab1 ( a int) location '$path'""") + sql("insert into tab1 values(1)") + checkAnswer(sql("select * from tab1"), Seq(1).map(i => Row(i))) + sql("create table tab2 ( a int)") + sql("insert into tab2 values(2)") + checkAnswer(sql("select * from tab2"), Seq(2).map(i => Row(i))) + sql(s"""insert overwrite local directory '$path' using parquet select * from tab2""") + sql("refresh table tab1") + checkAnswer(sql("select * from tab1"), Seq(2).map(i => Row(i))) + } + } + + test("SPARK-29174 fail LOCAL in INSERT OVERWRITE DIRECT remote path") { + val message = intercept[ParseException] { + sql("insert overwrite local directory 'hdfs:/abcd' using parquet select 1") + }.getMessage + assert(message.contains("LOCAL is supported only with file: scheme")) + } } class FileExistingTestFileSystem extends RawLocalFileSystem {