[SPARK-35105][SQL] Support multiple paths for ADD FILE/JAR/ARCHIVE commands

### What changes were proposed in this pull request?

This PR extends `ADD FILE/JAR/ARCHIVE` commands to be able to take multiple path arguments like Hive.

### Why are the changes needed?

To make those commands more useful.

### Does this PR introduce _any_ user-facing change?

Yes. In the current implementation, those commands can take a path which contains whitespaces without enclose it by neither `'` nor `"` but after this change, users need to enclose such paths.
I've note this incompatibility in the migration guide.

### How was this patch tested?

New tests.

Closes #32205 from sarutak/add-multiple-files.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
This commit is contained in:
Kousuke Saruta 2021-04-29 13:58:51 +09:00
parent 529b875901
commit 132cbf0c8c
10 changed files with 135 additions and 30 deletions

View file

@ -24,6 +24,8 @@ license: |
## Upgrading from Spark SQL 3.1 to 3.2
- Since Spark 3.2, ADD FILE/JAR/ARCHIVE commands require each path to be enclosed by `"` or `'` if the path contains whitespaces.
- Since Spark 3.2, all the supported JDBC dialects use StringType for ROWID. In Spark 3.1 or earlier, Oracle dialect uses StringType and the other dialects use LongType.
- In Spark 3.2, PostgreSQL JDBC dialect uses StringType for MONEY and MONEY[] is not supported due to the JDBC driver for PostgreSQL can't handle those types properly. In Spark 3.1 or earlier, DoubleType and ArrayType of DoubleType are used respectively.

View file

@ -26,7 +26,7 @@ license: |
### Syntax
```sql
ADD ARCHIVE file_name
ADD { ARCHIVE | ARCHIVES } file_name [ ... ]
```
### Parameters
@ -42,6 +42,7 @@ ADD ARCHIVE /tmp/test.tar.gz;
ADD ARCHIVE "/path/to/some.zip";
ADD ARCHIVE '/some/other.tgz';
ADD ARCHIVE "/path with space/abc.tar";
ADD ARCHIVES "/path with space/def.tgz" '/path with space/ghi.zip';
```
### Related Statements

View file

@ -26,7 +26,7 @@ license: |
### Syntax
```sql
ADD FILE resource_name
ADD { FILE | FILES } resource_name [ ... ]
```
### Parameters
@ -43,6 +43,7 @@ ADD FILE "/path/to/file/abc.txt";
ADD FILE '/another/test.txt';
ADD FILE "/path with space/abc.txt";
ADD FILE "/path/to/some/directory";
ADD FILES "/path with space/cde.txt" '/path with space/fgh.txt';
```
### Related Statements

View file

@ -26,7 +26,7 @@ license: |
### Syntax
```sql
ADD JAR file_name
ADD { JAR | JARS } file_name [ ... ]
```
### Parameters
@ -52,6 +52,7 @@ ADD JAR /tmp/test.jar;
ADD JAR "/path/to/some.jar";
ADD JAR '/some/other.jar';
ADD JAR "/path with space/abc.jar";
ADD JARS "/path with space/def.jar" '/path with space/ghi.jar';
ADD JAR "ivy://group:module:version";
ADD JAR "ivy://group:module:version?transitive=false"
ADD JAR "ivy://group:module:version?transitive=true"

View file

@ -335,7 +335,7 @@ class SparkSqlAstBuilder extends AstBuilder {
}
/**
* Create a [[AddFileCommand]], [[AddJarCommand]], [[AddArchiveCommand]],
* Create a [[AddFilesCommand]], [[AddJarsCommand]], [[AddArchivesCommand]],
* [[ListFilesCommand]], [[ListJarsCommand]] or [[ListArchivesCommand]]
* command depending on the requested operation on resources.
* Expected format:
@ -356,15 +356,12 @@ class SparkSqlAstBuilder extends AstBuilder {
case p => p
}
// The implementation of pathForAdd is to keep the compatibility with before SPARK-34977.
val pathForAdd = strLiteralDef.findFirstIn(rawArg)
.find(p => p.startsWith("\"") || p.startsWith("'")).map(unescapeSQLString).getOrElse(rawArg)
ctx.op.getType match {
case SqlBaseParser.ADD =>
ctx.identifier.getText.toLowerCase(Locale.ROOT) match {
case "file" => AddFileCommand(pathForAdd)
case "jar" => AddJarCommand(pathForAdd)
case "archive" => AddArchiveCommand(pathForAdd)
case "files" | "file" => AddFilesCommand(maybePaths)
case "jars" | "jar" => AddJarsCommand(maybePaths)
case "archives" | "archive" => AddArchivesCommand(maybePaths)
case other => operationNotAllowed(s"ADD with resource type '$other'", ctx)
}
case SqlBaseParser.LIST =>

View file

@ -27,9 +27,9 @@ import org.apache.spark.util.Utils
/**
* Adds a jar to the current session so it can be used (for UDFs or serdes).
*/
case class AddJarCommand(path: String) extends LeafRunnableCommand {
case class AddJarsCommand(paths: Seq[String]) extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sessionState.resourceLoader.addJar(path)
paths.foreach(sparkSession.sessionState.resourceLoader.addJar(_))
Seq.empty[Row]
}
}
@ -37,10 +37,10 @@ case class AddJarCommand(path: String) extends LeafRunnableCommand {
/**
* Adds a file to the current session so it can be used.
*/
case class AddFileCommand(path: String) extends LeafRunnableCommand {
case class AddFilesCommand(paths: Seq[String]) extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val recursive = !sparkSession.sessionState.conf.addSingleFileInAddFile
sparkSession.sparkContext.addFile(path, recursive)
paths.foreach(sparkSession.sparkContext.addFile(_, recursive))
Seq.empty[Row]
}
}
@ -48,9 +48,9 @@ case class AddFileCommand(path: String) extends LeafRunnableCommand {
/**
* Adds an archive to the current session so it can be used.
*/
case class AddArchiveCommand(path: String) extends LeafRunnableCommand {
case class AddArchivesCommand(paths: Seq[String]) extends LeafRunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
sparkSession.sparkContext.addArchive(path)
paths.foreach(sparkSession.sparkContext.addArchive(_))
Seq.empty[Row]
}
}

View file

@ -245,21 +245,24 @@ class SparkSqlParserSuite extends AnalysisTest {
}
test("manage resources") {
assertEqual("ADD FILE abc.txt", AddFileCommand("abc.txt"))
assertEqual("ADD FILE 'abc.txt'", AddFileCommand("abc.txt"))
assertEqual("ADD FILE \"/path/to/abc.txt\"", AddFileCommand("/path/to/abc.txt"))
assertEqual("ADD FILE abc.txt", AddFilesCommand(Seq("abc.txt")))
assertEqual("ADD FILE 'abc.txt'", AddFilesCommand(Seq("abc.txt")))
assertEqual("ADD FILE \"/path/to/abc.txt\"", AddFilesCommand("/path/to/abc.txt"::Nil))
assertEqual("LIST FILE abc.txt", ListFilesCommand(Array("abc.txt")))
assertEqual("LIST FILE '/path//abc.txt'", ListFilesCommand(Array("/path//abc.txt")))
assertEqual("LIST FILE \"/path2/abc.txt\"", ListFilesCommand(Array("/path2/abc.txt")))
assertEqual("ADD JAR /path2/_2/abc.jar", AddJarCommand("/path2/_2/abc.jar"))
assertEqual("ADD JAR '/test/path_2/jar/abc.jar'", AddJarCommand("/test/path_2/jar/abc.jar"))
assertEqual("ADD JAR \"abc.jar\"", AddJarCommand("abc.jar"))
assertEqual("ADD JAR /path2/_2/abc.jar", AddJarsCommand(Seq("/path2/_2/abc.jar")))
assertEqual("ADD JAR '/test/path_2/jar/abc.jar'",
AddJarsCommand(Seq("/test/path_2/jar/abc.jar")))
assertEqual("ADD JAR \"abc.jar\"", AddJarsCommand(Seq("abc.jar")))
assertEqual("LIST JAR /path-with-dash/abc.jar",
ListJarsCommand(Array("/path-with-dash/abc.jar")))
assertEqual("LIST JAR 'abc.jar'", ListJarsCommand(Array("abc.jar")))
assertEqual("LIST JAR \"abc.jar\"", ListJarsCommand(Array("abc.jar")))
assertEqual("ADD FILE /path with space/abc.txt", AddFileCommand("/path with space/abc.txt"))
assertEqual("ADD JAR /path with space/abc.jar", AddJarCommand("/path with space/abc.jar"))
assertEqual("ADD FILE '/path with space/abc.txt'",
AddFilesCommand(Seq("/path with space/abc.txt")))
assertEqual("ADD JAR '/path with space/abc.jar'",
AddJarsCommand(Seq("/path with space/abc.jar")))
}
test("SPARK-32608: script transform with row format delimit") {

View file

@ -169,10 +169,10 @@ private[hive] object SparkSQLCLIDriver extends Logging {
}
// The class loader of CliSessionState's conf is current main thread's class loader
// used to load jars passed by --jars. One class loader used by AddJarCommand is
// used to load jars passed by --jars. One class loader used by AddJarsCommand is
// sharedState.jarClassLoader which contain jar path passed by --jars in main thread.
// We set CliSessionState's conf class loader to sharedState.jarClassLoader.
// Thus we can load all jars passed by --jars and AddJarCommand.
// Thus we can load all jars passed by --jars and AddJarsCommand.
sessionState.getConf.setClassLoader(SparkSQLEnv.sqlContext.sharedState.jarClassLoader)
// TODO work around for set the log output to console, because the HiveContext

View file

@ -186,8 +186,8 @@ private[hive] class HiveClientImpl(
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
}
// Hive 2.3 will set UDFClassLoader to hiveConf when initializing SessionState
// since HIVE-11878, and ADDJarCommand will add jars to clientLoader.classLoader.
// For this reason we cannot load the jars added by ADDJarCommand because of class loader
// since HIVE-11878, and ADDJarsCommand will add jars to clientLoader.classLoader.
// For this reason we cannot load the jars added by ADDJarsCommand because of class loader
// got changed. We reset it to clientLoader.ClassLoader here.
state.getConf.setClassLoader(clientLoader.classLoader)
SessionState.start(state)

View file

@ -836,7 +836,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
Files.write(file.toPath, "test_file1".getBytes)
val jarFile = new File(dir, "test file.jar")
TestUtils.createJar(Seq(file), jarFile)
sql(s"ADD JAR ${jarFile.getAbsolutePath}")
sql(s"ADD JAR '${jarFile.getAbsolutePath}'")
assert(sql("LIST JARS").
filter(_.getString(0).contains(s"${jarFile.getName}".replace(" ", "%20"))).count() > 0)
}
@ -964,6 +964,104 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
}
}
test("SPARK-35105: ADD FILES command with multiple files") {
withTempDir { dir =>
val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
val file2 = File.createTempFile("someprefix2", "somesuffix 2", dir)
val file3 = File.createTempFile("someprefix3", "somesuffix 3", dir)
val file4 = File.createTempFile("someprefix4", "somesuffix4", dir)
Files.write(file1.toPath, "file1".getBytes)
Files.write(file2.toPath, "file2".getBytes)
Files.write(file3.toPath, "file3".getBytes)
Files.write(file4.toPath, "file3".getBytes)
sql(s"ADD FILE ${file1.getAbsolutePath} '${file2.getAbsoluteFile}'")
sql(s"""ADD FILES "${file3.getAbsolutePath}" ${file4.getAbsoluteFile}""")
val listFiles = sql(s"LIST FILES ${file1.getAbsolutePath} " +
s"'${file2.getAbsolutePath}' '${file3.getAbsolutePath}' ${file4.getAbsolutePath}")
assert(listFiles.count === 4)
assert(listFiles.filter(_.getString(0).contains(file1.getName)).count() === 1)
assert(listFiles.filter(
_.getString(0).contains(file2.getName.replace(" ", "%20"))).count() === 1)
assert(listFiles.filter(
_.getString(0).contains(file3.getName.replace(" ", "%20"))).count() === 1)
assert(listFiles.filter(_.getString(0).contains(file4.getName)).count() === 1)
}
}
test("SPARK-35105: ADD JARS command with multiple files") {
withTempDir { dir =>
val file1 = new File(dir, "test1.txt")
val file2 = new File(dir, "test2.txt")
val file3 = new File(dir, "test3.txt")
val file4 = new File(dir, "test4.txt")
Files.write(file1.toPath, "file1".getBytes)
Files.write(file2.toPath, "file2".getBytes)
Files.write(file3.toPath, "file3".getBytes)
Files.write(file4.toPath, "file4".getBytes)
val jarFile1 = File.createTempFile("someprefix1", "somesuffix 1", dir)
val jarFile2 = File.createTempFile("someprefix2", "somesuffix2", dir)
val jarFile3 = File.createTempFile("someprefix3", "somesuffix3", dir)
val jarFile4 = File.createTempFile("someprefix4", "somesuffix 4", dir)
TestUtils.createJar(Seq(file1), jarFile1)
TestUtils.createJar(Seq(file2), jarFile2)
TestUtils.createJar(Seq(file3), jarFile3)
TestUtils.createJar(Seq(file4), jarFile4)
sql(s"""ADD JAR "${jarFile1.getAbsolutePath}" ${jarFile2.getAbsoluteFile}""")
sql(s"ADD JARS ${jarFile3.getAbsolutePath} '${jarFile4.getAbsoluteFile}'")
val listFiles = sql(s"LIST JARS '${jarFile1.getAbsolutePath}' " +
s"${jarFile2.getAbsolutePath} ${jarFile3.getAbsolutePath} '${jarFile4.getAbsoluteFile}'")
assert(listFiles.count === 4)
assert(listFiles.filter(
_.getString(0).contains(jarFile1.getName.replace(" ", "%20"))).count() === 1)
assert(listFiles.filter(_.getString(0).contains(jarFile2.getName)).count() === 1)
assert(listFiles.filter(_.getString(0).contains(jarFile3.getName)).count() === 1)
assert(listFiles.filter(
_.getString(0).contains(jarFile4.getName.replace(" ", "%20"))).count() === 1)
}
}
test("SPARK-35105: ADD ARCHIVES command with multiple files") {
withTempDir { dir =>
val file1 = new File(dir, "test1.txt")
val file2 = new File(dir, "test2.txt")
val file3 = new File(dir, "test3.txt")
val file4 = new File(dir, "test4.txt")
Files.write(file1.toPath, "file1".getBytes)
Files.write(file2.toPath, "file2".getBytes)
Files.write(file3.toPath, "file3".getBytes)
Files.write(file4.toPath, "file4".getBytes)
val jarFile1 = File.createTempFile("someprefix1", "somesuffix1", dir)
val jarFile2 = File.createTempFile("someprefix2", "somesuffix 2", dir)
val jarFile3 = File.createTempFile("someprefix3", "somesuffix3", dir)
val jarFile4 = File.createTempFile("someprefix4", "somesuffix 4", dir)
TestUtils.createJar(Seq(file1), jarFile1)
TestUtils.createJar(Seq(file2), jarFile2)
TestUtils.createJar(Seq(file3), jarFile3)
TestUtils.createJar(Seq(file4), jarFile4)
sql(s"""ADD ARCHIVE ${jarFile1.getAbsolutePath} "${jarFile2.getAbsoluteFile}"""")
sql(s"ADD ARCHIVES ${jarFile3.getAbsolutePath} '${jarFile4.getAbsoluteFile}'")
val listFiles = sql(s"LIST ARCHIVES ${jarFile1.getAbsolutePath} " +
s"'${jarFile2.getAbsolutePath}' ${jarFile3.getAbsolutePath} '${jarFile4.getAbsolutePath}'")
assert(listFiles.count === 4)
assert(listFiles.filter(_.getString(0).contains(jarFile1.getName)).count() === 1)
assert(listFiles.filter(
_.getString(0).contains(jarFile2.getName.replace(" ", "%20"))).count() === 1)
assert(listFiles.filter(_.getString(0).contains(jarFile3.getName)).count() === 1)
assert(listFiles.filter(
_.getString(0).contains(jarFile4.getName.replace(" ", "%20"))).count() === 1)
}
}
test("SPARK-34977: LIST FILES/JARS/ARCHIVES should handle multiple quoted path arguments") {
withTempDir { dir =>
val file1 = File.createTempFile("someprefix1", "somesuffix1", dir)
@ -979,6 +1077,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
sql(s"ADD FILE '${file3.getAbsolutePath}'")
val listFiles = sql("LIST FILES " +
s"""'${file1.getAbsolutePath}' ${file2.getAbsolutePath} "${file3.getAbsolutePath}"""")
assert(listFiles.count === 3)
assert(listFiles.filter(_.getString(0).contains(file1.getName)).count() === 1)
assert(listFiles.filter(_.getString(0).contains(file2.getName)).count() === 1)
@ -1004,6 +1103,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
sql(s"ADD ARCHIVE '${jarFile3.getAbsolutePath}'")
val listArchives = sql(s"LIST ARCHIVES '${jarFile1.getAbsolutePath}' " +
s"""${jarFile2.getAbsolutePath} "${jarFile3.getAbsolutePath}"""")
assert(listArchives.count === 3)
assert(listArchives.filter(_.getString(0).contains(jarFile1.getName)).count() === 1)
assert(listArchives.filter(_.getString(0).contains(jarFile2.getName)).count() === 1)
@ -1026,7 +1126,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
sql(s"ADD JAR ${jarFile4.getAbsolutePath}")
sql(s"ADD JAR ${jarFile5.getAbsolutePath}")
sql(s"ADD JAR ${jarFile6.getAbsolutePath}")
sql(s"ADD JAR '${jarFile6.getAbsolutePath}'")
val listJars = sql(s"LIST JARS '${jarFile4.getAbsolutePath}' " +
s"""${jarFile5.getAbsolutePath} "${jarFile6.getAbsolutePath}"""")
assert(listJars.count === 3)