[SPARK-35236][SQL] Support archive files as resources for CREATE FUNCTION USING syntax

### What changes were proposed in this pull request?

This PR proposes to make `CREATE FUNCTION USING` syntax can take archives as resources.

### Why are the changes needed?

It would be useful.
`CREATE FUNCTION USING` syntax doesn't support archives as resources because archives were not supported in Spark SQL.
Now Spark SQL supports archives so I think we can support them for the syntax.

### Does this PR introduce _any_ user-facing change?

Yes. Users can specify archives for `CREATE FUNCTION USING` syntax.

### How was this patch tested?

New test.

Closes #32359 from sarutak/load-function-using-archive.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
This commit is contained in:
Kousuke Saruta 2021-04-28 10:15:21 +09:00 committed by hyukjinkwon
parent 0769049ee1
commit abb1f0c5d7
2 changed files with 53 additions and 5 deletions

View file

@ -150,10 +150,7 @@ class SessionResourceLoader(session: SparkSession) extends FunctionResourceLoade
resource.resourceType match {
case JarResource => addJar(resource.uri)
case FileResource => session.sparkContext.addFile(resource.uri)
case ArchiveResource =>
throw new AnalysisException(
"Archive is not allowed to be loaded. If YARN mode is used, " +
"please use --archives options while calling spark-submit.")
case ArchiveResource => session.sparkContext.addArchive(resource.uri)
}
}

View file

@ -20,6 +20,8 @@ package org.apache.spark.sql.hive.execution
import java.io.{DataInput, DataOutput, File, PrintWriter}
import java.util.{ArrayList, Arrays, Properties}
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType}
@ -30,6 +32,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectIns
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.io.{LongWritable, Writable}
import org.apache.spark.{SparkFiles, TestUtils}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.command.FunctionsCommand
@ -676,6 +679,47 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
assert(msg2.contains(s"No handler for UDF/UDAF/UDTF '${classOf[ArraySumUDF].getName}'"))
}
}
test("SPARK-35236: CREATE FUNCTION should take an archive in USING clause") {
withTempDir { dir =>
withUserDefinedFunction("testListFiles1" -> false) {
val text1 = File.createTempFile("test1_", ".txt", dir)
val json1 = File.createTempFile("test1_", ".json", dir)
val zipFile1 = File.createTempFile("test1_", ".zip", dir)
TestUtils.createJar(Seq(text1, json1), zipFile1)
sql(s"CREATE FUNCTION testListFiles1 AS '${classOf[ListFiles].getName}' " +
s"USING ARCHIVE '${zipFile1.getAbsolutePath}'")
val df1 = sql(s"SELECT testListFiles1('${SparkFiles.get(zipFile1.getName)}')")
val fileList1 =
df1.collect().map(_.getList[String](0)).head.asScala.filter(_ != "META-INF")
assert(fileList1.length === 2)
assert(fileList1.contains(text1.getName))
assert(fileList1.contains(json1.getName))
}
// Test for file#alias style archive registration.
withUserDefinedFunction("testListFiles2" -> false) {
val text2 = File.createTempFile("test2_", ".txt", dir)
val json2 = File.createTempFile("test2_", ".json", dir)
val csv2 = File.createTempFile("test2", ".csv", dir)
val zipFile2 = File.createTempFile("test2_", ".zip", dir)
TestUtils.createJar(Seq(text2, json2, csv2), zipFile2)
sql(s"CREATE FUNCTION testListFiles2 AS '${classOf[ListFiles].getName}' " +
s"USING ARCHIVE '${zipFile2.getAbsolutePath}#foo'")
val df2 = sql(s"SELECT testListFiles2('${SparkFiles.get("foo")}')")
val fileList2 =
df2.collect().map(_.getList[String](0)).head.asScala.filter(_ != "META-INF")
assert(fileList2.length === 3)
assert(fileList2.contains(text2.getName))
assert(fileList2.contains(json2.getName))
assert(fileList2.contains(csv2.getName))
}
}
}
}
class TestPair(x: Int, y: Int) extends Writable with Serializable {
@ -761,7 +805,6 @@ class StatelessUDF extends UDF {
}
class ArraySumUDF extends UDF {
import scala.collection.JavaConverters._
def evaluate(values: java.util.List[java.lang.Double]): java.lang.Double = {
var r = 0d
for (v <- values.asScala) {
@ -770,3 +813,11 @@ class ArraySumUDF extends UDF {
r
}
}
class ListFiles extends UDF {
import java.util.{ArrayList, Arrays, List => JList}
def evaluate(path: String): JList[String] = {
val fileArray = new File(path).list()
if (fileArray != null) Arrays.asList(fileArray: _*) else new ArrayList[String]()
}
}