[SPARK-34506][CORE] ADD JAR with ivy coordinates should be compatible with Hive transitive behavior

### What changes were proposed in this pull request?
SPARK-33084 added the ability to use ivy coordinates with `SparkContext.addJar`. PR #29966 claims to mimic Hive behavior although I found a few cases where it doesn't

1) The default value of the transitive parameter is false, both in case of parameter not being specified in coordinate or parameter value being invalid. The Hive behavior is that transitive is [true if not specified](cb2ac3dcc6/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java (L169)) in the coordinate and [false for invalid values](cb2ac3dcc6/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java (L124)). Also, regardless of Hive, I think a default of true for the transitive parameter also matches [ivy's own defaults](https://ant.apache.org/ivy/history/2.5.0/ivyfile/dependency.html#_attributes).

2) The parameter value for transitive parameter is regarded as case-sensitive [based on the understanding](https://github.com/apache/spark/pull/29966#discussion_r547752259) that Hive behavior is case-sensitive. However, this is not correct, Hive [treats the parameter value case-insensitively](cb2ac3dcc6/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java (L122)).

I propose that we be compatible with Hive for these behaviors

### Why are the changes needed?
To make `ADD JAR` with ivy coordinates compatible with Hive's transitive behavior

### Does this PR introduce _any_ user-facing change?

The user-facing changes here are within master as the feature introduced in SPARK-33084 has not been released yet
1. Previously an ivy coordinate without `transitive` parameter specified did not resolve transitive dependency, now it does.
2. Previously an `transitive` parameter value was treated case-sensitively. e.g. `transitive=TRUE` would be treated as false as it did not match exactly `true`. Now it will be treated case-insensitively.

### How was this patch tested?

Modified existing unit tests to test new behavior
Add new unit test to cover usage of `exclude` with unspecified `transitive`

Closes #31623 from shardulm94/spark-34506.

Authored-by: Shardul Mahadik <smahadik@linkedin.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
This commit is contained in:
Shardul Mahadik 2021-03-01 09:10:20 +09:00 committed by Takeshi Yamamuro
parent d07fc3076b
commit 0216051aca
5 changed files with 42 additions and 22 deletions

View file

@ -59,8 +59,9 @@ private[spark] object DependencyUtils extends Logging {
* @param uri Ivy URI need to be downloaded.
* @return Tuple value of parameter `transitive` and `exclude` value.
*
* 1. transitive: whether to download dependency jar of Ivy URI, default value is false
* and this parameter value is case-sensitive. Invalid value will be treat as false.
* 1. transitive: whether to download dependency jar of Ivy URI, default value is true
* and this parameter value is case-insensitive. This mimics Hive's behaviour for
* parsing the transitive parameter. Invalid value will be treat as false.
* Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true
* Output: true
*
@ -72,7 +73,7 @@ private[spark] object DependencyUtils extends Logging {
private def parseQueryParams(uri: URI): (Boolean, String) = {
val uriQuery = uri.getQuery
if (uriQuery == null) {
(false, "")
(true, "")
} else {
val mapTokens = uriQuery.split("&").map(_.split("="))
if (mapTokens.exists(isInvalidQueryString)) {
@ -81,14 +82,15 @@ private[spark] object DependencyUtils extends Logging {
}
val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
// Parse transitive parameters (e.g., transitive=true) in an Ivy URI, default value is false
// Parse transitive parameters (e.g., transitive=true) in an Ivy URI, default value is true
val transitiveParams = groupedParams.get("transitive")
if (transitiveParams.map(_.size).getOrElse(0) > 1) {
logWarning("It's best to specify `transitive` parameter in ivy URI query only once." +
" If there are multiple `transitive` parameter, we will select the last one")
}
val transitive =
transitiveParams.flatMap(_.takeRight(1).map(_._2 == "true").headOption).getOrElse(false)
transitiveParams.flatMap(_.takeRight(1).map(_._2.equalsIgnoreCase("true")).headOption)
.getOrElse(true)
// Parse an excluded list (e.g., exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
// in an Ivy URI. When download Ivy URI jar, Spark won't download transitive jar
@ -125,7 +127,7 @@ private[spark] object DependencyUtils extends Logging {
* `parameter=value&parameter=value...`
* Note that currently Ivy URI query part support two parameters:
* 1. transitive: whether to download dependent jars related to your Ivy URI.
* transitive=false or `transitive=true`, if not set, the default value is false.
* transitive=false or `transitive=true`, if not set, the default value is true.
* 2. exclude: exclusion list when download Ivy URI jar and dependency jars.
* The `exclude` parameter content is a ',' separated `group:module` pair string :
* `exclude=group:module,group:module...`

View file

@ -1035,13 +1035,10 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}
test("SPARK-33084: Add jar support Ivy URI -- default transitive = false") {
test("SPARK-33084: Add jar support Ivy URI -- default transitive = true") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0")
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")
assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
}
@ -1083,6 +1080,22 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}
test("SPARK-34506: Add jar support Ivy URI -- transitive=false will not download " +
"dependency jars") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=false")
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
}
test("SPARK-34506: Add jar support Ivy URI -- test exclude param when transitive unspecified") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?exclude=commons-lang:commons-lang")
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
assert(sc.listJars().exists(_.contains("org.slf4j_slf4j-api-1.7.10.jar")))
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
}
test("SPARK-33084: Add jar support Ivy URI -- test exclude param when transitive=true") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0" +
@ -1131,24 +1144,24 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
test("SPARK-33084: Add jar support Ivy URI -- test param key case sensitive") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?TRANSITIVE=true")
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=false")
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?TRANSITIVE=false")
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
}
test("SPARK-33084: Add jar support Ivy URI -- test transitive value case sensitive") {
test("SPARK-33084: Add jar support Ivy URI -- test transitive value case insensitive") {
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=TRUE")
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=FALSE")
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=true")
sc.addJar("ivy://org.apache.hive:hive-storage-api:2.7.0?transitive=false")
assert(sc.listJars().exists(_.contains("org.apache.hive_hive-storage-api-2.7.0.jar")))
assert(sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
assert(!sc.listJars().exists(_.contains("commons-lang_commons-lang-2.6.jar")))
}
test("SPARK-34346: hadoop configuration priority for spark/hive/hadoop configs") {

View file

@ -36,7 +36,7 @@ ADD JAR file_name
The name of the JAR file to be added. It could be either on a local file system or a distributed file system or an Ivy URI.
Apache Ivy is a popular dependency manager focusing on flexibility and simplicity. Now we support two parameter in URI query string:
* transitive: whether to download dependent jars related to your ivy URL. It is case-sensitive and only take last one if multiple transitive parameters are specified.
* transitive: whether to download dependent jars related to your ivy URL. The parameter name is case-sensitive, and the parameter value is case-insensitive. If multiple transitive parameters are specified, the last one wins.
* exclude: exclusion list during downloading Ivy URI jar and dependent jars.
User can write Ivy URI such as:

View file

@ -3726,13 +3726,13 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
test("SPARK-33084: Add jar support Ivy URI in SQL") {
val sc = spark.sparkContext
val hiveVersion = "2.3.8"
// default transitive=false, only download specified jar
sql(s"ADD JAR ivy://org.apache.hive.hcatalog:hive-hcatalog-core:$hiveVersion")
// transitive=false, only download specified jar
sql(s"ADD JAR ivy://org.apache.hive.hcatalog:hive-hcatalog-core:$hiveVersion?transitive=false")
assert(sc.listJars()
.exists(_.contains(s"org.apache.hive.hcatalog_hive-hcatalog-core-$hiveVersion.jar")))
// test download ivy URL jar return multiple jars
sql("ADD JAR ivy://org.scala-js:scalajs-test-interface_2.12:1.2.0?transitive=true")
// default transitive=true, test download ivy URL jar return multiple jars
sql("ADD JAR ivy://org.scala-js:scalajs-test-interface_2.12:1.2.0")
assert(sc.listJars().exists(_.contains("scalajs-library_2.12")))
assert(sc.listJars().exists(_.contains("scalajs-test-interface_2.12")))

View file

@ -1224,7 +1224,12 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
test("SPARK-33084: Add jar support Ivy URI in SQL") {
val testData = TestHive.getHiveFile("data/files/sample.json").toURI
withTable("t") {
sql(s"ADD JAR ivy://org.apache.hive.hcatalog:hive-hcatalog-core:$hiveVersion")
// hive-catalog-core has some transitive dependencies which dont exist on maven central
// and hence cannot be found in the test environment or are non-jar (.pom) which cause
// failures in tests. Use transitive=false as it should be good enough to test the Ivy
// support in Hive ADD JAR
sql(s"ADD JAR ivy://org.apache.hive.hcatalog:hive-hcatalog-core:$hiveVersion" +
"?transitive=false")
sql(
"""CREATE TABLE t(a string, b string)
|ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'""".stripMargin)